"""
alias Pleroma.Activity
alias Pleroma.Chat
- alias Pleroma.ChatMessageReference
+ alias Pleroma.Chat.MessageReference
alias Pleroma.Notification
alias Pleroma.Object
alias Pleroma.Repo
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Pipeline
alias Pleroma.Web.ActivityPub.Utils
- alias Pleroma.Web.Streamer
alias Pleroma.Web.Push
+ alias Pleroma.Web.Streamer
def handle(object, meta \\ [])
# - Rollback if we couldn't create it
# - Set up notifications
def handle(%{data: %{"type" => "Create"}} = activity, meta) do
- with {:ok, _object, _meta} <- handle_object_creation(meta[:object_data], meta) do
+ with {:ok, _object, meta} <- handle_object_creation(meta[:object_data], meta) do
{:ok, notifications} = Notification.create_notifications(activity, do_send: false)
meta =
Object.decrease_replies_count(in_reply_to)
end
- ChatMessageReference.delete_for_object(deleted_object)
+ MessageReference.delete_for_object(deleted_object)
ActivityPub.stream_out(object)
ActivityPub.stream_out_participations(deleted_object, user)
actor = User.get_cached_by_ap_id(object.data["actor"])
recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
- [[actor, recipient], [recipient, actor]]
- |> Enum.each(fn [user, other_user] ->
- if user.local do
- {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
- {:ok, cm_ref} = ChatMessageReference.create(chat, object, user.ap_id != actor.ap_id)
-
- # We add a cache of the unread value here so that it
- # doesn't change when being streamed out
- chat =
- chat
- |> Map.put(:unread, ChatMessageReference.unread_count_for_chat(chat))
-
- Streamer.stream(
- ["user", "user:pleroma_chat"],
- {user, %{cm_ref | chat: chat, object: object}}
- )
- end
- end)
+ streamables =
+ [[actor, recipient], [recipient, actor]]
+ |> Enum.map(fn [user, other_user] ->
+ if user.local do
+ {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
+ {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
+
+ {
+ ["user", "user:pleroma_chat"],
+ {user, %{cm_ref | chat: chat, object: object}}
+ }
+ end
+ end)
+ |> Enum.filter(& &1)
+
+ meta =
+ meta
+ |> add_streamables(streamables)
{:ok, object, meta}
end
def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
defp send_notifications(meta) do
- Keyword.get(meta, :created_notifications, [])
+ Keyword.get(meta, :notifications, [])
|> Enum.each(fn notification ->
Streamer.stream(["user", "user:notification"], notification)
Push.send(notification)
meta
end
+ defp send_streamables(meta) do
+ Keyword.get(meta, :streamables, [])
+ |> Enum.each(fn {topics, items} ->
+ Streamer.stream(topics, items)
+ end)
+
+ meta
+ end
+
+ defp add_streamables(meta, streamables) do
+ existing = Keyword.get(meta, :streamables, [])
+
+ meta
+ |> Keyword.put(:streamables, streamables ++ existing)
+ end
+
defp add_notifications(meta, notifications) do
- existing = Keyword.get(meta, :created_notifications, [])
+ existing = Keyword.get(meta, :notifications, [])
meta
- |> Keyword.put(:created_notifications, notifications ++ existing)
+ |> Keyword.put(:notifications, notifications ++ existing)
end
def handle_after_transaction(meta) do
meta
|> send_notifications()
+ |> send_streamables()
end
end