1 defmodule Pleroma.Web.ActivityPub.SideEffects do
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
9 alias Pleroma.Activity.Ir.Topics
11 alias Pleroma.Chat.MessageReference
12 alias Pleroma.FollowingRelationship
13 alias Pleroma.Notification
17 alias Pleroma.Web.ActivityPub.ActivityPub
18 alias Pleroma.Web.ActivityPub.Builder
19 alias Pleroma.Web.ActivityPub.Pipeline
20 alias Pleroma.Web.ActivityPub.Utils
21 alias Pleroma.Web.Push
22 alias Pleroma.Web.Streamer
23 alias Pleroma.Workers.BackgroundWorker
27 def handle(object, meta \\ [])
31 # - Sends a notification
37 "object" => follow_activity_id
42 with %Activity{actor: follower_id} = follow_activity <-
43 Activity.get_by_ap_id(follow_activity_id),
44 %User{} = followed <- User.get_cached_by_ap_id(actor),
45 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
46 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
47 {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
48 Notification.update_notification_type(followed, follow_activity)
49 User.update_follower_count(followed)
50 User.update_following_count(follower)
57 # - Rejects all existing follow activities for this person
58 # - Updates the follow state
59 # - Dismisses notification
65 "object" => follow_activity_id
70 with %Activity{actor: follower_id} = follow_activity <-
71 Activity.get_by_ap_id(follow_activity_id),
72 %User{} = followed <- User.get_cached_by_ap_id(actor),
73 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
74 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
75 FollowingRelationship.update(follower, followed, :follow_reject)
76 Notification.dismiss(follow_activity)
83 # - Follows if possible
84 # - Sends a notification
85 # - Generates accept or reject if appropriate
91 "object" => followed_user,
92 "actor" => following_user
97 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
98 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
99 {_, {:ok, _}, _, _} <-
100 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
101 if followed.local && !followed.locked do
102 {:ok, accept_data, _} = Builder.accept(followed, object)
103 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
106 {:following, {:error, _}, _follower, followed} ->
107 {:ok, reject_data, _} = Builder.reject(followed, object)
108 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
114 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
118 |> add_notifications(notifications)
120 updated_object = Activity.get_by_ap_id(follow_id)
122 {:ok, updated_object, meta}
125 # Tasks this handles:
126 # - Unfollow and block
128 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
132 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
133 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
134 User.block(blocker, blocked)
140 # Tasks this handles:
143 # For a local user, we also get a changeset with the full information, so we
144 # can update non-federating, non-activitypub settings as well.
145 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
146 if changeset = Keyword.get(meta, :user_update_changeset) do
148 |> User.update_and_set_cache()
150 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
152 User.get_by_ap_id(updated_object["id"])
153 |> User.remote_user_changeset(new_user_data)
154 |> User.update_and_set_cache()
160 # Tasks this handles:
161 # - Add like to object
162 # - Set up notification
163 def handle(%{data: %{"type" => "Like"}} = object, meta) do
164 liked_object = Object.get_by_ap_id(object.data["object"])
165 Utils.add_like_to_object(object, liked_object)
167 Notification.create_notifications(object)
173 # - Actually create object
174 # - Rollback if we couldn't create it
175 # - Increase the user note count
176 # - Increase the reply count
177 # - Increase replies count
178 # - Set up ActivityExpiration
179 # - Set up notifications
180 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
181 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
182 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
183 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
184 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
186 if in_reply_to = object.data["inReplyTo"] do
187 Object.increase_replies_count(in_reply_to)
190 if expires_at = activity.data["expires_at"] do
191 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
192 activity_id: activity.id,
193 expires_at: expires_at
197 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
201 |> add_notifications(notifications)
203 {:ok, activity, meta}
205 e -> Repo.rollback(e)
209 # Tasks this handles:
210 # - Add announce to object
211 # - Set up notification
212 # - Stream out the announce
213 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
214 announced_object = Object.get_by_ap_id(object.data["object"])
215 user = User.get_cached_by_ap_id(object.data["actor"])
217 Utils.add_announce_to_object(object, announced_object)
219 if !User.is_internal_user?(user) do
220 Notification.create_notifications(object)
223 |> Topics.get_activity_topics()
224 |> Streamer.stream(object)
230 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
231 with undone_object <- Activity.get_by_ap_id(undone_object),
232 :ok <- handle_undoing(undone_object) do
237 # Tasks this handles:
238 # - Add reaction to object
239 # - Set up notification
240 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
241 reacted_object = Object.get_by_ap_id(object.data["object"])
242 Utils.add_emoji_reaction_to_object(object, reacted_object)
244 Notification.create_notifications(object)
249 # Tasks this handles:
250 # - Delete and unpins the create activity
251 # - Replace object with Tombstone
252 # - Set up notification
253 # - Reduce the user note count
254 # - Reduce the reply count
255 # - Stream out the activity
256 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
258 Object.normalize(deleted_object, false) ||
259 User.get_cached_by_ap_id(deleted_object)
262 case deleted_object do
264 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
265 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
266 %User{} = user <- User.get_cached_by_ap_id(actor) do
267 User.remove_pinnned_activity(user, activity)
269 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
271 if in_reply_to = deleted_object.data["inReplyTo"] do
272 Object.decrease_replies_count(in_reply_to)
275 MessageReference.delete_for_object(deleted_object)
277 ActivityPub.stream_out(object)
278 ActivityPub.stream_out_participations(deleted_object, user)
282 Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
287 with {:ok, _} <- User.delete(deleted_object) do
293 Notification.create_notifications(object)
301 def handle(object, meta) do
305 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
306 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
307 actor = User.get_cached_by_ap_id(object.data["actor"])
308 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
311 [[actor, recipient], [recipient, actor]]
312 |> Enum.map(fn [user, other_user] ->
314 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
315 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
318 ["user", "user:pleroma_chat"],
319 {user, %{cm_ref | chat: chat, object: object}}
327 |> add_streamables(streamables)
333 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
334 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
335 Object.increase_vote_count(
336 object.data["inReplyTo"],
345 def handle_object_creation(%{"type" => objtype} = object, meta)
346 when objtype in ~w[Audio Question Event] do
347 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
353 def handle_object_creation(object, meta) do
357 defp undo_like(nil, object), do: delete_object(object)
359 defp undo_like(%Object{} = liked_object, object) do
360 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
361 delete_object(object)
365 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
366 object.data["object"]
367 |> Object.get_by_ap_id()
371 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
372 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
373 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
374 {:ok, _} <- Repo.delete(object) do
379 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
380 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
381 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
382 {:ok, _} <- Repo.delete(object) do
388 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
390 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
391 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
392 {:ok, _} <- User.unblock(blocker, blocked),
393 {:ok, _} <- Repo.delete(object) do
398 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
400 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
401 defp delete_object(object) do
402 with {:ok, _} <- Repo.delete(object), do: :ok
405 defp send_notifications(meta) do
406 Keyword.get(meta, :notifications, [])
407 |> Enum.each(fn notification ->
408 Streamer.stream(["user", "user:notification"], notification)
409 Push.send(notification)
415 defp send_streamables(meta) do
416 Keyword.get(meta, :streamables, [])
417 |> Enum.each(fn {topics, items} ->
418 Streamer.stream(topics, items)
424 defp add_streamables(meta, streamables) do
425 existing = Keyword.get(meta, :streamables, [])
428 |> Keyword.put(:streamables, streamables ++ existing)
431 defp add_notifications(meta, notifications) do
432 existing = Keyword.get(meta, :notifications, [])
435 |> Keyword.put(:notifications, notifications ++ existing)
438 def handle_after_transaction(meta) do
440 |> send_notifications()
441 |> send_streamables()