add license boilerplate to pleroma core
[akkoma] / lib / pleroma / web / federator / retry_queue.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Federator.RetryQueue do
6 use GenServer
7
8 require Logger
9
10 # initial timeout, 5 min
11 @initial_timeout 30_000
12 @max_retries 5
13
14 def init(args) do
15 {:ok, args}
16 end
17
18 def start_link() do
19 enabled = Pleroma.Config.get([:retry_queue, :enabled], false)
20
21 if enabled do
22 Logger.info("Starting retry queue")
23 GenServer.start_link(__MODULE__, %{delivered: 0, dropped: 0}, name: __MODULE__)
24 else
25 Logger.info("Retry queue disabled")
26 :ignore
27 end
28 end
29
30 def enqueue(data, transport, retries \\ 0) do
31 GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
32 end
33
34 def get_retry_params(retries) do
35 if retries > @max_retries do
36 {:drop, "Max retries reached"}
37 else
38 {:retry, growth_function(retries)}
39 end
40 end
41
42 def handle_cast({:maybe_enqueue, data, transport, retries}, %{dropped: drop_count} = state) do
43 case get_retry_params(retries) do
44 {:retry, timeout} ->
45 Process.send_after(
46 __MODULE__,
47 {:send, data, transport, retries},
48 timeout
49 )
50
51 {:noreply, state}
52
53 {:drop, message} ->
54 Logger.debug(message)
55 {:noreply, %{state | dropped: drop_count + 1}}
56 end
57 end
58
59 def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
60 case transport.publish_one(data) do
61 {:ok, _} ->
62 {:noreply, %{state | delivered: delivery_count + 1}}
63
64 {:error, _reason} ->
65 enqueue(data, transport, retries)
66 {:noreply, state}
67 end
68 end
69
70 def handle_info(unknown, state) do
71 Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
72 {:noreply, state}
73 end
74
75 defp growth_function(retries) do
76 round(@initial_timeout * :math.pow(retries, 3))
77 end
78 end