[#210] [TwitterAPI] Made actor be stored for uploads. Added ownership check
[akkoma] / lib / pleroma / web / federator / retry_queue.ex
index 1d38cd5a3a2fbfdd82ba2dcc94ddca469b27dc69..06c094f265aa270ba2c62b0fa7a156e909a2d987 100644 (file)
@@ -17,50 +17,45 @@ defmodule Pleroma.Web.Federator.RetryQueue do
   end
 
   def start_link() do
-    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
+    GenServer.start_link(__MODULE__, %{delivered: 0, dropped: 0}, name: __MODULE__)
   end
 
   def enqueue(data, transport, retries \\ 0) do
     GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
   end
 
-  def handle_cast({:maybe_enqueue, data, transport, retries}, state) do
+  def get_retry_params(retries) do
     if retries > @max_retries do
-      Logger.debug("Maximum retries reached on #{inspect(data)}")
-      {:noreply, state}
+      {:drop, "Max retries reached"}
     else
-      Process.send_after(
-        __MODULE__,
-        {:send, data, transport, retries},
-        growth_function(retries)
-      )
-
-      {:noreply, state}
+      {:retry, growth_function(retries)}
     end
   end
 
-  def handle_info({:send, %{topic: topic} = data, :websub, retries}, state) do
-    Logger.debug("RetryQueue: Retrying to send object #{topic}")
+  def handle_cast({:maybe_enqueue, data, transport, retries}, %{dropped: drop_count} = state) do
+    case get_retry_params(retries) do
+      {:retry, timeout} ->
+        Process.send_after(
+          __MODULE__,
+          {:send, data, transport, retries},
+          growth_function(retries)
+        )
 
-    case Websub.publish_one(data) do
-      {:ok, _} ->
         {:noreply, state}
 
-      {:error, reason} ->
-        enqueue(data, :websub, retries)
-        {:noreply, state}
+      {:drop, message} ->
+        Logger.debug(message)
+        {:noreply, %{state | dropped: drop_count + 1}}
     end
   end
 
-  def handle_info({:send, %{id: id} = data, :activitypub, retries}, state) do
-    Logger.debug("RetryQueue: Retrying to send object #{id}")
-
-    case ActivityPub.publish_one(data) do
+  def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
+    case transport.publish_one(data) do
       {:ok, _} ->
-        {:noreply, state}
+        {:noreply, %{state | delivered: delivery_count + 1}}
 
       {:error, reason} ->
-        enqueue(data, :activitypub, retries)
+        enqueue(data, transport, retries)
         {:noreply, state}
     end
   end