X-Git-Url: https://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fweb%2Fstreamer%2Fstreamer.ex;h=73ee3e1e11d79883ac55d83150ae2f6a5e1a4a5a;hb=4c044f62258b33a95b8281e1eb42a5e0ce47d42d;hp=2201cbfef87690eaab47557021a47c247ad7bede;hpb=904295d53b82fcfd7dfefd4c970815b36f06faf0;p=akkoma diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex index 2201cbfef..73ee3e1e1 100644 --- a/lib/pleroma/web/streamer/streamer.ex +++ b/lib/pleroma/web/streamer/streamer.ex @@ -6,11 +6,11 @@ defmodule Pleroma.Web.Streamer do require Logger alias Pleroma.Activity + alias Pleroma.Chat.MessageReference alias Pleroma.Config alias Pleroma.Conversation.Participation alias Pleroma.Notification alias Pleroma.Object - alias Pleroma.Repo alias Pleroma.User alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.Visibility @@ -90,29 +90,15 @@ defmodule Pleroma.Web.Streamer do if should_env_send?(), do: Registry.unregister(@registry, topic) end - def stream(topics, item) when is_list(topics) do + def stream(topics, items) do if should_env_send?() do - Enum.each(topics, fn t -> - spawn(fn -> do_stream(t, item) end) - end) - end - - :ok - end - - def stream(topic, items) when is_list(items) do - if should_env_send?() do - Enum.each(items, fn i -> - spawn(fn -> do_stream(topic, i) end) + List.wrap(topics) + |> Enum.each(fn topic -> + List.wrap(items) + |> Enum.each(fn item -> + spawn(fn -> do_stream(topic, item) end) + end) end) - - :ok - end - end - - def stream(topic, item) do - if should_env_send?() do - spawn(fn -> do_stream(topic, item) end) end :ok @@ -130,6 +116,7 @@ defmodule Pleroma.Web.Streamer do true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)), true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids, + true <- !(item.data["type"] == "Announce" && parent.data["actor"] == user.ap_id), true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)), true <- MapSet.disjoint?(recipients, recipient_blocks), %{host: item_host} <- URI.parse(item.actor), @@ -201,22 +188,15 @@ defmodule Pleroma.Web.Streamer do end) end - defp do_stream(topic, %{data: %{"type" => "ChatMessage"}} = object) + defp do_stream(topic, {user, %MessageReference{} = cm_ref}) when topic in ["user", "user:pleroma_chat"] do - recipients = [object.data["actor"] | object.data["to"]] - - topics = - %{ap_id: recipients, local: true} - |> Pleroma.User.Query.build() - |> Repo.all() - |> Enum.map(fn %{id: id} = user -> {user, "#{topic}:#{id}"} end) - - Enum.each(topics, fn {user, topic} -> - Registry.dispatch(@registry, topic, fn list -> - Enum.each(list, fn {pid, _auth} -> - text = StreamerView.render("chat_update.json", object, user, recipients) - send(pid, {:text, text}) - end) + topic = "#{topic}:#{user.id}" + + text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) + + Registry.dispatch(@registry, topic, fn list -> + Enum.each(list, fn {pid, _auth} -> + send(pid, {:text, text}) end) end) end