1 defmodule Pleroma.Web.ActivityPub.SideEffects do
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
9 alias Pleroma.Activity.Ir.Topics
11 alias Pleroma.Chat.MessageReference
12 alias Pleroma.FollowingRelationship
13 alias Pleroma.Notification
17 alias Pleroma.Web.ActivityPub.ActivityPub
18 alias Pleroma.Web.ActivityPub.Builder
19 alias Pleroma.Web.ActivityPub.Pipeline
20 alias Pleroma.Web.ActivityPub.Utils
21 alias Pleroma.Web.Push
22 alias Pleroma.Web.Streamer
23 alias Pleroma.Workers.BackgroundWorker
27 def handle(object, meta \\ [])
31 # - Sends a notification
37 "object" => follow_activity_id
42 with %Activity{actor: follower_id} = follow_activity <-
43 Activity.get_by_ap_id(follow_activity_id),
44 %User{} = followed <- User.get_cached_by_ap_id(actor),
45 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
46 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
47 {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
48 Notification.update_notification_type(followed, follow_activity)
49 User.update_follower_count(followed)
50 User.update_following_count(follower)
57 # - Rejects all existing follow activities for this person
58 # - Updates the follow state
59 # - Dismisses notification
65 "object" => follow_activity_id
70 with %Activity{actor: follower_id} = follow_activity <-
71 Activity.get_by_ap_id(follow_activity_id),
72 %User{} = followed <- User.get_cached_by_ap_id(actor),
73 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
74 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
75 FollowingRelationship.update(follower, followed, :follow_reject)
76 Notification.dismiss(follow_activity)
83 # - Follows if possible
84 # - Sends a notification
85 # - Generates accept or reject if appropriate
91 "object" => followed_user,
92 "actor" => following_user
97 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
98 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
99 {_, {:ok, _}, _, _} <-
100 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
101 if followed.local && !followed.locked do
102 {:ok, accept_data, _} = Builder.accept(followed, object)
103 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
106 {:following, {:error, _}, _follower, followed} ->
107 {:ok, reject_data, _} = Builder.reject(followed, object)
108 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
114 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
118 |> add_notifications(notifications)
120 updated_object = Activity.get_by_ap_id(follow_id)
122 {:ok, updated_object, meta}
125 # Tasks this handles:
126 # - Unfollow and block
128 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
132 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
133 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
134 User.block(blocker, blocked)
140 # Tasks this handles:
143 # For a local user, we also get a changeset with the full information, so we
144 # can update non-federating, non-activitypub settings as well.
145 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
146 if changeset = Keyword.get(meta, :user_update_changeset) do
148 |> User.update_and_set_cache()
150 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
152 User.get_by_ap_id(updated_object["id"])
153 |> User.remote_user_changeset(new_user_data)
154 |> User.update_and_set_cache()
160 # Tasks this handles:
161 # - Add like to object
162 # - Set up notification
163 def handle(%{data: %{"type" => "Like"}} = object, meta) do
164 liked_object = Object.get_by_ap_id(object.data["object"])
165 Utils.add_like_to_object(object, liked_object)
167 Notification.create_notifications(object)
173 # - Actually create object
174 # - Rollback if we couldn't create it
175 # - Increase the user note count
176 # - Increase the reply count
177 # - Increase replies count
178 # - Set up ActivityExpiration
179 # - Set up notifications
180 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
181 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
182 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
183 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
184 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
186 if in_reply_to = object.data["inReplyTo"] do
187 Object.increase_replies_count(in_reply_to)
190 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
194 |> add_notifications(notifications)
196 {:ok, activity, meta}
198 e -> Repo.rollback(e)
202 # Tasks this handles:
203 # - Add announce to object
204 # - Set up notification
205 # - Stream out the announce
206 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
207 announced_object = Object.get_by_ap_id(object.data["object"])
208 user = User.get_cached_by_ap_id(object.data["actor"])
210 Utils.add_announce_to_object(object, announced_object)
212 if !User.is_internal_user?(user) do
213 Notification.create_notifications(object)
216 |> Topics.get_activity_topics()
217 |> Streamer.stream(object)
223 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
224 with undone_object <- Activity.get_by_ap_id(undone_object),
225 :ok <- handle_undoing(undone_object) do
230 # Tasks this handles:
231 # - Add reaction to object
232 # - Set up notification
233 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
234 reacted_object = Object.get_by_ap_id(object.data["object"])
235 Utils.add_emoji_reaction_to_object(object, reacted_object)
237 Notification.create_notifications(object)
242 # Tasks this handles:
243 # - Delete and unpins the create activity
244 # - Replace object with Tombstone
245 # - Set up notification
246 # - Reduce the user note count
247 # - Reduce the reply count
248 # - Stream out the activity
249 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
251 Object.normalize(deleted_object, false) ||
252 User.get_cached_by_ap_id(deleted_object)
255 case deleted_object do
257 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
258 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
259 %User{} = user <- User.get_cached_by_ap_id(actor) do
260 User.remove_pinnned_activity(user, activity)
262 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
264 if in_reply_to = deleted_object.data["inReplyTo"] do
265 Object.decrease_replies_count(in_reply_to)
268 MessageReference.delete_for_object(deleted_object)
270 ActivityPub.stream_out(object)
271 ActivityPub.stream_out_participations(deleted_object, user)
275 Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
280 with {:ok, _} <- User.delete(deleted_object) do
286 Notification.create_notifications(object)
294 def handle(object, meta) do
298 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
299 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
300 actor = User.get_cached_by_ap_id(object.data["actor"])
301 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
304 [[actor, recipient], [recipient, actor]]
305 |> Enum.map(fn [user, other_user] ->
307 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
308 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
311 ["user", "user:pleroma_chat"],
312 {user, %{cm_ref | chat: chat, object: object}}
320 |> add_streamables(streamables)
326 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
327 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
328 Object.increase_vote_count(
329 object.data["inReplyTo"],
338 def handle_object_creation(%{"type" => objtype} = object, meta)
339 when objtype in ~w[Audio Video Question Event] do
340 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
346 def handle_object_creation(object, meta) do
350 defp undo_like(nil, object), do: delete_object(object)
352 defp undo_like(%Object{} = liked_object, object) do
353 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
354 delete_object(object)
358 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
359 object.data["object"]
360 |> Object.get_by_ap_id()
364 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
365 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
366 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
367 {:ok, _} <- Repo.delete(object) do
372 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
373 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
374 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
375 {:ok, _} <- Repo.delete(object) do
381 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
383 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
384 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
385 {:ok, _} <- User.unblock(blocker, blocked),
386 {:ok, _} <- Repo.delete(object) do
391 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
393 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
394 defp delete_object(object) do
395 with {:ok, _} <- Repo.delete(object), do: :ok
398 defp send_notifications(meta) do
399 Keyword.get(meta, :notifications, [])
400 |> Enum.each(fn notification ->
401 Streamer.stream(["user", "user:notification"], notification)
402 Push.send(notification)
408 defp send_streamables(meta) do
409 Keyword.get(meta, :streamables, [])
410 |> Enum.each(fn {topics, items} ->
411 Streamer.stream(topics, items)
417 defp add_streamables(meta, streamables) do
418 existing = Keyword.get(meta, :streamables, [])
421 |> Keyword.put(:streamables, streamables ++ existing)
424 defp add_notifications(meta, notifications) do
425 existing = Keyword.get(meta, :notifications, [])
428 |> Keyword.put(:notifications, notifications ++ existing)
431 def handle_after_transaction(meta) do
433 |> send_notifications()
434 |> send_streamables()