1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
12 alias Pleroma.Activity
14 alias Pleroma.Chat.MessageReference
15 alias Pleroma.FollowingRelationship
16 alias Pleroma.Notification
20 alias Pleroma.Web.ActivityPub.ActivityPub
21 alias Pleroma.Web.ActivityPub.Builder
22 alias Pleroma.Web.ActivityPub.Pipeline
23 alias Pleroma.Web.ActivityPub.Utils
24 alias Pleroma.Web.Push
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Workers.PollWorker
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
33 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
35 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
38 def handle(object, meta \\ [])
42 # - Sends a notification
49 "object" => follow_activity_id
54 with %Activity{actor: follower_id} = follow_activity <-
55 Activity.get_by_ap_id(follow_activity_id),
56 %User{} = followed <- User.get_cached_by_ap_id(actor),
57 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
58 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
59 {:ok, _follower, followed} <-
60 FollowingRelationship.update(follower, followed, :follow_accept) do
61 Notification.update_notification_type(followed, follow_activity)
68 # - Rejects all existing follow activities for this person
69 # - Updates the follow state
70 # - Dismisses notification
77 "object" => follow_activity_id
82 with %Activity{actor: follower_id} = follow_activity <-
83 Activity.get_by_ap_id(follow_activity_id),
84 %User{} = followed <- User.get_cached_by_ap_id(actor),
85 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
86 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
87 FollowingRelationship.update(follower, followed, :follow_reject)
88 Notification.dismiss(follow_activity)
95 # - Follows if possible
96 # - Sends a notification
97 # - Generates accept or reject if appropriate
104 "object" => followed_user,
105 "actor" => following_user
110 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
111 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
112 {_, {:ok, _, _}, _, _} <-
113 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
114 if followed.local && !followed.is_locked do
115 {:ok, accept_data, _} = Builder.accept(followed, object)
116 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
119 {:following, {:error, _}, _follower, followed} ->
120 {:ok, reject_data, _} = Builder.reject(followed, object)
121 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
127 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
131 |> add_notifications(notifications)
133 updated_object = Activity.get_by_ap_id(follow_id)
135 {:ok, updated_object, meta}
138 # Tasks this handles:
139 # - Unfollow and block
142 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
146 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
147 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
148 User.block(blocker, blocked)
154 # Tasks this handles:
157 # For a local user, we also get a changeset with the full information, so we
158 # can update non-federating, non-activitypub settings as well.
160 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
161 if changeset = Keyword.get(meta, :user_update_changeset) do
163 |> User.update_and_set_cache()
165 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
167 User.get_by_ap_id(updated_object["id"])
168 |> User.remote_user_changeset(new_user_data)
169 |> User.update_and_set_cache()
175 # Tasks this handles:
176 # - Add like to object
177 # - Set up notification
179 def handle(%{data: %{"type" => "Like"}} = object, meta) do
180 liked_object = Object.get_by_ap_id(object.data["object"])
181 Utils.add_like_to_object(object, liked_object)
183 Notification.create_notifications(object)
189 # - Actually create object
190 # - Rollback if we couldn't create it
191 # - Increase the user note count
192 # - Increase the reply count
193 # - Increase replies count
194 # - Set up ActivityExpiration
195 # - Set up notifications
197 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
198 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
199 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
200 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
201 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
202 {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
204 if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
205 Object.increase_replies_count(in_reply_to)
208 reply_depth = (meta[:depth] || 0) + 1
210 # FIXME: Force inReplyTo to replies
211 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
212 object.data["replies"] != nil do
213 for reply_id <- object.data["replies"] do
214 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
216 "depth" => reply_depth
221 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
222 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
227 |> add_notifications(notifications)
229 ap_streamer().stream_out(activity)
231 {:ok, activity, meta}
233 e -> Repo.rollback(e)
237 # Tasks this handles:
238 # - Add announce to object
239 # - Set up notification
240 # - Stream out the announce
242 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
243 announced_object = Object.get_by_ap_id(object.data["object"])
244 user = User.get_cached_by_ap_id(object.data["actor"])
246 Utils.add_announce_to_object(object, announced_object)
248 if !User.is_internal_user?(user) do
249 Notification.create_notifications(object)
251 ap_streamer().stream_out(object)
258 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
259 with undone_object <- Activity.get_by_ap_id(undone_object),
260 :ok <- handle_undoing(undone_object) do
265 # Tasks this handles:
266 # - Add reaction to object
267 # - Set up notification
269 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
270 reacted_object = Object.get_by_ap_id(object.data["object"])
271 Utils.add_emoji_reaction_to_object(object, reacted_object)
273 Notification.create_notifications(object)
278 # Tasks this handles:
279 # - Delete and unpins the create activity
280 # - Replace object with Tombstone
281 # - Set up notification
282 # - Reduce the user note count
283 # - Reduce the reply count
284 # - Stream out the activity
286 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
288 Object.normalize(deleted_object, fetch: false) ||
289 User.get_cached_by_ap_id(deleted_object)
292 case deleted_object do
294 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
295 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
296 %User{} = user <- User.get_cached_by_ap_id(actor) do
297 User.remove_pinned_object_id(user, deleted_object.data["id"])
299 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
301 if in_reply_to = deleted_object.data["inReplyTo"] do
302 Object.decrease_replies_count(in_reply_to)
305 MessageReference.delete_for_object(deleted_object)
307 ap_streamer().stream_out(object)
308 ap_streamer().stream_out_participations(deleted_object, user)
312 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
317 with {:ok, _} <- User.delete(deleted_object) do
323 Notification.create_notifications(object)
330 # Tasks this handles:
332 # - removes expiration job for pinned activity, if was set for expiration
334 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
335 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
336 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
337 # if pinned activity was scheduled for deletion, we remove job
338 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
339 Oban.cancel_job(expiration.id)
345 {:error, :user_not_found}
347 {:error, changeset} ->
348 if changeset.errors[:pinned_objects] do
349 {:error, :pinned_statuses_limit_reached}
356 # Tasks this handles:
357 # - removes pin from user
358 # - removes corresponding Add activity
359 # - if activity had expiration, recreates activity expiration job
361 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
362 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
363 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
365 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
368 # if pinned activity was scheduled for deletion, we reschedule it for deletion
369 if meta[:expires_at] do
370 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
372 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
374 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
375 activity_id: meta[:activity_id],
376 expires_at: expires_at
382 nil -> {:error, :user_not_found}
389 def handle(object, meta) do
393 def handle_object_creation(%{"type" => "ChatMessage"} = object, _activity, meta) do
394 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
395 actor = User.get_cached_by_ap_id(object.data["actor"])
396 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
399 [[actor, recipient], [recipient, actor]]
401 |> Enum.map(fn [user, other_user] ->
403 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
404 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
407 :chat_message_id_idempotency_key_cache,
409 meta[:idempotency_key]
413 ["user", "user:pleroma_chat"],
414 {user, %{cm_ref | chat: chat, object: object}}
422 |> add_streamables(streamables)
428 def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
429 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
430 PollWorker.schedule_poll_end(activity)
435 def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
436 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
437 Object.increase_vote_count(
438 object.data["inReplyTo"],
447 def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
448 when objtype in ~w[Audio Video Event Article Note Page] do
449 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
455 def handle_object_creation(object, _activity, meta) do
459 defp undo_like(nil, object), do: delete_object(object)
461 defp undo_like(%Object{} = liked_object, object) do
462 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
463 delete_object(object)
467 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
468 object.data["object"]
469 |> Object.get_by_ap_id()
473 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
474 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
475 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
476 {:ok, _} <- Repo.delete(object) do
481 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
482 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
483 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
484 {:ok, _} <- Repo.delete(object) do
490 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
492 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
493 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
494 {:ok, _} <- User.unblock(blocker, blocked),
495 {:ok, _} <- Repo.delete(object) do
500 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
502 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
503 defp delete_object(object) do
504 with {:ok, _} <- Repo.delete(object), do: :ok
507 defp send_notifications(meta) do
508 Keyword.get(meta, :notifications, [])
509 |> Enum.each(fn notification ->
510 Streamer.stream(["user", "user:notification"], notification)
511 Push.send(notification)
517 defp send_streamables(meta) do
518 Keyword.get(meta, :streamables, [])
519 |> Enum.each(fn {topics, items} ->
520 Streamer.stream(topics, items)
526 defp add_streamables(meta, streamables) do
527 existing = Keyword.get(meta, :streamables, [])
530 |> Keyword.put(:streamables, streamables ++ existing)
533 defp add_notifications(meta, notifications) do
534 existing = Keyword.get(meta, :notifications, [])
537 |> Keyword.put(:notifications, notifications ++ existing)
541 def handle_after_transaction(%Pleroma.Activity{data: %{"type" => "Create"}} = activity) do
542 Pleroma.Elasticsearch.put_by_id(:activity, activity.id)
545 def handle_after_transaction(%Pleroma.Activity{data: %{"type" => "Delete", "deleted_activity_id" => id}}) do
546 Pleroma.Elasticsearch.delete_by_id(:activity, id)
549 def handle_after_transaction(%Pleroma.Activity{}) do
553 def handle_after_transaction(%Pleroma.Object{}) do
557 def handle_after_transaction(meta) do
559 |> send_notifications()
560 |> send_streamables()