updates
[akkoma] / lib / pleroma / web / federator / retry_queue.ex
1 defmodule Pleroma.Web.Federator.RetryQueue do
2 use GenServer
3
4 require Logger
5
6 # initial timeout, 5 min
7 @initial_timeout 30_000
8 @max_retries 5
9
10 def init(args) do
11 {:ok, args}
12 end
13
14 def start_link() do
15 enabled = Pleroma.Config.get([:retry_queue, :enabled], false)
16
17 if enabled do
18 Logger.info("Starting retry queue")
19 GenServer.start_link(__MODULE__, %{delivered: 0, dropped: 0}, name: __MODULE__)
20 else
21 Logger.info("Retry queue disabled")
22 :ignore
23 end
24 end
25
26 def enqueue(data, transport, retries \\ 0) do
27 GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
28 end
29
30 def get_retry_params(retries) do
31 if retries > @max_retries do
32 {:drop, "Max retries reached"}
33 else
34 {:retry, growth_function(retries)}
35 end
36 end
37
38 def handle_cast({:maybe_enqueue, data, transport, retries}, %{dropped: drop_count} = state) do
39 case get_retry_params(retries) do
40 {:retry, timeout} ->
41 Process.send_after(
42 __MODULE__,
43 {:send, data, transport, retries},
44 timeout
45 )
46
47 {:noreply, state}
48
49 {:drop, message} ->
50 Logger.debug(message)
51 {:noreply, %{state | dropped: drop_count + 1}}
52 end
53 end
54
55 def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
56 case transport.publish_one(data) do
57 {:ok, _} ->
58 {:noreply, %{state | delivered: delivery_count + 1}}
59
60 {:error, _reason} ->
61 enqueue(data, transport, retries)
62 {:noreply, state}
63 end
64 end
65
66 def handle_info(unknown, state) do
67 Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
68 {:noreply, state}
69 end
70
71 defp growth_function(retries) do
72 round(@initial_timeout * :math.pow(retries, 3))
73 end
74 end