Mix tasks: improve uploads.ex moduledoc
[akkoma] / lib / mix / tasks / pleroma / uploads.ex
1 defmodule Mix.Tasks.Pleroma.Uploads do
2 use Mix.Task
3 import Mix.Ecto
4 alias Pleroma.{Upload, Uploaders.Local, Uploaders.S3}
5 alias Mix.Tasks.Pleroma.Common
6 require Logger
7
8 @log_every 50
9 @shortdoc "Manages uploads"
10 @moduledoc """
11 Manages uploads.
12
13 ## Migrate uploads from local to remote storage
14 mix pleroma.uploads migrate_local TARGET_UPLOADER [OPTIONS...]
15 Options:
16 - `--delete` - delete local uploads after migrating them to the target uploader
17
18
19 A list of avalible uploaders can be seen in config.exs
20 """
21
22 def run(["migrate_local", target_uploader | args]) do
23 delete? = Enum.member?(args, "--delete")
24 Common.start_pleroma()
25 local_path = Pleroma.Config.get!([Local, :uploads])
26 uploader = Module.concat(Pleroma.Uploaders, target_uploader)
27
28 unless Code.ensure_loaded?(uploader) do
29 raise("The uploader #{inspect(uploader)} is not an existing/loaded module.")
30 end
31
32 target_enabled? = Pleroma.Config.get([Upload, :uploader]) == uploader
33
34 unless target_enabled? do
35 Pleroma.Config.put([Upload, :uploader], uploader)
36 end
37
38 Mix.shell().info("Migrating files from local #{local_path} to #{to_string(uploader)}")
39
40 if delete? do
41 Mix.shell().info(
42 "Attention: uploaded files will be deleted, hope you have backups! (--delete ; cancel with ^C)"
43 )
44
45 :timer.sleep(:timer.seconds(5))
46 end
47
48 uploads =
49 File.ls!(local_path)
50 |> Enum.map(fn id ->
51 root_path = Path.join(local_path, id)
52
53 cond do
54 File.dir?(root_path) ->
55 files = for file <- File.ls!(root_path), do: {id, file, Path.join([root_path, file])}
56
57 case List.first(files) do
58 {id, file, path} ->
59 {%Pleroma.Upload{id: id, name: file, path: id <> "/" <> file, tempfile: path},
60 root_path}
61
62 _ ->
63 nil
64 end
65
66 File.exists?(root_path) ->
67 file = Path.basename(id)
68 [hash, ext] = String.split(id, ".")
69 {%Pleroma.Upload{id: hash, name: file, path: file, tempfile: root_path}, root_path}
70
71 true ->
72 nil
73 end
74 end)
75 |> Enum.filter(& &1)
76
77 total_count = length(uploads)
78 Mix.shell().info("Found #{total_count} uploads")
79
80 uploads
81 |> Task.async_stream(
82 fn {upload, root_path} ->
83 case Upload.store(upload, uploader: uploader, filters: [], size_limit: nil) do
84 {:ok, _} ->
85 if delete?, do: File.rm_rf!(root_path)
86 Logger.debug("uploaded: #{inspect(upload.path)} #{inspect(upload)}")
87 :ok
88
89 error ->
90 Mix.shell().error("failed to upload #{inspect(upload.path)}: #{inspect(error)}")
91 end
92 end,
93 timeout: 150_000
94 )
95 |> Stream.chunk_every(@log_every)
96 |> Enum.reduce(0, fn done, count ->
97 count = count + length(done)
98 Mix.shell().info("Uploaded #{count}/#{total_count} files")
99 count
100 end)
101
102 Mix.shell().info("Done!")
103 end
104 end