Federator: add retry queue.
authoreal <eal@waifu.club>
Sun, 26 Aug 2018 18:17:13 +0000 (21:17 +0300)
committereal <eal@waifu.club>
Sun, 18 Nov 2018 15:46:29 +0000 (17:46 +0200)
lib/pleroma/application.ex
lib/pleroma/web/federator/federator.ex
lib/pleroma/web/federator/retry_queue.ex [new file with mode: 0644]
lib/pleroma/web/websub/websub.ex

index eedad767507a6b48adc88f9f6c81b0d6ca9cab66..707200737476da076a74c75b93d8670184757f5e 100644 (file)
@@ -57,8 +57,9 @@ defmodule Pleroma.Application do
           id: :cachex_idem
         ),
         worker(Pleroma.Web.Federator, []),
-        worker(Pleroma.Stats, []),
-        worker(Pleroma.Gopher.Server, [])
+        worker(Pleroma.Web.Federator.RetryQueue, []),
+        worker(Pleroma.Gopher.Server, []),
+        worker(Pleroma.Stats, [])
       ] ++
         if Mix.env() == :test,
           do: [],
index 6554fd2ef4d6ca89f1d014ec31bb3b9d2bd8a60c..eefc9b48342afc41d676c172785044004a886d54 100644 (file)
@@ -3,6 +3,7 @@ defmodule Pleroma.Web.Federator do
   alias Pleroma.User
   alias Pleroma.Activity
   alias Pleroma.Web.{WebFinger, Websub}
+  alias Pleroma.Web.Federator.RetryQueue
   alias Pleroma.Web.ActivityPub.ActivityPub
   alias Pleroma.Web.ActivityPub.Relay
   alias Pleroma.Web.ActivityPub.Transmogrifier
@@ -122,29 +123,25 @@ defmodule Pleroma.Web.Federator do
   end
 
   def handle(:publish_single_ap, params) do
-    ActivityPub.publish_one(params)
-  end
-
-  def handle(:publish_single_websub, %{xml: xml, topic: topic, callback: callback, secret: secret}) do
-    signature = @websub.sign(secret || "", xml)
-    Logger.debug(fn -> "Pushing #{topic} to #{callback}" end)
-
-    with {:ok, %{status_code: code}} <-
-           @httpoison.post(
-             callback,
-             xml,
-             [
-               {"Content-Type", "application/atom+xml"},
-               {"X-Hub-Signature", "sha1=#{signature}"}
-             ],
-             timeout: 10000,
-             recv_timeout: 20000,
-             hackney: [pool: :default]
-           ) do
-      Logger.debug(fn -> "Pushed to #{callback}, code #{code}" end)
-    else
-      e ->
-        Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(e)}" end)
+    case ActivityPub.publish_one(params) do
+      {:ok, _} ->
+        :ok
+
+      {:error, _} ->
+        RetryQueue.enqueue(params, :activitypub)
+    end
+  end
+
+  def handle(
+        :publish_single_websub,
+        %{xml: xml, topic: topic, callback: callback, secret: secret} = params
+      ) do
+    case Websub.publish_one(params) do
+      {:ok, _} ->
+        :ok
+
+      {:error, _} ->
+        RetryQueue.enqueue(params, :websub)
     end
   end
 
diff --git a/lib/pleroma/web/federator/retry_queue.ex b/lib/pleroma/web/federator/retry_queue.ex
new file mode 100644 (file)
index 0000000..1d38cd5
--- /dev/null
@@ -0,0 +1,76 @@
+defmodule Pleroma.Web.Federator.RetryQueue do
+  use GenServer
+  alias Pleroma.Web.{WebFinger, Websub}
+  alias Pleroma.Web.ActivityPub.ActivityPub
+  require Logger
+
+  @websub Application.get_env(:pleroma, :websub)
+  @ostatus Application.get_env(:pleroma, :websub)
+  @httpoison Application.get_env(:pleroma, :websub)
+  @instance Application.get_env(:pleroma, :websub)
+  # initial timeout, 5 min
+  @initial_timeout 30_000
+  @max_retries 5
+
+  def init(args) do
+    {:ok, args}
+  end
+
+  def start_link() do
+    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
+  end
+
+  def enqueue(data, transport, retries \\ 0) do
+    GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
+  end
+
+  def handle_cast({:maybe_enqueue, data, transport, retries}, state) do
+    if retries > @max_retries do
+      Logger.debug("Maximum retries reached on #{inspect(data)}")
+      {:noreply, state}
+    else
+      Process.send_after(
+        __MODULE__,
+        {:send, data, transport, retries},
+        growth_function(retries)
+      )
+
+      {:noreply, state}
+    end
+  end
+
+  def handle_info({:send, %{topic: topic} = data, :websub, retries}, state) do
+    Logger.debug("RetryQueue: Retrying to send object #{topic}")
+
+    case Websub.publish_one(data) do
+      {:ok, _} ->
+        {:noreply, state}
+
+      {:error, reason} ->
+        enqueue(data, :websub, retries)
+        {:noreply, state}
+    end
+  end
+
+  def handle_info({:send, %{id: id} = data, :activitypub, retries}, state) do
+    Logger.debug("RetryQueue: Retrying to send object #{id}")
+
+    case ActivityPub.publish_one(data) do
+      {:ok, _} ->
+        {:noreply, state}
+
+      {:error, reason} ->
+        enqueue(data, :activitypub, retries)
+        {:noreply, state}
+    end
+  end
+
+  def handle_info(unknown, state) do
+    Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
+    {:noreply, state}
+  end
+
+  defp growth_function(retries) do
+    round(@initial_timeout * :math.pow(retries, 3))
+  end
+end
index e494811f9f700cd5ee941f4f230942ff7da7552a..396dcf045265cde85d8340f4a22c5e5a188b526f 100644 (file)
@@ -252,4 +252,29 @@ defmodule Pleroma.Web.Websub do
       Pleroma.Web.Federator.enqueue(:request_subscription, sub)
     end)
   end
+
+  def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret}) do
+    signature = sign(secret || "", xml)
+    Logger.info(fn -> "Pushing #{topic} to #{callback}" end)
+
+    with {:ok, %{status_code: code}} <-
+           @httpoison.post(
+             callback,
+             xml,
+             [
+               {"Content-Type", "application/atom+xml"},
+               {"X-Hub-Signature", "sha1=#{signature}"}
+             ],
+             timeout: 10000,
+             recv_timeout: 20000,
+             hackney: [pool: :default]
+           ) do
+      Logger.info(fn -> "Pushed to #{callback}, code #{code}" end)
+      {:ok, code}
+    else
+      e ->
+        Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(e)}" end)
+        {:error, e}
+    end
+  end
 end