1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
12 alias Pleroma.Activity
13 alias Pleroma.Activity.Ir.Topics
15 alias Pleroma.Chat.MessageReference
16 alias Pleroma.FollowingRelationship
17 alias Pleroma.Notification
21 alias Pleroma.Web.ActivityPub.ActivityPub
22 alias Pleroma.Web.ActivityPub.Builder
23 alias Pleroma.Web.ActivityPub.Pipeline
24 alias Pleroma.Web.ActivityPub.Utils
25 alias Pleroma.Web.Push
26 alias Pleroma.Web.Streamer
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
32 def handle(object, meta \\ [])
36 # - Sends a notification
42 "object" => follow_activity_id
47 with %Activity{actor: follower_id} = follow_activity <-
48 Activity.get_by_ap_id(follow_activity_id),
49 %User{} = followed <- User.get_cached_by_ap_id(actor),
50 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
51 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
52 {:ok, _follower, followed} <-
53 FollowingRelationship.update(follower, followed, :follow_accept) do
54 Notification.update_notification_type(followed, follow_activity)
61 # - Rejects all existing follow activities for this person
62 # - Updates the follow state
63 # - Dismisses notification
69 "object" => follow_activity_id
74 with %Activity{actor: follower_id} = follow_activity <-
75 Activity.get_by_ap_id(follow_activity_id),
76 %User{} = followed <- User.get_cached_by_ap_id(actor),
77 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
78 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
79 FollowingRelationship.update(follower, followed, :follow_reject)
80 Notification.dismiss(follow_activity)
87 # - Follows if possible
88 # - Sends a notification
89 # - Generates accept or reject if appropriate
95 "object" => followed_user,
96 "actor" => following_user
101 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
102 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
103 {_, {:ok, _, _}, _, _} <-
104 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
105 if followed.local && !followed.is_locked do
106 {:ok, accept_data, _} = Builder.accept(followed, object)
107 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
110 {:following, {:error, _}, _follower, followed} ->
111 {:ok, reject_data, _} = Builder.reject(followed, object)
112 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
118 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
122 |> add_notifications(notifications)
124 updated_object = Activity.get_by_ap_id(follow_id)
126 {:ok, updated_object, meta}
129 # Tasks this handles:
130 # - Unfollow and block
132 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
136 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
137 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
138 User.block(blocker, blocked)
144 # Tasks this handles:
147 # For a local user, we also get a changeset with the full information, so we
148 # can update non-federating, non-activitypub settings as well.
149 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
150 if changeset = Keyword.get(meta, :user_update_changeset) do
152 |> User.update_and_set_cache()
154 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
156 User.get_by_ap_id(updated_object["id"])
157 |> User.remote_user_changeset(new_user_data)
158 |> User.update_and_set_cache()
164 # Tasks this handles:
165 # - Add like to object
166 # - Set up notification
167 def handle(%{data: %{"type" => "Like"}} = object, meta) do
168 liked_object = Object.get_by_ap_id(object.data["object"])
169 Utils.add_like_to_object(object, liked_object)
171 Notification.create_notifications(object)
177 # - Actually create object
178 # - Rollback if we couldn't create it
179 # - Increase the user note count
180 # - Increase the reply count
181 # - Increase replies count
182 # - Set up ActivityExpiration
183 # - Set up notifications
184 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
185 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
186 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
187 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
188 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
190 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
191 Object.increase_replies_count(in_reply_to)
194 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
195 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
200 |> add_notifications(notifications)
202 {:ok, activity, meta}
204 e -> Repo.rollback(e)
208 # Tasks this handles:
209 # - Add announce to object
210 # - Set up notification
211 # - Stream out the announce
212 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
213 announced_object = Object.get_by_ap_id(object.data["object"])
214 user = User.get_cached_by_ap_id(object.data["actor"])
216 Utils.add_announce_to_object(object, announced_object)
218 if !User.is_internal_user?(user) do
219 Notification.create_notifications(object)
222 |> Topics.get_activity_topics()
223 |> Streamer.stream(object)
229 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
230 with undone_object <- Activity.get_by_ap_id(undone_object),
231 :ok <- handle_undoing(undone_object) do
236 # Tasks this handles:
237 # - Add reaction to object
238 # - Set up notification
239 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
240 reacted_object = Object.get_by_ap_id(object.data["object"])
241 Utils.add_emoji_reaction_to_object(object, reacted_object)
243 Notification.create_notifications(object)
248 # Tasks this handles:
249 # - Delete and unpins the create activity
250 # - Replace object with Tombstone
251 # - Set up notification
252 # - Reduce the user note count
253 # - Reduce the reply count
254 # - Stream out the activity
255 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
257 Object.normalize(deleted_object, false) ||
258 User.get_cached_by_ap_id(deleted_object)
261 case deleted_object do
263 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
264 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
265 %User{} = user <- User.get_cached_by_ap_id(actor) do
266 User.remove_pinnned_activity(user, activity)
268 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
270 if in_reply_to = deleted_object.data["inReplyTo"] do
271 Object.decrease_replies_count(in_reply_to)
274 MessageReference.delete_for_object(deleted_object)
276 ActivityPub.stream_out(object)
277 ActivityPub.stream_out_participations(deleted_object, user)
281 Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
286 with {:ok, _} <- User.delete(deleted_object) do
292 Notification.create_notifications(object)
300 def handle(object, meta) do
304 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
305 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
306 actor = User.get_cached_by_ap_id(object.data["actor"])
307 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
310 [[actor, recipient], [recipient, actor]]
312 |> Enum.map(fn [user, other_user] ->
314 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
315 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
318 :chat_message_id_idempotency_key_cache,
320 meta[:idempotency_key]
324 ["user", "user:pleroma_chat"],
325 {user, %{cm_ref | chat: chat, object: object}}
333 |> add_streamables(streamables)
339 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
340 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
341 Object.increase_vote_count(
342 object.data["inReplyTo"],
351 def handle_object_creation(%{"type" => objtype} = object, meta)
352 when objtype in ~w[Audio Video Question Event Article] do
353 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
359 def handle_object_creation(object, meta) do
363 defp undo_like(nil, object), do: delete_object(object)
365 defp undo_like(%Object{} = liked_object, object) do
366 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
367 delete_object(object)
371 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
372 object.data["object"]
373 |> Object.get_by_ap_id()
377 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
378 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
379 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
380 {:ok, _} <- Repo.delete(object) do
385 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
386 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
387 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
388 {:ok, _} <- Repo.delete(object) do
394 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
396 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
397 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
398 {:ok, _} <- User.unblock(blocker, blocked),
399 {:ok, _} <- Repo.delete(object) do
404 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
406 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
407 defp delete_object(object) do
408 with {:ok, _} <- Repo.delete(object), do: :ok
411 defp send_notifications(meta) do
412 Keyword.get(meta, :notifications, [])
413 |> Enum.each(fn notification ->
414 Streamer.stream(["user", "user:notification"], notification)
415 Push.send(notification)
421 defp send_streamables(meta) do
422 Keyword.get(meta, :streamables, [])
423 |> Enum.each(fn {topics, items} ->
424 Streamer.stream(topics, items)
430 defp add_streamables(meta, streamables) do
431 existing = Keyword.get(meta, :streamables, [])
434 |> Keyword.put(:streamables, streamables ++ existing)
437 defp add_notifications(meta, notifications) do
438 existing = Keyword.get(meta, :notifications, [])
441 |> Keyword.put(:notifications, notifications ++ existing)
444 def handle_after_transaction(meta) do
446 |> send_notifications()
447 |> send_streamables()