add a job queue
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
index e5f2658174628d9c7b45e452284433eb1a692227..5be3598872b61e8788348930acee2c96406fe12c 100644 (file)
@@ -36,6 +36,14 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     {recipients, to, cc}
   end
 
+  defp get_recipients(%{"type" => "Create"} = data) do
+    to = data["to"] || []
+    cc = data["cc"] || []
+    actor = data["actor"] || []
+    recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
+    {recipients, to, cc}
+  end
+
   defp get_recipients(data) do
     to = data["to"] || []
     cc = data["cc"] || []
@@ -140,8 +148,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
              additional
            ),
          {:ok, activity} <- insert(create_data, local),
-         :ok <- maybe_federate(activity),
-         {:ok, _actor} <- User.increase_note_count(actor) do
+         # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
+         {:ok, _actor} <- User.increase_note_count(actor),
+         :ok <- maybe_federate(activity) do
       {:ok, activity}
     end
   end
@@ -288,8 +297,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
 
     with {:ok, _} <- Object.delete(object),
          {:ok, activity} <- insert(data, local),
-         :ok <- maybe_federate(activity),
-         {:ok, _actor} <- User.decrease_note_count(user) do
+         # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
+         {:ok, _actor} <- User.decrease_note_count(user),
+         :ok <- maybe_federate(activity) do
       {:ok, activity}
     end
   end
@@ -408,6 +418,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     |> Enum.reverse()
   end
 
+  defp restrict_since(query, %{"since_id" => ""}), do: query
+
   defp restrict_since(query, %{"since_id" => since_id}) do
     from(activity in query, where: activity.id > ^since_id)
   end
@@ -463,6 +475,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
 
   defp restrict_local(query, _), do: query
 
+  defp restrict_max(query, %{"max_id" => ""}), do: query
+
   defp restrict_max(query, %{"max_id" => max_id}) do
     from(activity in query, where: activity.id < ^max_id)
   end
@@ -697,20 +711,18 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
 
     public = is_public?(activity)
 
-    remote_inboxes =
-      (Pleroma.Web.Salmon.remote_users(activity) ++ followers)
-      |> Enum.filter(fn user -> User.ap_enabled?(user) end)
-      |> Enum.map(fn %{info: %{source_data: data}} ->
-        (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
-      end)
-      |> Enum.uniq()
-      |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
-
     {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
     json = Jason.encode!(data)
 
-    Enum.each(remote_inboxes, fn inbox ->
-      Federator.enqueue(:publish_single_ap, %{
+    (Pleroma.Web.Salmon.remote_users(activity) ++ followers)
+    |> Enum.filter(fn user -> User.ap_enabled?(user) end)
+    |> Enum.map(fn %{info: %{source_data: data}} ->
+      (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
+    end)
+    |> Enum.uniq()
+    |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
+    |> Enum.each(fn inbox ->
+      Federator.publish_single_ap(%{
         inbox: inbox,
         json: json,
         actor: actor,