1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
12 alias Pleroma.Activity
13 alias Pleroma.FollowingRelationship
14 alias Pleroma.Notification
18 alias Pleroma.Web.ActivityPub.ActivityPub
19 alias Pleroma.Web.ActivityPub.Builder
20 alias Pleroma.Web.ActivityPub.Pipeline
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Push
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Workers.PollWorker
26 require Pleroma.Constants
29 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
31 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
33 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
36 def handle(object, meta \\ [])
40 # - Sends a notification
47 "object" => follow_activity_id
52 with %Activity{actor: follower_id} = follow_activity <-
53 Activity.get_by_ap_id(follow_activity_id),
54 %User{} = followed <- User.get_cached_by_ap_id(actor),
55 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
56 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
57 {:ok, _follower, followed} <-
58 FollowingRelationship.update(follower, followed, :follow_accept) do
59 Notification.update_notification_type(followed, follow_activity)
66 # - Rejects all existing follow activities for this person
67 # - Updates the follow state
68 # - Dismisses notification
75 "object" => follow_activity_id
80 with %Activity{actor: follower_id} = follow_activity <-
81 Activity.get_by_ap_id(follow_activity_id),
82 %User{} = followed <- User.get_cached_by_ap_id(actor),
83 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
84 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
85 FollowingRelationship.update(follower, followed, :follow_reject)
86 Notification.dismiss(follow_activity)
93 # - Follows if possible
94 # - Sends a notification
95 # - Generates accept or reject if appropriate
102 "object" => followed_user,
103 "actor" => following_user
108 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
109 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
110 {_, {:ok, _, _}, _, _} <-
111 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
112 if followed.local && !followed.is_locked do
113 {:ok, accept_data, _} = Builder.accept(followed, object)
114 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
117 {:following, {:error, _}, _follower, followed} ->
118 {:ok, reject_data, _} = Builder.reject(followed, object)
119 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
125 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
129 |> add_notifications(notifications)
131 updated_object = Activity.get_by_ap_id(follow_id)
133 {:ok, updated_object, meta}
136 # Tasks this handles:
137 # - Unfollow and block
140 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
144 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
145 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
146 User.block(blocker, blocked)
152 # Tasks this handles:
154 # - Update a non-user object (Note, Question, etc.)
156 # For a local user, we also get a changeset with the full information, so we
157 # can update non-federating, non-activitypub settings as well.
159 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
160 updated_object_id = updated_object["id"]
162 with {_, true} <- {:has_id, is_binary(updated_object_id)},
163 %{"type" => type} <- updated_object,
164 {_, is_user} <- {:is_user, type in Pleroma.Constants.actor_types()} do
166 handle_update_user(object, meta)
168 handle_update_object(object, meta)
176 # Tasks this handles:
177 # - Add like to object
178 # - Set up notification
180 def handle(%{data: %{"type" => "Like"}} = object, meta) do
181 liked_object = Object.get_by_ap_id(object.data["object"])
182 Utils.add_like_to_object(object, liked_object)
184 Notification.create_notifications(object)
190 # - Actually create object
191 # - Rollback if we couldn't create it
192 # - Increase the user note count
193 # - Increase the reply count
194 # - Increase replies count
195 # - Ask for scraping of nodeinfo
196 # - Set up ActivityExpiration
197 # - Set up notifications
198 # - Index incoming posts for search (if needed)
200 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
201 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
202 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
203 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
204 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
205 {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
207 if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
208 Object.increase_replies_count(in_reply_to)
211 reply_depth = (meta[:depth] || 0) + 1
213 Pleroma.Workers.NodeInfoFetcherWorker.enqueue("process", %{
214 "source_url" => activity.data["actor"]
217 # FIXME: Force inReplyTo to replies
218 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
219 object.data["replies"] != nil do
220 for reply_id <- object.data["replies"] do
221 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
223 "depth" => reply_depth
228 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
229 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
232 Pleroma.Search.add_to_index(Map.put(activity, :object, object))
236 |> add_notifications(notifications)
238 ap_streamer().stream_out(activity)
240 {:ok, activity, meta}
243 Logger.error(inspect(e))
248 # Tasks this handles:
249 # - Add announce to object
250 # - Set up notification
251 # - Stream out the announce
253 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
254 announced_object = Object.get_by_ap_id(object.data["object"])
255 user = User.get_cached_by_ap_id(object.data["actor"])
257 Utils.add_announce_to_object(object, announced_object)
259 if !User.is_internal_user?(user) do
260 Notification.create_notifications(object)
262 ap_streamer().stream_out(object)
269 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
270 with undone_object <- Activity.get_by_ap_id(undone_object),
271 :ok <- handle_undoing(undone_object) do
276 # Tasks this handles:
277 # - Add reaction to object
278 # - Set up notification
280 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
281 reacted_object = Object.get_by_ap_id(object.data["object"])
282 Utils.add_emoji_reaction_to_object(object, reacted_object)
284 Notification.create_notifications(object)
289 # Tasks this handles:
290 # - Delete and unpins the create activity
291 # - Replace object with Tombstone
292 # - Set up notification
293 # - Reduce the user note count
294 # - Reduce the reply count
295 # - Stream out the activity
296 # - Removes posts from search index (if needed)
298 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
300 Object.normalize(deleted_object, fetch: false) ||
301 User.get_cached_by_ap_id(deleted_object)
304 case deleted_object do
306 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
307 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
308 %User{} = user <- User.get_cached_by_ap_id(actor) do
309 User.remove_pinned_object_id(user, deleted_object.data["id"])
311 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
313 if in_reply_to = deleted_object.data["inReplyTo"] do
314 Object.decrease_replies_count(in_reply_to)
317 ap_streamer().stream_out(object)
318 ap_streamer().stream_out_participations(deleted_object, user)
322 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
327 with {:ok, _} <- User.delete(deleted_object) do
333 # Only remove from index when deleting actual objects, not users or anything else
334 with %Pleroma.Object{} <- deleted_object do
335 Pleroma.Search.remove_from_index(deleted_object)
344 # Tasks this handles:
346 # - removes expiration job for pinned activity, if was set for expiration
348 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
349 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
350 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
351 # if pinned activity was scheduled for deletion, we remove job
352 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
353 Oban.cancel_job(expiration.id)
359 {:error, :user_not_found}
361 {:error, changeset} ->
362 if changeset.errors[:pinned_objects] do
363 {:error, :pinned_statuses_limit_reached}
370 # Tasks this handles:
371 # - removes pin from user
372 # - removes corresponding Add activity
373 # - if activity had expiration, recreates activity expiration job
375 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
376 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
377 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
379 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
382 # if pinned activity was scheduled for deletion, we reschedule it for deletion
383 if meta[:expires_at] do
384 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
386 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
388 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
389 activity_id: meta[:activity_id],
390 expires_at: expires_at
396 nil -> {:error, :user_not_found}
403 def handle(object, meta) do
407 defp handle_update_user(
408 %{data: %{"type" => "Update", "object" => updated_object}} = object,
411 if changeset = Keyword.get(meta, :user_update_changeset) do
413 |> User.update_and_set_cache()
415 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
417 User.get_by_ap_id(updated_object["id"])
418 |> User.remote_user_changeset(new_user_data)
419 |> User.update_and_set_cache()
425 defp handle_update_object(
426 %{data: %{"type" => "Update", "object" => updated_object}} = object,
429 orig_object_ap_id = updated_object["id"]
430 orig_object = Object.get_by_ap_id(orig_object_ap_id)
431 orig_object_data = orig_object.data
435 # If this is a local Update, we don't process it by transmogrifier,
436 # so we use the embedded object as-is.
442 if orig_object_data["type"] in Pleroma.Constants.updatable_object_types() do
444 updated_data: updated_object_data,
446 used_history_in_new_object?: used_history_in_new_object?
447 } = Object.Updater.make_new_object_data_from_update_object(orig_object_data, updated_object)
451 |> Repo.preload(:hashtags)
452 |> Object.change(%{data: updated_object_data})
454 with {:ok, new_object} <- Repo.update(changeset),
455 {:ok, _} <- Object.invalid_object_cache(new_object),
456 {:ok, _} <- Object.set_cache(new_object),
457 # The metadata/utils.ex uses the object id for the cache.
458 {:ok, _} <- Pleroma.Activity.HTML.invalidate_cache_for(new_object.id) do
459 if used_history_in_new_object? do
460 with create_activity when not is_nil(create_activity) <-
461 Pleroma.Activity.get_create_by_object_ap_id(orig_object_ap_id),
462 {:ok, _} <- Pleroma.Activity.HTML.invalidate_cache_for(create_activity.id) do
471 |> Activity.normalize()
472 |> ActivityPub.notify_and_stream()
480 def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
481 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
482 PollWorker.schedule_poll_end(activity)
487 def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
488 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
489 Object.increase_vote_count(
490 object.data["inReplyTo"],
499 def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
500 when objtype in ~w[Audio Video Event Article Note Page] do
501 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
507 def handle_object_creation(object, _activity, meta) do
511 defp undo_like(nil, object), do: delete_object(object)
513 defp undo_like(%Object{} = liked_object, object) do
514 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
515 delete_object(object)
519 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
520 object.data["object"]
521 |> Object.get_by_ap_id()
525 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
526 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
527 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
528 {:ok, _} <- Repo.delete(object) do
533 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
534 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
535 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
536 {:ok, _} <- Repo.delete(object) do
542 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
544 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
545 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
546 {:ok, _} <- User.unblock(blocker, blocked),
547 {:ok, _} <- Repo.delete(object) do
552 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
554 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
555 defp delete_object(object) do
556 with {:ok, _} <- Repo.delete(object), do: :ok
559 defp send_notifications(meta) do
560 Keyword.get(meta, :notifications, [])
561 |> Enum.each(fn notification ->
562 Streamer.stream(["user", "user:notification"], notification)
563 Push.send(notification)
569 defp send_streamables(meta) do
570 Keyword.get(meta, :streamables, [])
571 |> Enum.each(fn {topics, items} ->
572 Streamer.stream(topics, items)
578 defp add_notifications(meta, notifications) do
579 existing = Keyword.get(meta, :notifications, [])
582 |> Keyword.put(:notifications, notifications ++ existing)
586 def handle_after_transaction(meta) do
588 |> send_notifications()
589 |> send_streamables()