1 defmodule Pleroma.Web.Federator.RetryQueue do
3 alias Pleroma.Web.{WebFinger, Websub}
4 alias Pleroma.Web.ActivityPub.ActivityPub
7 @websub Application.get_env(:pleroma, :websub)
8 @ostatus Application.get_env(:pleroma, :websub)
9 @httpoison Application.get_env(:pleroma, :websub)
10 @instance Application.get_env(:pleroma, :websub)
11 # initial timeout, 5 min
12 @initial_timeout 30_000
20 enabled = Pleroma.Config.get([:retry_queue, :enabled], false)
23 Logger.info("Starting retry queue")
24 GenServer.start_link(__MODULE__, %{delivered: 0, dropped: 0}, name: __MODULE__)
26 Logger.info("Retry queue disabled")
31 def enqueue(data, transport, retries \\ 0) do
32 GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
35 def get_retry_params(retries) do
36 if retries > @max_retries do
37 {:drop, "Max retries reached"}
39 {:retry, growth_function(retries)}
43 def handle_cast({:maybe_enqueue, data, transport, retries}, %{dropped: drop_count} = state) do
44 case get_retry_params(retries) do
48 {:send, data, transport, retries},
49 growth_function(retries)
56 {:noreply, %{state | dropped: drop_count + 1}}
60 def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
61 case transport.publish_one(data) do
63 {:noreply, %{state | delivered: delivery_count + 1}}
66 enqueue(data, transport, retries)
71 def handle_info(unknown, state) do
72 Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
76 defp growth_function(retries) do
77 round(@initial_timeout * :math.pow(retries, 3))