Merge remote-tracking branch 'remotes/origin/develop' into 2168-media-preview-proxy
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
index a34bf6a054143dbfba10152f386607192a8ee6b4..61feeae4d68fb4799634b58b03a0ceaad9c3800d 100644 (file)
@@ -6,7 +6,9 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
   collection, and so on.
   """
   alias Pleroma.Activity
+  alias Pleroma.Activity.Ir.Topics
   alias Pleroma.Chat
+  alias Pleroma.Chat.MessageReference
   alias Pleroma.Notification
   alias Pleroma.Object
   alias Pleroma.Repo
@@ -14,10 +16,46 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
   alias Pleroma.Web.ActivityPub.ActivityPub
   alias Pleroma.Web.ActivityPub.Pipeline
   alias Pleroma.Web.ActivityPub.Utils
+  alias Pleroma.Web.Push
   alias Pleroma.Web.Streamer
 
   def handle(object, meta \\ [])
 
+  # Tasks this handles:
+  # - Unfollow and block
+  def handle(
+        %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
+          object,
+        meta
+      ) do
+    with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
+         %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
+      User.block(blocker, blocked)
+    end
+
+    {:ok, object, meta}
+  end
+
+  # Tasks this handles:
+  # - Update the user
+  #
+  # For a local user, we also get a changeset with the full information, so we
+  # can update non-federating, non-activitypub settings as well.
+  def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
+    if changeset = Keyword.get(meta, :user_update_changeset) do
+      changeset
+      |> User.update_and_set_cache()
+    else
+      {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
+
+      User.get_by_ap_id(updated_object["id"])
+      |> User.remote_user_changeset(new_user_data)
+      |> User.update_and_set_cache()
+    end
+
+    {:ok, object, meta}
+  end
+
   # Tasks this handles:
   # - Add like to object
   # - Set up notification
@@ -35,8 +73,13 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
   # - Rollback if we couldn't create it
   # - Set up notifications
   def handle(%{data: %{"type" => "Create"}} = activity, meta) do
-    with {:ok, _object, _meta} <- handle_object_creation(meta[:object_data], meta) do
-      Notification.create_notifications(activity)
+    with {:ok, _object, meta} <- handle_object_creation(meta[:object_data], meta) do
+      {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
+
+      meta =
+        meta
+        |> add_notifications(notifications)
+
       {:ok, activity, meta}
     else
       e -> Repo.rollback(e)
@@ -55,7 +98,10 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
 
     if !User.is_internal_user?(user) do
       Notification.create_notifications(object)
-      ActivityPub.stream_out(object)
+
+      object
+      |> Topics.get_activity_topics()
+      |> Streamer.stream(object)
     end
 
     {:ok, object, meta}
@@ -104,6 +150,8 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
               Object.decrease_replies_count(in_reply_to)
             end
 
+            MessageReference.delete_for_object(deleted_object)
+
             ActivityPub.stream_out(object)
             ActivityPub.stream_out_participations(deleted_object, user)
             :ok
@@ -133,18 +181,25 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
       actor = User.get_cached_by_ap_id(object.data["actor"])
       recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
 
-      [[actor, recipient], [recipient, actor]]
-      |> Enum.each(fn [user, other_user] ->
-        if user.local do
-          if user.ap_id == actor.ap_id do
-            Chat.get_or_create(user.id, other_user.ap_id)
-          else
-            Chat.bump_or_create(user.id, other_user.ap_id)
+      streamables =
+        [[actor, recipient], [recipient, actor]]
+        |> Enum.map(fn [user, other_user] ->
+          if user.local do
+            {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
+            {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
+
+            {
+              ["user", "user:pleroma_chat"],
+              {user, %{cm_ref | chat: chat, object: object}}
+            }
           end
-        end
-      end)
+        end)
+        |> Enum.filter(& &1)
+
+      meta =
+        meta
+        |> add_streamables(streamables)
 
-      Streamer.stream(["user", "user:pleroma_chat"], object)
       {:ok, object, meta}
     end
   end
@@ -190,4 +245,43 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
   end
 
   def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
+
+  defp send_notifications(meta) do
+    Keyword.get(meta, :notifications, [])
+    |> Enum.each(fn notification ->
+      Streamer.stream(["user", "user:notification"], notification)
+      Push.send(notification)
+    end)
+
+    meta
+  end
+
+  defp send_streamables(meta) do
+    Keyword.get(meta, :streamables, [])
+    |> Enum.each(fn {topics, items} ->
+      Streamer.stream(topics, items)
+    end)
+
+    meta
+  end
+
+  defp add_streamables(meta, streamables) do
+    existing = Keyword.get(meta, :streamables, [])
+
+    meta
+    |> Keyword.put(:streamables, streamables ++ existing)
+  end
+
+  defp add_notifications(meta, notifications) do
+    existing = Keyword.get(meta, :notifications, [])
+
+    meta
+    |> Keyword.put(:notifications, notifications ++ existing)
+  end
+
+  def handle_after_transaction(meta) do
+    meta
+    |> send_notifications()
+    |> send_streamables()
+  end
 end