1 defmodule Pleroma.Web.ActivityPub.SideEffects do
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
9 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.ActivityExpiration
12 alias Pleroma.Chat.MessageReference
13 alias Pleroma.FollowingRelationship
14 alias Pleroma.Notification
18 alias Pleroma.Web.ActivityPub.ActivityPub
19 alias Pleroma.Web.ActivityPub.Builder
20 alias Pleroma.Web.ActivityPub.Pipeline
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Push
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Workers.BackgroundWorker
28 def handle(object, meta \\ [])
32 # - Sends a notification
38 "object" => follow_activity_id
43 with %Activity{actor: follower_id} = follow_activity <-
44 Activity.get_by_ap_id(follow_activity_id),
45 %User{} = followed <- User.get_cached_by_ap_id(actor),
46 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
47 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
48 {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
49 Notification.update_notification_type(followed, follow_activity)
50 User.update_follower_count(followed)
51 User.update_following_count(follower)
58 # - Rejects all existing follow activities for this person
59 # - Updates the follow state
60 # - Dismisses notification
66 "object" => follow_activity_id
71 with %Activity{actor: follower_id} = follow_activity <-
72 Activity.get_by_ap_id(follow_activity_id),
73 %User{} = followed <- User.get_cached_by_ap_id(actor),
74 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
75 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
76 FollowingRelationship.update(follower, followed, :follow_reject)
77 Notification.dismiss(follow_activity)
84 # - Follows if possible
85 # - Sends a notification
86 # - Generates accept or reject if appropriate
92 "object" => followed_user,
93 "actor" => following_user
98 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
99 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
100 {_, {:ok, _}, _, _} <-
101 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
102 if followed.local && !followed.locked do
103 {:ok, accept_data, _} = Builder.accept(followed, object)
104 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
107 {:following, {:error, _}, _follower, followed} ->
108 {:ok, reject_data, _} = Builder.reject(followed, object)
109 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
115 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
119 |> add_notifications(notifications)
121 updated_object = Activity.get_by_ap_id(follow_id)
123 {:ok, updated_object, meta}
126 # Tasks this handles:
127 # - Unfollow and block
129 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
133 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
134 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
135 User.block(blocker, blocked)
141 # Tasks this handles:
144 # For a local user, we also get a changeset with the full information, so we
145 # can update non-federating, non-activitypub settings as well.
146 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
147 if changeset = Keyword.get(meta, :user_update_changeset) do
149 |> User.update_and_set_cache()
151 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
153 User.get_by_ap_id(updated_object["id"])
154 |> User.remote_user_changeset(new_user_data)
155 |> User.update_and_set_cache()
161 # Tasks this handles:
162 # - Add like to object
163 # - Set up notification
164 def handle(%{data: %{"type" => "Like"}} = object, meta) do
165 liked_object = Object.get_by_ap_id(object.data["object"])
166 Utils.add_like_to_object(object, liked_object)
168 Notification.create_notifications(object)
174 # - Actually create object
175 # - Rollback if we couldn't create it
176 # - Increase the user note count
177 # - Increase the reply count
178 # - Increase replies count
179 # - Set up ActivityExpiration
180 # - Set up notifications
181 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
182 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
183 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
184 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
185 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
187 if in_reply_to = object.data["inReplyTo"] do
188 Object.increase_replies_count(in_reply_to)
191 if expires_at = activity.data["expires_at"] do
192 ActivityExpiration.create(activity, expires_at)
195 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
199 |> add_notifications(notifications)
201 {:ok, activity, meta}
203 e -> Repo.rollback(e)
207 # Tasks this handles:
208 # - Add announce to object
209 # - Set up notification
210 # - Stream out the announce
211 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
212 announced_object = Object.get_by_ap_id(object.data["object"])
213 user = User.get_cached_by_ap_id(object.data["actor"])
215 Utils.add_announce_to_object(object, announced_object)
217 if !User.is_internal_user?(user) do
218 Notification.create_notifications(object)
221 |> Topics.get_activity_topics()
222 |> Streamer.stream(object)
228 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
229 with undone_object <- Activity.get_by_ap_id(undone_object),
230 :ok <- handle_undoing(undone_object) do
235 # Tasks this handles:
236 # - Add reaction to object
237 # - Set up notification
238 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
239 reacted_object = Object.get_by_ap_id(object.data["object"])
240 Utils.add_emoji_reaction_to_object(object, reacted_object)
242 Notification.create_notifications(object)
247 # Tasks this handles:
248 # - Delete and unpins the create activity
249 # - Replace object with Tombstone
250 # - Set up notification
251 # - Reduce the user note count
252 # - Reduce the reply count
253 # - Stream out the activity
254 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
256 Object.normalize(deleted_object, false) ||
257 User.get_cached_by_ap_id(deleted_object)
260 case deleted_object do
262 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
263 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
264 %User{} = user <- User.get_cached_by_ap_id(actor) do
265 User.remove_pinnned_activity(user, activity)
267 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
269 if in_reply_to = deleted_object.data["inReplyTo"] do
270 Object.decrease_replies_count(in_reply_to)
273 MessageReference.delete_for_object(deleted_object)
275 ActivityPub.stream_out(object)
276 ActivityPub.stream_out_participations(deleted_object, user)
280 Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
285 with {:ok, _} <- User.delete(deleted_object) do
291 Notification.create_notifications(object)
299 def handle(object, meta) do
303 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
304 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
305 actor = User.get_cached_by_ap_id(object.data["actor"])
306 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
309 [[actor, recipient], [recipient, actor]]
310 |> Enum.map(fn [user, other_user] ->
312 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
313 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
316 ["user", "user:pleroma_chat"],
317 {user, %{cm_ref | chat: chat, object: object}}
325 |> add_streamables(streamables)
331 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
332 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
333 Object.increase_vote_count(
334 object.data["inReplyTo"],
343 def handle_object_creation(%{"type" => objtype} = object, meta)
344 when objtype in ~w[Audio Question Event] do
345 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
351 def handle_object_creation(object, meta) do
355 defp undo_like(nil, object), do: delete_object(object)
357 defp undo_like(%Object{} = liked_object, object) do
358 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
359 delete_object(object)
363 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
364 object.data["object"]
365 |> Object.get_by_ap_id()
369 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
370 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
371 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
372 {:ok, _} <- Repo.delete(object) do
377 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
378 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
379 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
380 {:ok, _} <- Repo.delete(object) do
386 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
388 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
389 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
390 {:ok, _} <- User.unblock(blocker, blocked),
391 {:ok, _} <- Repo.delete(object) do
396 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
398 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
399 defp delete_object(object) do
400 with {:ok, _} <- Repo.delete(object), do: :ok
403 defp send_notifications(meta) do
404 Keyword.get(meta, :notifications, [])
405 |> Enum.each(fn notification ->
406 Streamer.stream(["user", "user:notification"], notification)
407 Push.send(notification)
413 defp send_streamables(meta) do
414 Keyword.get(meta, :streamables, [])
415 |> Enum.each(fn {topics, items} ->
416 Streamer.stream(topics, items)
422 defp add_streamables(meta, streamables) do
423 existing = Keyword.get(meta, :streamables, [])
426 |> Keyword.put(:streamables, streamables ++ existing)
429 defp add_notifications(meta, notifications) do
430 existing = Keyword.get(meta, :notifications, [])
433 |> Keyword.put(:notifications, notifications ++ existing)
436 def handle_after_transaction(meta) do
438 |> send_notifications()
439 |> send_streamables()