1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
12 alias Pleroma.Activity
13 alias Pleroma.FollowingRelationship
14 alias Pleroma.Notification
18 alias Pleroma.Web.ActivityPub.ActivityPub
19 alias Pleroma.Web.ActivityPub.Builder
20 alias Pleroma.Web.ActivityPub.Pipeline
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Push
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Workers.PollWorker
26 require Pleroma.Constants
29 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
31 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
33 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
36 def handle(object, meta \\ [])
40 # - Sends a notification
47 "object" => follow_activity_id
52 with %Activity{actor: follower_id} = follow_activity <-
53 Activity.get_by_ap_id(follow_activity_id),
54 %User{} = followed <- User.get_cached_by_ap_id(actor),
55 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
56 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
57 {:ok, _follower, followed} <-
58 FollowingRelationship.update(follower, followed, :follow_accept) do
59 Notification.update_notification_type(followed, follow_activity)
66 # - Rejects all existing follow activities for this person
67 # - Updates the follow state
68 # - Dismisses notification
75 "object" => follow_activity_id
80 with %Activity{actor: follower_id} = follow_activity <-
81 Activity.get_by_ap_id(follow_activity_id),
82 %User{} = followed <- User.get_cached_by_ap_id(actor),
83 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
84 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
85 FollowingRelationship.update(follower, followed, :follow_reject)
86 Notification.dismiss(follow_activity)
93 # - Follows if possible
94 # - Sends a notification
95 # - Generates accept or reject if appropriate
102 "object" => followed_user,
103 "actor" => following_user
108 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
109 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
110 {_, {:ok, _, _}, _, _} <-
111 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
112 if followed.local && !followed.is_locked do
113 {:ok, accept_data, _} = Builder.accept(followed, object)
114 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
117 {:following, {:error, _}, _follower, followed} ->
118 {:ok, reject_data, _} = Builder.reject(followed, object)
119 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
125 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
129 |> add_notifications(notifications)
131 updated_object = Activity.get_by_ap_id(follow_id)
133 {:ok, updated_object, meta}
136 # Tasks this handles:
137 # - Unfollow and block
140 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
144 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
145 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
146 User.block(blocker, blocked)
152 # Tasks this handles:
154 # - Update a non-user object (Note, Question, etc.)
156 # For a local user, we also get a changeset with the full information, so we
157 # can update non-federating, non-activitypub settings as well.
159 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
160 updated_object_id = updated_object["id"]
162 with {_, true} <- {:has_id, is_binary(updated_object_id)},
163 %{"type" => type} <- updated_object,
164 {_, is_user} <- {:is_user, type in Pleroma.Constants.actor_types()} do
166 handle_update_user(object, meta)
168 handle_update_object(object, meta)
176 # Tasks this handles:
177 # - Add like to object
178 # - Set up notification
180 def handle(%{data: %{"type" => "Like"}} = object, meta) do
181 liked_object = Object.get_by_ap_id(object.data["object"])
182 Utils.add_like_to_object(object, liked_object)
184 Notification.create_notifications(object)
190 # - Actually create object
191 # - Rollback if we couldn't create it
192 # - Increase the user note count
193 # - Increase the reply count
194 # - Increase replies count
195 # - Set up ActivityExpiration
196 # - Set up notifications
197 # - Index incoming posts for search (if needed)
199 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
200 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
201 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
202 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
203 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
204 {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
206 if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
207 Object.increase_replies_count(in_reply_to)
210 reply_depth = (meta[:depth] || 0) + 1
212 # FIXME: Force inReplyTo to replies
213 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
214 object.data["replies"] != nil do
215 for reply_id <- object.data["replies"] do
216 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
218 "depth" => reply_depth
223 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
224 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
227 Pleroma.Search.add_to_index(Map.put(activity, :object, object))
231 |> add_notifications(notifications)
233 ap_streamer().stream_out(activity)
235 {:ok, activity, meta}
237 e -> Repo.rollback(e)
241 # Tasks this handles:
242 # - Add announce to object
243 # - Set up notification
244 # - Stream out the announce
246 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
247 announced_object = Object.get_by_ap_id(object.data["object"])
248 user = User.get_cached_by_ap_id(object.data["actor"])
250 Utils.add_announce_to_object(object, announced_object)
252 if !User.is_internal_user?(user) do
253 Notification.create_notifications(object)
255 ap_streamer().stream_out(object)
262 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
263 with undone_object <- Activity.get_by_ap_id(undone_object),
264 :ok <- handle_undoing(undone_object) do
269 # Tasks this handles:
270 # - Add reaction to object
271 # - Set up notification
273 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
274 reacted_object = Object.get_by_ap_id(object.data["object"])
275 Utils.add_emoji_reaction_to_object(object, reacted_object)
277 Notification.create_notifications(object)
282 # Tasks this handles:
283 # - Delete and unpins the create activity
284 # - Replace object with Tombstone
285 # - Set up notification
286 # - Reduce the user note count
287 # - Reduce the reply count
288 # - Stream out the activity
289 # - Removes posts from search index (if needed)
291 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
293 Object.normalize(deleted_object, fetch: false) ||
294 User.get_cached_by_ap_id(deleted_object)
297 case deleted_object do
299 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
300 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
301 %User{} = user <- User.get_cached_by_ap_id(actor) do
302 User.remove_pinned_object_id(user, deleted_object.data["id"])
304 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
306 if in_reply_to = deleted_object.data["inReplyTo"] do
307 Object.decrease_replies_count(in_reply_to)
310 ap_streamer().stream_out(object)
311 ap_streamer().stream_out_participations(deleted_object, user)
315 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
320 with {:ok, _} <- User.delete(deleted_object) do
326 Notification.create_notifications(object)
328 # Only remove from index when deleting actual objects, not users or anything else
329 with %Pleroma.Object{} <- deleted_object do
330 Pleroma.Search.remove_from_index(deleted_object)
339 # Tasks this handles:
341 # - removes expiration job for pinned activity, if was set for expiration
343 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
344 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
345 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
346 # if pinned activity was scheduled for deletion, we remove job
347 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
348 Oban.cancel_job(expiration.id)
354 {:error, :user_not_found}
356 {:error, changeset} ->
357 if changeset.errors[:pinned_objects] do
358 {:error, :pinned_statuses_limit_reached}
365 # Tasks this handles:
366 # - removes pin from user
367 # - removes corresponding Add activity
368 # - if activity had expiration, recreates activity expiration job
370 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
371 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
372 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
374 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
377 # if pinned activity was scheduled for deletion, we reschedule it for deletion
378 if meta[:expires_at] do
379 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
381 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
383 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
384 activity_id: meta[:activity_id],
385 expires_at: expires_at
391 nil -> {:error, :user_not_found}
398 def handle(object, meta) do
402 defp handle_update_user(
403 %{data: %{"type" => "Update", "object" => updated_object}} = object,
406 if changeset = Keyword.get(meta, :user_update_changeset) do
408 |> User.update_and_set_cache()
410 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
412 User.get_by_ap_id(updated_object["id"])
413 |> User.remote_user_changeset(new_user_data)
414 |> User.update_and_set_cache()
420 defp handle_update_object(
421 %{data: %{"type" => "Update", "object" => updated_object}} = object,
424 orig_object_ap_id = updated_object["id"]
425 orig_object = Object.get_by_ap_id(orig_object_ap_id)
426 orig_object_data = orig_object.data
430 # If this is a local Update, we don't process it by transmogrifier,
431 # so we use the embedded object as-is.
437 if orig_object_data["type"] in Pleroma.Constants.updatable_object_types() do
439 updated_data: updated_object_data,
441 used_history_in_new_object?: used_history_in_new_object?
442 } = Object.Updater.make_new_object_data_from_update_object(orig_object_data, updated_object)
446 |> Repo.preload(:hashtags)
447 |> Object.change(%{data: updated_object_data})
449 with {:ok, new_object} <- Repo.update(changeset),
450 {:ok, _} <- Object.invalid_object_cache(new_object),
451 {:ok, _} <- Object.set_cache(new_object),
452 # The metadata/utils.ex uses the object id for the cache.
453 {:ok, _} <- Pleroma.Activity.HTML.invalidate_cache_for(new_object.id) do
454 if used_history_in_new_object? do
455 with create_activity when not is_nil(create_activity) <-
456 Pleroma.Activity.get_create_by_object_ap_id(orig_object_ap_id),
457 {:ok, _} <- Pleroma.Activity.HTML.invalidate_cache_for(create_activity.id) do
466 |> Activity.normalize()
467 |> ActivityPub.notify_and_stream()
475 def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
476 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
477 PollWorker.schedule_poll_end(activity)
482 def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
483 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
484 Object.increase_vote_count(
485 object.data["inReplyTo"],
494 def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
495 when objtype in ~w[Audio Video Event Article Note Page] do
496 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
502 def handle_object_creation(object, _activity, meta) do
506 defp undo_like(nil, object), do: delete_object(object)
508 defp undo_like(%Object{} = liked_object, object) do
509 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
510 delete_object(object)
514 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
515 object.data["object"]
516 |> Object.get_by_ap_id()
520 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
521 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
522 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
523 {:ok, _} <- Repo.delete(object) do
528 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
529 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
530 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
531 {:ok, _} <- Repo.delete(object) do
537 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
539 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
540 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
541 {:ok, _} <- User.unblock(blocker, blocked),
542 {:ok, _} <- Repo.delete(object) do
547 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
549 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
550 defp delete_object(object) do
551 with {:ok, _} <- Repo.delete(object), do: :ok
554 defp send_notifications(meta) do
555 Keyword.get(meta, :notifications, [])
556 |> Enum.each(fn notification ->
557 Streamer.stream(["user", "user:notification"], notification)
558 Push.send(notification)
564 defp send_streamables(meta) do
565 Keyword.get(meta, :streamables, [])
566 |> Enum.each(fn {topics, items} ->
567 Streamer.stream(topics, items)
573 defp add_notifications(meta, notifications) do
574 existing = Keyword.get(meta, :notifications, [])
577 |> Keyword.put(:notifications, notifications ++ existing)
581 def handle_after_transaction(meta) do
583 |> send_notifications()
584 |> send_streamables()