1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
12 alias Pleroma.Activity
13 alias Pleroma.Activity.Ir.Topics
15 alias Pleroma.Chat.MessageReference
16 alias Pleroma.FollowingRelationship
17 alias Pleroma.Notification
21 alias Pleroma.Web.ActivityPub.ActivityPub
22 alias Pleroma.Web.ActivityPub.Builder
23 alias Pleroma.Web.ActivityPub.Pipeline
24 alias Pleroma.Web.ActivityPub.Utils
25 alias Pleroma.Web.Push
26 alias Pleroma.Web.Streamer
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @ap_streamer Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
32 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
34 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
37 def handle(object, meta \\ [])
41 # - Sends a notification
48 "object" => follow_activity_id
53 with %Activity{actor: follower_id} = follow_activity <-
54 Activity.get_by_ap_id(follow_activity_id),
55 %User{} = followed <- User.get_cached_by_ap_id(actor),
56 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
57 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
58 {:ok, _follower, followed} <-
59 FollowingRelationship.update(follower, followed, :follow_accept) do
60 Notification.update_notification_type(followed, follow_activity)
67 # - Rejects all existing follow activities for this person
68 # - Updates the follow state
69 # - Dismisses notification
76 "object" => follow_activity_id
81 with %Activity{actor: follower_id} = follow_activity <-
82 Activity.get_by_ap_id(follow_activity_id),
83 %User{} = followed <- User.get_cached_by_ap_id(actor),
84 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
85 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
86 FollowingRelationship.update(follower, followed, :follow_reject)
87 Notification.dismiss(follow_activity)
94 # - Follows if possible
95 # - Sends a notification
96 # - Generates accept or reject if appropriate
103 "object" => followed_user,
104 "actor" => following_user
109 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
110 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
111 {_, {:ok, _, _}, _, _} <-
112 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
113 if followed.local && !followed.is_locked do
114 {:ok, accept_data, _} = Builder.accept(followed, object)
115 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
118 {:following, {:error, _}, _follower, followed} ->
119 {:ok, reject_data, _} = Builder.reject(followed, object)
120 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
126 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
130 |> add_notifications(notifications)
132 updated_object = Activity.get_by_ap_id(follow_id)
134 {:ok, updated_object, meta}
137 # Tasks this handles:
138 # - Unfollow and block
141 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
145 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
146 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
147 User.block(blocker, blocked)
153 # Tasks this handles:
156 # For a local user, we also get a changeset with the full information, so we
157 # can update non-federating, non-activitypub settings as well.
159 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
160 if changeset = Keyword.get(meta, :user_update_changeset) do
162 |> User.update_and_set_cache()
164 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
166 User.get_by_ap_id(updated_object["id"])
167 |> User.remote_user_changeset(new_user_data)
168 |> User.update_and_set_cache()
174 # Tasks this handles:
175 # - Add like to object
176 # - Set up notification
178 def handle(%{data: %{"type" => "Like"}} = object, meta) do
179 liked_object = Object.get_by_ap_id(object.data["object"])
180 Utils.add_like_to_object(object, liked_object)
182 Notification.create_notifications(object)
188 # - Actually create object
189 # - Rollback if we couldn't create it
190 # - Increase the user note count
191 # - Increase the reply count
192 # - Increase replies count
193 # - Set up ActivityExpiration
194 # - Set up notifications
196 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
197 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
198 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
199 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
200 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
202 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
203 Object.increase_replies_count(in_reply_to)
206 reply_depth = (meta[:depth] || 0) + 1
208 # FIXME: Force inReplyTo to replies
209 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
210 object.data["replies"] != nil do
211 for reply_id <- object.data["replies"] do
212 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
214 "depth" => reply_depth
219 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
220 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
225 |> add_notifications(notifications)
227 {:ok, activity, meta}
229 e -> Repo.rollback(e)
233 # Tasks this handles:
234 # - Add announce to object
235 # - Set up notification
236 # - Stream out the announce
238 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
239 announced_object = Object.get_by_ap_id(object.data["object"])
240 user = User.get_cached_by_ap_id(object.data["actor"])
242 Utils.add_announce_to_object(object, announced_object)
244 if !User.is_internal_user?(user) do
245 Notification.create_notifications(object)
248 |> Topics.get_activity_topics()
249 |> Streamer.stream(object)
256 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
257 with undone_object <- Activity.get_by_ap_id(undone_object),
258 :ok <- handle_undoing(undone_object) do
263 # Tasks this handles:
264 # - Add reaction to object
265 # - Set up notification
267 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
268 reacted_object = Object.get_by_ap_id(object.data["object"])
269 Utils.add_emoji_reaction_to_object(object, reacted_object)
271 Notification.create_notifications(object)
276 # Tasks this handles:
277 # - Delete and unpins the create activity
278 # - Replace object with Tombstone
279 # - Set up notification
280 # - Reduce the user note count
281 # - Reduce the reply count
282 # - Stream out the activity
284 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
286 Object.normalize(deleted_object, fetch: false) ||
287 User.get_cached_by_ap_id(deleted_object)
290 case deleted_object do
292 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
293 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
294 %User{} = user <- User.get_cached_by_ap_id(actor) do
295 User.remove_pinned_object_id(user, deleted_object.data["id"])
297 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
299 if in_reply_to = deleted_object.data["inReplyTo"] do
300 Object.decrease_replies_count(in_reply_to)
303 MessageReference.delete_for_object(deleted_object)
305 @ap_streamer.stream_out(object)
306 @ap_streamer.stream_out_participations(deleted_object, user)
310 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
315 with {:ok, _} <- User.delete(deleted_object) do
321 Notification.create_notifications(object)
328 # Tasks this handles:
330 # - removes expiration job for pinned activity, if was set for expiration
332 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
333 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
334 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
335 # if pinned activity was scheduled for deletion, we remove job
336 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
337 Oban.cancel_job(expiration.id)
343 {:error, :user_not_found}
345 {:error, changeset} ->
346 if changeset.errors[:pinned_objects] do
347 {:error, :pinned_statuses_limit_reached}
354 # Tasks this handles:
355 # - removes pin from user
356 # - removes corresponding Add activity
357 # - if activity had expiration, recreates activity expiration job
359 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
360 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
361 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
363 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
366 # if pinned activity was scheduled for deletion, we reschedule it for deletion
367 if meta[:expires_at] do
368 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
370 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
372 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
373 activity_id: meta[:activity_id],
374 expires_at: expires_at
380 nil -> {:error, :user_not_found}
387 def handle(object, meta) do
391 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
392 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
393 actor = User.get_cached_by_ap_id(object.data["actor"])
394 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
397 [[actor, recipient], [recipient, actor]]
399 |> Enum.map(fn [user, other_user] ->
401 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
402 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
405 :chat_message_id_idempotency_key_cache,
407 meta[:idempotency_key]
411 ["user", "user:pleroma_chat"],
412 {user, %{cm_ref | chat: chat, object: object}}
420 |> add_streamables(streamables)
426 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
427 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
428 Object.increase_vote_count(
429 object.data["inReplyTo"],
438 def handle_object_creation(%{"type" => objtype} = object, meta)
439 when objtype in ~w[Audio Video Question Event Article Note] do
440 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
446 def handle_object_creation(object, meta) do
450 defp undo_like(nil, object), do: delete_object(object)
452 defp undo_like(%Object{} = liked_object, object) do
453 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
454 delete_object(object)
458 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
459 object.data["object"]
460 |> Object.get_by_ap_id()
464 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
465 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
466 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
467 {:ok, _} <- Repo.delete(object) do
472 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
473 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
474 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
475 {:ok, _} <- Repo.delete(object) do
481 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
483 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
484 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
485 {:ok, _} <- User.unblock(blocker, blocked),
486 {:ok, _} <- Repo.delete(object) do
491 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
493 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
494 defp delete_object(object) do
495 with {:ok, _} <- Repo.delete(object), do: :ok
498 defp send_notifications(meta) do
499 Keyword.get(meta, :notifications, [])
500 |> Enum.each(fn notification ->
501 Streamer.stream(["user", "user:notification"], notification)
502 Push.send(notification)
508 defp send_streamables(meta) do
509 Keyword.get(meta, :streamables, [])
510 |> Enum.each(fn {topics, items} ->
511 Streamer.stream(topics, items)
517 defp add_streamables(meta, streamables) do
518 existing = Keyword.get(meta, :streamables, [])
521 |> Keyword.put(:streamables, streamables ++ existing)
524 defp add_notifications(meta, notifications) do
525 existing = Keyword.get(meta, :notifications, [])
528 |> Keyword.put(:notifications, notifications ++ existing)
532 def handle_after_transaction(meta) do
534 |> send_notifications()
535 |> send_streamables()