1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects.Handling do
6 @callback handle(map(), keyword()) :: {:ok, map(), keyword()} | {:error, any()}
7 @callback handle_after_transaction(map()) :: map()
10 defmodule Pleroma.Web.ActivityPub.SideEffects do
12 This module looks at an inserted object and executes the side effects that it
13 implies. For example, a `Like` activity will increase the like count on the
14 liked object, a `Follow` activity will add the user to the follower
15 collection, and so on.
17 alias Pleroma.Activity
18 alias Pleroma.Activity.Ir.Topics
20 alias Pleroma.Chat.MessageReference
21 alias Pleroma.FollowingRelationship
22 alias Pleroma.Notification
26 alias Pleroma.Web.ActivityPub.ActivityPub
27 alias Pleroma.Web.ActivityPub.Builder
28 alias Pleroma.Web.ActivityPub.Pipeline
29 alias Pleroma.Web.ActivityPub.Utils
30 alias Pleroma.Web.Push
31 alias Pleroma.Web.Streamer
35 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
37 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
40 def handle(object, meta \\ [])
44 # - Sends a notification
51 "object" => follow_activity_id
56 with %Activity{actor: follower_id} = follow_activity <-
57 Activity.get_by_ap_id(follow_activity_id),
58 %User{} = followed <- User.get_cached_by_ap_id(actor),
59 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
60 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
61 {:ok, _follower, followed} <-
62 FollowingRelationship.update(follower, followed, :follow_accept) do
63 Notification.update_notification_type(followed, follow_activity)
70 # - Rejects all existing follow activities for this person
71 # - Updates the follow state
72 # - Dismisses notification
79 "object" => follow_activity_id
84 with %Activity{actor: follower_id} = follow_activity <-
85 Activity.get_by_ap_id(follow_activity_id),
86 %User{} = followed <- User.get_cached_by_ap_id(actor),
87 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
88 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
89 FollowingRelationship.update(follower, followed, :follow_reject)
90 Notification.dismiss(follow_activity)
97 # - Follows if possible
98 # - Sends a notification
99 # - Generates accept or reject if appropriate
106 "object" => followed_user,
107 "actor" => following_user
112 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
113 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
114 {_, {:ok, _, _}, _, _} <-
115 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
116 if followed.local && !followed.is_locked do
117 {:ok, accept_data, _} = Builder.accept(followed, object)
118 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
121 {:following, {:error, _}, _follower, followed} ->
122 {:ok, reject_data, _} = Builder.reject(followed, object)
123 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
129 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
133 |> add_notifications(notifications)
135 updated_object = Activity.get_by_ap_id(follow_id)
137 {:ok, updated_object, meta}
140 # Tasks this handles:
141 # - Unfollow and block
144 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
148 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
149 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
150 User.block(blocker, blocked)
156 # Tasks this handles:
159 # For a local user, we also get a changeset with the full information, so we
160 # can update non-federating, non-activitypub settings as well.
162 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
163 if changeset = Keyword.get(meta, :user_update_changeset) do
165 |> User.update_and_set_cache()
167 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
169 User.get_by_ap_id(updated_object["id"])
170 |> User.remote_user_changeset(new_user_data)
171 |> User.update_and_set_cache()
177 # Tasks this handles:
178 # - Add like to object
179 # - Set up notification
181 def handle(%{data: %{"type" => "Like"}} = object, meta) do
182 liked_object = Object.get_by_ap_id(object.data["object"])
183 Utils.add_like_to_object(object, liked_object)
185 Notification.create_notifications(object)
191 # - Actually create object
192 # - Rollback if we couldn't create it
193 # - Increase the user note count
194 # - Increase the reply count
195 # - Increase replies count
196 # - Set up ActivityExpiration
197 # - Set up notifications
199 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
200 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
201 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
202 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
203 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
205 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
206 Object.increase_replies_count(in_reply_to)
209 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
210 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
215 |> add_notifications(notifications)
217 {:ok, activity, meta}
219 e -> Repo.rollback(e)
223 # Tasks this handles:
224 # - Add announce to object
225 # - Set up notification
226 # - Stream out the announce
228 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
229 announced_object = Object.get_by_ap_id(object.data["object"])
230 user = User.get_cached_by_ap_id(object.data["actor"])
232 Utils.add_announce_to_object(object, announced_object)
234 if !User.is_internal_user?(user) do
235 Notification.create_notifications(object)
238 |> Topics.get_activity_topics()
239 |> Streamer.stream(object)
246 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
247 with undone_object <- Activity.get_by_ap_id(undone_object),
248 :ok <- handle_undoing(undone_object) do
253 # Tasks this handles:
254 # - Add reaction to object
255 # - Set up notification
257 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
258 reacted_object = Object.get_by_ap_id(object.data["object"])
259 Utils.add_emoji_reaction_to_object(object, reacted_object)
261 Notification.create_notifications(object)
266 # Tasks this handles:
267 # - Delete and unpins the create activity
268 # - Replace object with Tombstone
269 # - Set up notification
270 # - Reduce the user note count
271 # - Reduce the reply count
272 # - Stream out the activity
274 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
276 Object.normalize(deleted_object, false) ||
277 User.get_cached_by_ap_id(deleted_object)
280 case deleted_object do
282 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
283 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
284 %User{} = user <- User.get_cached_by_ap_id(actor) do
285 User.remove_pinnned_activity(user, activity)
287 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
289 if in_reply_to = deleted_object.data["inReplyTo"] do
290 Object.decrease_replies_count(in_reply_to)
293 MessageReference.delete_for_object(deleted_object)
295 ActivityPub.stream_out(object)
296 ActivityPub.stream_out_participations(deleted_object, user)
300 Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
305 with {:ok, _} <- User.delete(deleted_object) do
311 Notification.create_notifications(object)
320 def handle(object, meta) do
324 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
325 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
326 actor = User.get_cached_by_ap_id(object.data["actor"])
327 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
330 [[actor, recipient], [recipient, actor]]
332 |> Enum.map(fn [user, other_user] ->
334 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
335 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
338 :chat_message_id_idempotency_key_cache,
340 meta[:idempotency_key]
344 ["user", "user:pleroma_chat"],
345 {user, %{cm_ref | chat: chat, object: object}}
353 |> add_streamables(streamables)
359 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
360 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
361 Object.increase_vote_count(
362 object.data["inReplyTo"],
371 def handle_object_creation(%{"type" => objtype} = object, meta)
372 when objtype in ~w[Audio Video Question Event Article] do
373 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
379 def handle_object_creation(object, meta) do
383 defp undo_like(nil, object), do: delete_object(object)
385 defp undo_like(%Object{} = liked_object, object) do
386 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
387 delete_object(object)
391 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
392 object.data["object"]
393 |> Object.get_by_ap_id()
397 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
398 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
399 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
400 {:ok, _} <- Repo.delete(object) do
405 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
406 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
407 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
408 {:ok, _} <- Repo.delete(object) do
414 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
416 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
417 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
418 {:ok, _} <- User.unblock(blocker, blocked),
419 {:ok, _} <- Repo.delete(object) do
424 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
426 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
427 defp delete_object(object) do
428 with {:ok, _} <- Repo.delete(object), do: :ok
431 defp send_notifications(meta) do
432 Keyword.get(meta, :notifications, [])
433 |> Enum.each(fn notification ->
434 Streamer.stream(["user", "user:notification"], notification)
435 Push.send(notification)
441 defp send_streamables(meta) do
442 Keyword.get(meta, :streamables, [])
443 |> Enum.each(fn {topics, items} ->
444 Streamer.stream(topics, items)
450 defp add_streamables(meta, streamables) do
451 existing = Keyword.get(meta, :streamables, [])
454 |> Keyword.put(:streamables, streamables ++ existing)
457 defp add_notifications(meta, notifications) do
458 existing = Keyword.get(meta, :notifications, [])
461 |> Keyword.put(:notifications, notifications ++ existing)
465 def handle_after_transaction(meta) do
467 |> send_notifications()
468 |> send_streamables()