[#534] Federation publish requests status control (enforced 2xx response code check).
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
index 68b684c4b9abddb5e8d76df13a08e04f313efc97..10155ff5a3c2cc72a527bd2fa3fedbd7afd1c174 100644 (file)
@@ -3,7 +3,7 @@
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Web.ActivityPub.ActivityPub do
-  alias Pleroma.{Activity, Repo, Object, Upload, User, Notification}
+  alias Pleroma.{Activity, Repo, Object, Upload, User, Notification, Instances}
   alias Pleroma.Web.ActivityPub.{Transmogrifier, MRF}
   alias Pleroma.Web.WebFinger
   alias Pleroma.Web.Federator
@@ -92,7 +92,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
   def stream_out(activity) do
     public = "https://www.w3.org/ns/activitystreams#Public"
 
-    if activity.data["type"] in ["Create", "Announce"] do
+    if activity.data["type"] in ["Create", "Announce", "Delete"] do
       Pleroma.Web.Streamer.stream("user", activity)
       Pleroma.Web.Streamer.stream("list", activity)
 
@@ -103,16 +103,18 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
           Pleroma.Web.Streamer.stream("public:local", activity)
         end
 
-        activity.data["object"]
-        |> Map.get("tag", [])
-        |> Enum.filter(fn tag -> is_bitstring(tag) end)
-        |> Enum.map(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
+        if activity.data["type"] in ["Create"] do
+          activity.data["object"]
+          |> Map.get("tag", [])
+          |> Enum.filter(fn tag -> is_bitstring(tag) end)
+          |> Enum.map(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
 
-        if activity.data["object"]["attachment"] != [] do
-          Pleroma.Web.Streamer.stream("public:media", activity)
+          if activity.data["object"]["attachment"] != [] do
+            Pleroma.Web.Streamer.stream("public:media", activity)
 
-          if activity.local do
-            Pleroma.Web.Streamer.stream("public:local:media", activity)
+            if activity.local do
+              Pleroma.Web.Streamer.stream("public:local:media", activity)
+            end
           end
         end
       else
@@ -138,8 +140,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
              additional
            ),
          {:ok, activity} <- insert(create_data, local),
-         :ok <- maybe_federate(activity),
-         {:ok, _actor} <- User.increase_note_count(actor) do
+         # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
+         {:ok, _actor} <- User.increase_note_count(actor),
+         :ok <- maybe_federate(activity) do
       {:ok, activity}
     end
   end
@@ -286,8 +289,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
 
     with {:ok, _} <- Object.delete(object),
          {:ok, activity} <- insert(data, local),
-         :ok <- maybe_federate(activity),
-         {:ok, _actor} <- User.decrease_note_count(user) do
+         # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
+         {:ok, _actor} <- User.decrease_note_count(user),
+         :ok <- maybe_federate(activity) do
       {:ok, activity}
     end
   end
@@ -717,7 +721,15 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     end)
   end
 
-  def publish_one(%{inbox: inbox, json: json, actor: actor, id: id}) do
+  def publish_one(%{inbox: inbox} = activity) do
+    if Instances.reachable?(inbox) do
+      do_publish_one(activity)
+    else
+      {:error, :noop}
+    end
+  end
+
+  defp do_publish_one(%{inbox: inbox, json: json, actor: actor, id: id}) do
     Logger.info("Federating #{id} to #{inbox}")
     host = URI.parse(inbox).host
 
@@ -730,15 +742,24 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
         digest: digest
       })
 
-    @httpoison.post(
-      inbox,
-      json,
-      [
-        {"Content-Type", "application/activity+json"},
-        {"signature", signature},
-        {"digest", digest}
-      ]
-    )
+    with {:ok, %{status: code}} when code in 200..299 <-
+           result =
+             @httpoison.post(
+               inbox,
+               json,
+               [
+                 {"Content-Type", "application/activity+json"},
+                 {"signature", signature},
+                 {"digest", digest}
+               ]
+             ) do
+      Instances.set_reachable(inbox)
+      result
+    else
+      e ->
+        Instances.set_unreachable(inbox)
+        e
+    end
   end
 
   # TODO: