1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
12 alias Pleroma.Activity
14 alias Pleroma.Chat.MessageReference
15 alias Pleroma.FollowingRelationship
16 alias Pleroma.Notification
20 alias Pleroma.Web.ActivityPub.ActivityPub
21 alias Pleroma.Web.ActivityPub.Builder
22 alias Pleroma.Web.ActivityPub.Pipeline
23 alias Pleroma.Web.ActivityPub.Utils
24 alias Pleroma.Web.Push
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Workers.PollWorker
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
33 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
35 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
38 def handle(object, meta \\ [])
42 # - Sends a notification
49 "object" => follow_activity_id
54 with %Activity{actor: follower_id} = follow_activity <-
55 Activity.get_by_ap_id(follow_activity_id),
56 %User{} = followed <- User.get_cached_by_ap_id(actor),
57 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
58 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
59 {:ok, _follower, followed} <-
60 FollowingRelationship.update(follower, followed, :follow_accept) do
61 Notification.update_notification_type(followed, follow_activity)
68 # - Rejects all existing follow activities for this person
69 # - Updates the follow state
70 # - Dismisses notification
77 "object" => follow_activity_id
82 with %Activity{actor: follower_id} = follow_activity <-
83 Activity.get_by_ap_id(follow_activity_id),
84 %User{} = followed <- User.get_cached_by_ap_id(actor),
85 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
86 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
87 FollowingRelationship.update(follower, followed, :follow_reject)
88 Notification.dismiss(follow_activity)
95 # - Follows if possible
96 # - Sends a notification
97 # - Generates accept or reject if appropriate
104 "object" => followed_user,
105 "actor" => following_user
110 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
111 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
112 {_, {:ok, _, _}, _, _} <-
113 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
114 if followed.local && !followed.is_locked do
115 {:ok, accept_data, _} = Builder.accept(followed, object)
116 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
119 {:following, {:error, _}, _follower, followed} ->
120 {:ok, reject_data, _} = Builder.reject(followed, object)
121 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
127 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
131 |> add_notifications(notifications)
133 updated_object = Activity.get_by_ap_id(follow_id)
135 {:ok, updated_object, meta}
138 # Tasks this handles:
139 # - Unfollow and block
142 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
146 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
147 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
148 User.block(blocker, blocked)
154 # Tasks this handles:
157 # For a local user, we also get a changeset with the full information, so we
158 # can update non-federating, non-activitypub settings as well.
160 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
161 if changeset = Keyword.get(meta, :user_update_changeset) do
163 |> User.update_and_set_cache()
165 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
167 User.get_by_ap_id(updated_object["id"])
168 |> User.remote_user_changeset(new_user_data)
169 |> User.update_and_set_cache()
175 # Tasks this handles:
176 # - Add like to object
177 # - Set up notification
179 def handle(%{data: %{"type" => "Like"}} = object, meta) do
180 liked_object = Object.get_by_ap_id(object.data["object"])
181 Utils.add_like_to_object(object, liked_object)
183 Notification.create_notifications(object)
189 # - Actually create object
190 # - Rollback if we couldn't create it
191 # - Increase the user note count
192 # - Increase the reply count
193 # - Increase replies count
194 # - Set up ActivityExpiration
195 # - Set up notifications
196 # - Index incoming posts for search (if needed)
198 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
199 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
200 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
201 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
202 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
203 {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
205 if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
206 Object.increase_replies_count(in_reply_to)
209 reply_depth = (meta[:depth] || 0) + 1
211 # FIXME: Force inReplyTo to replies
212 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
213 object.data["replies"] != nil do
214 for reply_id <- object.data["replies"] do
215 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
217 "depth" => reply_depth
222 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
223 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
226 Pleroma.Search.add_to_index(Map.put(activity, :object, object))
230 |> add_notifications(notifications)
232 ap_streamer().stream_out(activity)
234 {:ok, activity, meta}
236 e -> Repo.rollback(e)
240 # Tasks this handles:
241 # - Add announce to object
242 # - Set up notification
243 # - Stream out the announce
245 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
246 announced_object = Object.get_by_ap_id(object.data["object"])
247 user = User.get_cached_by_ap_id(object.data["actor"])
249 Utils.add_announce_to_object(object, announced_object)
251 if !User.is_internal_user?(user) do
252 Notification.create_notifications(object)
254 ap_streamer().stream_out(object)
261 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
262 with undone_object <- Activity.get_by_ap_id(undone_object),
263 :ok <- handle_undoing(undone_object) do
268 # Tasks this handles:
269 # - Add reaction to object
270 # - Set up notification
272 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
273 reacted_object = Object.get_by_ap_id(object.data["object"])
274 Utils.add_emoji_reaction_to_object(object, reacted_object)
275 Notification.create_notifications(object)
280 # Tasks this handles:
281 # - Delete and unpins the create activity
282 # - Replace object with Tombstone
283 # - Set up notification
284 # - Reduce the user note count
285 # - Reduce the reply count
286 # - Stream out the activity
287 # - Removes posts from search index (if needed)
289 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
291 Object.normalize(deleted_object, fetch: false) ||
292 User.get_cached_by_ap_id(deleted_object)
295 case deleted_object do
297 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
298 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
299 %User{} = user <- User.get_cached_by_ap_id(actor) do
300 User.remove_pinned_object_id(user, deleted_object.data["id"])
302 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
304 if in_reply_to = deleted_object.data["inReplyTo"] do
305 Object.decrease_replies_count(in_reply_to)
308 MessageReference.delete_for_object(deleted_object)
310 ap_streamer().stream_out(object)
311 ap_streamer().stream_out_participations(deleted_object, user)
315 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
320 with {:ok, _} <- User.delete(deleted_object) do
326 Notification.create_notifications(object)
328 Pleroma.Search.remove_from_index(deleted_object)
336 # Tasks this handles:
338 # - removes expiration job for pinned activity, if was set for expiration
340 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
341 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
342 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
343 # if pinned activity was scheduled for deletion, we remove job
344 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
345 Oban.cancel_job(expiration.id)
351 {:error, :user_not_found}
353 {:error, changeset} ->
354 if changeset.errors[:pinned_objects] do
355 {:error, :pinned_statuses_limit_reached}
362 # Tasks this handles:
363 # - removes pin from user
364 # - removes corresponding Add activity
365 # - if activity had expiration, recreates activity expiration job
367 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
368 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
369 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
371 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
374 # if pinned activity was scheduled for deletion, we reschedule it for deletion
375 if meta[:expires_at] do
376 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
378 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
380 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
381 activity_id: meta[:activity_id],
382 expires_at: expires_at
388 nil -> {:error, :user_not_found}
395 def handle(object, meta) do
399 def handle_object_creation(%{"type" => "ChatMessage"} = object, _activity, meta) do
400 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
401 actor = User.get_cached_by_ap_id(object.data["actor"])
402 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
405 [[actor, recipient], [recipient, actor]]
407 |> Enum.map(fn [user, other_user] ->
409 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
410 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
413 :chat_message_id_idempotency_key_cache,
415 meta[:idempotency_key]
419 ["user", "user:pleroma_chat"],
420 {user, %{cm_ref | chat: chat, object: object}}
428 |> add_streamables(streamables)
434 def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
435 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
436 PollWorker.schedule_poll_end(activity)
441 def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
442 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
443 Object.increase_vote_count(
444 object.data["inReplyTo"],
453 def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
454 when objtype in ~w[Audio Video Event Article Note Page] do
455 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
461 def handle_object_creation(object, _activity, meta) do
465 defp undo_like(nil, object), do: delete_object(object)
467 defp undo_like(%Object{} = liked_object, object) do
468 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
469 delete_object(object)
473 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
474 object.data["object"]
475 |> Object.get_by_ap_id()
479 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
480 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
481 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
482 {:ok, _} <- Repo.delete(object) do
487 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
488 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
489 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
490 {:ok, _} <- Repo.delete(object) do
496 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
498 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
499 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
500 {:ok, _} <- User.unblock(blocker, blocked),
501 {:ok, _} <- Repo.delete(object) do
506 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
508 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
509 defp delete_object(object) do
510 with {:ok, _} <- Repo.delete(object), do: :ok
513 defp send_notifications(meta) do
514 Keyword.get(meta, :notifications, [])
515 |> Enum.each(fn notification ->
516 Streamer.stream(["user", "user:notification"], notification)
517 Push.send(notification)
523 defp send_streamables(meta) do
524 Keyword.get(meta, :streamables, [])
525 |> Enum.each(fn {topics, items} ->
526 Streamer.stream(topics, items)
532 defp add_streamables(meta, streamables) do
533 existing = Keyword.get(meta, :streamables, [])
536 |> Keyword.put(:streamables, streamables ++ existing)
539 defp add_notifications(meta, notifications) do
540 existing = Keyword.get(meta, :notifications, [])
543 |> Keyword.put(:notifications, notifications ++ existing)
547 def handle_after_transaction(%Pleroma.Activity{data: %{"type" => "Create"}} = activity) do
548 Pleroma.Elasticsearch.put_by_id(:activity, activity.id)
551 def handle_after_transaction(%Pleroma.Activity{
552 data: %{"type" => "Delete", "deleted_activity_id" => id}
554 Pleroma.Elasticsearch.delete_by_id(:activity, id)
557 def handle_after_transaction(%Pleroma.Activity{}) do
561 def handle_after_transaction(%Pleroma.Object{}) do
565 def handle_after_transaction(meta) do
567 |> send_notifications()
568 |> send_streamables()