[#582] Optimized federation retirement by reducing the number of SQL calls
authorIvan Tashkinov <ivantashkinov@gmail.com>
Sun, 3 Feb 2019 09:41:27 +0000 (12:41 +0300)
committerIvan Tashkinov <ivantashkinov@gmail.com>
Sun, 3 Feb 2019 09:41:27 +0000 (12:41 +0300)
(calling `Instances.set_reachable/1` only if instance had `unreachable_since`,
calling `Instances.set_unreachable/1` only if instance had nil `unreachable_since`).

lib/pleroma/instances/instance.ex
lib/pleroma/web/activity_pub/activity_pub.ex
lib/pleroma/web/federator/federator.ex
lib/pleroma/web/salmon/salmon.ex
lib/pleroma/web/websub/websub.ex
test/web/federator_test.exs
test/web/instances/instances_test.exs

index a87590d8b2e457bc7e6ab0edb8d266bfb49cb4e3..4a4ca26dd731b3b8a0ddec7e4e06346e2fcaaff3 100644 (file)
@@ -26,7 +26,7 @@ defmodule Pleroma.Instances.Instance do
     |> unique_constraint(:host)
   end
 
-  def filter_reachable([]), do: []
+  def filter_reachable([]), do: %{}
 
   def filter_reachable(urls_or_hosts) when is_list(urls_or_hosts) do
     hosts =
@@ -34,17 +34,28 @@ defmodule Pleroma.Instances.Instance do
       |> Enum.map(&(&1 && host(&1)))
       |> Enum.filter(&(to_string(&1) != ""))
 
-    unreachable_hosts =
+    unreachable_since_by_host =
       Repo.all(
         from(i in Instance,
-          where:
-            i.host in ^hosts and
-              i.unreachable_since <= ^Instances.reachability_datetime_threshold(),
-          select: i.host
+          where: i.host in ^hosts,
+          select: {i.host, i.unreachable_since}
         )
       )
+      |> Map.new(& &1)
 
-    Enum.filter(urls_or_hosts, &(&1 && host(&1) not in unreachable_hosts))
+    reachability_datetime_threshold = Instances.reachability_datetime_threshold()
+
+    for entry <- Enum.filter(urls_or_hosts, &is_binary/1) do
+      host = host(entry)
+      unreachable_since = unreachable_since_by_host[host]
+
+      if !unreachable_since ||
+           NaiveDateTime.compare(unreachable_since, reachability_datetime_threshold) == :gt do
+        {entry, unreachable_since}
+      end
+    end
+    |> Enum.filter(& &1)
+    |> Map.new(& &1)
   end
 
   def reachable?(url_or_host) when is_binary(url_or_host) do
index 06e8c3f1c28945370e547284d9cdc274cfdcd65e..5f6c8e7d33ff027a3c14d7114a3e1981853f8f32 100644 (file)
@@ -744,7 +744,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
 
     public = is_public?(activity)
 
-    remote_inboxes =
+    reachable_inboxes_metadata =
       (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
       |> Enum.filter(fn user -> User.ap_enabled?(user) end)
       |> Enum.map(fn %{info: %{source_data: data}} ->
@@ -757,17 +757,18 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
     json = Jason.encode!(data)
 
-    Enum.each(remote_inboxes, fn inbox ->
+    Enum.each(reachable_inboxes_metadata, fn {inbox, unreachable_since} ->
       Federator.enqueue(:publish_single_ap, %{
         inbox: inbox,
         json: json,
         actor: actor,
-        id: activity.data["id"]
+        id: activity.data["id"],
+        unreachable_since: unreachable_since
       })
     end)
   end
 
-  def publish_one(%{inbox: inbox, json: json, actor: actor, id: id}) do
+  def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
     Logger.info("Federating #{id} to #{inbox}")
     host = URI.parse(inbox).host
 
@@ -791,11 +792,11 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
                  {"digest", digest}
                ]
              ) do
-      Instances.set_reachable(inbox)
+      if params[:unreachable_since], do: Instances.set_reachable(inbox)
       result
     else
       {_post_result, response} ->
-        Instances.set_unreachable(inbox)
+        unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
         {:error, response}
     end
   end
index 46f7a497341a143c12a7b527f05ab800eee8cd18..bb7676cf00be5ab0b1d22c3fa9b5f33cc45f6a62 100644 (file)
@@ -124,8 +124,8 @@ defmodule Pleroma.Web.Federator do
     end
   end
 
-  def handle(:publish_single_salmon, {user_or_url, feed, poster}) do
-    Salmon.send_to_user(user_or_url, feed, poster)
+  def handle(:publish_single_salmon, params) do
+    Salmon.send_to_user(params)
   end
 
   def handle(:publish_single_ap, params) do
index 07ca42a5f0aef095c94909b3ad9c4f3b6792c709..4d519ece4b37b6c4b1879f3bf7b2048419c3e0ed 100644 (file)
@@ -162,30 +162,29 @@ defmodule Pleroma.Web.Salmon do
     |> Enum.filter(fn user -> user && !user.local end)
   end
 
-  # push an activity to remote accounts
-  #
-  def send_to_user(%{info: %{salmon: salmon}}, feed, poster),
-    do: send_to_user(salmon, feed, poster)
+  @doc "Pushes an activity to remote account."
+  def send_to_user(%{recipient: %{info: %{salmon: salmon}}} = params),
+    do: send_to_user(Map.put(params, :recipient, salmon))
 
-  def send_to_user(url, feed, poster) when is_binary(url) do
+  def send_to_user(%{recipient: url, feed: feed, poster: poster} = params) when is_binary(url) do
     with {:ok, %{status: code}} when code in 200..299 <-
            poster.(
              url,
              feed,
              [{"Content-Type", "application/magic-envelope+xml"}]
            ) do
-      Instances.set_reachable(url)
+      if params[:unreachable_since], do: Instances.set_reachable(url)
       Logger.debug(fn -> "Pushed to #{url}, code #{code}" end)
       :ok
     else
       e ->
-        Instances.set_unreachable(url)
+        unless params[:unreachable_since], do: Instances.set_reachable(url)
         Logger.debug(fn -> "Pushing Salmon to #{url} failed, #{inspect(e)}" end)
         :error
     end
   end
 
-  def send_to_user(_, _, _), do: :noop
+  def send_to_user(_), do: :noop
 
   @supported_activities [
     "Create",
@@ -218,13 +217,20 @@ defmodule Pleroma.Web.Salmon do
       remote_users = remote_users(activity)
 
       salmon_urls = Enum.map(remote_users, & &1.info.salmon)
-      reachable_salmon_urls = Instances.filter_reachable(salmon_urls)
+      reachable_urls_metadata = Instances.filter_reachable(salmon_urls)
+      reachable_urls = Map.keys(reachable_urls_metadata)
 
       remote_users
-      |> Enum.filter(&(&1.info.salmon in reachable_salmon_urls))
+      |> Enum.filter(&(&1.info.salmon in reachable_urls))
       |> Enum.each(fn remote_user ->
         Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
-        Pleroma.Web.Federator.enqueue(:publish_single_salmon, {remote_user, feed, poster})
+
+        Pleroma.Web.Federator.enqueue(:publish_single_salmon, %{
+          recipient: remote_user,
+          feed: feed,
+          poster: poster,
+          unreachable_since: reachable_urls_metadata[remote_user.info.salmon]
+        })
       end)
     end
   end
index 8f7d53b03f598da82d15e12d8cbd2b5da4ebf546..cf51dce7681a80e79ad6f8a20ce29e43b48bbdc0 100644 (file)
@@ -70,7 +70,8 @@ defmodule Pleroma.Web.Websub do
     subscriptions = Repo.all(query)
 
     callbacks = Enum.map(subscriptions, & &1.callback)
-    reachable_callbacks = Instances.filter_reachable(callbacks)
+    reachable_callbacks_metadata = Instances.filter_reachable(callbacks)
+    reachable_callbacks = Map.keys(reachable_callbacks_metadata)
 
     subscriptions
     |> Enum.filter(&(&1.callback in reachable_callbacks))
@@ -79,7 +80,8 @@ defmodule Pleroma.Web.Websub do
         xml: response,
         topic: topic,
         callback: sub.callback,
-        secret: sub.secret
+        secret: sub.secret,
+        unreachable_since: reachable_callbacks_metadata[sub.callback]
       }
 
       Pleroma.Web.Federator.enqueue(:publish_single_websub, data)
@@ -268,7 +270,7 @@ defmodule Pleroma.Web.Websub do
     end)
   end
 
-  def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret}) do
+  def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret} = params) do
     signature = sign(secret || "", xml)
     Logger.info(fn -> "Pushing #{topic} to #{callback}" end)
 
