[#534] Optimized bulk publish ops to filter on reachability early. `Instance` refacto...
authorIvan Tashkinov <ivantashkinov@gmail.com>
Thu, 24 Jan 2019 16:15:23 +0000 (19:15 +0300)
committerIvan Tashkinov <ivantashkinov@gmail.com>
Thu, 24 Jan 2019 16:15:23 +0000 (19:15 +0300)
lib/pleroma/instances.ex
lib/pleroma/instances/instance.ex
lib/pleroma/web/activity_pub/activity_pub.ex
lib/pleroma/web/salmon/salmon.ex
lib/pleroma/web/websub/websub.ex

index 25b739520ae5d6d5a2c782409cb62e56a7028049..6d445d6b3a075eb600d5a3f25ccba4fee184628a 100644 (file)
@@ -3,10 +3,19 @@ defmodule Pleroma.Instances do
 
   @adapter Pleroma.Instances.Instance
 
+  defdelegate filter_reachable(urls), to: @adapter
   defdelegate reachable?(url), to: @adapter
   defdelegate set_reachable(url), to: @adapter
   defdelegate set_unreachable(url, unreachable_since \\ nil), to: @adapter
 
   def reachability_time_threshold,
     do: NaiveDateTime.add(NaiveDateTime.utc_now(), -30 * 24 * 3600, :second)
+
+  def host(url_or_host) when is_binary(url_or_host) do
+    if url_or_host =~ ~r/^http/i do
+      URI.parse(url_or_host).host
+    else
+      url_or_host
+    end
+  end
 end
index fe52331a359c97eb620dc3d8cc106208c5314462..a17c8dab10ffa139500bbcb36d75564b3ffcfb5d 100644 (file)
@@ -18,12 +18,35 @@ defmodule Pleroma.Instances.Instance do
     timestamps()
   end
 
-  def update_changeset(struct, params \\ %{}) do
+  defdelegate host(url), to: Instances
+
+  def changeset(struct, params \\ %{}) do
     struct
     |> cast(params, [:host, :unreachable_since, :reachability_checked_at])
+    |> validate_required([:host])
     |> unique_constraint(:host)
   end
 
+  def filter_reachable([]), do: []
+
+  def filter_reachable(urls) when is_list(urls) do
+    hosts =
+      urls
+      |> Enum.map(&(&1 && host(&1)))
+      |> Enum.filter(&(to_string(&1) != ""))
+
+    unreachable_hosts =
+      Repo.all(
+        from(i in Instance,
+          where:
+            i.host in ^hosts and i.unreachable_since <= ^Instances.reachability_time_threshold(),
+          select: i.host
+        )
+      )
+
+    Enum.filter(urls, &(&1 && host(&1) not in unreachable_hosts))
+  end
+
   def reachable?(url) when is_binary(url) do
     !Repo.one(
       from(i in Instance,
@@ -37,13 +60,13 @@ defmodule Pleroma.Instances.Instance do
   def reachable?(_), do: true
 
   def set_reachable(url) when is_binary(url) do
-    Repo.update_all(
-      from(i in Instance, where: i.host == ^host(url)),
-      set: [
-        unreachable_since: nil,
-        reachability_checked_at: DateTime.utc_now()
-      ]
-    )
+    with host <- host(url),
+         %Instance{} = existing_record <- Repo.get_by(Instance, %{host: host}) do
+      {:ok, _instance} =
+        existing_record
+        |> changeset(%{unreachable_since: nil, reachability_checked_at: DateTime.utc_now()})
+        |> Repo.update()
+    end
   end
 
   def set_reachable(_), do: {0, :noop}
@@ -67,19 +90,17 @@ defmodule Pleroma.Instances.Instance do
            do: Map.delete(changes, :unreachable_since),
            else: changes
 
-      {:ok, _instance} = Repo.update(update_changeset(existing_record, update_changes))
+      {:ok, _instance} =
+        existing_record
+        |> changeset(update_changes)
+        |> Repo.update()
     else
-      {:ok, _instance} = Repo.insert(update_changeset(%Instance{}, Map.put(changes, :host, host)))
+      {:ok, _instance} =
+        %Instance{}
+        |> changeset(Map.put(changes, :host, host))
+        |> Repo.insert()
     end
   end
 
   def set_unreachable(_, _), do: {0, :noop}
-
-  defp host(url_or_host) do
-    if url_or_host =~ ~r/^http/i do
-      URI.parse(url_or_host).host
-    else
-      url_or_host
-    end
-  end
 end
index 44c295d658445c4d9777bdd883cd679e3113e166..4b34334a0908d339239f174b201245ec24fdb5d2 100644 (file)
@@ -689,7 +689,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
   end
 
   def publish(actor, activity) do
-    followers =
+    remote_followers =
       if actor.follower_address in activity.recipients do
         {:ok, followers} = User.get_followers(actor)
         followers |> Enum.filter(&(!&1.local))
@@ -700,13 +700,14 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     public = is_public?(activity)
 
     remote_inboxes =
-      (Pleroma.Web.Salmon.remote_users(activity) ++ followers)
+      (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
       |> Enum.filter(fn user -> User.ap_enabled?(user) end)
       |> Enum.map(fn %{info: %{source_data: data}} ->
         (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
       end)
       |> Enum.uniq()
       |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
+      |> Instances.filter_reachable()
 
     {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
     json = Jason.encode!(data)
index e4d2d95171432413b71530c33de479e38ec460a4..848131d52769be2636349698d042411fde1f6105 100644 (file)
@@ -221,7 +221,13 @@ defmodule Pleroma.Web.Salmon do
       {:ok, private, _} = keys_from_pem(keys)
       {:ok, feed} = encode(private, feed)
 
-      remote_users(activity)
+      remote_users = remote_users(activity)
+
+      salmon_urls = Enum.map(remote_users, & &1.info.salmon)
+      reachable_salmon_urls = Instances.filter_reachable(salmon_urls)
+
+      remote_users
+      |> Enum.filter(&(&1.info.salmon in reachable_salmon_urls))
       |> Enum.each(fn remote_user ->
         Task.start(fn ->
           Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
index ac8903913c1f9ee4f81bc41154186f7992adf0c6..bb44425916ec5469acb7ec8c088a1c99dc8e1d44 100644 (file)
@@ -54,7 +54,12 @@ defmodule Pleroma.Web.Websub do
   ]
   def publish(topic, user, %{data: %{"type" => type}} = activity)
       when type in @supported_activities do
-    # TODO: Only send to still valid subscriptions.
+    response =
+      user
+      |> FeedRepresenter.to_simple_form([activity], [user])
+      |> :xmerl.export_simple(:xmerl_xml)
+      |> to_string
+
     query =
       from(
         sub in WebsubServerSubscription,
@@ -64,13 +69,12 @@ defmodule Pleroma.Web.Websub do
 
     subscriptions = Repo.all(query)
 
-    Enum.each(subscriptions, fn sub ->
-      response =
-        user
-        |> FeedRepresenter.to_simple_form([activity], [user])
-        |> :xmerl.export_simple(:xmerl_xml)
-        |> to_string
+    callbacks = Enum.map(subscriptions, & &1.callback)
+    reachable_callbacks = Instances.filter_reachable(callbacks)
 
+    subscriptions
+    |> Enum.filter(&(&1.callback in reachable_callbacks))
+    |> Enum.each(fn sub ->
       data = %{
         xml: response,
         topic: topic,