1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
12 alias Pleroma.Activity
13 alias Pleroma.Activity.Ir.Topics
15 alias Pleroma.Chat.MessageReference
16 alias Pleroma.FollowingRelationship
17 alias Pleroma.Notification
21 alias Pleroma.Web.ActivityPub.ActivityPub
22 alias Pleroma.Web.ActivityPub.Builder
23 alias Pleroma.Web.ActivityPub.Pipeline
24 alias Pleroma.Web.ActivityPub.Utils
25 alias Pleroma.Web.Push
26 alias Pleroma.Web.Streamer
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @ap_streamer Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
32 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
34 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
37 def handle(object, meta \\ [])
41 # - Sends a notification
48 "object" => follow_activity_id
53 with %Activity{actor: follower_id} = follow_activity <-
54 Activity.get_by_ap_id(follow_activity_id),
55 %User{} = followed <- User.get_cached_by_ap_id(actor),
56 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
57 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
58 {:ok, _follower, followed} <-
59 FollowingRelationship.update(follower, followed, :follow_accept) do
60 Notification.update_notification_type(followed, follow_activity)
67 # - Rejects all existing follow activities for this person
68 # - Updates the follow state
69 # - Dismisses notification
76 "object" => follow_activity_id
81 with %Activity{actor: follower_id} = follow_activity <-
82 Activity.get_by_ap_id(follow_activity_id),
83 %User{} = followed <- User.get_cached_by_ap_id(actor),
84 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
85 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
86 FollowingRelationship.update(follower, followed, :follow_reject)
87 Notification.dismiss(follow_activity)
94 # - Follows if possible
95 # - Sends a notification
96 # - Generates accept or reject if appropriate
103 "object" => followed_user,
104 "actor" => following_user
109 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
110 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
111 {_, {:ok, _, _}, _, _} <-
112 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
113 if followed.local && !followed.is_locked do
114 {:ok, accept_data, _} = Builder.accept(followed, object)
115 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
118 {:following, {:error, _}, _follower, followed} ->
119 {:ok, reject_data, _} = Builder.reject(followed, object)
120 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
126 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
130 |> add_notifications(notifications)
132 updated_object = Activity.get_by_ap_id(follow_id)
134 {:ok, updated_object, meta}
137 # Tasks this handles:
138 # - Unfollow and block
141 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
145 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
146 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
147 User.block(blocker, blocked)
153 # Tasks this handles:
156 # For a local user, we also get a changeset with the full information, so we
157 # can update non-federating, non-activitypub settings as well.
159 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
160 if changeset = Keyword.get(meta, :user_update_changeset) do
162 |> User.update_and_set_cache()
164 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
166 User.get_by_ap_id(updated_object["id"])
167 |> User.remote_user_changeset(new_user_data)
168 |> User.update_and_set_cache()
174 # Tasks this handles:
175 # - Add like to object
176 # - Set up notification
178 def handle(%{data: %{"type" => "Like"}} = object, meta) do
179 liked_object = Object.get_by_ap_id(object.data["object"])
180 Utils.add_like_to_object(object, liked_object)
182 Notification.create_notifications(object)
188 # - Actually create object
189 # - Rollback if we couldn't create it
190 # - Increase the user note count
191 # - Increase the reply count
192 # - Increase replies count
193 # - Set up ActivityExpiration
194 # - Set up notifications
196 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
197 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
198 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
199 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
200 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
202 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
203 Object.increase_replies_count(in_reply_to)
206 reply_depth = (meta[:depth] || 0) + 1
208 # FIXME: Force inReplyTo to replies
209 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
210 object.data["replies"] != nil do
211 for reply_id <- object.data["replies"] do
212 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
214 "depth" => reply_depth
219 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
220 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
225 |> add_notifications(notifications)
227 {:ok, activity, meta}
229 e -> Repo.rollback(e)
233 # Tasks this handles:
234 # - Add announce to object
235 # - Set up notification
236 # - Stream out the announce
238 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
239 announced_object = Object.get_by_ap_id(object.data["object"])
240 user = User.get_cached_by_ap_id(object.data["actor"])
242 Utils.add_announce_to_object(object, announced_object)
244 if !User.is_internal_user?(user) do
245 Notification.create_notifications(object)
248 |> Topics.get_activity_topics()
249 |> Streamer.stream(object)
256 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
257 with undone_object <- Activity.get_by_ap_id(undone_object),
258 :ok <- handle_undoing(undone_object) do
263 # Tasks this handles:
264 # - Add reaction to object
265 # - Set up notification
267 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
268 reacted_object = Object.get_by_ap_id(object.data["object"])
269 Utils.add_emoji_reaction_to_object(object, reacted_object)
271 Notification.create_notifications(object)
276 # Tasks this handles:
277 # - Delete and unpins the create activity
278 # - Replace object with Tombstone
279 # - Set up notification
280 # - Reduce the user note count
281 # - Reduce the reply count
282 # - Stream out the activity
284 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
286 Object.normalize(deleted_object, fetch: false) ||
287 User.get_cached_by_ap_id(deleted_object)
290 case deleted_object do
292 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
293 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
294 %User{} = user <- User.get_cached_by_ap_id(actor) do
295 User.remove_pinnned_activity(user, activity)
297 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
299 if in_reply_to = deleted_object.data["inReplyTo"] do
300 Object.decrease_replies_count(in_reply_to)
303 MessageReference.delete_for_object(deleted_object)
305 @ap_streamer.stream_out(object)
306 @ap_streamer.stream_out_participations(deleted_object, user)
310 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
315 with {:ok, _} <- User.delete(deleted_object) do
321 Notification.create_notifications(object)
330 def handle(object, meta) do
334 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
335 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
336 actor = User.get_cached_by_ap_id(object.data["actor"])
337 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
340 [[actor, recipient], [recipient, actor]]
342 |> Enum.map(fn [user, other_user] ->
344 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
345 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
348 :chat_message_id_idempotency_key_cache,
350 meta[:idempotency_key]
354 ["user", "user:pleroma_chat"],
355 {user, %{cm_ref | chat: chat, object: object}}
363 |> add_streamables(streamables)
369 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
370 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
371 Object.increase_vote_count(
372 object.data["inReplyTo"],
381 def handle_object_creation(%{"type" => objtype} = object, meta)
382 when objtype in ~w[Audio Video Question Event Article Note] do
383 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
389 def handle_object_creation(object, meta) do
393 defp undo_like(nil, object), do: delete_object(object)
395 defp undo_like(%Object{} = liked_object, object) do
396 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
397 delete_object(object)
401 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
402 object.data["object"]
403 |> Object.get_by_ap_id()
407 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
408 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
409 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
410 {:ok, _} <- Repo.delete(object) do
415 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
416 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
417 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
418 {:ok, _} <- Repo.delete(object) do
424 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
426 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
427 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
428 {:ok, _} <- User.unblock(blocker, blocked),
429 {:ok, _} <- Repo.delete(object) do
434 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
436 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
437 defp delete_object(object) do
438 with {:ok, _} <- Repo.delete(object), do: :ok
441 defp send_notifications(meta) do
442 Keyword.get(meta, :notifications, [])
443 |> Enum.each(fn notification ->
444 Streamer.stream(["user", "user:notification"], notification)
445 Push.send(notification)
451 defp send_streamables(meta) do
452 Keyword.get(meta, :streamables, [])
453 |> Enum.each(fn {topics, items} ->
454 Streamer.stream(topics, items)
460 defp add_streamables(meta, streamables) do
461 existing = Keyword.get(meta, :streamables, [])
464 |> Keyword.put(:streamables, streamables ++ existing)
467 defp add_notifications(meta, notifications) do
468 existing = Keyword.get(meta, :notifications, [])
471 |> Keyword.put(:notifications, notifications ++ existing)
475 def handle_after_transaction(meta) do
477 |> send_notifications()
478 |> send_streamables()