Delete attachments asynchronously
authorRoman Chvanikov <chvanikoff@pm.me>
Sun, 19 Jan 2020 16:45:20 +0000 (19:45 +0300)
committerRoman Chvanikov <chvanikoff@pm.me>
Sun, 19 Jan 2020 16:45:20 +0000 (19:45 +0300)
lib/pleroma/object.ex
lib/pleroma/workers/attachments_cleanup_worker.ex [new file with mode: 0644]
test/object_test.exs

index 499339982b9c1b314a2d4eeded4ed01a8ff64e59..38e372f6ddf6817dc04615a0f014d279ddc90eaa 100644 (file)
@@ -19,6 +19,8 @@ defmodule Pleroma.Object do
 
   @type t() :: %__MODULE__{}
 
+  @derive {Jason.Encoder, only: [:data]}
+
   schema "objects" do
     field(:data, :map)
 
@@ -183,83 +185,14 @@ defmodule Pleroma.Object do
          deleted_activity = Activity.delete_all_by_object_ap_id(id),
          {:ok, true} <- Cachex.del(:object_cache, "object:#{id}"),
          {:ok, _} <- Cachex.del(:web_resp_cache, URI.parse(id).path),
-         :ok <- delete_attachments(object) do
+         {:ok, _} <-
+           Pleroma.Workers.AttachmentsCleanupWorker.enqueue("cleanup_attachments", %{
+             "object" => object
+           }) do
       {:ok, object, deleted_activity}
     end
   end
 
-  defp delete_attachments(%{data: %{"attachment" => [_ | _] = attachments, "actor" => actor}}) do
-    hrefs =
-      Enum.flat_map(attachments, fn attachment ->
-        Enum.map(attachment["url"], & &1["href"])
-      end)
-
-    names = Enum.map(attachments, & &1["name"])
-
-    uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])
-
-    # find all objects for copies of the attachments, name and actor doesn't matter here
-    delete_ids =
-      from(o in Object,
-        where:
-          fragment(
-            "to_jsonb(array(select jsonb_array_elements((?)#>'{url}') ->> 'href' where jsonb_typeof((?)#>'{url}') = 'array'))::jsonb \\?| (?)",
-            o.data,
-            o.data,
-            ^hrefs
-          )
-      )
-      |> Repo.all()
-      # we should delete 1 object for any given attachment, but don't delete files if
-      # there are more than 1 object for it
-      |> Enum.reduce(%{}, fn %{
-                               id: id,
-                               data: %{
-                                 "url" => [%{"href" => href}],
-                                 "actor" => obj_actor,
-                                 "name" => name
-                               }
-                             },
-                             acc ->
-        Map.update(acc, href, %{id: id, count: 1}, fn val ->
-          case obj_actor == actor and name in names do
-            true ->
-              # set id of the actor's object that will be deleted
-              %{val | id: id, count: val.count + 1}
-
-            false ->
-              # another actor's object, just increase count to not delete file
-              %{val | count: val.count + 1}
-          end
-        end)
-      end)
-      |> Enum.map(fn {href, %{id: id, count: count}} ->
-        # only delete files that have single instance
-        with 1 <- count do
-          prefix =
-            case Pleroma.Config.get([Pleroma.Upload, :base_url]) do
-              nil -> "media"
-              _ -> ""
-            end
-
-          base_url = Pleroma.Config.get([__MODULE__, :base_url], Pleroma.Web.base_url())
-
-          file_path = String.trim_leading(href, "#{base_url}/#{prefix}")
-
-          uploader.delete_file(file_path)
-        end
-
-        id
-      end)
-
-    from(o in Object, where: o.id in ^delete_ids)
-    |> Repo.delete_all()
-
-    :ok
-  end
-
-  defp delete_attachments(%{data: _data}), do: :ok
-
   def prune(%Object{data: %{"id" => id}} = object) do
     with {:ok, object} <- Repo.delete(object),
          {:ok, true} <- Cachex.del(:object_cache, "object:#{id}"),
