1 defmodule Pleroma.Web.ActivityPub.SideEffects do
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
9 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.ActivityExpiration
12 alias Pleroma.Chat.MessageReference
13 alias Pleroma.FollowingRelationship
14 alias Pleroma.Notification
18 alias Pleroma.Web.ActivityPub.ActivityPub
19 alias Pleroma.Web.ActivityPub.Builder
20 alias Pleroma.Web.ActivityPub.Pipeline
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Push
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Workers.BackgroundWorker
26 def handle(object, meta \\ [])
30 # - Sends a notification
36 "object" => follow_activity_id
41 with %Activity{actor: follower_id} = follow_activity <-
42 Activity.get_by_ap_id(follow_activity_id),
43 %User{} = followed <- User.get_cached_by_ap_id(actor),
44 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
45 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
46 {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
47 Notification.update_notification_type(followed, follow_activity)
48 User.update_follower_count(followed)
49 User.update_following_count(follower)
56 # - Follows if possible
57 # - Sends a notification
58 # - Generates accept or reject if appropriate
64 "object" => followed_user,
65 "actor" => following_user
70 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
71 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
72 {_, {:ok, _}, _, _} <-
73 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
74 if followed.local && !followed.locked do
75 {:ok, accept_data, _} = Builder.accept(followed, object)
76 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
79 {:following, {:error, _}, follower, followed} ->
80 Utils.update_follow_state_for_all(object, "reject")
81 FollowingRelationship.update(follower, followed, :follow_reject)
90 |> ActivityPub.reject()
97 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
101 |> add_notifications(notifications)
103 updated_object = Activity.get_by_ap_id(follow_id)
105 {:ok, updated_object, meta}
108 # Tasks this handles:
109 # - Unfollow and block
111 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
115 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
116 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
117 User.block(blocker, blocked)
123 # Tasks this handles:
126 # For a local user, we also get a changeset with the full information, so we
127 # can update non-federating, non-activitypub settings as well.
128 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
129 if changeset = Keyword.get(meta, :user_update_changeset) do
131 |> User.update_and_set_cache()
133 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
135 User.get_by_ap_id(updated_object["id"])
136 |> User.remote_user_changeset(new_user_data)
137 |> User.update_and_set_cache()
143 # Tasks this handles:
144 # - Add like to object
145 # - Set up notification
146 def handle(%{data: %{"type" => "Like"}} = object, meta) do
147 liked_object = Object.get_by_ap_id(object.data["object"])
148 Utils.add_like_to_object(object, liked_object)
150 Notification.create_notifications(object)
156 # - Actually create object
157 # - Rollback if we couldn't create it
158 # - Increase the user note count
159 # - Increase the reply count
160 # - Increase replies count
161 # - Set up ActivityExpiration
162 # - Set up notifications
163 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
164 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
165 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
166 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
167 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
169 if in_reply_to = object.data["inReplyTo"] do
170 Object.increase_replies_count(in_reply_to)
173 if expires_at = activity.data["expires_at"] do
174 ActivityExpiration.create(activity, expires_at)
177 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
181 |> add_notifications(notifications)
183 {:ok, activity, meta}
185 e -> Repo.rollback(e)
189 # Tasks this handles:
190 # - Add announce to object
191 # - Set up notification
192 # - Stream out the announce
193 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
194 announced_object = Object.get_by_ap_id(object.data["object"])
195 user = User.get_cached_by_ap_id(object.data["actor"])
197 Utils.add_announce_to_object(object, announced_object)
199 if !User.is_internal_user?(user) do
200 Notification.create_notifications(object)
203 |> Topics.get_activity_topics()
204 |> Streamer.stream(object)
210 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
211 with undone_object <- Activity.get_by_ap_id(undone_object),
212 :ok <- handle_undoing(undone_object) do
217 # Tasks this handles:
218 # - Add reaction to object
219 # - Set up notification
220 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
221 reacted_object = Object.get_by_ap_id(object.data["object"])
222 Utils.add_emoji_reaction_to_object(object, reacted_object)
224 Notification.create_notifications(object)
229 # Tasks this handles:
230 # - Delete and unpins the create activity
231 # - Replace object with Tombstone
232 # - Set up notification
233 # - Reduce the user note count
234 # - Reduce the reply count
235 # - Stream out the activity
236 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
238 Object.normalize(deleted_object, false) || User.get_cached_by_ap_id(deleted_object)
241 case deleted_object do
243 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
244 %User{} = user <- User.get_cached_by_ap_id(deleted_object.data["actor"]) do
245 User.remove_pinnned_activity(user, activity)
247 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
249 if in_reply_to = deleted_object.data["inReplyTo"] do
250 Object.decrease_replies_count(in_reply_to)
253 MessageReference.delete_for_object(deleted_object)
255 ActivityPub.stream_out(object)
256 ActivityPub.stream_out_participations(deleted_object, user)
261 with {:ok, _} <- User.delete(deleted_object) do
267 Notification.create_notifications(object)
275 def handle(object, meta) do
279 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
280 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
281 actor = User.get_cached_by_ap_id(object.data["actor"])
282 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
285 [[actor, recipient], [recipient, actor]]
286 |> Enum.map(fn [user, other_user] ->
288 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
289 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
292 ["user", "user:pleroma_chat"],
293 {user, %{cm_ref | chat: chat, object: object}}
301 |> add_streamables(streamables)
307 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
308 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
309 Object.increase_vote_count(
310 object.data["inReplyTo"],
319 def handle_object_creation(%{"type" => "Question"} = object, meta) do
320 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
326 def handle_object_creation(object, meta) do
330 defp undo_like(nil, object), do: delete_object(object)
332 defp undo_like(%Object{} = liked_object, object) do
333 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
334 delete_object(object)
338 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
339 object.data["object"]
340 |> Object.get_by_ap_id()
344 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
345 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
346 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
347 {:ok, _} <- Repo.delete(object) do
352 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
353 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
354 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
355 {:ok, _} <- Repo.delete(object) do
361 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
363 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
364 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
365 {:ok, _} <- User.unblock(blocker, blocked),
366 {:ok, _} <- Repo.delete(object) do
371 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
373 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
374 defp delete_object(object) do
375 with {:ok, _} <- Repo.delete(object), do: :ok
378 defp send_notifications(meta) do
379 Keyword.get(meta, :notifications, [])
380 |> Enum.each(fn notification ->
381 Streamer.stream(["user", "user:notification"], notification)
382 Push.send(notification)
388 defp send_streamables(meta) do
389 Keyword.get(meta, :streamables, [])
390 |> Enum.each(fn {topics, items} ->
391 Streamer.stream(topics, items)
397 defp add_streamables(meta, streamables) do
398 existing = Keyword.get(meta, :streamables, [])
401 |> Keyword.put(:streamables, streamables ++ existing)
404 defp add_notifications(meta, notifications) do
405 existing = Keyword.get(meta, :notifications, [])
408 |> Keyword.put(:notifications, notifications ++ existing)
411 def handle_after_transaction(meta) do
413 |> send_notifications()
414 |> send_streamables()