Merge branch 'feature/retry-queue' into 'develop'
authorHaelwenn <git.pleroma.social@hacktivis.me>
Tue, 20 Nov 2018 19:04:52 +0000 (19:04 +0000)
committerHaelwenn <git.pleroma.social@hacktivis.me>
Tue, 20 Nov 2018 19:04:52 +0000 (19:04 +0000)
Federator: add retry queue.

See merge request pleroma/pleroma!323

lib/pleroma/application.ex
lib/pleroma/web/federator/federator.ex
lib/pleroma/web/federator/retry_queue.ex [new file with mode: 0644]
lib/pleroma/web/websub/websub.ex
test/web/retry_queue_test.exs [new file with mode: 0644]

index eedad767507a6b48adc88f9f6c81b0d6ca9cab66..707200737476da076a74c75b93d8670184757f5e 100644 (file)
@@ -57,8 +57,9 @@ defmodule Pleroma.Application do
           id: :cachex_idem
         ),
         worker(Pleroma.Web.Federator, []),
-        worker(Pleroma.Stats, []),
-        worker(Pleroma.Gopher.Server, [])
+        worker(Pleroma.Web.Federator.RetryQueue, []),
+        worker(Pleroma.Gopher.Server, []),
+        worker(Pleroma.Stats, [])
       ] ++
         if Mix.env() == :test,
           do: [],
index 6554fd2ef4d6ca89f1d014ec31bb3b9d2bd8a60c..000883cc20a5aa74b8e46b255e43d479878f3e4c 100644 (file)
@@ -3,6 +3,7 @@ defmodule Pleroma.Web.Federator do
   alias Pleroma.User
   alias Pleroma.Activity
   alias Pleroma.Web.{WebFinger, Websub}
+  alias Pleroma.Web.Federator.RetryQueue
   alias Pleroma.Web.ActivityPub.ActivityPub
   alias Pleroma.Web.ActivityPub.Relay
   alias Pleroma.Web.ActivityPub.Transmogrifier
@@ -122,29 +123,25 @@ defmodule Pleroma.Web.Federator do
   end
 
   def handle(:publish_single_ap, params) do
-    ActivityPub.publish_one(params)
-  end
-
-  def handle(:publish_single_websub, %{xml: xml, topic: topic, callback: callback, secret: secret}) do
-    signature = @websub.sign(secret || "", xml)
-    Logger.debug(fn -> "Pushing #{topic} to #{callback}" end)
-
-    with {:ok, %{status_code: code}} <-
-           @httpoison.post(
-             callback,
-             xml,
-             [
-               {"Content-Type", "application/atom+xml"},
-               {"X-Hub-Signature", "sha1=#{signature}"}
-             ],
-             timeout: 10000,
-             recv_timeout: 20000,
-             hackney: [pool: :default]
-           ) do
-      Logger.debug(fn -> "Pushed to #{callback}, code #{code}" end)
-    else
-      e ->
-        Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(e)}" end)
+    case ActivityPub.publish_one(params) do
+      {:ok, _} ->
+        :ok
+
+      {:error, _} ->
+        RetryQueue.enqueue(params, ActivityPub)
+    end
+  end
+
+  def handle(
+        :publish_single_websub,
+        %{xml: xml, topic: topic, callback: callback, secret: secret} = params
+      ) do
+    case Websub.publish_one(params) do
+      {:ok, _} ->
+        :ok
+
+      {:error, _} ->
+        RetryQueue.enqueue(params, Websub)
     end
   end
 
