[#1149] Oban jobs implementation for :federator_incoming and :federator_outgoing...
[akkoma] / lib / pleroma / web / federator / federator.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Federator do
6 alias Pleroma.Workers.Publisher, as: PublisherWorker
7 alias Pleroma.Workers.Receiver, as: ReceiverWorker
8 alias Pleroma.Workers.Subscriber, as: SubscriberWorker
9
10 require Logger
11
12 def init do
13 # 1 minute
14 refresh_subscriptions(schedule_in: 60)
15 end
16
17 @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)"
18 # credo:disable-for-previous-line Credo.Check.Readability.MaxLineLength
19 def allowed_incoming_reply_depth?(depth) do
20 max_replies_depth = Pleroma.Config.get([:instance, :federation_incoming_replies_max_depth])
21
22 if max_replies_depth do
23 (depth || 1) <= max_replies_depth
24 else
25 true
26 end
27 end
28
29 # Client API
30
31 def incoming_doc(doc) do
32 %{"op" => "incoming_doc", "body" => doc}
33 |> ReceiverWorker.new(worker_args(:federator_incoming))
34 |> Pleroma.Repo.insert()
35 end
36
37 def incoming_ap_doc(params) do
38 %{"op" => "incoming_ap_doc", "params" => params}
39 |> ReceiverWorker.new(worker_args(:federator_incoming))
40 |> Pleroma.Repo.insert()
41 end
42
43 def publish(%{id: "pleroma:fakeid"} = activity) do
44 PublisherWorker.perform_publish(activity)
45 end
46
47 def publish(activity) do
48 %{"op" => "publish", "activity_id" => activity.id}
49 |> PublisherWorker.new(worker_args(:federator_outgoing))
50 |> Pleroma.Repo.insert()
51 end
52
53 def verify_websub(websub) do
54 %{"op" => "verify_websub", "websub_id" => websub.id}
55 |> SubscriberWorker.new(worker_args(:federator_outgoing))
56 |> Pleroma.Repo.insert()
57 end
58
59 def request_subscription(websub) do
60 %{"op" => "request_subscription", "websub_id" => websub.id}
61 |> SubscriberWorker.new(worker_args(:federator_outgoing))
62 |> Pleroma.Repo.insert()
63 end
64
65 def refresh_subscriptions(worker_args \\ []) do
66 %{"op" => "refresh_subscriptions"}
67 |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing))
68 |> Pleroma.Repo.insert()
69 end
70
71 defp worker_args(queue) do
72 if max_attempts = Pleroma.Config.get([:workers, :retries, queue]) do
73 [max_attempts: max_attempts]
74 else
75 []
76 end
77 end
78 end