1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
12 alias Pleroma.Activity
14 alias Pleroma.Chat.MessageReference
15 alias Pleroma.FollowingRelationship
16 alias Pleroma.Notification
20 alias Pleroma.Web.ActivityPub.ActivityPub
21 alias Pleroma.Web.ActivityPub.Builder
22 alias Pleroma.Web.ActivityPub.Pipeline
23 alias Pleroma.Web.ActivityPub.Utils
24 alias Pleroma.Web.Push
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Workers.PollWorker
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
33 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
35 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
38 def handle(object, meta \\ [])
42 # - Sends a notification
49 "object" => follow_activity_id
54 with %Activity{actor: follower_id} = follow_activity <-
55 Activity.get_by_ap_id(follow_activity_id),
56 %User{} = followed <- User.get_cached_by_ap_id(actor),
57 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
58 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
59 {:ok, _follower, followed} <-
60 FollowingRelationship.update(follower, followed, :follow_accept) do
61 Notification.update_notification_type(followed, follow_activity)
68 # - Rejects all existing follow activities for this person
69 # - Updates the follow state
70 # - Dismisses notification
77 "object" => follow_activity_id
82 with %Activity{actor: follower_id} = follow_activity <-
83 Activity.get_by_ap_id(follow_activity_id),
84 %User{} = followed <- User.get_cached_by_ap_id(actor),
85 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
86 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
87 FollowingRelationship.update(follower, followed, :follow_reject)
88 Notification.dismiss(follow_activity)
95 # - Follows if possible
96 # - Sends a notification
97 # - Generates accept or reject if appropriate
104 "object" => followed_user,
105 "actor" => following_user
110 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
111 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
112 {_, {:ok, _, _}, _, _} <-
113 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
114 if followed.local && !followed.is_locked do
115 {:ok, accept_data, _} = Builder.accept(followed, object)
116 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
119 {:following, {:error, _}, _follower, followed} ->
120 {:ok, reject_data, _} = Builder.reject(followed, object)
121 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
127 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
131 |> add_notifications(notifications)
133 updated_object = Activity.get_by_ap_id(follow_id)
135 {:ok, updated_object, meta}
138 # Tasks this handles:
139 # - Unfollow and block
142 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
146 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
147 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
148 User.block(blocker, blocked)
154 # Tasks this handles:
157 # For a local user, we also get a changeset with the full information, so we
158 # can update non-federating, non-activitypub settings as well.
160 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
161 if changeset = Keyword.get(meta, :user_update_changeset) do
163 |> User.update_and_set_cache()
165 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
167 User.get_by_ap_id(updated_object["id"])
168 |> User.remote_user_changeset(new_user_data)
169 |> User.update_and_set_cache()
175 # Tasks this handles:
176 # - Add like to object
177 # - Set up notification
179 def handle(%{data: %{"type" => "Like"}} = object, meta) do
180 liked_object = Object.get_by_ap_id(object.data["object"])
181 Utils.add_like_to_object(object, liked_object)
183 Notification.create_notifications(object)
189 # - Actually create object
190 # - Rollback if we couldn't create it
191 # - Increase the user note count
192 # - Increase the reply count
193 # - Increase replies count
194 # - Set up ActivityExpiration
195 # - Set up notifications
196 # - Index incoming posts for search (if needed)
198 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
199 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
200 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
201 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
202 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
203 {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
205 if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
206 Object.increase_replies_count(in_reply_to)
209 reply_depth = (meta[:depth] || 0) + 1
211 # FIXME: Force inReplyTo to replies
212 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
213 object.data["replies"] != nil do
214 for reply_id <- object.data["replies"] do
215 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
217 "depth" => reply_depth
222 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
223 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
226 Pleroma.Search.add_to_index(Map.put(activity, :object, object))
230 |> add_notifications(notifications)
232 ap_streamer().stream_out(activity)
234 {:ok, activity, meta}
236 e -> Repo.rollback(e)
240 # Tasks this handles:
241 # - Add announce to object
242 # - Set up notification
243 # - Stream out the announce
245 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
246 announced_object = Object.get_by_ap_id(object.data["object"])
247 user = User.get_cached_by_ap_id(object.data["actor"])
249 Utils.add_announce_to_object(object, announced_object)
251 if !User.is_internal_user?(user) do
252 Notification.create_notifications(object)
254 ap_streamer().stream_out(object)
261 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
262 with undone_object <- Activity.get_by_ap_id(undone_object),
263 :ok <- handle_undoing(undone_object) do
268 # Tasks this handles:
269 # - Add reaction to object
270 # - Set up notification
272 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
273 reacted_object = Object.get_by_ap_id(object.data["object"])
274 Utils.add_emoji_reaction_to_object(object, reacted_object)
275 Notification.create_notifications(object)
280 # Tasks this handles:
281 # - Delete and unpins the create activity
282 # - Replace object with Tombstone
283 # - Set up notification
284 # - Reduce the user note count
285 # - Reduce the reply count
286 # - Stream out the activity
287 # - Removes posts from search index (if needed)
289 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
291 Object.normalize(deleted_object, fetch: false) ||
292 User.get_cached_by_ap_id(deleted_object)
295 case deleted_object do
297 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
298 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
299 %User{} = user <- User.get_cached_by_ap_id(actor) do
300 User.remove_pinned_object_id(user, deleted_object.data["id"])
302 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
304 if in_reply_to = deleted_object.data["inReplyTo"] do
305 Object.decrease_replies_count(in_reply_to)
308 MessageReference.delete_for_object(deleted_object)
310 ap_streamer().stream_out(object)
311 ap_streamer().stream_out_participations(deleted_object, user)
315 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
320 with {:ok, _} <- User.delete(deleted_object) do
326 Notification.create_notifications(object)
328 # Only remove from index when deleting actual objects, not users or anything else
329 with %Pleroma.Object{} <- deleted_object do
330 Pleroma.Search.remove_from_index(deleted_object)
339 # Tasks this handles:
341 # - removes expiration job for pinned activity, if was set for expiration
343 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
344 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
345 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
346 # if pinned activity was scheduled for deletion, we remove job
347 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
348 Oban.cancel_job(expiration.id)
354 {:error, :user_not_found}
356 {:error, changeset} ->
357 if changeset.errors[:pinned_objects] do
358 {:error, :pinned_statuses_limit_reached}
365 # Tasks this handles:
366 # - removes pin from user
367 # - removes corresponding Add activity
368 # - if activity had expiration, recreates activity expiration job
370 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
371 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
372 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
374 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
377 # if pinned activity was scheduled for deletion, we reschedule it for deletion
378 if meta[:expires_at] do
379 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
381 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
383 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
384 activity_id: meta[:activity_id],
385 expires_at: expires_at
391 nil -> {:error, :user_not_found}
398 def handle(object, meta) do
402 def handle_object_creation(%{"type" => "ChatMessage"} = object, _activity, meta) do
403 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
404 actor = User.get_cached_by_ap_id(object.data["actor"])
405 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
408 [[actor, recipient], [recipient, actor]]
410 |> Enum.map(fn [user, other_user] ->
412 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
413 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
416 :chat_message_id_idempotency_key_cache,
418 meta[:idempotency_key]
422 ["user", "user:pleroma_chat"],
423 {user, %{cm_ref | chat: chat, object: object}}
431 |> add_streamables(streamables)
437 def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
438 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
439 PollWorker.schedule_poll_end(activity)
444 def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
445 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
446 Object.increase_vote_count(
447 object.data["inReplyTo"],
456 def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
457 when objtype in ~w[Audio Video Event Article Note Page] do
458 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
464 def handle_object_creation(object, _activity, meta) do
468 defp undo_like(nil, object), do: delete_object(object)
470 defp undo_like(%Object{} = liked_object, object) do
471 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
472 delete_object(object)
476 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
477 object.data["object"]
478 |> Object.get_by_ap_id()
482 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
483 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
484 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
485 {:ok, _} <- Repo.delete(object) do
490 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
491 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
492 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
493 {:ok, _} <- Repo.delete(object) do
499 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
501 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
502 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
503 {:ok, _} <- User.unblock(blocker, blocked),
504 {:ok, _} <- Repo.delete(object) do
509 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
511 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
512 defp delete_object(object) do
513 with {:ok, _} <- Repo.delete(object), do: :ok
516 defp send_notifications(meta) do
517 Keyword.get(meta, :notifications, [])
518 |> Enum.each(fn notification ->
519 Streamer.stream(["user", "user:notification"], notification)
520 Push.send(notification)
526 defp send_streamables(meta) do
527 Keyword.get(meta, :streamables, [])
528 |> Enum.each(fn {topics, items} ->
529 Streamer.stream(topics, items)
535 defp add_streamables(meta, streamables) do
536 existing = Keyword.get(meta, :streamables, [])
539 |> Keyword.put(:streamables, streamables ++ existing)
542 defp add_notifications(meta, notifications) do
543 existing = Keyword.get(meta, :notifications, [])
546 |> Keyword.put(:notifications, notifications ++ existing)
550 def handle_after_transaction(%Pleroma.Activity{data: %{"type" => "Create"}} = activity) do
551 Pleroma.Elasticsearch.put_by_id(:activity, activity.id)
554 def handle_after_transaction(%Pleroma.Activity{
555 data: %{"type" => "Delete", "deleted_activity_id" => id}
557 Pleroma.Elasticsearch.delete_by_id(:activity, id)
560 def handle_after_transaction(%Pleroma.Activity{}) do
564 def handle_after_transaction(%Pleroma.Object{}) do
568 def handle_after_transaction(meta) do
570 |> send_notifications()
571 |> send_streamables()