+
+ def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
+ with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
+ actor = User.get_cached_by_ap_id(object.data["actor"])
+ recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
+
+ streamables =
+ [[actor, recipient], [recipient, actor]]
+ |> Enum.uniq()
+ |> Enum.map(fn [user, other_user] ->
+ if user.local do
+ {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
+ {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
+
+ @cachex.put(
+ :chat_message_id_idempotency_key_cache,
+ cm_ref.id,
+ meta[:idempotency_key]
+ )
+
+ {
+ ["user", "user:pleroma_chat"],
+ {user, %{cm_ref | chat: chat, object: object}}
+ }
+ end
+ end)
+ |> Enum.filter(& &1)
+
+ meta =
+ meta
+ |> add_streamables(streamables)
+
+ {:ok, object, meta}
+ end
+ end
+
+ def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
+ with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
+ Object.increase_vote_count(
+ object.data["inReplyTo"],
+ object.data["name"],
+ object.data["actor"]
+ )
+
+ {:ok, object, meta}
+ end
+ end
+
+ def handle_object_creation(%{"type" => objtype} = object, meta)
+ when objtype in ~w[Audio Video Question Event Article] do
+ with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
+ {:ok, object, meta}
+ end
+ end
+
+ # Nothing to do
+ def handle_object_creation(object, meta) do
+ {:ok, object, meta}
+ end
+
+ defp undo_like(nil, object), do: delete_object(object)
+
+ defp undo_like(%Object{} = liked_object, object) do
+ with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
+ delete_object(object)
+ end
+ end
+
+ def handle_undoing(%{data: %{"type" => "Like"}} = object) do
+ object.data["object"]
+ |> Object.get_by_ap_id()
+ |> undo_like(object)
+ end
+
+ def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
+ with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
+ {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
+ {:ok, _} <- Repo.delete(object) do
+ :ok
+ end
+ end
+
+ def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
+ with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
+ {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
+ {:ok, _} <- Repo.delete(object) do
+ :ok
+ end
+ end
+
+ def handle_undoing(
+ %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
+ ) do
+ with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
+ %User{} = blocked <- User.get_cached_by_ap_id(blocked),
+ {:ok, _} <- User.unblock(blocker, blocked),
+ {:ok, _} <- Repo.delete(object) do
+ :ok
+ end
+ end
+
+ def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
+
+ @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
+ defp delete_object(object) do
+ with {:ok, _} <- Repo.delete(object), do: :ok
+ end
+
+ defp send_notifications(meta) do
+ Keyword.get(meta, :notifications, [])
+ |> Enum.each(fn notification ->
+ Streamer.stream(["user", "user:notification"], notification)
+ Push.send(notification)
+ end)
+
+ meta
+ end
+
+ defp send_streamables(meta) do
+ Keyword.get(meta, :streamables, [])
+ |> Enum.each(fn {topics, items} ->
+ Streamer.stream(topics, items)
+ end)
+
+ meta
+ end
+
+ defp add_streamables(meta, streamables) do
+ existing = Keyword.get(meta, :streamables, [])
+
+ meta
+ |> Keyword.put(:streamables, streamables ++ existing)
+ end
+
+ defp add_notifications(meta, notifications) do
+ existing = Keyword.get(meta, :notifications, [])
+
+ meta
+ |> Keyword.put(:notifications, notifications ++ existing)
+ end
+
+ @impl true
+ def handle_after_transaction(meta) do
+ meta
+ |> send_notifications()
+ |> send_streamables()
+ end