Moving some background jobs into simple tasks
authorAlexander Strizhakov <alex.strizhakov@gmail.com>
Thu, 10 Sep 2020 07:54:57 +0000 (10:54 +0300)
committerAlexander Strizhakov <alex.strizhakov@gmail.com>
Wed, 11 Nov 2020 10:39:49 +0000 (13:39 +0300)
- fetching activity data
- attachment prefetching
- using limiter to prevent overload

lib/pleroma/application.ex
lib/pleroma/web/activity_pub/activity_pub.ex
lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex
lib/pleroma/web/activity_pub/side_effects.ex
lib/pleroma/web/rich_media/helpers.ex
lib/pleroma/workers/background_worker.ex
priv/repo/migrations/20200915095704_remove_background_jobs.exs [new file with mode: 0644]
test/pleroma/config/deprecation_warnings_test.exs
test/pleroma/web/activity_pub/mrf/media_proxy_warming_policy_test.exs
test/pleroma/web/mastodon_api/controllers/status_controller_test.exs
test/pleroma/web/pleroma_api/views/chat_message_reference_view_test.exs

index 7c4cd9626d04c30fd06d2bb54fedda9d99951603..769af1806cdfc0b7ae0a16caf3268cc58ebaf4cd 100644 (file)
@@ -57,6 +57,7 @@ defmodule Pleroma.Application do
     setup_instrumenters()
     load_custom_modules()
     Pleroma.Docs.JSON.compile()
+    limiters_setup()
 
     adapter = Application.get_env(:tesla, :adapter)
 
@@ -273,4 +274,9 @@ defmodule Pleroma.Application do
   end
 
   defp http_children(_, _), do: []
+
+  def limiters_setup do
+    [Pleroma.Web.RichMedia.Helpers, Pleroma.Web.MediaProxy]
+    |> Enum.each(&ConcurrentLimiter.new(&1, 1, 0))
+  end
 end
index d8f685d38112a88bdb9065a06ad8fa1166507e66..6008f2f4ab1e026c7fd0db5d768b948598b863ef 100644 (file)
@@ -123,7 +123,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
       # Splice in the child object if we have one.
       activity = Maps.put_if_present(activity, :object, object)
 
-      BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
+      ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
+        Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
+      end)
 
       {:ok, activity}
     else
index 0fb05d3c4be9dafa2cee8427b0922b55e6dcc48d..816cc89bfe2f9799ff62303bb64ce83801648365 100644 (file)
@@ -8,7 +8,6 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
 
   alias Pleroma.HTTP
   alias Pleroma.Web.MediaProxy
-  alias Pleroma.Workers.BackgroundWorker
 
   require Logger
 
@@ -17,7 +16,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
     recv_timeout: 10_000
   ]
 
-  def perform(:prefetch, url) do
+  defp prefetch(url) do
     # Fetching only proxiable resources
     if MediaProxy.enabled?() and MediaProxy.url_proxiable?(url) do
       # If preview proxy is enabled, it'll also hit media proxy (so we're caching both requests)
@@ -25,17 +24,25 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
 
       Logger.debug("Prefetching #{inspect(url)} as #{inspect(prefetch_url)}")
 
-      HTTP.get(prefetch_url, [], @adapter_options)
+      if Pleroma.Config.get(:env) == :test do
+        fetch(prefetch_url)
+      else
+        ConcurrentLimiter.limit(MediaProxy, fn ->
+          Task.start(fn -> fetch(prefetch_url) end)
+        end)
+      end
     end
   end
 
