1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
12 alias Pleroma.Activity
13 alias Pleroma.Activity.Ir.Topics
15 alias Pleroma.Chat.MessageReference
16 alias Pleroma.FollowingRelationship
17 alias Pleroma.Notification
21 alias Pleroma.Web.ActivityPub.ActivityPub
22 alias Pleroma.Web.ActivityPub.Builder
23 alias Pleroma.Web.ActivityPub.Pipeline
24 alias Pleroma.Web.ActivityPub.Utils
25 alias Pleroma.Web.Push
26 alias Pleroma.Web.Streamer
27 alias Pleroma.Workers.BackgroundWorker
31 def handle(object, meta \\ [])
35 # - Sends a notification
41 "object" => follow_activity_id
46 with %Activity{actor: follower_id} = follow_activity <-
47 Activity.get_by_ap_id(follow_activity_id),
48 %User{} = followed <- User.get_cached_by_ap_id(actor),
49 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
50 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
51 {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
52 Notification.update_notification_type(followed, follow_activity)
53 User.update_follower_count(followed)
54 User.update_following_count(follower)
61 # - Rejects all existing follow activities for this person
62 # - Updates the follow state
63 # - Dismisses notification
69 "object" => follow_activity_id
74 with %Activity{actor: follower_id} = follow_activity <-
75 Activity.get_by_ap_id(follow_activity_id),
76 %User{} = followed <- User.get_cached_by_ap_id(actor),
77 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
78 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
79 FollowingRelationship.update(follower, followed, :follow_reject)
80 Notification.dismiss(follow_activity)
87 # - Follows if possible
88 # - Sends a notification
89 # - Generates accept or reject if appropriate
95 "object" => followed_user,
96 "actor" => following_user
101 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
102 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
103 {_, {:ok, _}, _, _} <-
104 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
105 if followed.local && !followed.is_locked do
106 {:ok, accept_data, _} = Builder.accept(followed, object)
107 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
110 {:following, {:error, _}, _follower, followed} ->
111 {:ok, reject_data, _} = Builder.reject(followed, object)
112 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
118 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
122 |> add_notifications(notifications)
124 updated_object = Activity.get_by_ap_id(follow_id)
126 {:ok, updated_object, meta}
129 # Tasks this handles:
130 # - Unfollow and block
132 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
136 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
137 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
138 User.block(blocker, blocked)
144 # Tasks this handles:
147 # For a local user, we also get a changeset with the full information, so we
148 # can update non-federating, non-activitypub settings as well.
149 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
150 if changeset = Keyword.get(meta, :user_update_changeset) do
152 |> User.update_and_set_cache()
154 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
156 User.get_by_ap_id(updated_object["id"])
157 |> User.remote_user_changeset(new_user_data)
158 |> User.update_and_set_cache()
164 # Tasks this handles:
165 # - Add like to object
166 # - Set up notification
167 def handle(%{data: %{"type" => "Like"}} = object, meta) do
168 liked_object = Object.get_by_ap_id(object.data["object"])
169 Utils.add_like_to_object(object, liked_object)
171 Notification.create_notifications(object)
177 # - Actually create object
178 # - Rollback if we couldn't create it
179 # - Increase the user note count
180 # - Increase the reply count
181 # - Increase replies count
182 # - Set up ActivityExpiration
183 # - Set up notifications
184 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
185 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
186 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
187 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
188 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
190 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
191 Object.increase_replies_count(in_reply_to)
194 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
198 |> add_notifications(notifications)
200 {:ok, activity, meta}
202 e -> Repo.rollback(e)
206 # Tasks this handles:
207 # - Add announce to object
208 # - Set up notification
209 # - Stream out the announce
210 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
211 announced_object = Object.get_by_ap_id(object.data["object"])
212 user = User.get_cached_by_ap_id(object.data["actor"])
214 Utils.add_announce_to_object(object, announced_object)
216 if !User.is_internal_user?(user) do
217 Notification.create_notifications(object)
220 |> Topics.get_activity_topics()
221 |> Streamer.stream(object)
227 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
228 with undone_object <- Activity.get_by_ap_id(undone_object),
229 :ok <- handle_undoing(undone_object) do
234 # Tasks this handles:
235 # - Add reaction to object
236 # - Set up notification
237 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
238 reacted_object = Object.get_by_ap_id(object.data["object"])
239 Utils.add_emoji_reaction_to_object(object, reacted_object)
241 Notification.create_notifications(object)
246 # Tasks this handles:
247 # - Delete and unpins the create activity
248 # - Replace object with Tombstone
249 # - Set up notification
250 # - Reduce the user note count
251 # - Reduce the reply count
252 # - Stream out the activity
253 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
255 Object.normalize(deleted_object, false) ||
256 User.get_cached_by_ap_id(deleted_object)
259 case deleted_object do
261 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
262 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
263 %User{} = user <- User.get_cached_by_ap_id(actor) do
264 User.remove_pinnned_activity(user, activity)
266 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
268 if in_reply_to = deleted_object.data["inReplyTo"] do
269 Object.decrease_replies_count(in_reply_to)
272 MessageReference.delete_for_object(deleted_object)
274 ActivityPub.stream_out(object)
275 ActivityPub.stream_out_participations(deleted_object, user)
279 Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
284 with {:ok, _} <- User.delete(deleted_object) do
290 Notification.create_notifications(object)
298 def handle(object, meta) do
302 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
303 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
304 actor = User.get_cached_by_ap_id(object.data["actor"])
305 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
308 [[actor, recipient], [recipient, actor]]
310 |> Enum.map(fn [user, other_user] ->
312 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
313 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
316 :chat_message_id_idempotency_key_cache,
318 meta[:idempotency_key]
322 ["user", "user:pleroma_chat"],
323 {user, %{cm_ref | chat: chat, object: object}}
331 |> add_streamables(streamables)
337 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
338 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
339 Object.increase_vote_count(
340 object.data["inReplyTo"],
349 def handle_object_creation(%{"type" => objtype} = object, meta)
350 when objtype in ~w[Audio Video Question Event Article] do
351 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
357 def handle_object_creation(object, meta) do
361 defp undo_like(nil, object), do: delete_object(object)
363 defp undo_like(%Object{} = liked_object, object) do
364 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
365 delete_object(object)
369 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
370 object.data["object"]
371 |> Object.get_by_ap_id()
375 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
376 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
377 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
378 {:ok, _} <- Repo.delete(object) do
383 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
384 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
385 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
386 {:ok, _} <- Repo.delete(object) do
392 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
394 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
395 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
396 {:ok, _} <- User.unblock(blocker, blocked),
397 {:ok, _} <- Repo.delete(object) do
402 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
404 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
405 defp delete_object(object) do
406 with {:ok, _} <- Repo.delete(object), do: :ok
409 defp send_notifications(meta) do
410 Keyword.get(meta, :notifications, [])
411 |> Enum.each(fn notification ->
412 Streamer.stream(["user", "user:notification"], notification)
413 Push.send(notification)
419 defp send_streamables(meta) do
420 Keyword.get(meta, :streamables, [])
421 |> Enum.each(fn {topics, items} ->
422 Streamer.stream(topics, items)
428 defp add_streamables(meta, streamables) do
429 existing = Keyword.get(meta, :streamables, [])
432 |> Keyword.put(:streamables, streamables ++ existing)
435 defp add_notifications(meta, notifications) do
436 existing = Keyword.get(meta, :notifications, [])
439 |> Keyword.put(:notifications, notifications ++ existing)
442 def handle_after_transaction(meta) do
444 |> send_notifications()
445 |> send_streamables()