Merge branch 'migrate-legacy-tags' into 'develop'
[akkoma] / lib / pleroma / web / streamer / streamer.ex
index 5e37e2cf2d80c2299cff77a75f2bcb1dd2e43b87..d1d70e5561613a002a7003bdbd2451b2467660f3 100644 (file)
@@ -6,7 +6,7 @@ defmodule Pleroma.Web.Streamer do
   require Logger
 
   alias Pleroma.Activity
-  alias Pleroma.ChatMessageReference
+  alias Pleroma.Chat.MessageReference
   alias Pleroma.Config
   alias Pleroma.Conversation.Participation
   alias Pleroma.Notification
@@ -90,35 +90,23 @@ defmodule Pleroma.Web.Streamer do
     if should_env_send?(), do: Registry.unregister(@registry, topic)
   end
 
-  def stream(topics, item) when is_list(topics) do
+  def stream(topics, items) do
     if should_env_send?() do
-      Enum.each(topics, fn t ->
-        spawn(fn -> do_stream(t, item) end)
+      List.wrap(topics)
+      |> Enum.each(fn topic ->
+        List.wrap(items)
+        |> Enum.each(fn item ->
+          spawn(fn -> do_stream(topic, item) end)
+        end)
       end)
     end
 
     :ok
   end
 
-  def stream(topic, items) when is_list(items) do
-    if should_env_send?() do
-      Enum.each(items, fn i ->
-        spawn(fn -> do_stream(topic, i) end)
-      end)
-
-      :ok
-    end
-  end
-
-  def stream(topic, item) do
-    if should_env_send?() do
-      spawn(fn -> do_stream(topic, item) end)
-    end
-
-    :ok
-  end
+  def filtered_by_user?(user, item, streamed_type \\ :activity)
 
-  def filtered_by_user?(%User{} = user, %Activity{} = item) do
+  def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
     %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
       User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
 
@@ -130,6 +118,9 @@ defmodule Pleroma.Web.Streamer do
          true <-
            Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
          true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
+         true <-
+           !(streamed_type == :activity && item.data["type"] == "Announce" &&
+               parent.data["actor"] == user.ap_id),
          true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
          true <- MapSet.disjoint?(recipients, recipient_blocks),
          %{host: item_host} <- URI.parse(item.actor),
@@ -144,8 +135,8 @@ defmodule Pleroma.Web.Streamer do
     end
   end
 
-  def filtered_by_user?(%User{} = user, %Notification{activity: activity}) do
-    filtered_by_user?(user, activity)
+  def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
+    filtered_by_user?(user, activity, :notification)
   end
 
   defp do_stream("direct", item) do
@@ -201,7 +192,7 @@ defmodule Pleroma.Web.Streamer do
     end)
   end
 
-  defp do_stream(topic, {user, %ChatMessageReference{} = cm_ref})
+  defp do_stream(topic, {user, %MessageReference{} = cm_ref})
        when topic in ["user", "user:pleroma_chat"] do
     topic = "#{topic}:#{user.id}"