-  def perform(:preload, %{"object" => %{"attachment" => attachments}} = _message) do
+  defp fetch(url), do: HTTP.get(url, [], @adapter_options)
+
+  defp preload(%{"object" => %{"attachment" => attachments}} = _message) do
     Enum.each(attachments, fn
       %{"url" => url} when is_list(url) ->
         url
         |> Enum.each(fn
           %{"href" => href} ->
-            BackgroundWorker.enqueue("media_proxy_prefetch", %{"url" => href})
+            prefetch(href)
 
           x ->
             Logger.debug("Unhandled attachment URL object #{inspect(x)}")
@@ -51,7 +58,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
         %{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message
       )
       when is_list(attachments) and length(attachments) > 0 do
-    BackgroundWorker.enqueue("media_proxy_preload", %{"message" => message})
+    preload(message)
 
     {:ok, message}
   end
index bbff35c360fd9b6f85083530f2a04b32c5649f00..4d8fb721e9465dc4eb384d1bbe56f23e20e55b91 100644 (file)
@@ -24,7 +24,6 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
   alias Pleroma.Web.ActivityPub.Utils
   alias Pleroma.Web.Push
   alias Pleroma.Web.Streamer
-  alias Pleroma.Workers.BackgroundWorker
 
   require Logger
 
@@ -191,7 +190,9 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
         Object.increase_replies_count(in_reply_to)
       end
 
-      BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
+      ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
+        Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
+      end)
 
       meta =
         meta
index d67b594b57dd10a2edce2bc9362e8ff1b7dcd219..442bf99957a9aa952518f5777bbed9dff806bba4 100644 (file)
@@ -78,11 +78,6 @@ defmodule Pleroma.Web.RichMedia.Helpers do
 
   def fetch_data_for_activity(_), do: %{}
 
-  def perform(:fetch, %Activity{} = activity) do
-    fetch_data_for_activity(activity)
-    :ok
-  end
-
   def rich_media_get(url) do
     headers = [{"user-agent", Pleroma.Application.user_agent() <> "; Bot"}]
 
index 55b5a13d9e451950a5bbf8560a3b7e0e3371a3fd..0647c65ae2b25022e7639d8802ae1f6a8eb1b1c9 100644 (file)
@@ -3,9 +3,7 @@
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Workers.BackgroundWorker do
-  alias Pleroma.Activity
   alias Pleroma.User
-  alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
 
   use Pleroma.Workers.WorkerHelper, queue: "background"
 
@@ -32,19 +30,6 @@ defmodule Pleroma.Workers.BackgroundWorker do
     {:ok, User.Import.perform(String.to_atom(op), user, identifiers)}
   end
 
-  def perform(%Job{args: %{"op" => "media_proxy_preload", "message" => message}}) do
-    MediaProxyWarmingPolicy.perform(:preload, message)
-  end
-
-  def perform(%Job{args: %{"op" => "media_proxy_prefetch", "url" => url}}) do
-    MediaProxyWarmingPolicy.perform(:prefetch, url)
-  end
-
-  def perform(%Job{args: %{"op" => "fetch_data_for_activity", "activity_id" => activity_id}}) do
-    activity = Activity.get_by_id(activity_id)
-    Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity)
-  end
-
   def perform(%Job{
         args: %{"op" => "move_following", "origin_id" => origin_id, "target_id" => target_id}
       }) do
diff --git a/priv/repo/migrations/20200915095704_remove_background_jobs.exs b/priv/repo/migrations/20200915095704_remove_background_jobs.exs
new file mode 100644 (file)
index 0000000..9785bfb
--- /dev/null
@@ -0,0 +1,22 @@
+defmodule Pleroma.Repo.Migrations.RemoveBackgroundJobs do
+  use Ecto.Migration
+
+  import Ecto.Query, only: [from: 2]
+
+  def up do
+    from(j in "oban_jobs",
+      where:
+        j.queue == ^"background" and
+          fragment("?->>'op'", j.args) in ^[
+            "fetch_data_for_activity",
+            "media_proxy_prefetch",
+            "media_proxy_preload"
+          ] and
+          j.worker == ^"Pleroma.Workers.BackgroundWorker",
+      select: [:id]
+    )
+    |> Pleroma.Repo.delete_all()
+  end
+
+  def down, do: :ok
+end
index 0cfed45557dd65ecd1d5159e508971d94f3082c8..f52629f8ab5842f4f921bc63ae812c9f69e69474 100644 (file)
@@ -12,7 +12,7 @@ defmodule Pleroma.Config.DeprecationWarningsTest do
   alias Pleroma.Config.DeprecationWarnings
 
   test "check_old_mrf_config/0" do
