1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
12 alias Pleroma.Activity
13 alias Pleroma.FollowingRelationship
14 alias Pleroma.Notification
18 alias Pleroma.Web.ActivityPub.ActivityPub
19 alias Pleroma.Web.ActivityPub.Builder
20 alias Pleroma.Web.ActivityPub.Pipeline
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Push
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Workers.PollWorker
26 require Pleroma.Constants
29 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
31 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
33 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
36 def handle(object, meta \\ [])
40 # - Sends a notification
47 "object" => follow_activity_id
52 with %Activity{actor: follower_id} = follow_activity <-
53 Activity.get_by_ap_id(follow_activity_id),
54 %User{} = followed <- User.get_cached_by_ap_id(actor),
55 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
56 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
57 {:ok, _follower, followed} <-
58 FollowingRelationship.update(follower, followed, :follow_accept) do
59 Notification.update_notification_type(followed, follow_activity)
66 # - Rejects all existing follow activities for this person
67 # - Updates the follow state
68 # - Dismisses notification
75 "object" => follow_activity_id
80 with %Activity{actor: follower_id} = follow_activity <-
81 Activity.get_by_ap_id(follow_activity_id),
82 %User{} = followed <- User.get_cached_by_ap_id(actor),
83 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
84 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
85 FollowingRelationship.update(follower, followed, :follow_reject)
86 Notification.dismiss(follow_activity)
93 # - Follows if possible
94 # - Sends a notification
95 # - Generates accept or reject if appropriate
102 "object" => followed_user,
103 "actor" => following_user
108 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
109 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
110 {_, {:ok, _, _}, _, _} <-
111 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
112 if followed.local && !followed.is_locked do
113 {:ok, accept_data, _} = Builder.accept(followed, object)
114 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
117 {:following, {:error, _}, _follower, followed} ->
118 {:ok, reject_data, _} = Builder.reject(followed, object)
119 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
125 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
129 |> add_notifications(notifications)
131 updated_object = Activity.get_by_ap_id(follow_id)
133 {:ok, updated_object, meta}
136 # Tasks this handles:
137 # - Unfollow and block
140 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
144 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
145 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
146 User.block(blocker, blocked)
152 # Tasks this handles:
154 # - Update a non-user object (Note, Question, etc.)
156 # For a local user, we also get a changeset with the full information, so we
157 # can update non-federating, non-activitypub settings as well.
159 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
160 updated_object_id = updated_object["id"]
162 with {_, true} <- {:has_id, is_binary(updated_object_id)},
163 %{"type" => type} <- updated_object,
164 {_, is_user} <- {:is_user, type in Pleroma.Constants.actor_types()} do
166 handle_update_user(object, meta)
168 handle_update_object(object, meta)
176 # Tasks this handles:
177 # - Add like to object
178 # - Set up notification
180 def handle(%{data: %{"type" => "Like"}} = object, meta) do
181 liked_object = Object.get_by_ap_id(object.data["object"])
182 Utils.add_like_to_object(object, liked_object)
184 Notification.create_notifications(object)
190 # - Actually create object
191 # - Rollback if we couldn't create it
192 # - Increase the user note count
193 # - Increase the reply count
194 # - Increase replies count
195 # - Ask for scraping of nodeinfo
196 # - Set up ActivityExpiration
197 # - Set up notifications
198 # - Index incoming posts for search (if needed)
200 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
201 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
202 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
203 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
204 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
205 {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
207 if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
208 Object.increase_replies_count(in_reply_to)
211 reply_depth = (meta[:depth] || 0) + 1
213 Pleroma.Workers.NodeInfoFetcherWorker.enqueue("process", %{
214 "source_url" => activity.data["actor"]
217 # FIXME: Force inReplyTo to replies
218 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
219 object.data["replies"] != nil do
220 for reply_id <- object.data["replies"] do
221 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
223 "depth" => reply_depth
228 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
229 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
232 Pleroma.Search.add_to_index(Map.put(activity, :object, object))
236 |> add_notifications(notifications)
238 ap_streamer().stream_out(activity)
240 {:ok, activity, meta}
243 Logger.error(inspect(e))
248 # Tasks this handles:
249 # - Add announce to object
250 # - Set up notification
251 # - Stream out the announce
253 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
254 announced_object = Object.get_by_ap_id(object.data["object"])
255 user = User.get_cached_by_ap_id(object.data["actor"])
257 Utils.add_announce_to_object(object, announced_object)
259 if !User.is_internal_user?(user) do
260 Notification.create_notifications(object)
262 ap_streamer().stream_out(object)
269 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
270 with undone_object <- Activity.get_by_ap_id(undone_object),
271 :ok <- handle_undoing(undone_object) do
276 # Tasks this handles:
277 # - Add reaction to object
278 # - Set up notification
280 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
281 reacted_object = Object.get_by_ap_id(object.data["object"])
282 Utils.add_emoji_reaction_to_object(object, reacted_object)
284 Notification.create_notifications(object)
289 # Tasks this handles:
290 # - Delete and unpins the create activity
291 # - Set up notification
292 # - Reduce the user note count
293 # - Reduce the reply count
294 # - Stream out the activity
295 # - Removes posts from search index (if needed)
297 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
299 Object.normalize(deleted_object, fetch: false) ||
300 User.get_cached_by_ap_id(deleted_object)
303 case deleted_object do
305 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
306 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
307 %User{} = user <- User.get_cached_by_ap_id(actor) do
308 User.remove_pinned_object_id(user, deleted_object.data["id"])
310 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
312 if in_reply_to = deleted_object.data["inReplyTo"] do
313 Object.decrease_replies_count(in_reply_to)
316 ap_streamer().stream_out(object)
317 ap_streamer().stream_out_participations(deleted_object, user)
321 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
326 with {:ok, _} <- User.delete(deleted_object) do
332 # Only remove from index when deleting actual objects, not users or anything else
333 with %Pleroma.Object{} <- deleted_object do
334 Pleroma.Search.remove_from_index(deleted_object)
343 # Tasks this handles:
345 # - removes expiration job for pinned activity, if was set for expiration
347 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
348 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
349 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
350 # if pinned activity was scheduled for deletion, we remove job
351 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
352 Oban.cancel_job(expiration.id)
358 {:error, :user_not_found}
360 {:error, changeset} ->
361 if changeset.errors[:pinned_objects] do
362 {:error, :pinned_statuses_limit_reached}
369 # Tasks this handles:
370 # - removes pin from user
371 # - removes corresponding Add activity
372 # - if activity had expiration, recreates activity expiration job
374 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
375 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
376 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
378 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
381 # if pinned activity was scheduled for deletion, we reschedule it for deletion
382 if meta[:expires_at] do
383 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
385 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
387 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
388 activity_id: meta[:activity_id],
389 expires_at: expires_at
395 nil -> {:error, :user_not_found}
402 def handle(object, meta) do
406 defp handle_update_user(
407 %{data: %{"type" => "Update", "object" => updated_object}} = object,
410 if changeset = Keyword.get(meta, :user_update_changeset) do
412 |> User.update_and_set_cache()
414 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
416 User.get_by_ap_id(updated_object["id"])
417 |> User.remote_user_changeset(new_user_data)
418 |> User.update_and_set_cache()
424 defp handle_update_object(
425 %{data: %{"type" => "Update", "object" => updated_object}} = object,
428 orig_object_ap_id = updated_object["id"]
429 orig_object = Object.get_by_ap_id(orig_object_ap_id)
430 orig_object_data = orig_object.data
434 # If this is a local Update, we don't process it by transmogrifier,
435 # so we use the embedded object as-is.
441 if orig_object_data["type"] in Pleroma.Constants.updatable_object_types() do
443 updated_data: updated_object_data,
445 used_history_in_new_object?: used_history_in_new_object?
446 } = Object.Updater.make_new_object_data_from_update_object(orig_object_data, updated_object)
450 |> Repo.preload(:hashtags)
451 |> Object.change(%{data: updated_object_data})
453 with {:ok, new_object} <- Repo.update(changeset),
454 {:ok, _} <- Object.invalid_object_cache(new_object),
455 {:ok, _} <- Object.set_cache(new_object),
456 # The metadata/utils.ex uses the object id for the cache.
457 {:ok, _} <- Pleroma.Activity.HTML.invalidate_cache_for(new_object.id) do
458 if used_history_in_new_object? do
459 with create_activity when not is_nil(create_activity) <-
460 Pleroma.Activity.get_create_by_object_ap_id(orig_object_ap_id),
461 {:ok, _} <- Pleroma.Activity.HTML.invalidate_cache_for(create_activity.id) do
470 |> Activity.normalize()
471 |> ActivityPub.notify_and_stream()
479 def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
480 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
481 PollWorker.schedule_poll_end(activity)
486 def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
487 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
488 Object.increase_vote_count(
489 object.data["inReplyTo"],
498 def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
499 when objtype in ~w[Audio Video Event Article Note Page] do
500 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
506 def handle_object_creation(object, _activity, meta) do
510 defp undo_like(nil, object), do: delete_object(object)
512 defp undo_like(%Object{} = liked_object, object) do
513 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
514 delete_object(object)
518 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
519 object.data["object"]
520 |> Object.get_by_ap_id()
524 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
525 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
526 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
527 {:ok, _} <- Repo.delete(object) do
532 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
533 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
534 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
535 {:ok, _} <- Repo.delete(object) do
541 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
543 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
544 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
545 {:ok, _} <- User.unblock(blocker, blocked),
546 {:ok, _} <- Repo.delete(object) do
551 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
553 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
554 defp delete_object(object) do
555 with {:ok, _} <- Repo.delete(object), do: :ok
558 defp send_notifications(meta) do
559 Keyword.get(meta, :notifications, [])
560 |> Enum.each(fn notification ->
561 Streamer.stream(["user", "user:notification"], notification)
562 Push.send(notification)
568 defp send_streamables(meta) do
569 Keyword.get(meta, :streamables, [])
570 |> Enum.each(fn {topics, items} ->
571 Streamer.stream(topics, items)
577 defp add_notifications(meta, notifications) do
578 existing = Keyword.get(meta, :notifications, [])
581 |> Keyword.put(:notifications, notifications ++ existing)
585 def handle_after_transaction(meta) do
587 |> send_notifications()
588 |> send_streamables()