1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
12 alias Pleroma.Activity
13 alias Pleroma.FollowingRelationship
14 alias Pleroma.Notification
18 alias Pleroma.Web.ActivityPub.ActivityPub
19 alias Pleroma.Web.ActivityPub.Builder
20 alias Pleroma.Web.ActivityPub.Pipeline
21 alias Pleroma.Web.ActivityPub.Utils
22 alias Pleroma.Web.Push
23 alias Pleroma.Web.Streamer
24 alias Pleroma.Workers.PollWorker
28 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
30 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
32 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
35 def handle(object, meta \\ [])
39 # - Sends a notification
46 "object" => follow_activity_id
51 with %Activity{actor: follower_id} = follow_activity <-
52 Activity.get_by_ap_id(follow_activity_id),
53 %User{} = followed <- User.get_cached_by_ap_id(actor),
54 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
55 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
56 {:ok, _follower, followed} <-
57 FollowingRelationship.update(follower, followed, :follow_accept) do
58 Notification.update_notification_type(followed, follow_activity)
65 # - Rejects all existing follow activities for this person
66 # - Updates the follow state
67 # - Dismisses notification
74 "object" => follow_activity_id
79 with %Activity{actor: follower_id} = follow_activity <-
80 Activity.get_by_ap_id(follow_activity_id),
81 %User{} = followed <- User.get_cached_by_ap_id(actor),
82 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
83 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
84 FollowingRelationship.update(follower, followed, :follow_reject)
85 Notification.dismiss(follow_activity)
92 # - Follows if possible
93 # - Sends a notification
94 # - Generates accept or reject if appropriate
101 "object" => followed_user,
102 "actor" => following_user
107 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
108 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
109 {_, {:ok, _, _}, _, _} <-
110 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
111 if followed.local && !followed.is_locked do
112 {:ok, accept_data, _} = Builder.accept(followed, object)
113 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
116 {:following, {:error, _}, _follower, followed} ->
117 {:ok, reject_data, _} = Builder.reject(followed, object)
118 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
124 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
128 |> add_notifications(notifications)
130 updated_object = Activity.get_by_ap_id(follow_id)
132 {:ok, updated_object, meta}
135 # Tasks this handles:
136 # - Unfollow and block
139 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
143 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
144 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
145 User.block(blocker, blocked)
151 # Tasks this handles:
154 # For a local user, we also get a changeset with the full information, so we
155 # can update non-federating, non-activitypub settings as well.
157 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
158 if changeset = Keyword.get(meta, :user_update_changeset) do
160 |> User.update_and_set_cache()
162 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
164 User.get_by_ap_id(updated_object["id"])
165 |> User.remote_user_changeset(new_user_data)
166 |> User.update_and_set_cache()
172 # Tasks this handles:
173 # - Add like to object
174 # - Set up notification
176 def handle(%{data: %{"type" => "Like"}} = object, meta) do
177 liked_object = Object.get_by_ap_id(object.data["object"])
178 Utils.add_like_to_object(object, liked_object)
180 Notification.create_notifications(object)
186 # - Actually create object
187 # - Rollback if we couldn't create it
188 # - Increase the user note count
189 # - Increase the reply count
190 # - Increase replies count
191 # - Set up ActivityExpiration
192 # - Set up notifications
193 # - Index incoming posts for search (if needed)
195 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
196 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
197 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
198 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
199 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
200 {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
202 if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
203 Object.increase_replies_count(in_reply_to)
206 reply_depth = (meta[:depth] || 0) + 1
208 # FIXME: Force inReplyTo to replies
209 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
210 object.data["replies"] != nil do
211 for reply_id <- object.data["replies"] do
212 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
214 "depth" => reply_depth
219 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
220 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
223 Pleroma.Search.add_to_index(Map.put(activity, :object, object))
227 |> add_notifications(notifications)
229 ap_streamer().stream_out(activity)
231 {:ok, activity, meta}
233 e -> Repo.rollback(e)
237 # Tasks this handles:
238 # - Add announce to object
239 # - Set up notification
240 # - Stream out the announce
242 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
243 announced_object = Object.get_by_ap_id(object.data["object"])
244 user = User.get_cached_by_ap_id(object.data["actor"])
246 Utils.add_announce_to_object(object, announced_object)
248 if !User.is_internal_user?(user) do
249 Notification.create_notifications(object)
251 ap_streamer().stream_out(object)
258 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
259 with undone_object <- Activity.get_by_ap_id(undone_object),
260 :ok <- handle_undoing(undone_object) do
265 # Tasks this handles:
266 # - Add reaction to object
267 # - Set up notification
269 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
270 reacted_object = Object.get_by_ap_id(object.data["object"])
271 Utils.add_emoji_reaction_to_object(object, reacted_object)
273 Notification.create_notifications(object)
278 # Tasks this handles:
279 # - Delete and unpins the create activity
280 # - Replace object with Tombstone
281 # - Set up notification
282 # - Reduce the user note count
283 # - Reduce the reply count
284 # - Stream out the activity
285 # - Removes posts from search index (if needed)
287 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
289 Object.normalize(deleted_object, fetch: false) ||
290 User.get_cached_by_ap_id(deleted_object)
293 case deleted_object do
295 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
296 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
297 %User{} = user <- User.get_cached_by_ap_id(actor) do
298 User.remove_pinned_object_id(user, deleted_object.data["id"])
300 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
302 if in_reply_to = deleted_object.data["inReplyTo"] do
303 Object.decrease_replies_count(in_reply_to)
306 ap_streamer().stream_out(object)
307 ap_streamer().stream_out_participations(deleted_object, user)
311 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
316 with {:ok, _} <- User.delete(deleted_object) do
322 Notification.create_notifications(object)
324 # Only remove from index when deleting actual objects, not users or anything else
325 with %Pleroma.Object{} <- deleted_object do
326 Pleroma.Search.remove_from_index(deleted_object)
335 # Tasks this handles:
337 # - removes expiration job for pinned activity, if was set for expiration
339 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
340 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
341 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
342 # if pinned activity was scheduled for deletion, we remove job
343 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
344 Oban.cancel_job(expiration.id)
350 {:error, :user_not_found}
352 {:error, changeset} ->
353 if changeset.errors[:pinned_objects] do
354 {:error, :pinned_statuses_limit_reached}
361 # Tasks this handles:
362 # - removes pin from user
363 # - removes corresponding Add activity
364 # - if activity had expiration, recreates activity expiration job
366 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
367 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
368 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
370 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
373 # if pinned activity was scheduled for deletion, we reschedule it for deletion
374 if meta[:expires_at] do
375 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
377 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
379 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
380 activity_id: meta[:activity_id],
381 expires_at: expires_at
387 nil -> {:error, :user_not_found}
394 def handle(object, meta) do
398 def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
399 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
400 PollWorker.schedule_poll_end(activity)
405 def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
406 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
407 Object.increase_vote_count(
408 object.data["inReplyTo"],
417 def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
418 when objtype in ~w[Audio Video Event Article Note Page] do
419 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
425 def handle_object_creation(object, _activity, meta) do
429 defp undo_like(nil, object), do: delete_object(object)
431 defp undo_like(%Object{} = liked_object, object) do
432 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
433 delete_object(object)
437 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
438 object.data["object"]
439 |> Object.get_by_ap_id()
443 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
444 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
445 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
446 {:ok, _} <- Repo.delete(object) do
451 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
452 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
453 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
454 {:ok, _} <- Repo.delete(object) do
460 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
462 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
463 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
464 {:ok, _} <- User.unblock(blocker, blocked),
465 {:ok, _} <- Repo.delete(object) do
470 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
472 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
473 defp delete_object(object) do
474 with {:ok, _} <- Repo.delete(object), do: :ok
477 defp send_notifications(meta) do
478 Keyword.get(meta, :notifications, [])
479 |> Enum.each(fn notification ->
480 Streamer.stream(["user", "user:notification"], notification)
481 Push.send(notification)
487 defp send_streamables(meta) do
488 Keyword.get(meta, :streamables, [])
489 |> Enum.each(fn {topics, items} ->
490 Streamer.stream(topics, items)
496 defp add_notifications(meta, notifications) do
497 existing = Keyword.get(meta, :notifications, [])
500 |> Keyword.put(:notifications, notifications ++ existing)
504 def handle_after_transaction(meta) do
506 |> send_notifications()
507 |> send_streamables()