-    clear_config([:instance, :rewrite_policy], Pleroma.Web.ActivityPub.MRF.NoOpPolicy)
+    clear_config([:instance, :rewrite_policy], [])
     clear_config([:instance, :mrf_transparency], true)
     clear_config([:instance, :mrf_transparency_exclusions], [])
 
index 1710c4d2ae98e975bd17aebf9fdafaa985732315..84362ce78520ea5c7a8b8c82a352a34e175b8c7d 100644 (file)
@@ -3,10 +3,10 @@
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicyTest do
-  use Pleroma.DataCase
+  use ExUnit.Case
+  use Pleroma.Tests.Helpers
 
   alias Pleroma.HTTP
-  alias Pleroma.Tests.ObanHelpers
   alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
 
   import Mock
@@ -25,13 +25,13 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicyTest do
   setup do: clear_config([:media_proxy, :enabled], true)
 
   test "it prefetches media proxy URIs" do
+    Tesla.Mock.mock(fn %{method: :get, url: "http://example.com/image.jpg"} ->
+      {:ok, %Tesla.Env{status: 200, body: ""}}
+    end)
+
     with_mock HTTP, get: fn _, _, _ -> {:ok, []} end do
       MediaProxyWarmingPolicy.filter(@message)
 
-      ObanHelpers.perform_all()
-      # Performing jobs which has been just enqueued
-      ObanHelpers.perform_all()
-
       assert called(HTTP.get(:_, :_, :_))
     end
   end
index 436608e515094a3733a02d5ec294b25e6cac582d..252cae6a9376350b27b272b80eccfd7d238048a4 100644 (file)
@@ -328,7 +328,7 @@ defmodule Pleroma.Web.MastodonAPI.StatusControllerTest do
     end
 
     test "posting a status with OGP link preview", %{conn: conn} do
-      Tesla.Mock.mock(fn env -> apply(HttpRequestMock, :request, [env]) end)
+      Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
       clear_config([:rich_media, :enabled], true)
 
       conn =
@@ -1197,7 +1197,7 @@ defmodule Pleroma.Web.MastodonAPI.StatusControllerTest do
     end
 
     test "returns rich-media card", %{conn: conn, user: user} do
-      Tesla.Mock.mock(fn env -> apply(HttpRequestMock, :request, [env]) end)
+      Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
 
       {:ok, activity} = CommonAPI.post(user, %{status: "https://example.com/ogp"})
 
@@ -1242,7 +1242,7 @@ defmodule Pleroma.Web.MastodonAPI.StatusControllerTest do
     end
 
     test "replaces missing description with an empty string", %{conn: conn, user: user} do
-      Tesla.Mock.mock(fn env -> apply(HttpRequestMock, :request, [env]) end)
+      Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
 
       {:ok, activity} = CommonAPI.post(user, %{status: "https://example.com/ogp-missing-data"})
 
index ae825787083603217fe6a3deea8e17892cfa0fca..93eef00a2c550b4c045545916d0568218f02d004 100644 (file)
@@ -48,7 +48,7 @@ defmodule Pleroma.Web.PleromaAPI.ChatMessageReferenceViewTest do
 
     clear_config([:rich_media, :enabled], true)
 
-    Tesla.Mock.mock(fn
+    Tesla.Mock.mock_global(fn
       %{url: "https://example.com/ogp"} ->
         %Tesla.Env{status: 200, body: File.read!("test/fixtures/rich_media/ogp.html")}
     end)