Merge branch 'optional-retry-queue' into 'develop'
[akkoma] / lib / pleroma / web / federator / retry_queue.ex
index 1d38cd5a3a2fbfdd82ba2dcc94ddca469b27dc69..13df40c809cd2723d41889f4614eda1eece30af3 100644 (file)
@@ -17,50 +17,53 @@ defmodule Pleroma.Web.Federator.RetryQueue do
   end
 
   def start_link() do
-    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
+    enabled = Pleroma.Config.get([:retry_queue, :enabled], false)
+
+    if enabled do
+      Logger.info("Starting retry queue")
+      GenServer.start_link(__MODULE__, %{delivered: 0, dropped: 0}, name: __MODULE__)
+    else
+      Logger.info("Retry queue disabled")
+      :ignore
+    end
   end
 
   def enqueue(data, transport, retries \\ 0) do
     GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
   end
 
-  def handle_cast({:maybe_enqueue, data, transport, retries}, state) do
+  def get_retry_params(retries) do
     if retries > @max_retries do
-      Logger.debug("Maximum retries reached on #{inspect(data)}")
-      {:noreply, state}
+      {:drop, "Max retries reached"}
     else
-      Process.send_after(
-        __MODULE__,
-        {:send, data, transport, retries},
-        growth_function(retries)
-      )
-
-      {:noreply, state}
+      {:retry, growth_function(retries)}
     end
   end
 
-  def handle_info({:send, %{topic: topic} = data, :websub, retries}, state) do
-    Logger.debug("RetryQueue: Retrying to send object #{topic}")
+  def handle_cast({:maybe_enqueue, data, transport, retries}, %{dropped: drop_count} = state) do
+    case get_retry_params(retries) do
+      {:retry, timeout} ->
+        Process.send_after(
+          __MODULE__,
+          {:send, data, transport, retries},
+          growth_function(retries)
+        )
 
-    case Websub.publish_one(data) do
-      {:ok, _} ->
         {:noreply, state}
 
-      {:error, reason} ->
-        enqueue(data, :websub, retries)
-        {:noreply, state}
+      {:drop, message} ->
+        Logger.debug(message)
+        {:noreply, %{state | dropped: drop_count + 1}}
     end
   end
 
-  def handle_info({:send, %{id: id} = data, :activitypub, retries}, state) do
-    Logger.debug("RetryQueue: Retrying to send object #{id}")
-
-    case ActivityPub.publish_one(data) do
+  def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
+    case transport.publish_one(data) do
       {:ok, _} ->
-        {:noreply, state}
+        {:noreply, %{state | delivered: delivery_count + 1}}
 
       {:error, reason} ->
-        enqueue(data, :activitypub, retries)
+        enqueue(data, transport, retries)
         {:noreply, state}
     end
   end