@@ -281,12 +283,12 @@ defmodule Pleroma.Web.Websub do
                {"X-Hub-Signature", "sha1=#{signature}"}
              ]
            ) do
-      Instances.set_reachable(callback)
+      if params[:unreachable_since], do: Instances.set_reachable(callback)
       Logger.info(fn -> "Pushed to #{callback}, code #{code}" end)
       {:ok, code}
     else
       {_post_result, response} ->
-        Instances.set_unreachable(callback)
+        unless params[:unreachable_since], do: Instances.set_reachable(callback)
         Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(response)}" end)
         {:error, response}
     end
index c6d10ef7809edcd6a47d964fac488b3aacfc3def..7bb249d7482675884d170f3a2a7abd2b0fc9b53b 100644 (file)
@@ -163,8 +163,8 @@ defmodule Pleroma.Web.FederatorTest do
       {:ok, _activity} =
         CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"})
 
-      assert called(Federator.enqueue(:publish_single_salmon, {remote_user2, :_, :_}))
-      refute called(Federator.enqueue(:publish_single_websub, {remote_user1, :_, :_}))
+      assert called(Federator.enqueue(:publish_single_salmon, %{recipient: remote_user2}))
+      refute called(Federator.enqueue(:publish_single_websub, %{recipient: remote_user1}))
     end
   end
 
