migrate to oban 2.0-rc1
[akkoma] / lib / pleroma / workers / attachments_cleanup_worker.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Workers.AttachmentsCleanupWorker do
6 import Ecto.Query
7
8 alias Pleroma.Object
9 alias Pleroma.Repo
10
11 use Pleroma.Workers.WorkerHelper, queue: "attachments_cleanup"
12
13 @impl Oban.Worker
14 def perform(%Job{
15 args: %{
16 "op" => "cleanup_attachments",
17 "object" => %{"data" => %{"attachment" => [_ | _] = attachments, "actor" => actor}}
18 }
19 }) do
20 attachments
21 |> Enum.flat_map(fn item -> Enum.map(item["url"], & &1["href"]) end)
22 |> fetch_objects
23 |> prepare_objects(actor, Enum.map(attachments, & &1["name"]))
24 |> filter_objects
25 |> do_clean
26
27 {:ok, :success}
28 end
29
30 def perform(%Job{args: %{"op" => "cleanup_attachments", "object" => _object}}), do: {:ok, :skip}
31
32 defp do_clean({object_ids, attachment_urls}) do
33 uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])
34
35 prefix =
36 case Pleroma.Config.get([Pleroma.Upload, :base_url]) do
37 nil -> "media"
38 _ -> ""
39 end
40
41 base_url =
42 String.trim_trailing(
43 Pleroma.Config.get([Pleroma.Upload, :base_url], Pleroma.Web.base_url()),
44 "/"
45 )
46
47 Enum.each(attachment_urls, fn href ->
48 href
49 |> String.trim_leading("#{base_url}/#{prefix}")
50 |> uploader.delete_file()
51 end)
52
53 delete_objects(object_ids)
54 end
55
56 defp delete_objects([_ | _] = object_ids) do
57 Repo.delete_all(from(o in Object, where: o.id in ^object_ids))
58 end
59
60 defp delete_objects(_), do: :ok
61
62 # we should delete 1 object for any given attachment, but don't delete
63 # files if there are more than 1 object for it
64 defp filter_objects(objects) do
65 Enum.reduce(objects, {[], []}, fn {href, %{id: id, count: count}}, {ids, hrefs} ->
66 with 1 <- count do
67 {ids ++ [id], hrefs ++ [href]}
68 else
69 _ -> {ids ++ [id], hrefs}
70 end
71 end)
72 end
73
74 defp prepare_objects(objects, actor, names) do
75 objects
76 |> Enum.reduce(%{}, fn %{
77 id: id,
78 data: %{
79 "url" => [%{"href" => href}],
80 "actor" => obj_actor,
81 "name" => name
82 }
83 },
84 acc ->
85 Map.update(acc, href, %{id: id, count: 1}, fn val ->
86 case obj_actor == actor and name in names do
87 true ->
88 # set id of the actor's object that will be deleted
89 %{val | id: id, count: val.count + 1}
90
91 false ->
92 # another actor's object, just increase count to not delete file
93 %{val | count: val.count + 1}
94 end
95 end)
96 end)
97 end
98
99 defp fetch_objects(hrefs) do
100 from(o in Object,
101 where:
102 fragment(
103 "to_jsonb(array(select jsonb_array_elements((?)#>'{url}') ->> 'href' where jsonb_typeof((?)#>'{url}') = 'array'))::jsonb \\?| (?)",
104 o.data,
105 o.data,
106 ^hrefs
107 )
108 )
109 # The query above can be time consumptive on large instances until we
110 # refactor how uploads are stored
111 |> Repo.all(timeout: :infinity)
112 end
113 end