1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
12 alias Pleroma.Activity
13 alias Pleroma.Activity.Ir.Topics
15 alias Pleroma.Chat.MessageReference
16 alias Pleroma.FollowingRelationship
17 alias Pleroma.Notification
21 alias Pleroma.Web.ActivityPub.ActivityPub
22 alias Pleroma.Web.ActivityPub.Builder
23 alias Pleroma.Web.ActivityPub.Pipeline
24 alias Pleroma.Web.ActivityPub.Utils
25 alias Pleroma.Web.Push
26 alias Pleroma.Web.Streamer
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @ap_streamer Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
32 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
34 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
37 def handle(object, meta \\ [])
41 # - Sends a notification
48 "object" => follow_activity_id
53 with %Activity{actor: follower_id} = follow_activity <-
54 Activity.get_by_ap_id(follow_activity_id),
55 %User{} = followed <- User.get_cached_by_ap_id(actor),
56 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
57 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
58 {:ok, _follower, followed} <-
59 FollowingRelationship.update(follower, followed, :follow_accept) do
60 Notification.update_notification_type(followed, follow_activity)
67 # - Rejects all existing follow activities for this person
68 # - Updates the follow state
69 # - Dismisses notification
76 "object" => follow_activity_id
81 with %Activity{actor: follower_id} = follow_activity <-
82 Activity.get_by_ap_id(follow_activity_id),
83 %User{} = followed <- User.get_cached_by_ap_id(actor),
84 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
85 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
86 FollowingRelationship.update(follower, followed, :follow_reject)
87 Notification.dismiss(follow_activity)
94 # - Follows if possible
95 # - Sends a notification
96 # - Generates accept or reject if appropriate
103 "object" => followed_user,
104 "actor" => following_user
109 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
110 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
111 {_, {:ok, _, _}, _, _} <-
112 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
113 if followed.local && !followed.is_locked do
114 {:ok, accept_data, _} = Builder.accept(followed, object)
115 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
118 {:following, {:error, _}, _follower, followed} ->
119 {:ok, reject_data, _} = Builder.reject(followed, object)
120 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
126 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
130 |> add_notifications(notifications)
132 updated_object = Activity.get_by_ap_id(follow_id)
134 {:ok, updated_object, meta}
137 # Tasks this handles:
138 # - Unfollow and block
141 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
145 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
146 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
147 User.block(blocker, blocked)
153 # Tasks this handles:
156 # For a local user, we also get a changeset with the full information, so we
157 # can update non-federating, non-activitypub settings as well.
159 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
160 if changeset = Keyword.get(meta, :user_update_changeset) do
162 |> User.update_and_set_cache()
164 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
166 User.get_by_ap_id(updated_object["id"])
167 |> User.remote_user_changeset(new_user_data)
168 |> User.update_and_set_cache()
174 # Tasks this handles:
175 # - Add like to object
176 # - Set up notification
178 def handle(%{data: %{"type" => "Like"}} = object, meta) do
179 liked_object = Object.get_by_ap_id(object.data["object"])
180 Utils.add_like_to_object(object, liked_object)
182 Notification.create_notifications(object)
188 # - Actually create object
189 # - Rollback if we couldn't create it
190 # - Increase the user note count
191 # - Increase the reply count
192 # - Increase replies count
193 # - Set up ActivityExpiration
194 # - Set up notifications
196 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
197 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
198 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
199 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
200 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
202 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
203 Object.increase_replies_count(in_reply_to)
206 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
207 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
212 |> add_notifications(notifications)
214 {:ok, activity, meta}
216 e -> Repo.rollback(e)
220 # Tasks this handles:
221 # - Add announce to object
222 # - Set up notification
223 # - Stream out the announce
225 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
226 announced_object = Object.get_by_ap_id(object.data["object"])
227 user = User.get_cached_by_ap_id(object.data["actor"])
229 Utils.add_announce_to_object(object, announced_object)
231 if !User.is_internal_user?(user) do
232 Notification.create_notifications(object)
235 |> Topics.get_activity_topics()
236 |> Streamer.stream(object)
243 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
244 with undone_object <- Activity.get_by_ap_id(undone_object),
245 :ok <- handle_undoing(undone_object) do
250 # Tasks this handles:
251 # - Add reaction to object
252 # - Set up notification
254 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
255 reacted_object = Object.get_by_ap_id(object.data["object"])
256 Utils.add_emoji_reaction_to_object(object, reacted_object)
258 Notification.create_notifications(object)
263 # Tasks this handles:
264 # - Delete and unpins the create activity
265 # - Replace object with Tombstone
266 # - Set up notification
267 # - Reduce the user note count
268 # - Reduce the reply count
269 # - Stream out the activity
271 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
273 Object.normalize(deleted_object, fetch: false) ||
274 User.get_cached_by_ap_id(deleted_object)
277 case deleted_object do
279 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
280 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
281 %User{} = user <- User.get_cached_by_ap_id(actor) do
282 User.remove_pinned_object_id(user, deleted_object.data["id"])
284 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
286 if in_reply_to = deleted_object.data["inReplyTo"] do
287 Object.decrease_replies_count(in_reply_to)
290 MessageReference.delete_for_object(deleted_object)
292 @ap_streamer.stream_out(object)
293 @ap_streamer.stream_out_participations(deleted_object, user)
297 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
302 with {:ok, _} <- User.delete(deleted_object) do
308 Notification.create_notifications(object)
315 # Tasks this handles:
317 # - removes expiration job for pinned activity, if was set for expiration
319 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
320 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
321 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
322 # if pinned activity was scheduled for deletion, we remove job
323 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
324 Oban.cancel_job(expiration.id)
330 {:error, :user_not_found}
332 {:error, changeset} ->
333 if changeset.errors[:pinned_objects] do
334 {:error, :pinned_statuses_limit_reached}
341 # Tasks this handles:
342 # - removes pin from user
343 # - removes corresponding Add activity
344 # - if activity had expiration, recreates activity expiration job
346 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
347 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
348 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
350 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
353 # if pinned activity was scheduled for deletion, we reschedule it for deletion
354 if meta[:expires_at] do
355 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
357 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
359 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
360 activity_id: meta[:activity_id],
361 expires_at: expires_at
367 nil -> {:error, :user_not_found}
374 def handle(object, meta) do
378 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
379 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
380 actor = User.get_cached_by_ap_id(object.data["actor"])
381 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
384 [[actor, recipient], [recipient, actor]]
386 |> Enum.map(fn [user, other_user] ->
388 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
389 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
392 :chat_message_id_idempotency_key_cache,
394 meta[:idempotency_key]
398 ["user", "user:pleroma_chat"],
399 {user, %{cm_ref | chat: chat, object: object}}
407 |> add_streamables(streamables)
413 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
414 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
415 Object.increase_vote_count(
416 object.data["inReplyTo"],
425 def handle_object_creation(%{"type" => objtype} = object, meta)
426 when objtype in ~w[Audio Video Question Event Article] do
427 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
433 def handle_object_creation(object, meta) do
437 defp undo_like(nil, object), do: delete_object(object)
439 defp undo_like(%Object{} = liked_object, object) do
440 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
441 delete_object(object)
445 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
446 object.data["object"]
447 |> Object.get_by_ap_id()
451 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
452 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
453 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
454 {:ok, _} <- Repo.delete(object) do
459 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
460 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
461 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
462 {:ok, _} <- Repo.delete(object) do
468 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
470 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
471 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
472 {:ok, _} <- User.unblock(blocker, blocked),
473 {:ok, _} <- Repo.delete(object) do
478 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
480 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
481 defp delete_object(object) do
482 with {:ok, _} <- Repo.delete(object), do: :ok
485 defp send_notifications(meta) do
486 Keyword.get(meta, :notifications, [])
487 |> Enum.each(fn notification ->
488 Streamer.stream(["user", "user:notification"], notification)
489 Push.send(notification)
495 defp send_streamables(meta) do
496 Keyword.get(meta, :streamables, [])
497 |> Enum.each(fn {topics, items} ->
498 Streamer.stream(topics, items)
504 defp add_streamables(meta, streamables) do
505 existing = Keyword.get(meta, :streamables, [])
508 |> Keyword.put(:streamables, streamables ++ existing)
511 defp add_notifications(meta, notifications) do
512 existing = Keyword.get(meta, :notifications, [])
515 |> Keyword.put(:notifications, notifications ++ existing)
519 def handle_after_transaction(meta) do
521 |> send_notifications()
522 |> send_streamables()