1 defmodule Pleroma.Web.ActivityPub.SideEffects do
3 This module looks at an inserted object and executes the side effects that it
4 implies. For example, a `Like` activity will increase the like count on the
5 liked object, a `Follow` activity will add the user to the follower
9 alias Pleroma.Activity.Ir.Topics
10 alias Pleroma.ActivityExpiration
12 alias Pleroma.Chat.MessageReference
13 alias Pleroma.FollowingRelationship
14 alias Pleroma.Notification
18 alias Pleroma.Web.ActivityPub.ActivityPub
19 alias Pleroma.Web.ActivityPub.Builder
20 alias Pleroma.Web.ActivityPub.Pipeline
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Push
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Workers.BackgroundWorker
25 alias Pleroma.FollowingRelationship
27 def handle(object, meta \\ [])
31 # - Sends a notification
37 "object" => follow_activity_id
42 with %Activity{actor: follower_id} = follow_activity <-
43 Activity.get_by_ap_id(follow_activity_id),
44 %User{} = followed <- User.get_cached_by_ap_id(actor),
45 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
46 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
47 {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
48 Notification.update_notification_type(followed, follow_activity)
49 User.update_follower_count(followed)
50 User.update_following_count(follower)
57 # - Follows if possible
58 # - Sends a notification
59 # - Generates accept or reject if appropriate
65 "object" => followed_user,
66 "actor" => following_user
71 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
72 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
73 {_, {:ok, _}, _, _} <-
74 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
75 if followed.local && !followed.locked do
76 {:ok, accept_data, _} = Builder.accept(followed, object)
77 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
80 {:following, {:error, _}, follower, followed} ->
81 Utils.update_follow_state_for_all(object, "reject")
82 FollowingRelationship.update(follower, followed, :follow_reject)
91 |> ActivityPub.reject()
98 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
102 |> add_notifications(notifications)
104 updated_object = Activity.get_by_ap_id(follow_id)
106 {:ok, updated_object, meta}
109 # Tasks this handles:
110 # - Unfollow and block
112 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
116 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
117 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
118 User.block(blocker, blocked)
124 # Tasks this handles:
127 # For a local user, we also get a changeset with the full information, so we
128 # can update non-federating, non-activitypub settings as well.
129 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
130 if changeset = Keyword.get(meta, :user_update_changeset) do
132 |> User.update_and_set_cache()
134 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
136 User.get_by_ap_id(updated_object["id"])
137 |> User.remote_user_changeset(new_user_data)
138 |> User.update_and_set_cache()
144 # Tasks this handles:
145 # - Add like to object
146 # - Set up notification
147 def handle(%{data: %{"type" => "Like"}} = object, meta) do
148 liked_object = Object.get_by_ap_id(object.data["object"])
149 Utils.add_like_to_object(object, liked_object)
151 Notification.create_notifications(object)
157 # - Actually create object
158 # - Rollback if we couldn't create it
159 # - Increase the user note count
160 # - Increase the reply count
161 # - Increase replies count
162 # - Set up ActivityExpiration
163 # - Set up notifications
164 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
165 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
166 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
167 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
168 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
170 if in_reply_to = object.data["inReplyTo"] do
171 Object.increase_replies_count(in_reply_to)
174 if expires_at = activity.data["expires_at"] do
175 ActivityExpiration.create(activity, expires_at)
178 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
182 |> add_notifications(notifications)
184 {:ok, activity, meta}
186 e -> Repo.rollback(e)
190 # Tasks this handles:
191 # - Add announce to object
192 # - Set up notification
193 # - Stream out the announce
194 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
195 announced_object = Object.get_by_ap_id(object.data["object"])
196 user = User.get_cached_by_ap_id(object.data["actor"])
198 Utils.add_announce_to_object(object, announced_object)
200 if !User.is_internal_user?(user) do
201 Notification.create_notifications(object)
204 |> Topics.get_activity_topics()
205 |> Streamer.stream(object)
211 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
212 with undone_object <- Activity.get_by_ap_id(undone_object),
213 :ok <- handle_undoing(undone_object) do
218 # Tasks this handles:
219 # - Add reaction to object
220 # - Set up notification
221 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
222 reacted_object = Object.get_by_ap_id(object.data["object"])
223 Utils.add_emoji_reaction_to_object(object, reacted_object)
225 Notification.create_notifications(object)
230 # Tasks this handles:
231 # - Delete and unpins the create activity
232 # - Replace object with Tombstone
233 # - Set up notification
234 # - Reduce the user note count
235 # - Reduce the reply count
236 # - Stream out the activity
237 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
239 Object.normalize(deleted_object, false) || User.get_cached_by_ap_id(deleted_object)
242 case deleted_object do
244 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
245 %User{} = user <- User.get_cached_by_ap_id(deleted_object.data["actor"]) do
246 User.remove_pinnned_activity(user, activity)
248 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
250 if in_reply_to = deleted_object.data["inReplyTo"] do
251 Object.decrease_replies_count(in_reply_to)
254 MessageReference.delete_for_object(deleted_object)
256 ActivityPub.stream_out(object)
257 ActivityPub.stream_out_participations(deleted_object, user)
262 with {:ok, _} <- User.delete(deleted_object) do
268 Notification.create_notifications(object)
276 def handle(object, meta) do
280 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
281 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
282 actor = User.get_cached_by_ap_id(object.data["actor"])
283 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
286 [[actor, recipient], [recipient, actor]]
287 |> Enum.map(fn [user, other_user] ->
289 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
290 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
293 ["user", "user:pleroma_chat"],
294 {user, %{cm_ref | chat: chat, object: object}}
302 |> add_streamables(streamables)
308 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
309 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
310 Object.increase_vote_count(
311 object.data["inReplyTo"],
320 def handle_object_creation(%{"type" => "Question"} = object, meta) do
321 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
327 def handle_object_creation(object, meta) do
331 defp undo_like(nil, object), do: delete_object(object)
333 defp undo_like(%Object{} = liked_object, object) do
334 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
335 delete_object(object)
339 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
340 object.data["object"]
341 |> Object.get_by_ap_id()
345 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
346 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
347 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
348 {:ok, _} <- Repo.delete(object) do
353 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
354 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
355 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
356 {:ok, _} <- Repo.delete(object) do
362 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
364 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
365 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
366 {:ok, _} <- User.unblock(blocker, blocked),
367 {:ok, _} <- Repo.delete(object) do
372 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
374 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
375 defp delete_object(object) do
376 with {:ok, _} <- Repo.delete(object), do: :ok
379 defp send_notifications(meta) do
380 Keyword.get(meta, :notifications, [])
381 |> Enum.each(fn notification ->
382 Streamer.stream(["user", "user:notification"], notification)
383 Push.send(notification)
389 defp send_streamables(meta) do
390 Keyword.get(meta, :streamables, [])
391 |> Enum.each(fn {topics, items} ->
392 Streamer.stream(topics, items)
398 defp add_streamables(meta, streamables) do
399 existing = Keyword.get(meta, :streamables, [])
402 |> Keyword.put(:streamables, streamables ++ existing)
405 defp add_notifications(meta, notifications) do
406 existing = Keyword.get(meta, :notifications, [])
409 |> Keyword.put(:notifications, notifications ++ existing)
412 def handle_after_transaction(meta) do
414 |> send_notifications()
415 |> send_streamables()