Merge remote-tracking branch 'origin' into follower-hiding
[akkoma] / lib / pleroma / web / federator / retry_queue.ex
1 defmodule Pleroma.Web.Federator.RetryQueue do
2 use GenServer
3 alias Pleroma.Web.{WebFinger, Websub}
4 alias Pleroma.Web.ActivityPub.ActivityPub
5 require Logger
6
7 @websub Application.get_env(:pleroma, :websub)
8 @ostatus Application.get_env(:pleroma, :websub)
9 @httpoison Application.get_env(:pleroma, :websub)
10 @instance Application.get_env(:pleroma, :websub)
11 # initial timeout, 5 min
12 @initial_timeout 30_000
13 @max_retries 5
14
15 def init(args) do
16 {:ok, args}
17 end
18
19 def start_link() do
20 enabled = Pleroma.Config.get([:retry_queue, :enabled], false)
21
22 if enabled do
23 Logger.info("Starting retry queue")
24 GenServer.start_link(__MODULE__, %{delivered: 0, dropped: 0}, name: __MODULE__)
25 else
26 Logger.info("Retry queue disabled")
27 :ignore
28 end
29 end
30
31 def enqueue(data, transport, retries \\ 0) do
32 GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
33 end
34
35 def get_retry_params(retries) do
36 if retries > @max_retries do
37 {:drop, "Max retries reached"}
38 else
39 {:retry, growth_function(retries)}
40 end
41 end
42
43 def handle_cast({:maybe_enqueue, data, transport, retries}, %{dropped: drop_count} = state) do
44 case get_retry_params(retries) do
45 {:retry, timeout} ->
46 Process.send_after(
47 __MODULE__,
48 {:send, data, transport, retries},
49 growth_function(retries)
50 )
51
52 {:noreply, state}
53
54 {:drop, message} ->
55 Logger.debug(message)
56 {:noreply, %{state | dropped: drop_count + 1}}
57 end
58 end
59
60 def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
61 case transport.publish_one(data) do
62 {:ok, _} ->
63 {:noreply, %{state | delivered: delivery_count + 1}}
64
65 {:error, reason} ->
66 enqueue(data, transport, retries)
67 {:noreply, state}
68 end
69 end
70
71 def handle_info(unknown, state) do
72 Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
73 {:noreply, state}
74 end
75
76 defp growth_function(retries) do
77 round(@initial_timeout * :math.pow(retries, 3))
78 end
79 end