[#1149] Added more oban workers. Refactoring.
[akkoma] / lib / pleroma / web / activity_pub / mrf / mediaproxy_warming_policy.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
6 @moduledoc "Preloads any attachments in the MediaProxy cache by prefetching them"
7 @behaviour Pleroma.Web.ActivityPub.MRF
8
9 alias Pleroma.HTTP
10 alias Pleroma.Repo
11 alias Pleroma.Web.MediaProxy
12 alias Pleroma.Workers.BackgroundWorker
13
14 require Logger
15
16 @hackney_options [
17 pool: :media,
18 recv_timeout: 10_000
19 ]
20
21 defdelegate worker_args(queue), to: Pleroma.Workers.Helper
22
23 def perform(:prefetch, url) do
24 Logger.info("Prefetching #{inspect(url)}")
25
26 url
27 |> MediaProxy.url()
28 |> HTTP.get([], adapter: @hackney_options)
29 end
30
31 def perform(:preload, %{"object" => %{"attachment" => attachments}} = _message) do
32 Enum.each(attachments, fn
33 %{"url" => url} when is_list(url) ->
34 url
35 |> Enum.each(fn
36 %{"href" => href} ->
37 %{"op" => "media_proxy_prefetch", "url" => href}
38 |> BackgroundWorker.new(worker_args(:background))
39 |> Repo.insert()
40
41 x ->
42 Logger.debug("Unhandled attachment URL object #{inspect(x)}")
43 end)
44
45 x ->
46 Logger.debug("Unhandled attachment #{inspect(x)}")
47 end)
48 end
49
50 @impl true
51 def filter(
52 %{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message
53 )
54 when is_list(attachments) and length(attachments) > 0 do
55 %{"op" => "media_proxy_preload", "message" => message}
56 |> BackgroundWorker.new(worker_args(:background))
57 |> Repo.insert()
58
59 {:ok, message}
60 end
61
62 @impl true
63 def filter(message), do: {:ok, message}
64 end