diff --git a/lib/pleroma/workers/attachments_cleanup_worker.ex b/lib/pleroma/workers/attachments_cleanup_worker.ex
new file mode 100644 (file)
index 0000000..f8239ec
--- /dev/null
@@ -0,0 +1,87 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.AttachmentsCleanupWorker do
+  import Ecto.Query
+
+  alias Pleroma.Object
+  alias Pleroma.Repo
+
+  use Pleroma.Workers.WorkerHelper, queue: "attachments_cleanup"
+
+  @impl Oban.Worker
+  def perform(
+        %{"object" => %{"data" => %{"attachment" => [_ | _] = attachments, "actor" => actor}}},
+        _job
+      ) do
+    hrefs =
+      Enum.flat_map(attachments, fn attachment ->
+        Enum.map(attachment["url"], & &1["href"])
+      end)
+
+    names = Enum.map(attachments, & &1["name"])
+
+    uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])
+
+    # find all objects for copies of the attachments, name and actor doesn't matter here
+    delete_ids =
+      from(o in Object,
+        where:
+          fragment(
+            "to_jsonb(array(select jsonb_array_elements((?)#>'{url}') ->> 'href' where jsonb_typeof((?)#>'{url}') = 'array'))::jsonb \\?| (?)",
+            o.data,
+            o.data,
+            ^hrefs
+          )
+      )
+      # The query above can be time consumptive on large instances until we refactor how uploads are stored
+      |> Repo.all(timout: :infinity)
+      # we should delete 1 object for any given attachment, but don't delete files if
+      # there are more than 1 object for it
+      |> Enum.reduce(%{}, fn %{
+                               id: id,
+                               data: %{
+                                 "url" => [%{"href" => href}],
+                                 "actor" => obj_actor,
+                                 "name" => name
+                               }
+                             },
+                             acc ->
+        Map.update(acc, href, %{id: id, count: 1}, fn val ->
+          case obj_actor == actor and name in names do
+            true ->
+              # set id of the actor's object that will be deleted
+              %{val | id: id, count: val.count + 1}
+
+            false ->
+              # another actor's object, just increase count to not delete file
+              %{val | count: val.count + 1}
+          end
+        end)
+      end)
+      |> Enum.map(fn {href, %{id: id, count: count}} ->
+        # only delete files that have single instance
+        with 1 <- count do
+          prefix =
+            case Pleroma.Config.get([Pleroma.Upload, :base_url]) do
+              nil -> "media"
+              _ -> ""
+            end
+
+          base_url = Pleroma.Config.get([__MODULE__, :base_url], Pleroma.Web.base_url())
+
+          file_path = String.trim_leading(href, "#{base_url}/#{prefix}")
+
+          uploader.delete_file(file_path)
+        end
+
+        id
+      end)
+
+    from(o in Object, where: o.id in ^delete_ids)
+    |> Repo.delete_all()
+  end
+
+  def perform(%{"object" => _object}, _job), do: :ok
+end
index b002c2bae96ebf2fb0b5cd4f1778c73f83d824ce..997ec9691ea1b90562f9e9380e956b6dd3f07129 100644 (file)
@@ -4,12 +4,14 @@
 
 defmodule Pleroma.ObjectTest do
   use Pleroma.DataCase
+  use Oban.Testing, repo: Pleroma.Repo
   import ExUnit.CaptureLog
   import Pleroma.Factory
   import Tesla.Mock
   alias Pleroma.Activity
   alias Pleroma.Object
   alias Pleroma.Repo
+  alias Pleroma.Tests.ObanHelpers
   alias Pleroma.Web.CommonAPI
 
   setup do
@@ -99,6 +101,8 @@ defmodule Pleroma.ObjectTest do
 
       Object.delete(note)
 
+      ObanHelpers.perform(all_enqueued(worker: Pleroma.Workers.AttachmentsCleanupWorker))
+
       assert Object.get_by_id(attachment.id) == nil
 
       assert {:ok, []} == File.ls("#{uploads_dir}/#{path}")
@@ -133,6 +137,8 @@ defmodule Pleroma.ObjectTest do
 
       Object.delete(note)
 
+      ObanHelpers.perform(all_enqueued(worker: Pleroma.Workers.AttachmentsCleanupWorker))
+
       assert Object.get_by_id(attachment.id) == nil
       assert {:ok, files} = File.ls(uploads_dir)
       refute filename in files