1 defmodule Pleroma.Web.ActivityPub.SideEffects do
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
9 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.ActivityExpiration
12 alias Pleroma.Chat.MessageReference
13 alias Pleroma.FollowingRelationship
14 alias Pleroma.Notification
18 alias Pleroma.Web.ActivityPub.ActivityPub
19 alias Pleroma.Web.ActivityPub.Builder
20 alias Pleroma.Web.ActivityPub.Pipeline
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Push
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Workers.BackgroundWorker
26 def handle(object, meta \\ [])
30 # - Sends a notification
36 "object" => follow_activity_id
41 with %Activity{actor: follower_id} = follow_activity <-
42 Activity.get_by_ap_id(follow_activity_id),
43 %User{} = followed <- User.get_cached_by_ap_id(actor),
44 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
45 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
46 {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
47 Notification.update_notification_type(followed, follow_activity)
48 User.update_follower_count(followed)
49 User.update_following_count(follower)
56 # - Rejects all existing follow activities for this person
57 # - Updates the follow state
58 # - Dismisses notification
64 "object" => follow_activity_id
69 with %Activity{actor: follower_id} = follow_activity <-
70 Activity.get_by_ap_id(follow_activity_id),
71 %User{} = followed <- User.get_cached_by_ap_id(actor),
72 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
73 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
74 FollowingRelationship.update(follower, followed, :follow_reject)
75 Notification.dismiss(follow_activity)
82 # - Follows if possible
83 # - Sends a notification
84 # - Generates accept or reject if appropriate
90 "object" => followed_user,
91 "actor" => following_user
96 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
97 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
98 {_, {:ok, _}, _, _} <-
99 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
100 if followed.local && !followed.locked do
101 {:ok, accept_data, _} = Builder.accept(followed, object)
102 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
105 {:following, {:error, _}, _follower, followed} ->
106 {:ok, reject_data, _} = Builder.reject(followed, object)
107 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
113 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
117 |> add_notifications(notifications)
119 updated_object = Activity.get_by_ap_id(follow_id)
121 {:ok, updated_object, meta}
124 # Tasks this handles:
125 # - Unfollow and block
127 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
131 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
132 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
133 User.block(blocker, blocked)
139 # Tasks this handles:
142 # For a local user, we also get a changeset with the full information, so we
143 # can update non-federating, non-activitypub settings as well.
144 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
145 if changeset = Keyword.get(meta, :user_update_changeset) do
147 |> User.update_and_set_cache()
149 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
151 User.get_by_ap_id(updated_object["id"])
152 |> User.remote_user_changeset(new_user_data)
153 |> User.update_and_set_cache()
159 # Tasks this handles:
160 # - Add like to object
161 # - Set up notification
162 def handle(%{data: %{"type" => "Like"}} = object, meta) do
163 liked_object = Object.get_by_ap_id(object.data["object"])
164 Utils.add_like_to_object(object, liked_object)
166 Notification.create_notifications(object)
172 # - Actually create object
173 # - Rollback if we couldn't create it
174 # - Increase the user note count
175 # - Increase the reply count
176 # - Increase replies count
177 # - Set up ActivityExpiration
178 # - Set up notifications
179 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
180 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
181 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
182 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
183 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
185 if in_reply_to = object.data["inReplyTo"] do
186 Object.increase_replies_count(in_reply_to)
189 if expires_at = activity.data["expires_at"] do
190 ActivityExpiration.create(activity, expires_at)
193 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
197 |> add_notifications(notifications)
199 {:ok, activity, meta}
201 e -> Repo.rollback(e)
205 # Tasks this handles:
206 # - Add announce to object
207 # - Set up notification
208 # - Stream out the announce
209 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
210 announced_object = Object.get_by_ap_id(object.data["object"])
211 user = User.get_cached_by_ap_id(object.data["actor"])
213 Utils.add_announce_to_object(object, announced_object)
215 if !User.is_internal_user?(user) do
216 Notification.create_notifications(object)
219 |> Topics.get_activity_topics()
220 |> Streamer.stream(object)
226 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
227 with undone_object <- Activity.get_by_ap_id(undone_object),
228 :ok <- handle_undoing(undone_object) do
233 # Tasks this handles:
234 # - Add reaction to object
235 # - Set up notification
236 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
237 reacted_object = Object.get_by_ap_id(object.data["object"])
238 Utils.add_emoji_reaction_to_object(object, reacted_object)
240 Notification.create_notifications(object)
245 # Tasks this handles:
246 # - Delete and unpins the create activity
247 # - Replace object with Tombstone
248 # - Set up notification
249 # - Reduce the user note count
250 # - Reduce the reply count
251 # - Stream out the activity
252 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
254 Object.normalize(deleted_object, false) || User.get_cached_by_ap_id(deleted_object)
257 case deleted_object do
259 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
260 %User{} = user <- User.get_cached_by_ap_id(deleted_object.data["actor"]) do
261 User.remove_pinnned_activity(user, activity)
263 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
265 if in_reply_to = deleted_object.data["inReplyTo"] do
266 Object.decrease_replies_count(in_reply_to)
269 MessageReference.delete_for_object(deleted_object)
271 ActivityPub.stream_out(object)
272 ActivityPub.stream_out_participations(deleted_object, user)
277 with {:ok, _} <- User.delete(deleted_object) do
283 Notification.create_notifications(object)
291 def handle(object, meta) do
295 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
296 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
297 actor = User.get_cached_by_ap_id(object.data["actor"])
298 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
301 [[actor, recipient], [recipient, actor]]
302 |> Enum.map(fn [user, other_user] ->
304 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
305 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
308 ["user", "user:pleroma_chat"],
309 {user, %{cm_ref | chat: chat, object: object}}
317 |> add_streamables(streamables)
323 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
324 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
325 Object.increase_vote_count(
326 object.data["inReplyTo"],
335 def handle_object_creation(%{"type" => "Question"} = object, meta) do
336 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
342 def handle_object_creation(object, meta) do
346 defp undo_like(nil, object), do: delete_object(object)
348 defp undo_like(%Object{} = liked_object, object) do
349 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
350 delete_object(object)
354 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
355 object.data["object"]
356 |> Object.get_by_ap_id()
360 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
361 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
362 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
363 {:ok, _} <- Repo.delete(object) do
368 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
369 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
370 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
371 {:ok, _} <- Repo.delete(object) do
377 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
379 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
380 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
381 {:ok, _} <- User.unblock(blocker, blocked),
382 {:ok, _} <- Repo.delete(object) do
387 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
389 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
390 defp delete_object(object) do
391 with {:ok, _} <- Repo.delete(object), do: :ok
394 defp send_notifications(meta) do
395 Keyword.get(meta, :notifications, [])
396 |> Enum.each(fn notification ->
397 Streamer.stream(["user", "user:notification"], notification)
398 Push.send(notification)
404 defp send_streamables(meta) do
405 Keyword.get(meta, :streamables, [])
406 |> Enum.each(fn {topics, items} ->
407 Streamer.stream(topics, items)
413 defp add_streamables(meta, streamables) do
414 existing = Keyword.get(meta, :streamables, [])
417 |> Keyword.put(:streamables, streamables ++ existing)
420 defp add_notifications(meta, notifications) do
421 existing = Keyword.get(meta, :notifications, [])
424 |> Keyword.put(:notifications, notifications ++ existing)
427 def handle_after_transaction(meta) do
429 |> send_notifications()
430 |> send_streamables()