1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
12 alias Pleroma.Activity
13 alias Pleroma.Activity.Ir.Topics
15 alias Pleroma.Chat.MessageReference
16 alias Pleroma.FollowingRelationship
17 alias Pleroma.Notification
21 alias Pleroma.Web.ActivityPub.ActivityPub
22 alias Pleroma.Web.ActivityPub.Builder
23 alias Pleroma.Web.ActivityPub.Pipeline
24 alias Pleroma.Web.ActivityPub.Utils
25 alias Pleroma.Web.Push
26 alias Pleroma.Web.Streamer
30 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
32 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
35 def handle(object, meta \\ [])
39 # - Sends a notification
46 "object" => follow_activity_id
51 with %Activity{actor: follower_id} = follow_activity <-
52 Activity.get_by_ap_id(follow_activity_id),
53 %User{} = followed <- User.get_cached_by_ap_id(actor),
54 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
55 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
56 {:ok, _follower, followed} <-
57 FollowingRelationship.update(follower, followed, :follow_accept) do
58 Notification.update_notification_type(followed, follow_activity)
65 # - Rejects all existing follow activities for this person
66 # - Updates the follow state
67 # - Dismisses notification
74 "object" => follow_activity_id
79 with %Activity{actor: follower_id} = follow_activity <-
80 Activity.get_by_ap_id(follow_activity_id),
81 %User{} = followed <- User.get_cached_by_ap_id(actor),
82 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
83 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
84 FollowingRelationship.update(follower, followed, :follow_reject)
85 Notification.dismiss(follow_activity)
92 # - Follows if possible
93 # - Sends a notification
94 # - Generates accept or reject if appropriate
101 "object" => followed_user,
102 "actor" => following_user
107 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
108 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
109 {_, {:ok, _, _}, _, _} <-
110 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
111 if followed.local && !followed.is_locked do
112 {:ok, accept_data, _} = Builder.accept(followed, object)
113 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
116 {:following, {:error, _}, _follower, followed} ->
117 {:ok, reject_data, _} = Builder.reject(followed, object)
118 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
124 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
128 |> add_notifications(notifications)
130 updated_object = Activity.get_by_ap_id(follow_id)
132 {:ok, updated_object, meta}
135 # Tasks this handles:
136 # - Unfollow and block
139 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
143 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
144 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
145 User.block(blocker, blocked)
151 # Tasks this handles:
154 # For a local user, we also get a changeset with the full information, so we
155 # can update non-federating, non-activitypub settings as well.
157 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
158 if changeset = Keyword.get(meta, :user_update_changeset) do
160 |> User.update_and_set_cache()
162 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
164 User.get_by_ap_id(updated_object["id"])
165 |> User.remote_user_changeset(new_user_data)
166 |> User.update_and_set_cache()
172 # Tasks this handles:
173 # - Add like to object
174 # - Set up notification
176 def handle(%{data: %{"type" => "Like"}} = object, meta) do
177 liked_object = Object.get_by_ap_id(object.data["object"])
178 Utils.add_like_to_object(object, liked_object)
180 Notification.create_notifications(object)
186 # - Actually create object
187 # - Rollback if we couldn't create it
188 # - Increase the user note count
189 # - Increase the reply count
190 # - Increase replies count
191 # - Set up ActivityExpiration
192 # - Set up notifications
194 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
195 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
196 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
197 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
198 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
200 if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
201 Object.increase_replies_count(in_reply_to)
204 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
205 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
210 |> add_notifications(notifications)
212 {:ok, activity, meta}
214 e -> Repo.rollback(e)
218 # Tasks this handles:
219 # - Add announce to object
220 # - Set up notification
221 # - Stream out the announce
223 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
224 announced_object = Object.get_by_ap_id(object.data["object"])
225 user = User.get_cached_by_ap_id(object.data["actor"])
227 Utils.add_announce_to_object(object, announced_object)
229 if !User.is_internal_user?(user) do
230 Notification.create_notifications(object)
233 |> Topics.get_activity_topics()
234 |> Streamer.stream(object)
241 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
242 with undone_object <- Activity.get_by_ap_id(undone_object),
243 :ok <- handle_undoing(undone_object) do
248 # Tasks this handles:
249 # - Add reaction to object
250 # - Set up notification
252 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
253 reacted_object = Object.get_by_ap_id(object.data["object"])
254 Utils.add_emoji_reaction_to_object(object, reacted_object)
256 Notification.create_notifications(object)
261 # Tasks this handles:
262 # - Delete and unpins the create activity
263 # - Replace object with Tombstone
264 # - Set up notification
265 # - Reduce the user note count
266 # - Reduce the reply count
267 # - Stream out the activity
269 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
271 Object.normalize(deleted_object, fetch: false) ||
272 User.get_cached_by_ap_id(deleted_object)
275 case deleted_object do
277 with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
278 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
279 %User{} = user <- User.get_cached_by_ap_id(actor) do
280 User.remove_pinnned_activity(user, activity)
282 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
284 if in_reply_to = deleted_object.data["inReplyTo"] do
285 Object.decrease_replies_count(in_reply_to)
288 MessageReference.delete_for_object(deleted_object)
290 ActivityPub.stream_out(object)
291 ActivityPub.stream_out_participations(deleted_object, user)
295 Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
300 with {:ok, _} <- User.delete(deleted_object) do
306 Notification.create_notifications(object)
315 def handle(object, meta) do
319 def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
320 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
321 actor = User.get_cached_by_ap_id(object.data["actor"])
322 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
325 [[actor, recipient], [recipient, actor]]
327 |> Enum.map(fn [user, other_user] ->
329 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
330 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
333 :chat_message_id_idempotency_key_cache,
335 meta[:idempotency_key]
339 ["user", "user:pleroma_chat"],
340 {user, %{cm_ref | chat: chat, object: object}}
348 |> add_streamables(streamables)
354 def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
355 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
356 Object.increase_vote_count(
357 object.data["inReplyTo"],
366 def handle_object_creation(%{"type" => objtype} = object, meta)
367 when objtype in ~w[Audio Video Question Event Article] do
368 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
374 def handle_object_creation(object, meta) do
378 defp undo_like(nil, object), do: delete_object(object)
380 defp undo_like(%Object{} = liked_object, object) do
381 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
382 delete_object(object)
386 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
387 object.data["object"]
388 |> Object.get_by_ap_id()
392 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
393 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
394 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
395 {:ok, _} <- Repo.delete(object) do
400 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
401 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
402 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
403 {:ok, _} <- Repo.delete(object) do
409 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
411 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
412 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
413 {:ok, _} <- User.unblock(blocker, blocked),
414 {:ok, _} <- Repo.delete(object) do
419 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
421 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
422 defp delete_object(object) do
423 with {:ok, _} <- Repo.delete(object), do: :ok
426 defp send_notifications(meta) do
427 Keyword.get(meta, :notifications, [])
428 |> Enum.each(fn notification ->
429 Streamer.stream(["user", "user:notification"], notification)
430 Push.send(notification)
436 defp send_streamables(meta) do
437 Keyword.get(meta, :streamables, [])
438 |> Enum.each(fn {topics, items} ->
439 Streamer.stream(topics, items)
445 defp add_streamables(meta, streamables) do
446 existing = Keyword.get(meta, :streamables, [])
449 |> Keyword.put(:streamables, streamables ++ existing)
452 defp add_notifications(meta, notifications) do
453 existing = Keyword.get(meta, :notifications, [])
456 |> Keyword.put(:notifications, notifications ++ existing)
460 def handle_after_transaction(meta) do
462 |> send_notifications()
463 |> send_streamables()