1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
12 alias Pleroma.Activity
14 alias Pleroma.Chat.MessageReference
15 alias Pleroma.FollowingRelationship
16 alias Pleroma.Notification
20 alias Pleroma.Web.ActivityPub.ActivityPub
21 alias Pleroma.Web.ActivityPub.Builder
22 alias Pleroma.Web.ActivityPub.Pipeline
23 alias Pleroma.Web.ActivityPub.Utils
24 alias Pleroma.Web.Push
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Workers.PollWorker
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
33 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
35 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
38 def handle(object, meta \\ [])
42 # - Sends a notification
49 "object" => follow_activity_id
54 with %Activity{actor: follower_id} = follow_activity <-
55 Activity.get_by_ap_id(follow_activity_id),
56 %User{} = followed <- User.get_cached_by_ap_id(actor),
57 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
58 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
59 {:ok, _follower, followed} <-
60 FollowingRelationship.update(follower, followed, :follow_accept) do
61 Notification.update_notification_type(followed, follow_activity)
68 # - Rejects all existing follow activities for this person
69 # - Updates the follow state
70 # - Dismisses notification
77 "object" => follow_activity_id
82 with %Activity{actor: follower_id} = follow_activity <-
83 Activity.get_by_ap_id(follow_activity_id),
84 %User{} = followed <- User.get_cached_by_ap_id(actor),
85 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
86 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
87 FollowingRelationship.update(follower, followed, :follow_reject)
88 Notification.dismiss(follow_activity)
95 # - Follows if possible
96 # - Sends a notification
97 # - Generates accept or reject if appropriate
104 "object" => followed_user,
105 "actor" => following_user
110 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
111 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
112 {_, {:ok, _, _}, _, _} <-
113 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
114 if followed.local && !followed.is_locked do
115 {:ok, accept_data, _} = Builder.accept(followed, object)
116 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
119 {:following, {:error, _}, _follower, followed} ->
120 {:ok, reject_data, _} = Builder.reject(followed, object)
121 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
127 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
131 |> add_notifications(notifications)
133 updated_object = Activity.get_by_ap_id(follow_id)
135 {:ok, updated_object, meta}
138 # Tasks this handles:
139 # - Unfollow and block
142 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
146 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
147 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
148 User.block(blocker, blocked)
154 # Tasks this handles:
157 # For a local user, we also get a changeset with the full information, so we
158 # can update non-federating, non-activitypub settings as well.
160 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
161 if changeset = Keyword.get(meta, :user_update_changeset) do
163 |> User.update_and_set_cache()
165 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
167 User.get_by_ap_id(updated_object["id"])
168 |> User.remote_user_changeset(new_user_data)
169 |> User.update_and_set_cache()
175 # Tasks this handles:
176 # - Add like to object
177 # - Set up notification
179 def handle(%{data: %{"type" => "Like"}} = object, meta) do
180 liked_object = Object.get_by_ap_id(object.data["object"])
181 Utils.add_like_to_object(object, liked_object)
183 Notification.create_notifications(object)
189 # - Actually create object
190 # - Rollback if we couldn't create it
191 # - Increase the user note count
192 # - Increase the reply count
193 # - Increase replies count
194 # - Set up ActivityExpiration
195 # - Set up notifications
197 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
198 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
199 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
200 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
201 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
203 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
204 Object.increase_replies_count(in_reply_to)
207 reply_depth = (meta[:depth] || 0) + 1
209 # FIXME: Force inReplyTo to replies
210 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
211 object.data["replies"] != nil do
212 for reply_id <- object.data["replies"] do
213 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
215 "depth" => reply_depth
220 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
221 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
226 |> add_notifications(notifications)
228 ap_streamer().stream_out(activity)
230 {:ok, activity, meta}
232 e -> Repo.rollback(e)
236 # Tasks this handles:
237 # - Add announce to object
238 # - Set up notification
239 # - Stream out the announce
241 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
242 announced_object = Object.get_by_ap_id(object.data["object"])
243 user = User.get_cached_by_ap_id(object.data["actor"])
245 Utils.add_announce_to_object(object, announced_object)
247 if !User.is_internal_user?(user) do
248 Notification.create_notifications(object)
250 ap_streamer().stream_out(object)
257 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
258 with undone_object <- Activity.get_by_ap_id(undone_object),
259 :ok <- handle_undoing(undone_object) do
264 # Tasks this handles:
265 # - Add reaction to object
266 # - Set up notification
268 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
269 reacted_object = Object.get_by_ap_id(object.data["object"])
270 Utils.add_emoji_reaction_to_object(object, reacted_object)
272 Notification.create_notifications(object)
277 # Tasks this handles:
278 # - Delete and unpins the create activity
279 # - Replace object with Tombstone
280 # - Set up notification
281 # - Reduce the user note count
282 # - Reduce the reply count
283 # - Stream out the activity
285 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
287 Object.normalize(deleted_object, fetch: false) ||
288 User.get_cached_by_ap_id(deleted_object)
291 case deleted_object do
293 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
294 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
295 %User{} = user <- User.get_cached_by_ap_id(actor) do
296 User.remove_pinned_object_id(user, deleted_object.data["id"])
298 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
300 if in_reply_to = deleted_object.data["inReplyTo"] do
301 Object.decrease_replies_count(in_reply_to)
304 MessageReference.delete_for_object(deleted_object)
306 ap_streamer().stream_out(object)
307 ap_streamer().stream_out_participations(deleted_object, user)
311 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
316 with {:ok, _} <- User.delete(deleted_object) do
322 Notification.create_notifications(object)
329 # Tasks this handles:
331 # - removes expiration job for pinned activity, if was set for expiration
333 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
334 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
335 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
336 # if pinned activity was scheduled for deletion, we remove job
337 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
338 Oban.cancel_job(expiration.id)
344 {:error, :user_not_found}
346 {:error, changeset} ->
347 if changeset.errors[:pinned_objects] do
348 {:error, :pinned_statuses_limit_reached}
355 # Tasks this handles:
356 # - removes pin from user
357 # - removes corresponding Add activity
358 # - if activity had expiration, recreates activity expiration job
360 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
361 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
362 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
364 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
367 # if pinned activity was scheduled for deletion, we reschedule it for deletion
368 if meta[:expires_at] do
369 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
371 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
373 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
374 activity_id: meta[:activity_id],
375 expires_at: expires_at
381 nil -> {:error, :user_not_found}
388 def handle(object, meta) do
392 def handle_object_creation(%{"type" => "ChatMessage"} = object, _activity, meta) do
393 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
394 actor = User.get_cached_by_ap_id(object.data["actor"])
395 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
398 [[actor, recipient], [recipient, actor]]
400 |> Enum.map(fn [user, other_user] ->
402 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
403 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
406 :chat_message_id_idempotency_key_cache,
408 meta[:idempotency_key]
412 ["user", "user:pleroma_chat"],
413 {user, %{cm_ref | chat: chat, object: object}}
421 |> add_streamables(streamables)
427 def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
428 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
429 PollWorker.schedule_poll_end(activity)
434 def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
435 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
436 Object.increase_vote_count(
437 object.data["inReplyTo"],
446 def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
447 when objtype in ~w[Audio Video Event Article Note Page] do
448 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
454 def handle_object_creation(object, _activity, meta) do
458 defp undo_like(nil, object), do: delete_object(object)
460 defp undo_like(%Object{} = liked_object, object) do
461 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
462 delete_object(object)
466 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
467 object.data["object"]
468 |> Object.get_by_ap_id()
472 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
473 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
474 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
475 {:ok, _} <- Repo.delete(object) do
480 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
481 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
482 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
483 {:ok, _} <- Repo.delete(object) do
489 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
491 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
492 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
493 {:ok, _} <- User.unblock(blocker, blocked),
494 {:ok, _} <- Repo.delete(object) do
499 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
501 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
502 defp delete_object(object) do
503 with {:ok, _} <- Repo.delete(object), do: :ok
506 defp send_notifications(meta) do
507 Keyword.get(meta, :notifications, [])
508 |> Enum.each(fn notification ->
509 Streamer.stream(["user", "user:notification"], notification)
510 Push.send(notification)
516 defp send_streamables(meta) do
517 Keyword.get(meta, :streamables, [])
518 |> Enum.each(fn {topics, items} ->
519 Streamer.stream(topics, items)
525 defp add_streamables(meta, streamables) do
526 existing = Keyword.get(meta, :streamables, [])
529 |> Keyword.put(:streamables, streamables ++ existing)
532 defp add_notifications(meta, notifications) do
533 existing = Keyword.get(meta, :notifications, [])
536 |> Keyword.put(:notifications, notifications ++ existing)
540 def handle_after_transaction(activity, meta) do
541 Pleroma.Elasticsearch.put_by_id(activity.id)
544 |> send_notifications()
545 |> send_streamables()