diff --git a/lib/pleroma/web/federator/retry_queue.ex b/lib/pleroma/web/federator/retry_queue.ex
new file mode 100644 (file)
index 0000000..06c094f
--- /dev/null
@@ -0,0 +1,71 @@
+defmodule Pleroma.Web.Federator.RetryQueue do
+  use GenServer
+  alias Pleroma.Web.{WebFinger, Websub}
+  alias Pleroma.Web.ActivityPub.ActivityPub
+  require Logger
+
+  @websub Application.get_env(:pleroma, :websub)
+  @ostatus Application.get_env(:pleroma, :websub)
+  @httpoison Application.get_env(:pleroma, :websub)
+  @instance Application.get_env(:pleroma, :websub)
+  # initial timeout, 5 min
+  @initial_timeout 30_000
+  @max_retries 5
+
+  def init(args) do
+    {:ok, args}
+  end
+
+  def start_link() do
+    GenServer.start_link(__MODULE__, %{delivered: 0, dropped: 0}, name: __MODULE__)
+  end
+
+  def enqueue(data, transport, retries \\ 0) do
+    GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
+  end
+
+  def get_retry_params(retries) do
+    if retries > @max_retries do
+      {:drop, "Max retries reached"}
+    else
+      {:retry, growth_function(retries)}
+    end
+  end
+
+  def handle_cast({:maybe_enqueue, data, transport, retries}, %{dropped: drop_count} = state) do
+    case get_retry_params(retries) do
+      {:retry, timeout} ->
+        Process.send_after(
+          __MODULE__,
+          {:send, data, transport, retries},
+          growth_function(retries)
+        )
+
+        {:noreply, state}
+
+      {:drop, message} ->
+        Logger.debug(message)
+        {:noreply, %{state | dropped: drop_count + 1}}
+    end
+  end
+
+  def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
+    case transport.publish_one(data) do
+      {:ok, _} ->
+        {:noreply, %{state | delivered: delivery_count + 1}}
+
+      {:error, reason} ->
+        enqueue(data, transport, retries)
+        {:noreply, state}
+    end
+  end
+
+  def handle_info(unknown, state) do
+    Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
+    {:noreply, state}
+  end
+
+  defp growth_function(retries) do
+    round(@initial_timeout * :math.pow(retries, 3))
+  end
+end
index e494811f9f700cd5ee941f4f230942ff7da7552a..396dcf045265cde85d8340f4a22c5e5a188b526f 100644 (file)
@@ -252,4 +252,29 @@ defmodule Pleroma.Web.Websub do
       Pleroma.Web.Federator.enqueue(:request_subscription, sub)
     end)
   end
+
+  def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret}) do
+    signature = sign(secret || "", xml)
+    Logger.info(fn -> "Pushing #{topic} to #{callback}" end)
+
+    with {:ok, %{status_code: code}} <-
+           @httpoison.post(
+             callback,
+             xml,
+             [
+               {"Content-Type", "application/atom+xml"},
+               {"X-Hub-Signature", "sha1=#{signature}"}
+             ],
+             timeout: 10000,
+             recv_timeout: 20000,
+             hackney: [pool: :default]
+           ) do
+      Logger.info(fn -> "Pushed to #{callback}, code #{code}" end)
+      {:ok, code}
+    else
+      e ->
+        Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(e)}" end)
+        {:error, e}
+    end
+  end
 end
diff --git a/test/web/retry_queue_test.exs b/test/web/retry_queue_test.exs
new file mode 100644 (file)
index 0000000..ce29649
--- /dev/null
@@ -0,0 +1,31 @@
+defmodule MockActivityPub do
+  def publish_one(ret) do
+    {ret, "success"}
+  end
+end
+
+defmodule Pleroma.ActivityTest do
+  use Pleroma.DataCase
+  alias Pleroma.Web.Federator.RetryQueue
+
+  @small_retry_count 0
+  @hopeless_retry_count 10
+
+  test "failed posts are retried" do
+    {:retry, _timeout} = RetryQueue.get_retry_params(@small_retry_count)
+
+    assert {:noreply, %{delivered: 1}} ==
+             RetryQueue.handle_info({:send, :ok, MockActivityPub, @small_retry_count}, %{
+               delivered: 0
+             })
+  end
+
+  test "posts that have been tried too many times are dropped" do
+    {:drop, _timeout} = RetryQueue.get_retry_params(@hopeless_retry_count)
+
+    assert {:noreply, %{dropped: 1}} ==
+             RetryQueue.handle_cast({:maybe_enqueue, %{}, nil, @hopeless_retry_count}, %{
+               dropped: 0
+             })
+  end
+end