1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
12 alias Pleroma.Activity
13 alias Pleroma.Activity.Ir.Topics
15 alias Pleroma.Chat.MessageReference
16 alias Pleroma.FollowingRelationship
17 alias Pleroma.Notification
21 alias Pleroma.Web.ActivityPub.ActivityPub
22 alias Pleroma.Web.ActivityPub.Builder
23 alias Pleroma.Web.ActivityPub.Pipeline
24 alias Pleroma.Web.ActivityPub.Utils
25 alias Pleroma.Web.Push
26 alias Pleroma.Web.Streamer
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
31 @ap_streamer Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
32 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
34 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
37 def handle(object, meta \\ [])
41 # - Sends a notification
48 "object" => follow_activity_id
53 with %Activity{actor: follower_id} = follow_activity <-
54 Activity.get_by_ap_id(follow_activity_id),
55 %User{} = followed <- User.get_cached_by_ap_id(actor),
56 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
57 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
58 {:ok, _follower, followed} <-
59 FollowingRelationship.update(follower, followed, :follow_accept) do
60 Notification.update_notification_type(followed, follow_activity)
67 # - Rejects all existing follow activities for this person
68 # - Updates the follow state
69 # - Dismisses notification
76 "object" => follow_activity_id
81 with %Activity{actor: follower_id} = follow_activity <-
82 Activity.get_by_ap_id(follow_activity_id),
83 %User{} = followed <- User.get_cached_by_ap_id(actor),
84 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
85 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
86 FollowingRelationship.update(follower, followed, :follow_reject)
87 Notification.dismiss(follow_activity)
94 # - Follows if possible
95 # - Sends a notification
96 # - Generates accept or reject if appropriate
103 "object" => followed_user,
104 "actor" => following_user
109 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
110 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
111 {_, {:ok, _, _}, _, _} <-
112 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
113 if followed.local && !followed.is_locked do
114 {:ok, accept_data, _} = Builder.accept(followed, object)
115 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
118 {:following, {:error, _}, _follower, followed} ->
119 {:ok, reject_data, _} = Builder.reject(followed, object)
120 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
126 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
130 |> add_notifications(notifications)
132 updated_object = Activity.get_by_ap_id(follow_id)
134 {:ok, updated_object, meta}
137 # Tasks this handles:
138 # - Unfollow and block
141 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
145 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
146 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
147 User.block(blocker, blocked)
153 # Tasks this handles:
156 # For a local user, we also get a changeset with the full information, so we
157 # can update non-federating, non-activitypub settings as well.
159 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
160 if changeset = Keyword.get(meta, :user_update_changeset) do
162 |> User.update_and_set_cache()
164 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
166 User.get_by_ap_id(updated_object["id"])
167 |> User.remote_user_changeset(new_user_data)
168 |> User.update_and_set_cache()
174 # Tasks this handles:
175 # - Add like to object
176 # - Set up notification
178 def handle(%{data: %{"type" => "Like"}} = object, meta) do
179 liked_object = Object.get_by_ap_id(object.data["object"])
180 Utils.add_like_to_object(object, liked_object)
182 Notification.create_notifications(object)
188 # - Actually create object
189 # - Rollback if we couldn't create it
190 # - Increase the user note count
191 # - Increase the reply count
192 # - Increase replies count
193 # - Set up ActivityExpiration
194 # - Set up notifications
196 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
197 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
198 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
199 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
200 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
202 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
203 Object.increase_replies_count(in_reply_to)
206 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
207 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
212 |> add_notifications(notifications)
214 {:ok, activity, meta}
216 e -> Repo.rollback(e)
220 # Tasks this handles:
221 # - Add announce to object
222 # - Set up notification
223 # - Stream out the announce
225 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
226 announced_object = Object.get_by_ap_id(object.data["object"])
227 user = User.get_cached_by_ap_id(object.data["actor"])
229 Utils.add_announce_to_object(object, announced_object)
231 if !User.is_internal_user?(user) do
232 Notification.create_notifications(object)
235 |> Topics.get_activity_topics()
236 |> Streamer.stream(object)
243 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
244 with undone_object <- Activity.get_by_ap_id(undone_object),
245 :ok <- handle_undoing(undone_object) do
250 # Tasks this handles:
251 # - Add reaction to object
252 # - Set up notification
254 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
255 reacted_object = Object.get_by_ap_id(object.data["object"])
256 Utils.add_emoji_reaction_to_object(object, reacted_object)
258 Notification.create_notifications(object)
263 # Tasks this handles:
264 # - Delete and unpins the create activity
265 # - Replace object with Tombstone
266 # - Set up notification
267 # - Reduce the user note count
268 # - Reduce the reply count
269 # - Stream out the activity
271 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
273 Object.normalize(deleted_object, fetch: false) ||
274 User.get_cached_by_ap_id(deleted_object)
277 case deleted_object do
279 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
280 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
281 %User{} = user <- User.get_cached_by_ap_id(actor) do
282 User.remove_pinnned_activity(user, activity)
284 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
286 if in_reply_to = deleted_object.data["inReplyTo"] do
287 Object.decrease_replies_count(in_reply_to)
290 MessageReference.delete_for_object(deleted_object)
292 @ap_streamer.stream_out(object)
293 @ap_streamer.stream_out_participations(deleted_object, user)
297 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
302 with {:ok, _} <- User.delete(deleted_object) do
308 Notification.create_notifications(object)
317 def handle(object, meta) do
321 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
322 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
323 actor = User.get_cached_by_ap_id(object.data["actor"])
324 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
327 [[actor, recipient], [recipient, actor]]
329 |> Enum.map(fn [user, other_user] ->
331 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
332 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
335 :chat_message_id_idempotency_key_cache,
337 meta[:idempotency_key]
341 ["user", "user:pleroma_chat"],
342 {user, %{cm_ref | chat: chat, object: object}}
350 |> add_streamables(streamables)
356 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
357 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
358 Object.increase_vote_count(
359 object.data["inReplyTo"],
368 def handle_object_creation(%{"type" => objtype} = object, meta)
369 when objtype in ~w[Audio Video Question Event Article] do
370 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
376 def handle_object_creation(object, meta) do
380 defp undo_like(nil, object), do: delete_object(object)
382 defp undo_like(%Object{} = liked_object, object) do
383 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
384 delete_object(object)
388 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
389 object.data["object"]
390 |> Object.get_by_ap_id()
394 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
395 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
396 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
397 {:ok, _} <- Repo.delete(object) do
402 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
403 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
404 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
405 {:ok, _} <- Repo.delete(object) do
411 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
413 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
414 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
415 {:ok, _} <- User.unblock(blocker, blocked),
416 {:ok, _} <- Repo.delete(object) do
421 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
423 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
424 defp delete_object(object) do
425 with {:ok, _} <- Repo.delete(object), do: :ok
428 defp send_notifications(meta) do
429 Keyword.get(meta, :notifications, [])
430 |> Enum.each(fn notification ->
431 Streamer.stream(["user", "user:notification"], notification)
432 Push.send(notification)
438 defp send_streamables(meta) do
439 Keyword.get(meta, :streamables, [])
440 |> Enum.each(fn {topics, items} ->
441 Streamer.stream(topics, items)
447 defp add_streamables(meta, streamables) do
448 existing = Keyword.get(meta, :streamables, [])
451 |> Keyword.put(:streamables, streamables ++ existing)
454 defp add_notifications(meta, notifications) do
455 existing = Keyword.get(meta, :notifications, [])
458 |> Keyword.put(:notifications, notifications ++ existing)
462 def handle_after_transaction(meta) do
464 |> send_notifications()
465 |> send_streamables()