1 defmodule Pleroma.Web.ActivityPub.SideEffects do
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
9 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.ActivityExpiration
12 alias Pleroma.Chat.MessageReference
13 alias Pleroma.FollowingRelationship
14 alias Pleroma.Notification
18 alias Pleroma.Web.ActivityPub.ActivityPub
19 alias Pleroma.Web.ActivityPub.Builder
20 alias Pleroma.Web.ActivityPub.Pipeline
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Push
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Workers.BackgroundWorker
28 def handle(object, meta \\ [])
32 # - Sends a notification
38 "object" => follow_activity_id
43 with %Activity{actor: follower_id} = follow_activity <-
44 Activity.get_by_ap_id(follow_activity_id),
45 %User{} = followed <- User.get_cached_by_ap_id(actor),
46 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
47 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
48 {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
49 Notification.update_notification_type(followed, follow_activity)
50 User.update_follower_count(followed)
51 User.update_following_count(follower)
58 # - Rejects all existing follow activities for this person
59 # - Updates the follow state
60 # - Dismisses notification
66 "object" => follow_activity_id
71 with %Activity{actor: follower_id} = follow_activity <-
72 Activity.get_by_ap_id(follow_activity_id),
73 %User{} = followed <- User.get_cached_by_ap_id(actor),
74 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
75 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
76 FollowingRelationship.update(follower, followed, :follow_reject)
77 Notification.dismiss(follow_activity)
84 # - Follows if possible
85 # - Sends a notification
86 # - Generates accept or reject if appropriate
92 "object" => followed_user,
93 "actor" => following_user
98 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
99 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
100 {_, {:ok, _}, _, _} <-
101 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
102 if followed.local && !followed.locked do
103 {:ok, accept_data, _} = Builder.accept(followed, object)
104 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
107 {:following, {:error, _}, _follower, followed} ->
108 {:ok, reject_data, _} = Builder.reject(followed, object)
109 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
115 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
119 |> add_notifications(notifications)
121 updated_object = Activity.get_by_ap_id(follow_id)
123 {:ok, updated_object, meta}
126 # Tasks this handles:
127 # - Unfollow and block
129 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
133 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
134 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
135 User.block(blocker, blocked)
141 # Tasks this handles:
144 # For a local user, we also get a changeset with the full information, so we
145 # can update non-federating, non-activitypub settings as well.
146 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
147 if changeset = Keyword.get(meta, :user_update_changeset) do
149 |> User.update_and_set_cache()
151 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
153 User.get_by_ap_id(updated_object["id"])
154 |> User.remote_user_changeset(new_user_data)
155 |> User.update_and_set_cache()
161 # Tasks this handles:
162 # - Add like to object
163 # - Set up notification
164 def handle(%{data: %{"type" => "Like"}} = object, meta) do
165 liked_object = Object.get_by_ap_id(object.data["object"])
166 Utils.add_like_to_object(object, liked_object)
168 Notification.create_notifications(object)
174 # - Actually create object
175 # - Rollback if we couldn't create it
176 # - Increase the user note count
177 # - Increase the reply count
178 # - Increase replies count
179 # - Set up ActivityExpiration
180 # - Set up notifications
181 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
182 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
183 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
184 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
185 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
187 if in_reply_to = object.data["inReplyTo"] do
188 Object.increase_replies_count(in_reply_to)
191 if expires_at = activity.data["expires_at"] do
192 ActivityExpiration.create(activity, expires_at)
195 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
199 |> add_notifications(notifications)
201 {:ok, activity, meta}
203 e -> Repo.rollback(e)
207 # Tasks this handles:
208 # - Add announce to object
209 # - Set up notification
210 # - Stream out the announce
211 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
212 announced_object = Object.get_by_ap_id(object.data["object"])
213 user = User.get_cached_by_ap_id(object.data["actor"])
215 Utils.add_announce_to_object(object, announced_object)
217 if !User.is_internal_user?(user) do
218 Notification.create_notifications(object)
221 |> Topics.get_activity_topics()
222 |> Streamer.stream(object)
228 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
229 with undone_object <- Activity.get_by_ap_id(undone_object),
230 :ok <- handle_undoing(undone_object) do
235 # Tasks this handles:
236 # - Add reaction to object
237 # - Set up notification
238 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
239 reacted_object = Object.get_by_ap_id(object.data["object"])
240 Utils.add_emoji_reaction_to_object(object, reacted_object)
242 Notification.create_notifications(object)
247 # Tasks this handles:
248 # - Delete and unpins the create activity
249 # - Replace object with Tombstone
250 # - Set up notification
251 # - Reduce the user note count
252 # - Reduce the reply count
253 # - Stream out the activity
254 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
256 Object.normalize(deleted_object, false) ||
257 User.get_cached_by_ap_id(deleted_object)
260 case deleted_object do
262 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
263 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
264 %User{} = user <- User.get_cached_by_ap_id(actor) do
265 User.remove_pinnned_activity(user, activity)
267 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
269 if in_reply_to = deleted_object.data["inReplyTo"] do
270 Object.decrease_replies_count(in_reply_to)
273 MessageReference.delete_for_object(deleted_object)
275 ActivityPub.stream_out(object)
276 ActivityPub.stream_out_participations(deleted_object, user)
280 Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
285 with {:ok, _} <- User.delete(deleted_object) do
291 Notification.create_notifications(object)
299 def handle(object, meta) do
303 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
304 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
305 actor = User.get_cached_by_ap_id(object.data["actor"])
306 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
309 [[actor, recipient], [recipient, actor]]
310 |> Enum.map(fn [user, other_user] ->
312 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
313 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
316 ["user", "user:pleroma_chat"],
317 {user, %{cm_ref | chat: chat, object: object}}
325 |> add_streamables(streamables)
331 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
332 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
333 Object.increase_vote_count(
334 object.data["inReplyTo"],
343 def handle_object_creation(%{"type" => "Question"} = object, meta) do
344 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
350 def handle_object_creation(object, meta) do
354 defp undo_like(nil, object), do: delete_object(object)
356 defp undo_like(%Object{} = liked_object, object) do
357 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
358 delete_object(object)
362 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
363 object.data["object"]
364 |> Object.get_by_ap_id()
368 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
369 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
370 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
371 {:ok, _} <- Repo.delete(object) do
376 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
377 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
378 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
379 {:ok, _} <- Repo.delete(object) do
385 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
387 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
388 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
389 {:ok, _} <- User.unblock(blocker, blocked),
390 {:ok, _} <- Repo.delete(object) do
395 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
397 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
398 defp delete_object(object) do
399 with {:ok, _} <- Repo.delete(object), do: :ok
402 defp send_notifications(meta) do
403 Keyword.get(meta, :notifications, [])
404 |> Enum.each(fn notification ->
405 Streamer.stream(["user", "user:notification"], notification)
406 Push.send(notification)
412 defp send_streamables(meta) do
413 Keyword.get(meta, :streamables, [])
414 |> Enum.each(fn {topics, items} ->
415 Streamer.stream(topics, items)
421 defp add_streamables(meta, streamables) do
422 existing = Keyword.get(meta, :streamables, [])
425 |> Keyword.put(:streamables, streamables ++ existing)
428 defp add_notifications(meta, notifications) do
429 existing = Keyword.get(meta, :notifications, [])
432 |> Keyword.put(:notifications, notifications ++ existing)
435 def handle_after_transaction(meta) do
437 |> send_notifications()
438 |> send_streamables()