Merge branch 'user_info_fixes_again' into 'develop'
[akkoma] / lib / mix / tasks / migrate_local_uploads.ex
1 defmodule Mix.Tasks.MigrateLocalUploads do
2 use Mix.Task
3 import Mix.Ecto
4 alias Pleroma.{Upload, Uploaders.Local, Uploaders.S3}
5 require Logger
6
7 @log_every 50
8 @shortdoc "Migrate uploads from local to remote storage"
9
10 def run([target_uploader | args]) do
11 delete? = Enum.member?(args, "--delete")
12 Application.ensure_all_started(:pleroma)
13
14 local_path = Pleroma.Config.get!([Local, :uploads])
15 uploader = Module.concat(Pleroma.Uploaders, target_uploader)
16
17 unless Code.ensure_loaded?(uploader) do
18 raise("The uploader #{inspect(uploader)} is not an existing/loaded module.")
19 end
20
21 target_enabled? = Pleroma.Config.get([Upload, :uploader]) == uploader
22
23 unless target_enabled? do
24 Pleroma.Config.put([Upload, :uploader], uploader)
25 end
26
27 Logger.info("Migrating files from local #{local_path} to #{to_string(uploader)}")
28
29 if delete? do
30 Logger.warn(
31 "Attention: uploaded files will be deleted, hope you have backups! (--delete ; cancel with ^C)"
32 )
33
34 :timer.sleep(:timer.seconds(5))
35 end
36
37 uploads =
38 File.ls!(local_path)
39 |> Enum.map(fn id ->
40 root_path = Path.join(local_path, id)
41
42 cond do
43 File.dir?(root_path) ->
44 files = for file <- File.ls!(root_path), do: {id, file, Path.join([root_path, file])}
45
46 case List.first(files) do
47 {id, file, path} ->
48 {%Pleroma.Upload{id: id, name: file, path: id <> "/" <> file, tempfile: path},
49 root_path}
50
51 _ ->
52 nil
53 end
54
55 File.exists?(root_path) ->
56 file = Path.basename(id)
57 [hash, ext] = String.split(id, ".")
58 {%Pleroma.Upload{id: hash, name: file, path: file, tempfile: root_path}, root_path}
59
60 true ->
61 nil
62 end
63 end)
64 |> Enum.filter(& &1)
65
66 total_count = length(uploads)
67 Logger.info("Found #{total_count} uploads")
68
69 uploads
70 |> Task.async_stream(
71 fn {upload, root_path} ->
72 case Upload.store(upload, uploader: uploader, filters: [], size_limit: nil) do
73 {:ok, _} ->
74 if delete?, do: File.rm_rf!(root_path)
75 Logger.debug("uploaded: #{inspect(upload.path)} #{inspect(upload)}")
76 :ok
77
78 error ->
79 Logger.error("failed to upload #{inspect(upload.path)}: #{inspect(error)}")
80 end
81 end,
82 timeout: 150_000
83 )
84 |> Stream.chunk_every(@log_every)
85 |> Enum.reduce(0, fn done, count ->
86 count = count + length(done)
87 Logger.info("Uploaded #{count}/#{total_count} files")
88 count
89 end)
90
91 Logger.info("Done!")
92 end
93
94 def run(_) do
95 Logger.error("Usage: migrate_local_uploads S3|Swift [--delete]")
96 end
97 end