index adb8560a7823a49c69663b53da78282d155da1d1..2530c09fe8fd20a0594cadf5526e609532bdb203 100644 (file)
@@ -47,7 +47,7 @@ defmodule Pleroma.InstancesTest do
   end
 
   describe "filter_reachable/1" do
-    test "keeps only reachable elements of supplied list" do
+    setup do
       host = "consistently-unreachable.name"
       url1 = "http://eventually-unreachable.com/path"
       url2 = "http://domain.com/path"
@@ -55,7 +55,26 @@ defmodule Pleroma.InstancesTest do
       Instances.set_consistently_unreachable(host)
       Instances.set_unreachable(url1)
 
-      assert [url1, url2] == Instances.filter_reachable([host, url1, url2])
+      result = Instances.filter_reachable([host, url1, url2, nil])
+      %{result: result, url1: url1, url2: url2}
+    end
+
+    test "returns a map with keys containing 'not marked consistently unreachable' elements of supplied list",
+         %{result: result, url1: url1, url2: url2} do
+      assert is_map(result)
+      assert Enum.sort([url1, url2]) == result |> Map.keys() |> Enum.sort()
+    end
+
+    test "returns a map with `unreachable_since` values for keys",
+         %{result: result, url1: url1, url2: url2} do
+      assert is_map(result)
+      assert %NaiveDateTime{} = result[url1]
+      assert is_nil(result[url2])
+    end
+
+    test "returns an empty map for empty list or list containing no hosts / url" do
+      assert %{} == Instances.filter_reachable([])
+      assert %{} == Instances.filter_reachable([nil])
     end
   end