1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
12 alias Pleroma.Activity
13 alias Pleroma.Activity.Ir.Topics
15 alias Pleroma.Chat.MessageReference
16 alias Pleroma.FollowingRelationship
17 alias Pleroma.Notification
21 alias Pleroma.Web.ActivityPub.ActivityPub
22 alias Pleroma.Web.ActivityPub.Builder
23 alias Pleroma.Web.ActivityPub.Pipeline
24 alias Pleroma.Web.ActivityPub.Utils
25 alias Pleroma.Web.Push
26 alias Pleroma.Web.Streamer
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
33 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
35 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
38 def handle(object, meta \\ [])
42 # - Sends a notification
49 "object" => follow_activity_id
54 with %Activity{actor: follower_id} = follow_activity <-
55 Activity.get_by_ap_id(follow_activity_id),
56 %User{} = followed <- User.get_cached_by_ap_id(actor),
57 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
58 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
59 {:ok, _follower, followed} <-
60 FollowingRelationship.update(follower, followed, :follow_accept) do
61 Notification.update_notification_type(followed, follow_activity)
68 # - Rejects all existing follow activities for this person
69 # - Updates the follow state
70 # - Dismisses notification
77 "object" => follow_activity_id
82 with %Activity{actor: follower_id} = follow_activity <-
83 Activity.get_by_ap_id(follow_activity_id),
84 %User{} = followed <- User.get_cached_by_ap_id(actor),
85 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
86 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
87 FollowingRelationship.update(follower, followed, :follow_reject)
88 Notification.dismiss(follow_activity)
95 # - Follows if possible
96 # - Sends a notification
97 # - Generates accept or reject if appropriate
104 "object" => followed_user,
105 "actor" => following_user
110 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
111 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
112 {_, {:ok, _, _}, _, _} <-
113 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
114 if followed.local && !followed.is_locked do
115 {:ok, accept_data, _} = Builder.accept(followed, object)
116 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
119 {:following, {:error, _}, _follower, followed} ->
120 {:ok, reject_data, _} = Builder.reject(followed, object)
121 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
127 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
131 |> add_notifications(notifications)
133 updated_object = Activity.get_by_ap_id(follow_id)
135 {:ok, updated_object, meta}
138 # Tasks this handles:
139 # - Unfollow and block
142 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
146 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
147 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
148 User.block(blocker, blocked)
154 # Tasks this handles:
157 # For a local user, we also get a changeset with the full information, so we
158 # can update non-federating, non-activitypub settings as well.
160 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
161 if changeset = Keyword.get(meta, :user_update_changeset) do
163 |> User.update_and_set_cache()
165 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
167 User.get_by_ap_id(updated_object["id"])
168 |> User.remote_user_changeset(new_user_data)
169 |> User.update_and_set_cache()
175 # Tasks this handles:
176 # - Add like to object
177 # - Set up notification
179 def handle(%{data: %{"type" => "Like"}} = object, meta) do
180 liked_object = Object.get_by_ap_id(object.data["object"])
181 Utils.add_like_to_object(object, liked_object)
183 Notification.create_notifications(object)
189 # - Actually create object
190 # - Rollback if we couldn't create it
191 # - Increase the user note count
192 # - Increase the reply count
193 # - Increase replies count
194 # - Set up ActivityExpiration
195 # - Set up notifications
197 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
198 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
199 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
200 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
201 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
203 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
204 Object.increase_replies_count(in_reply_to)
207 reply_depth = (meta[:depth] || 0) + 1
209 # FIXME: Force inReplyTo to replies
210 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
211 object.data["replies"] != nil do
212 for reply_id <- object.data["replies"] do
213 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
215 "depth" => reply_depth
220 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
221 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
226 |> add_notifications(notifications)
228 {:ok, activity, meta}
230 e -> Repo.rollback(e)
234 # Tasks this handles:
235 # - Add announce to object
236 # - Set up notification
237 # - Stream out the announce
239 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
240 announced_object = Object.get_by_ap_id(object.data["object"])
241 user = User.get_cached_by_ap_id(object.data["actor"])
243 Utils.add_announce_to_object(object, announced_object)
245 if !User.is_internal_user?(user) do
246 Notification.create_notifications(object)
249 |> Topics.get_activity_topics()
250 |> Streamer.stream(object)
257 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
258 with undone_object <- Activity.get_by_ap_id(undone_object),
259 :ok <- handle_undoing(undone_object) do
264 # Tasks this handles:
265 # - Add reaction to object
266 # - Set up notification
268 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
269 reacted_object = Object.get_by_ap_id(object.data["object"])
270 Utils.add_emoji_reaction_to_object(object, reacted_object)
272 Notification.create_notifications(object)
277 # Tasks this handles:
278 # - Delete and unpins the create activity
279 # - Replace object with Tombstone
280 # - Set up notification
281 # - Reduce the user note count
282 # - Reduce the reply count
283 # - Stream out the activity
285 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
287 Object.normalize(deleted_object, fetch: false) ||
288 User.get_cached_by_ap_id(deleted_object)
291 case deleted_object do
293 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
294 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
295 %User{} = user <- User.get_cached_by_ap_id(actor) do
296 User.remove_pinned_object_id(user, deleted_object.data["id"])
298 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
300 if in_reply_to = deleted_object.data["inReplyTo"] do
301 Object.decrease_replies_count(in_reply_to)
304 MessageReference.delete_for_object(deleted_object)
306 ap_streamer().stream_out(object)
307 ap_streamer().stream_out_participations(deleted_object, user)
311 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
316 with {:ok, _} <- User.delete(deleted_object) do
322 Notification.create_notifications(object)
329 # Tasks this handles:
331 # - removes expiration job for pinned activity, if was set for expiration
333 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
334 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
335 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
336 # if pinned activity was scheduled for deletion, we remove job
337 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
338 Oban.cancel_job(expiration.id)
344 {:error, :user_not_found}
346 {:error, changeset} ->
347 if changeset.errors[:pinned_objects] do
348 {:error, :pinned_statuses_limit_reached}
355 # Tasks this handles:
356 # - removes pin from user
357 # - removes corresponding Add activity
358 # - if activity had expiration, recreates activity expiration job
360 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
361 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
362 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
364 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
367 # if pinned activity was scheduled for deletion, we reschedule it for deletion
368 if meta[:expires_at] do
369 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
371 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
373 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
374 activity_id: meta[:activity_id],
375 expires_at: expires_at
381 nil -> {:error, :user_not_found}
388 def handle(object, meta) do
392 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
393 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
394 actor = User.get_cached_by_ap_id(object.data["actor"])
395 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
398 [[actor, recipient], [recipient, actor]]
400 |> Enum.map(fn [user, other_user] ->
402 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
403 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
406 :chat_message_id_idempotency_key_cache,
408 meta[:idempotency_key]
412 ["user", "user:pleroma_chat"],
413 {user, %{cm_ref | chat: chat, object: object}}
421 |> add_streamables(streamables)
427 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
428 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
429 Object.increase_vote_count(
430 object.data["inReplyTo"],
439 def handle_object_creation(%{"type" => objtype} = object, meta)
440 when objtype in ~w[Audio Video Question Event Article Note Page] do
441 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
447 def handle_object_creation(object, meta) do
451 defp undo_like(nil, object), do: delete_object(object)
453 defp undo_like(%Object{} = liked_object, object) do
454 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
455 delete_object(object)
459 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
460 object.data["object"]
461 |> Object.get_by_ap_id()
465 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
466 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
467 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
468 {:ok, _} <- Repo.delete(object) do
473 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
474 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
475 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
476 {:ok, _} <- Repo.delete(object) do
482 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
484 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
485 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
486 {:ok, _} <- User.unblock(blocker, blocked),
487 {:ok, _} <- Repo.delete(object) do
492 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
494 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
495 defp delete_object(object) do
496 with {:ok, _} <- Repo.delete(object), do: :ok
499 defp send_notifications(meta) do
500 Keyword.get(meta, :notifications, [])
501 |> Enum.each(fn notification ->
502 Streamer.stream(["user", "user:notification"], notification)
503 Push.send(notification)
509 defp send_streamables(meta) do
510 Keyword.get(meta, :streamables, [])
511 |> Enum.each(fn {topics, items} ->
512 Streamer.stream(topics, items)
518 defp add_streamables(meta, streamables) do
519 existing = Keyword.get(meta, :streamables, [])
522 |> Keyword.put(:streamables, streamables ++ existing)
525 defp add_notifications(meta, notifications) do
526 existing = Keyword.get(meta, :notifications, [])
529 |> Keyword.put(:notifications, notifications ++ existing)
533 def handle_after_transaction(meta) do
535 |> send_notifications()
536 |> send_streamables()