1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
12 alias Pleroma.Activity
13 alias Pleroma.Activity.Ir.Topics
15 alias Pleroma.Chat.MessageReference
16 alias Pleroma.FollowingRelationship
17 alias Pleroma.Notification
21 alias Pleroma.Web.ActivityPub.ActivityPub
22 alias Pleroma.Web.ActivityPub.Builder
23 alias Pleroma.Web.ActivityPub.Pipeline
24 alias Pleroma.Web.ActivityPub.Utils
25 alias Pleroma.Web.Push
26 alias Pleroma.Web.Streamer
30 def handle(object, meta \\ [])
34 # - Sends a notification
40 "object" => follow_activity_id
45 with %Activity{actor: follower_id} = follow_activity <-
46 Activity.get_by_ap_id(follow_activity_id),
47 %User{} = followed <- User.get_cached_by_ap_id(actor),
48 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
49 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
50 {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
51 Notification.update_notification_type(followed, follow_activity)
52 User.update_follower_count(followed)
53 User.update_following_count(follower)
60 # - Rejects all existing follow activities for this person
61 # - Updates the follow state
62 # - Dismisses notification
68 "object" => follow_activity_id
73 with %Activity{actor: follower_id} = follow_activity <-
74 Activity.get_by_ap_id(follow_activity_id),
75 %User{} = followed <- User.get_cached_by_ap_id(actor),
76 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
77 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
78 FollowingRelationship.update(follower, followed, :follow_reject)
79 Notification.dismiss(follow_activity)
86 # - Follows if possible
87 # - Sends a notification
88 # - Generates accept or reject if appropriate
94 "object" => followed_user,
95 "actor" => following_user
100 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
101 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
102 {_, {:ok, _}, _, _} <-
103 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
104 if followed.local && !followed.is_locked do
105 {:ok, accept_data, _} = Builder.accept(followed, object)
106 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
109 {:following, {:error, _}, _follower, followed} ->
110 {:ok, reject_data, _} = Builder.reject(followed, object)
111 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
117 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
121 |> add_notifications(notifications)
123 updated_object = Activity.get_by_ap_id(follow_id)
125 {:ok, updated_object, meta}
128 # Tasks this handles:
129 # - Unfollow and block
131 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
135 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
136 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
137 User.block(blocker, blocked)
143 # Tasks this handles:
146 # For a local user, we also get a changeset with the full information, so we
147 # can update non-federating, non-activitypub settings as well.
148 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
149 if changeset = Keyword.get(meta, :user_update_changeset) do
151 |> User.update_and_set_cache()
153 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
155 User.get_by_ap_id(updated_object["id"])
156 |> User.remote_user_changeset(new_user_data)
157 |> User.update_and_set_cache()
163 # Tasks this handles:
164 # - Add like to object
165 # - Set up notification
166 def handle(%{data: %{"type" => "Like"}} = object, meta) do
167 liked_object = Object.get_by_ap_id(object.data["object"])
168 Utils.add_like_to_object(object, liked_object)
170 Notification.create_notifications(object)
176 # - Actually create object
177 # - Rollback if we couldn't create it
178 # - Increase the user note count
179 # - Increase the reply count
180 # - Increase replies count
181 # - Set up ActivityExpiration
182 # - Set up notifications
183 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
184 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
185 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
186 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
187 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
189 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
190 Object.increase_replies_count(in_reply_to)
193 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
194 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
199 |> add_notifications(notifications)
201 {:ok, activity, meta}
203 e -> Repo.rollback(e)
207 # Tasks this handles:
208 # - Add announce to object
209 # - Set up notification
210 # - Stream out the announce
211 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
212 announced_object = Object.get_by_ap_id(object.data["object"])
213 user = User.get_cached_by_ap_id(object.data["actor"])
215 Utils.add_announce_to_object(object, announced_object)
217 if !User.is_internal_user?(user) do
218 Notification.create_notifications(object)
221 |> Topics.get_activity_topics()
222 |> Streamer.stream(object)
228 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
229 with undone_object <- Activity.get_by_ap_id(undone_object),
230 :ok <- handle_undoing(undone_object) do
235 # Tasks this handles:
236 # - Add reaction to object
237 # - Set up notification
238 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
239 reacted_object = Object.get_by_ap_id(object.data["object"])
240 Utils.add_emoji_reaction_to_object(object, reacted_object)
242 Notification.create_notifications(object)
247 # Tasks this handles:
248 # - Delete and unpins the create activity
249 # - Replace object with Tombstone
250 # - Set up notification
251 # - Reduce the user note count
252 # - Reduce the reply count
253 # - Stream out the activity
254 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
256 Object.normalize(deleted_object, false) ||
257 User.get_cached_by_ap_id(deleted_object)
260 case deleted_object do
262 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
263 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
264 %User{} = user <- User.get_cached_by_ap_id(actor) do
265 User.remove_pinnned_activity(user, activity)
267 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
269 if in_reply_to = deleted_object.data["inReplyTo"] do
270 Object.decrease_replies_count(in_reply_to)
273 MessageReference.delete_for_object(deleted_object)
275 ActivityPub.stream_out(object)
276 ActivityPub.stream_out_participations(deleted_object, user)
280 Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
285 with {:ok, _} <- User.delete(deleted_object) do
291 Notification.create_notifications(object)
299 def handle(object, meta) do
303 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
304 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
305 actor = User.get_cached_by_ap_id(object.data["actor"])
306 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
309 [[actor, recipient], [recipient, actor]]
311 |> Enum.map(fn [user, other_user] ->
313 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
314 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
317 :chat_message_id_idempotency_key_cache,
319 meta[:idempotency_key]
323 ["user", "user:pleroma_chat"],
324 {user, %{cm_ref | chat: chat, object: object}}
332 |> add_streamables(streamables)
338 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
339 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
340 Object.increase_vote_count(
341 object.data["inReplyTo"],
350 def handle_object_creation(%{"type" => objtype} = object, meta)
351 when objtype in ~w[Audio Video Question Event Article] do
352 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
358 def handle_object_creation(object, meta) do
362 defp undo_like(nil, object), do: delete_object(object)
364 defp undo_like(%Object{} = liked_object, object) do
365 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
366 delete_object(object)
370 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
371 object.data["object"]
372 |> Object.get_by_ap_id()
376 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
377 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
378 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
379 {:ok, _} <- Repo.delete(object) do
384 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
385 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
386 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
387 {:ok, _} <- Repo.delete(object) do
393 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
395 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
396 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
397 {:ok, _} <- User.unblock(blocker, blocked),
398 {:ok, _} <- Repo.delete(object) do
403 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
405 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
406 defp delete_object(object) do
407 with {:ok, _} <- Repo.delete(object), do: :ok
410 defp send_notifications(meta) do
411 Keyword.get(meta, :notifications, [])
412 |> Enum.each(fn notification ->
413 Streamer.stream(["user", "user:notification"], notification)
414 Push.send(notification)
420 defp send_streamables(meta) do
421 Keyword.get(meta, :streamables, [])
422 |> Enum.each(fn {topics, items} ->
423 Streamer.stream(topics, items)
429 defp add_streamables(meta, streamables) do
430 existing = Keyword.get(meta, :streamables, [])
433 |> Keyword.put(:streamables, streamables ++ existing)
436 defp add_notifications(meta, notifications) do
437 existing = Keyword.get(meta, :notifications, [])
440 |> Keyword.put(:notifications, notifications ++ existing)
443 def handle_after_transaction(meta) do
445 |> send_notifications()
446 |> send_streamables()