Merge branch 'develop' into feature/relay-list
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
index 1d34c4d7ef8e9f1049f4064458b70ce9246c4e10..94c467b69e72196df67da628f450b063c22b8d05 100644 (file)
@@ -7,6 +7,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
   alias Pleroma.Activity.Ir.Topics
   alias Pleroma.Config
   alias Pleroma.Conversation
+  alias Pleroma.Conversation.Participation
   alias Pleroma.Notification
   alias Pleroma.Object
   alias Pleroma.Object.Containment
@@ -131,7 +132,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
          {:ok, map} <- MRF.filter(map),
          {recipients, _, _} = get_recipients(map),
          {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
-         :ok <- Containment.contain_child(map),
+         {:containment, :ok} <- {:containment, Containment.contain_child(map)},
          {:ok, map, object} <- insert_full_object(map) do
       {:ok, activity} =
         Repo.insert(%Activity{
@@ -153,11 +154,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
 
       Notification.create_notifications(activity)
 
-      participations =
-        activity
-        |> Conversation.create_or_bump_for()
-        |> get_participations()
-
+      conversation = create_or_bump_conversation(activity, map["actor"])
+      participations = get_participations(conversation)
       stream_out(activity)
       stream_out_participations(participations)
       {:ok, activity}
@@ -182,7 +180,20 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     end
   end
 
-  defp get_participations({:ok, %{participations: participations}}), do: participations
+  defp create_or_bump_conversation(activity, actor) do
+    with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
+         %User{} = user <- User.get_cached_by_ap_id(actor),
+         Participation.mark_as_read(user, conversation) do
+      {:ok, conversation}
+    end
+  end
+
+  defp get_participations({:ok, conversation}) do
+    conversation
+    |> Repo.preload(:participations, force: true)
+    |> Map.get(:participations)
+  end
+
   defp get_participations(_), do: []
 
   def stream_out_participations(participations) do
@@ -225,6 +236,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     # only accept false as false value
     local = !(params[:local] == false)
     published = params[:published]
+    quick_insert? = Pleroma.Config.get([:env]) == :benchmark
 
     with create_data <-
            make_create_data(
@@ -235,12 +247,16 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
          {:fake, false, activity} <- {:fake, fake, activity},
          _ <- increase_replies_count_if_reply(create_data),
          _ <- increase_poll_votes_if_vote(create_data),
+         {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
          # Changing note count prior to enqueuing federation task in order to avoid
          # race conditions on updating user.info
          {:ok, _actor} <- increase_note_count_if_public(actor, activity),
          :ok <- maybe_federate(activity) do
       {:ok, activity}
     else
+      {:quick_insert, true, activity} ->
+        {:ok, activity}
+
       {:fake, true, activity} ->
         {:ok, activity}
 
@@ -1203,7 +1219,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
          data <- maybe_update_follow_information(data) do
       {:ok, data}
     else
-      e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
+      e ->
+        Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
+        {:error, e}
     end
   end