Merge branch 'streamer-worker-registry' into 'develop'
authorlain <lain@soykaf.club>
Thu, 7 May 2020 09:13:32 +0000 (09:13 +0000)
committerlain <lain@soykaf.club>
Thu, 7 May 2020 09:13:32 +0000 (09:13 +0000)
Streamer rework

See merge request pleroma/pleroma!2426

17 files changed:
lib/pleroma/application.ex
lib/pleroma/web/activity_pub/activity_pub.ex
lib/pleroma/web/mastodon_api/websocket_handler.ex
lib/pleroma/web/streamer/ping.ex [deleted file]
lib/pleroma/web/streamer/state.ex [deleted file]
lib/pleroma/web/streamer/streamer.ex
lib/pleroma/web/streamer/streamer_socket.ex [deleted file]
lib/pleroma/web/streamer/supervisor.ex [deleted file]
lib/pleroma/web/streamer/worker.ex [deleted file]
test/integration/mastodon_websocket_test.exs
test/notification_test.exs
test/support/builders/activity_builder.ex
test/support/conn_case.ex
test/support/data_case.ex
test/web/streamer/ping_test.exs [deleted file]
test/web/streamer/state_test.exs [deleted file]
test/web/streamer/streamer_test.exs

index 308d8cffa661ad3cf1ab8cf3dfb3845fcda1a8d4..a00bc06247e3125cc5157320b10361c616d1ce4a 100644 (file)
@@ -173,7 +173,14 @@ defmodule Pleroma.Application do
   defp streamer_child(env) when env in [:test, :benchmark], do: []
 
   defp streamer_child(_) do
-    [Pleroma.Web.Streamer.supervisor()]
+    [
+      {Registry,
+       [
+         name: Pleroma.Web.Streamer.registry(),
+         keys: :duplicate,
+         partitions: System.schedulers_online()
+       ]}
+    ]
   end
 
   defp chat_child(_env, true) do
index 099df58792562fc3766868760249fc8a762e087b..8baaf97ac822348975ae8ec7fb2a36b880cf93ee 100644 (file)
@@ -170,12 +170,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
 
       BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
 
-      Notification.create_notifications(activity)
-
-      conversation = create_or_bump_conversation(activity, map["actor"])
-      participations = get_participations(conversation)
-      stream_out(activity)
-      stream_out_participations(participations)
       {:ok, activity}
     else
       %Activity{} = activity ->
@@ -198,6 +192,15 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     end
   end
 
+  def notify_and_stream(activity) do
+    Notification.create_notifications(activity)
+
+    conversation = create_or_bump_conversation(activity, activity.actor)
+    participations = get_participations(conversation)
+    stream_out(activity)
+    stream_out_participations(participations)
+  end
+
   defp create_or_bump_conversation(activity, actor) do
     with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
          %User{} = user <- User.get_cached_by_ap_id(actor),
@@ -274,6 +277,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
          _ <- increase_poll_votes_if_vote(create_data),
          {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
          {:ok, _actor} <- increase_note_count_if_public(actor, activity),
+         _ <- notify_and_stream(activity),
          :ok <- maybe_federate(activity) do
       {:ok, activity}
     else
@@ -301,6 +305,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
              additional
            ),
          {:ok, activity} <- insert(listen_data, local),
+         _ <- notify_and_stream(activity),
          :ok <- maybe_federate(activity) do
       {:ok, activity}
     end
@@ -325,6 +330,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
            %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
            |> Utils.maybe_put("id", activity_id),
          {:ok, activity} <- insert(data, local),
+         _ <- notify_and_stream(activity),
          :ok <- maybe_federate(activity) do
       {:ok, activity}
     end
@@ -344,6 +350,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
          },
          data <- Utils.maybe_put(data, "id", activity_id),
          {:ok, activity} <- insert(data, local),
+         _ <- notify_and_stream(activity),
          :ok <- maybe_federate(activity) do
       {:ok, activity}
     end
@@ -365,6 +372,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
          reaction_data <- make_emoji_reaction_data(user, object, emoji, activity_id),
          {:ok, activity} <- insert(reaction_data, local),
          {:ok, object} <- add_emoji_reaction_to_object(activity, object),
+         _ <- notify_and_stream(activity),
          :ok <- maybe_federate(activity) do
       {:ok, activity, object}
     else
@@ -391,6 +399,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
          unreact_data <- make_undo_data(user, reaction_activity, activity_id),
          {:ok, activity} <- insert(unreact_data, local),
          {:ok, object} <- remove_emoji_reaction_from_object(reaction_activity, object),
+         _ <- notify_and_stream(activity),
          :ok <- maybe_federate(activity) do
       {:ok, activity, object}
     else
@@ -413,6 +422,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
          {:ok, unlike_activity} <- insert(unlike_data, local),
          {:ok, _activity} <- Repo.delete(like_activity),
          {:ok, object} <- remove_like_from_object(like_activity, object),
+         _ <- notify_and_stream(unlike_activity),
          :ok <- maybe_federate(unlike_activity) do
       {:ok, unlike_activity, like_activity, object}
     else
@@ -442,6 +452,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
          announce_data <- make_announce_data(user, object, activity_id, public),
          {:ok, activity} <- insert(announce_data, local),
          {:ok, object} <- add_announce_to_object(activity, object),
+         _ <- notify_and_stream(activity),
          :ok <- maybe_federate(activity) do
       {:ok, activity, object}
     else
@@ -468,6 +479,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
          unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
          {:ok, unannounce_activity} <- insert(unannounce_data, local),
+         _ <- notify_and_stream(unannounce_activity),
          :ok <- maybe_federate(unannounce_activity),
          {:ok, _activity} <- Repo.delete(announce_activity),
          {:ok, object} <- remove_announce_from_object(announce_activity, object) do
@@ -490,6 +502,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
   defp do_follow(follower, followed, activity_id, local) do
     with data <- make_follow_data(follower, followed, activity_id),
          {:ok, activity} <- insert(data, local),
+         _ <- notify_and_stream(activity),
          :ok <- maybe_federate(activity) do
       {:ok, activity}
     else
@@ -511,6 +524,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
          {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
          unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
          {:ok, activity} <- insert(unfollow_data, local),
+         _ <- notify_and_stream(activity),
          :ok <- maybe_federate(activity) do
       {:ok, activity}
     else
@@ -540,6 +554,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     with true <- outgoing_blocks,
          block_data <- make_block_data(blocker, blocked, activity_id),
          {:ok, activity} <- insert(block_data, local),
+         _ <- notify_and_stream(activity),
          :ok <- maybe_federate(activity) do
       {:ok, activity}
     else
@@ -560,6 +575,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
          unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
          {:ok, activity} <- insert(unblock_data, local),
+         _ <- notify_and_stream(activity),
          :ok <- maybe_federate(activity) do
       {:ok, activity}
     else
@@ -594,6 +610,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     with flag_data <- make_flag_data(params, additional),
          {:ok, activity} <- insert(flag_data, local),
          {:ok, stripped_activity} <- strip_report_status_data(activity),
+         _ <- notify_and_stream(activity),
          :ok <- maybe_federate(stripped_activity) do
       User.all_superusers()
       |> Enum.filter(fn user -> not is_nil(user.email) end)
@@ -617,7 +634,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     }
 
     with true <- origin.ap_id in target.also_known_as,
-         {:ok, activity} <- insert(params, local) do
+         {:ok, activity} <- insert(params, local),
+         _ <- notify_and_stream(activity) do
       maybe_federate(activity)
 
       BackgroundWorker.enqueue("move_following", %{
index 5652a37c19f08c5c1431e79c46e66d63d58e9ebc..6ef3fe2dd75460655ba7ee27c7d9598a0c512ba7 100644 (file)
@@ -12,6 +12,11 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
 
   @behaviour :cowboy_websocket
 
+  # Cowboy timeout period.
+  @timeout :timer.seconds(30)
+  # Hibernate every X messages
+  @hibernate_every 100
+
   @streams [
     "public",
     "public:local",
@@ -25,9 +30,6 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
   ]
   @anonymous_streams ["public", "public:local", "hashtag"]
 
-  # Handled by periodic keepalive in Pleroma.Web.Streamer.Ping.
-  @timeout :infinity
-
   def init(%{qs: qs} = req, state) do
     with params <- :cow_qs.parse_qs(qs),
          sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil),
@@ -42,7 +44,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
           req
         end
 
-      {:cowboy_websocket, req, %{user: user, topic: topic}, %{idle_timeout: @timeout}}
+      {:cowboy_websocket, req, %{user: user, topic: topic, count: 0}, %{idle_timeout: @timeout}}
     else
       {:error, code} ->
         Logger.debug("#{__MODULE__} denied connection: #{inspect(code)} - #{inspect(req)}")
@@ -57,7 +59,13 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
   end
 
   def websocket_init(state) do
-    send(self(), :subscribe)
+    Logger.debug(
+      "#{__MODULE__} accepted websocket connection for user #{
+        (state.user || %{id: "anonymous"}).id
+      }, topic #{state.topic}"
+    )
+
+    Streamer.add_socket(state.topic, state.user)
     {:ok, state}
   end
 
@@ -66,19 +74,24 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
     {:ok, state}
   end
 
-  def websocket_info(:subscribe, state) do
-    Logger.debug(
-      "#{__MODULE__} accepted websocket connection for user #{
-        (state.user || %{id: "anonymous"}).id
-      }, topic #{state.topic}"
-    )
+  def websocket_info({:render_with_user, view, template, item}, state) do
+    user = %User{} = User.get_cached_by_ap_id(state.user.ap_id)
 
-    Streamer.add_socket(state.topic, streamer_socket(state))
-    {:ok, state}
+    unless Streamer.filtered_by_user?(user, item) do
+      websocket_info({:text, view.render(template, user, item)}, %{state | user: user})
+    else
+      {:ok, state}
+    end
   end
 
   def websocket_info({:text, message}, state) do
-    {:reply, {:text, message}, state}
+    # If the websocket processed X messages, force an hibernate/GC.
+    # We don't hibernate at every message to balance CPU usage/latency with RAM usage.
+    if state.count > @hibernate_every do
+      {:reply, {:text, message}, %{state | count: 0}, :hibernate}
+    else
+      {:reply, {:text, message}, %{state | count: state.count + 1}}
+    end
   end
 
   def terminate(reason, _req, state) do
@@ -88,7 +101,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
       }, topic #{state.topic || "?"}: #{inspect(reason)}"
     )
 
-    Streamer.remove_socket(state.topic, streamer_socket(state))
+    Streamer.remove_socket(state.topic)
     :ok
   end
 
@@ -136,8 +149,4 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
   end
 
   defp expand_topic(topic, _), do: topic
-
-  defp streamer_socket(state) do
-    %{transport_pid: self(), assigns: state}
-  end
 end
diff --git a/lib/pleroma/web/streamer/ping.ex b/lib/pleroma/web/streamer/ping.ex
deleted file mode 100644 (file)
index 7a08202..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Streamer.Ping do
-  use GenServer
-  require Logger
-
-  alias Pleroma.Web.Streamer.State
-  alias Pleroma.Web.Streamer.StreamerSocket
-
-  @keepalive_interval :timer.seconds(30)
-
-  def start_link(opts) do
-    ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval)
-    GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__)
-  end
-
-  def init(%{ping_interval: ping_interval} = args) do
-    Process.send_after(self(), :ping, ping_interval)
-    {:ok, args}
-  end
-
-  def handle_info(:ping, %{ping_interval: ping_interval} = state) do
-    State.get_sockets()
-    |> Map.values()
-    |> List.flatten()
-    |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} ->
-      Logger.debug("Sending keepalive ping")
-      send(transport_pid, {:text, ""})
-    end)
-
-    Process.send_after(self(), :ping, ping_interval)
-
-    {:noreply, state}
-  end
-end
diff --git a/lib/pleroma/web/streamer/state.ex b/lib/pleroma/web/streamer/state.ex
deleted file mode 100644 (file)
index 999550b..0000000
+++ /dev/null
@@ -1,82 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Streamer.State do
-  use GenServer
-  require Logger
-
-  alias Pleroma.Web.Streamer.StreamerSocket
-
-  @env Mix.env()
-
-  def start_link(_) do
-    GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__)
-  end
-
-  def add_socket(topic, socket) do
-    GenServer.call(__MODULE__, {:add, topic, socket})
-  end
-
-  def remove_socket(topic, socket) do
-    do_remove_socket(@env, topic, socket)
-  end
-
-  def get_sockets do
-    %{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state)
-    stream_sockets
-  end
-
-  def init(init_arg) do
-    {:ok, init_arg}
-  end
-
-  def handle_call(:get_state, _from, state) do
-    {:reply, state, state}
-  end
-
-  def handle_call({:add, topic, socket}, _from, %{sockets: sockets} = state) do
-    internal_topic = internal_topic(topic, socket)
-    stream_socket = StreamerSocket.from_socket(socket)
-
-    sockets_for_topic =
-      sockets
-      |> Map.get(internal_topic, [])
-      |> List.insert_at(0, stream_socket)
-      |> Enum.uniq()
-
-    state = put_in(state, [:sockets, internal_topic], sockets_for_topic)
-    Logger.debug("Got new conn for #{topic}")
-    {:reply, state, state}
-  end
-
-  def handle_call({:remove, topic, socket}, _from, %{sockets: sockets} = state) do
-    internal_topic = internal_topic(topic, socket)
-    stream_socket = StreamerSocket.from_socket(socket)
-
-    sockets_for_topic =
-      sockets
-      |> Map.get(internal_topic, [])
-      |> List.delete(stream_socket)
-
-    state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic)
-    {:reply, state, state}
-  end
-
-  defp do_remove_socket(:test, _, _) do
-    :ok
-  end
-
-  defp do_remove_socket(_env, topic, socket) do
-    GenServer.call(__MODULE__, {:remove, topic, socket})
-  end
-
-  defp internal_topic(topic, socket)
-       when topic in ~w[user user:notification direct] do
-    "#{topic}:#{socket.assigns[:user].id}"
-  end
-
-  defp internal_topic(topic, _) do
-    topic
-  end
-end
index 814d5a7292519e881b4d81cc28139e02211fce8b..5ad4aa9367a8c588f6b3c1f4440f48375967b4c6 100644 (file)
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Web.Streamer do
-  alias Pleroma.Web.Streamer.State
-  alias Pleroma.Web.Streamer.Worker
+  require Logger
+
+  alias Pleroma.Activity
+  alias Pleroma.Config
+  alias Pleroma.Conversation.Participation
+  alias Pleroma.Notification
+  alias Pleroma.Object
+  alias Pleroma.User
+  alias Pleroma.Web.ActivityPub.ActivityPub
+  alias Pleroma.Web.ActivityPub.Visibility
+  alias Pleroma.Web.CommonAPI
+  alias Pleroma.Web.StreamerView
 
-  @timeout 60_000
   @mix_env Mix.env()
+  @registry Pleroma.Web.StreamerRegistry
+
+  def registry, do: @registry
 
-  def add_socket(topic, socket) do
-    State.add_socket(topic, socket)
+  def add_socket(topic, %User{} = user) do
+    if should_env_send?(), do: Registry.register(@registry, user_topic(topic, user), true)
   end
 
-  def remove_socket(topic, socket) do
-    State.remove_socket(topic, socket)
+  def add_socket(topic, _) do
+    if should_env_send?(), do: Registry.register(@registry, topic, false)
   end
 
-  def get_sockets do
-    State.get_sockets()
+  def remove_socket(topic) do
+    if should_env_send?(), do: Registry.unregister(@registry, topic)
   end
 
-  def stream(topics, items) do
-    if should_send?() do
-      Task.async(fn ->
-        :poolboy.transaction(
-          :streamer_worker,
-          &Worker.stream(&1, topics, items),
-          @timeout
-        )
+  def stream(topics, item) when is_list(topics) do
+    if should_env_send?() do
+      Enum.each(topics, fn t ->
+        spawn(fn -> do_stream(t, item) end)
       end)
     end
+
+    :ok
   end
 
-  def supervisor, do: Pleroma.Web.Streamer.Supervisor
+  def stream(topic, items) when is_list(items) do
+    if should_env_send?() do
+      Enum.each(items, fn i ->
+        spawn(fn -> do_stream(topic, i) end)
+      end)
 
-  defp should_send? do
-    handle_should_send(@mix_env)
+      :ok
+    end
   end
 
-  defp handle_should_send(:test) do
-    case Process.whereis(:streamer_worker) do
-      nil ->
-        false
+  def stream(topic, item) do
+    if should_env_send?() do
+      spawn(fn -> do_stream(topic, item) end)
+    end
+
+    :ok
+  end
 
-      pid ->
-        Process.alive?(pid)
+  def filtered_by_user?(%User{} = user, %Activity{} = item) do
+    %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
+      User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
+
+    recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
+    recipients = MapSet.new(item.recipients)
+    domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
+
+    with parent <- Object.normalize(item) || item,
+         true <-
+           Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
+         true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
+         true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
+         true <- MapSet.disjoint?(recipients, recipient_blocks),
+         %{host: item_host} <- URI.parse(item.actor),
+         %{host: parent_host} <- URI.parse(parent.data["actor"]),
+         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
+         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
+         true <- thread_containment(item, user),
+         false <- CommonAPI.thread_muted?(user, item) do
+      false
+    else
+      _ -> true
     end
   end
 
-  defp handle_should_send(:benchmark), do: false
+  def filtered_by_user?(%User{} = user, %Notification{activity: activity}) do
+    filtered_by_user?(user, activity)
+  end
+
+  defp do_stream("direct", item) do
+    recipient_topics =
+      User.get_recipients_from_activity(item)
+      |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
+
+    Enum.each(recipient_topics, fn user_topic ->
+      Logger.debug("Trying to push direct message to #{user_topic}\n\n")
+      push_to_socket(user_topic, item)
+    end)
+  end
+
+  defp do_stream("participation", participation) do
+    user_topic = "direct:#{participation.user_id}"
+    Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
 
-  defp handle_should_send(_), do: true
+    push_to_socket(user_topic, participation)
+  end
+
+  defp do_stream("list", item) do
+    # filter the recipient list if the activity is not public, see #270.
+    recipient_lists =
+      case Visibility.is_public?(item) do
+        true ->
+          Pleroma.List.get_lists_from_activity(item)
+
+        _ ->
+          Pleroma.List.get_lists_from_activity(item)
+          |> Enum.filter(fn list ->
+            owner = User.get_cached_by_id(list.user_id)
+
+            Visibility.visible_for_user?(item, owner)
+          end)
+      end
+
+    recipient_topics =
+      recipient_lists
+      |> Enum.map(fn %{id: id} -> "list:#{id}" end)
+
+    Enum.each(recipient_topics, fn list_topic ->
+      Logger.debug("Trying to push message to #{list_topic}\n\n")
+      push_to_socket(list_topic, item)
+    end)
+  end
+
+  defp do_stream(topic, %Notification{} = item)
+       when topic in ["user", "user:notification"] do
+    Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
+      Enum.each(list, fn {pid, _auth} ->
+        send(pid, {:render_with_user, StreamerView, "notification.json", item})
+      end)
+    end)
+  end
+
+  defp do_stream("user", item) do
+    Logger.debug("Trying to push to users")
+
+    recipient_topics =
+      User.get_recipients_from_activity(item)
+      |> Enum.map(fn %{id: id} -> "user:#{id}" end)
+
+    Enum.each(recipient_topics, fn topic ->
+      push_to_socket(topic, item)
+    end)
+  end
+
+  defp do_stream(topic, item) do
+    Logger.debug("Trying to push to #{topic}")
+    Logger.debug("Pushing item to #{topic}")
+    push_to_socket(topic, item)
+  end
+
+  defp push_to_socket(topic, %Participation{} = participation) do
+    rendered = StreamerView.render("conversation.json", participation)
+
+    Registry.dispatch(@registry, topic, fn list ->
+      Enum.each(list, fn {pid, _} ->
+        send(pid, {:text, rendered})
+      end)
+    end)
+  end
+
+  defp push_to_socket(topic, %Activity{
+         data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
+       }) do
+    rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
+
+    Registry.dispatch(@registry, topic, fn list ->
+      Enum.each(list, fn {pid, _} ->
+        send(pid, {:text, rendered})
+      end)
+    end)
+  end
+
+  defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
+
+  defp push_to_socket(topic, item) do
+    anon_render = StreamerView.render("update.json", item)
+
+    Registry.dispatch(@registry, topic, fn list ->
+      Enum.each(list, fn {pid, auth?} ->
+        if auth? do
+          send(pid, {:render_with_user, StreamerView, "update.json", item})
+        else
+          send(pid, {:text, anon_render})
+        end
+      end)
+    end)
+  end
+
+  defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
+
+  defp thread_containment(activity, user) do
+    if Config.get([:instance, :skip_thread_containment]) do
+      true
+    else
+      ActivityPub.contain_activity(activity, user)
+    end
+  end
+
+  # In test environement, only return true if the registry is started.
+  # In benchmark environment, returns false.
+  # In any other environment, always returns true.
+  cond do
+    @mix_env == :test ->
+      def should_env_send? do
+        case Process.whereis(@registry) do
+          nil ->
+            false
+
+          pid ->
+            Process.alive?(pid)
+        end
+      end
+
+    @mix_env == :benchmark ->
+      def should_env_send?, do: false
+
+    true ->
+      def should_env_send?, do: true
+  end
+
+  defp user_topic(topic, user)
+       when topic in ~w[user user:notification direct] do
+    "#{topic}:#{user.id}"
+  end
+
+  defp user_topic(topic, _) do
+    topic
+  end
 end
diff --git a/lib/pleroma/web/streamer/streamer_socket.ex b/lib/pleroma/web/streamer/streamer_socket.ex
deleted file mode 100644 (file)
index 7d5dcd3..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Streamer.StreamerSocket do
-  defstruct transport_pid: nil, user: nil
-
-  alias Pleroma.User
-  alias Pleroma.Web.Streamer.StreamerSocket
-
-  def from_socket(%{
-        transport_pid: transport_pid,
-        assigns: %{user: nil}
-      }) do
-    %StreamerSocket{
-      transport_pid: transport_pid
-    }
-  end
-
-  def from_socket(%{
-        transport_pid: transport_pid,
-        assigns: %{user: %User{} = user}
-      }) do
-    %StreamerSocket{
-      transport_pid: transport_pid,
-      user: user
-    }
-  end
-
-  def from_socket(%{transport_pid: transport_pid}) do
-    %StreamerSocket{
-      transport_pid: transport_pid
-    }
-  end
-end
diff --git a/lib/pleroma/web/streamer/supervisor.ex b/lib/pleroma/web/streamer/supervisor.ex
deleted file mode 100644 (file)
index bd9029b..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Streamer.Supervisor do
-  use Supervisor
-
-  def start_link(opts) do
-    Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
-  end
-
-  def init(args) do
-    children = [
-      {Pleroma.Web.Streamer.State, args},
-      {Pleroma.Web.Streamer.Ping, args},
-      :poolboy.child_spec(:streamer_worker, poolboy_config())
-    ]
-
-    opts = [strategy: :one_for_one, name: Pleroma.Web.Streamer.Supervisor]
-    Supervisor.init(children, opts)
-  end
-
-  defp poolboy_config do
-    opts =
-      Pleroma.Config.get(:streamer,
-        workers: 3,
-        overflow_workers: 2
-      )
-
-    [
-      {:name, {:local, :streamer_worker}},
-      {:worker_module, Pleroma.Web.Streamer.Worker},
-      {:size, opts[:workers]},
-      {:max_overflow, opts[:overflow_workers]}
-    ]
-  end
-end
diff --git a/lib/pleroma/web/streamer/worker.ex b/lib/pleroma/web/streamer/worker.ex
deleted file mode 100644 (file)
index f6160fa..0000000
+++ /dev/null
@@ -1,208 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Streamer.Worker do
-  use GenServer
-
-  require Logger
-
-  alias Pleroma.Activity
-  alias Pleroma.Config
-  alias Pleroma.Conversation.Participation
-  alias Pleroma.Notification
-  alias Pleroma.Object
-  alias Pleroma.User
-  alias Pleroma.Web.ActivityPub.ActivityPub
-  alias Pleroma.Web.ActivityPub.Visibility
-  alias Pleroma.Web.CommonAPI
-  alias Pleroma.Web.Streamer.State
-  alias Pleroma.Web.Streamer.StreamerSocket
-  alias Pleroma.Web.StreamerView
-
-  def start_link(_) do
-    GenServer.start_link(__MODULE__, %{}, [])
-  end
-
-  def init(init_arg) do
-    {:ok, init_arg}
-  end
-
-  def stream(pid, topics, items) do
-    GenServer.call(pid, {:stream, topics, items})
-  end
-
-  def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
-    Enum.each(topics, fn t ->
-      do_stream(%{topic: t, item: item})
-    end)
-
-    {:reply, state, state}
-  end
-
-  def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
-    Enum.each(items, fn i ->
-      do_stream(%{topic: topic, item: i})
-    end)
-
-    {:reply, state, state}
-  end
-
-  def handle_call({:stream, topic, item}, _from, state) do
-    do_stream(%{topic: topic, item: item})
-
-    {:reply, state, state}
-  end
-
-  defp do_stream(%{topic: "direct", item: item}) do
-    recipient_topics =
-      User.get_recipients_from_activity(item)
-      |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
-
-    Enum.each(recipient_topics, fn user_topic ->
-      Logger.debug("Trying to push direct message to #{user_topic}\n\n")
-      push_to_socket(State.get_sockets(), user_topic, item)
-    end)
-  end
-
-  defp do_stream(%{topic: "participation", item: participation}) do
-    user_topic = "direct:#{participation.user_id}"
-    Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
-
-    push_to_socket(State.get_sockets(), user_topic, participation)
-  end
-
-  defp do_stream(%{topic: "list", item: item}) do
-    # filter the recipient list if the activity is not public, see #270.
-    recipient_lists =
-      case Visibility.is_public?(item) do
-        true ->
-          Pleroma.List.get_lists_from_activity(item)
-
-        _ ->
-          Pleroma.List.get_lists_from_activity(item)
-          |> Enum.filter(fn list ->
-            owner = User.get_cached_by_id(list.user_id)
-
-            Visibility.visible_for_user?(item, owner)
-          end)
-      end
-
-    recipient_topics =
-      recipient_lists
-      |> Enum.map(fn %{id: id} -> "list:#{id}" end)
-
-    Enum.each(recipient_topics, fn list_topic ->
-      Logger.debug("Trying to push message to #{list_topic}\n\n")
-      push_to_socket(State.get_sockets(), list_topic, item)
-    end)
-  end
-
-  defp do_stream(%{topic: topic, item: %Notification{} = item})
-       when topic in ["user", "user:notification"] do
-    State.get_sockets()
-    |> Map.get("#{topic}:#{item.user_id}", [])
-    |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
-      with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
-           true <- should_send?(user, item) do
-        send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
-      end
-    end)
-  end
-
-  defp do_stream(%{topic: "user", item: item}) do
-    Logger.debug("Trying to push to users")
-
-    recipient_topics =
-      User.get_recipients_from_activity(item)
-      |> Enum.map(fn %{id: id} -> "user:#{id}" end)
-
-    Enum.each(recipient_topics, fn topic ->
-      push_to_socket(State.get_sockets(), topic, item)
-    end)
-  end
-
-  defp do_stream(%{topic: topic, item: item}) do
-    Logger.debug("Trying to push to #{topic}")
-    Logger.debug("Pushing item to #{topic}")
-    push_to_socket(State.get_sockets(), topic, item)
-  end
-
-  defp should_send?(%User{} = user, %Activity{} = item) do
-    %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
-      User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
-
-    recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
-    recipients = MapSet.new(item.recipients)
-    domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
-
-    with parent <- Object.normalize(item) || item,
-         true <-
-           Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
-         true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
-         true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
-         true <- MapSet.disjoint?(recipients, recipient_blocks),
-         %{host: item_host} <- URI.parse(item.actor),
-         %{host: parent_host} <- URI.parse(parent.data["actor"]),
-         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
-         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
-         true <- thread_containment(item, user),
-         false <- CommonAPI.thread_muted?(user, item) do
-      true
-    else
-      _ -> false
-    end
-  end
-
-  defp should_send?(%User{} = user, %Notification{activity: activity}) do
-    should_send?(user, activity)
-  end
-
-  def push_to_socket(topics, topic, %Participation{} = participation) do
-    Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
-      send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
-    end)
-  end
-
-  def push_to_socket(topics, topic, %Activity{
-        data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
-      }) do
-    Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
-      send(
-        transport_pid,
-        {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
-      )
-    end)
-  end
-
-  def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
-
-  def push_to_socket(topics, topic, item) do
-    Enum.each(topics[topic] || [], fn %StreamerSocket{
-                                        transport_pid: transport_pid,
-                                        user: socket_user
-                                      } ->
-      # Get the current user so we have up-to-date blocks etc.
-      if socket_user do
-        user = User.get_cached_by_ap_id(socket_user.ap_id)
-
-        if should_send?(user, item) do
-          send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
-        end
-      else
-        send(transport_pid, {:text, StreamerView.render("update.json", item)})
-      end
-    end)
-  end
-
-  @spec thread_containment(Activity.t(), User.t()) :: boolean()
-  defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
-
-  defp thread_containment(activity, user) do
-    if Config.get([:instance, :skip_thread_containment]) do
-      true
-    else
-      ActivityPub.contain_activity(activity, user)
-    end
-  end
-end
index bd229c55ff0e0448133a9a3d61ce82878c525512..109c7b4cb6dbda0714e8784f3b6275909279ab60 100644 (file)
@@ -12,17 +12,14 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
   alias Pleroma.Web.CommonAPI
   alias Pleroma.Web.OAuth
 
+  @moduletag needs_streamer: true, capture_log: true
+
   @path Pleroma.Web.Endpoint.url()
         |> URI.parse()
         |> Map.put(:scheme, "ws")
         |> Map.put(:path, "/api/v1/streaming")
         |> URI.to_string()
 
-  setup_all do
-    start_supervised(Pleroma.Web.Streamer.supervisor())
-    :ok
-  end
-
   def start_socket(qs \\ nil, headers \\ []) do
     path =
       case qs do
index 601a6c0ca1f4b3d70b16b13b8d8d7b242491b661..5c85f3368ba05376ecdecb20ead42b2e6128864a 100644 (file)
@@ -162,14 +162,18 @@ defmodule Pleroma.NotificationTest do
     @tag needs_streamer: true
     test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do
       user = insert(:user)
-      task = Task.async(fn -> assert_receive {:text, _}, 4_000 end)
-      task_user_notification = Task.async(fn -> assert_receive {:text, _}, 4_000 end)
-      Streamer.add_socket("user", %{transport_pid: task.pid, assigns: %{user: user}})
 
-      Streamer.add_socket(
-        "user:notification",
-        %{transport_pid: task_user_notification.pid, assigns: %{user: user}}
-      )
+      task =
+        Task.async(fn ->
+          Streamer.add_socket("user", user)
+          assert_receive {:render_with_user, _, _, _}, 4_000
+        end)
+
+      task_user_notification =
+        Task.async(fn ->
+          Streamer.add_socket("user:notification", user)
+          assert_receive {:render_with_user, _, _, _}, 4_000
+        end)
 
       activity = insert(:note_activity)
 
index 6e5a8e0594aab4a41e2980837aceca7b6158899e..7c4950bfa08a6b9559f587daa3e7bc5a49b77b24 100644 (file)
@@ -21,7 +21,15 @@ defmodule Pleroma.Builders.ActivityBuilder do
 
   def insert(data \\ %{}, opts \\ %{}) do
     activity = build(data, opts)
-    ActivityPub.insert(activity)
+
+    case ActivityPub.insert(activity) do
+      ok = {:ok, activity} ->
+        ActivityPub.notify_and_stream(activity)
+        ok
+
+      error ->
+        error
+    end
   end
 
   def insert_list(times, data \\ %{}, opts \\ %{}) do
index 91c03b1a88843fd4d3919251b3bb4025a4846960..b23918dd1d981d6596dd255fd55d3584139615d2 100644 (file)
@@ -139,7 +139,11 @@ defmodule Pleroma.Web.ConnCase do
     end
 
     if tags[:needs_streamer] do
-      start_supervised(Pleroma.Web.Streamer.supervisor())
+      start_supervised(%{
+        id: Pleroma.Web.Streamer.registry(),
+        start:
+          {Registry, :start_link, [[keys: :duplicate, name: Pleroma.Web.Streamer.registry()]]}
+      })
     end
 
     {:ok, conn: Phoenix.ConnTest.build_conn()}
index 1669f252034713d8a8c4fbac016d7d331c65d528..ba8848952e11b0d7e7e7895f05b8099194a7177f 100644 (file)
@@ -40,7 +40,11 @@ defmodule Pleroma.DataCase do
     end
 
     if tags[:needs_streamer] do
-      start_supervised(Pleroma.Web.Streamer.supervisor())
+      start_supervised(%{
+        id: Pleroma.Web.Streamer.registry(),
+        start:
+          {Registry, :start_link, [[keys: :duplicate, name: Pleroma.Web.Streamer.registry()]]}
+      })
     end
 
     :ok
diff --git a/test/web/streamer/ping_test.exs b/test/web/streamer/ping_test.exs
deleted file mode 100644 (file)
index 5df6c1c..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.PingTest do
-  use Pleroma.DataCase
-
-  import Pleroma.Factory
-  alias Pleroma.Web.Streamer
-
-  setup do
-    start_supervised({Streamer.supervisor(), [ping_interval: 30]})
-
-    :ok
-  end
-
-  describe "sockets" do
-    setup do
-      user = insert(:user)
-      {:ok, %{user: user}}
-    end
-
-    test "it sends pings", %{user: user} do
-      task =
-        Task.async(fn ->
-          assert_receive {:text, received_event}, 40
-          assert_receive {:text, received_event}, 40
-          assert_receive {:text, received_event}, 40
-        end)
-
-      Streamer.add_socket("public", %{transport_pid: task.pid, assigns: %{user: user}})
-
-      Task.await(task)
-    end
-  end
-end
diff --git a/test/web/streamer/state_test.exs b/test/web/streamer/state_test.exs
deleted file mode 100644 (file)
index a755e75..0000000
+++ /dev/null
@@ -1,54 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.StateTest do
-  use Pleroma.DataCase
-
-  import Pleroma.Factory
-  alias Pleroma.Web.Streamer
-  alias Pleroma.Web.Streamer.StreamerSocket
-
-  @moduletag needs_streamer: true
-
-  describe "sockets" do
-    setup do
-      user = insert(:user)
-      user2 = insert(:user)
-      {:ok, %{user: user, user2: user2}}
-    end
-
-    test "it can add a socket", %{user: user} do
-      Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}})
-
-      assert(%{"public" => [%StreamerSocket{transport_pid: 1}]} = Streamer.get_sockets())
-    end
-
-    test "it can add multiple sockets per user", %{user: user} do
-      Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}})
-      Streamer.add_socket("public", %{transport_pid: 2, assigns: %{user: user}})
-
-      assert(
-        %{
-          "public" => [
-            %StreamerSocket{transport_pid: 2},
-            %StreamerSocket{transport_pid: 1}
-          ]
-        } = Streamer.get_sockets()
-      )
-    end
-
-    test "it will not add a duplicate socket", %{user: user} do
-      Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}})
-      Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}})
-
-      assert(
-        %{
-          "activity" => [
-            %StreamerSocket{transport_pid: 1}
-          ]
-        } = Streamer.get_sockets()
-      )
-    end
-  end
-end
index 3c0f240f54168daaa4e91e8567f0bdb00be04a13..ee530f4e94515db157efd0e5264b9b63e3abc3ca 100644 (file)
@@ -12,13 +12,9 @@ defmodule Pleroma.Web.StreamerTest do
   alias Pleroma.User
   alias Pleroma.Web.CommonAPI
   alias Pleroma.Web.Streamer
-  alias Pleroma.Web.Streamer.StreamerSocket
-  alias Pleroma.Web.Streamer.Worker
 
   @moduletag needs_streamer: true, capture_log: true
 
-  @streamer_timeout 150
-  @streamer_start_wait 10
   setup do: clear_config([:instance, :skip_thread_containment])
 
   describe "user streams" do
@@ -29,69 +25,35 @@ defmodule Pleroma.Web.StreamerTest do
     end
 
     test "it streams the user's post in the 'user' stream", %{user: user} do
-      task =
-        Task.async(fn ->
-          assert_receive {:text, _}, @streamer_timeout
-        end)
-
-      Streamer.add_socket(
-        "user",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
-
+      Streamer.add_socket("user", user)
       {:ok, activity} = CommonAPI.post(user, %{"status" => "hey"})
-
-      Streamer.stream("user", activity)
-      Task.await(task)
+      assert_receive {:render_with_user, _, _, ^activity}
+      refute Streamer.filtered_by_user?(user, activity)
     end
 
     test "it streams boosts of the user in the 'user' stream", %{user: user} do
-      task =
-        Task.async(fn ->
-          assert_receive {:text, _}, @streamer_timeout
-        end)
-
-      Streamer.add_socket(
-        "user",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      Streamer.add_socket("user", user)
 
       other_user = insert(:user)
       {:ok, activity} = CommonAPI.post(other_user, %{"status" => "hey"})
       {:ok, announce, _} = CommonAPI.repeat(activity.id, user)
 
-      Streamer.stream("user", announce)
-      Task.await(task)
+      assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce}
+      refute Streamer.filtered_by_user?(user, announce)
     end
 
     test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
-      task =
-        Task.async(fn ->
-          assert_receive {:text, _}, @streamer_timeout
-        end)
-
-      Streamer.add_socket(
-        "user",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
-
+      Streamer.add_socket("user", user)
       Streamer.stream("user", notify)
-      Task.await(task)
+      assert_receive {:render_with_user, _, _, ^notify}
+      refute Streamer.filtered_by_user?(user, notify)
     end
 
     test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do
-      task =
-        Task.async(fn ->
-          assert_receive {:text, _}, @streamer_timeout
-        end)
-
-      Streamer.add_socket(
-        "user:notification",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
-
+      Streamer.add_socket("user:notification", user)
       Streamer.stream("user:notification", notify)
-      Task.await(task)
+      assert_receive {:render_with_user, _, _, ^notify}
+      refute Streamer.filtered_by_user?(user, notify)
     end
 
     test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
@@ -100,18 +62,12 @@ defmodule Pleroma.Web.StreamerTest do
       blocked = insert(:user)
       {:ok, _user_relationship} = User.block(user, blocked)
 
-      task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
-
-      Streamer.add_socket(
-        "user:notification",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      Streamer.add_socket("user:notification", user)
 
       {:ok, activity} = CommonAPI.post(user, %{"status" => ":("})
-      {:ok, notif} = CommonAPI.favorite(blocked, activity.id)
+      {:ok, _} = CommonAPI.favorite(blocked, activity.id)
 
-      Streamer.stream("user:notification", notif)
-      Task.await(task)
+      refute_receive _
     end
 
     test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
@@ -119,45 +75,50 @@ defmodule Pleroma.Web.StreamerTest do
     } do
       user2 = insert(:user)
 
-      task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
+      {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
+      {:ok, _} = CommonAPI.add_mute(user, activity)
 
-      Streamer.add_socket(
-        "user:notification",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      Streamer.add_socket("user:notification", user)
 
-      {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
-      {:ok, activity} = CommonAPI.add_mute(user, activity)
-      {:ok, notif} = CommonAPI.favorite(user2, activity.id)
+      {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
 
-      Streamer.stream("user:notification", notif)
-      Task.await(task)
+      refute_receive _
+      assert Streamer.filtered_by_user?(user, favorite_activity)
     end
 
-    test "it doesn't send notify to the 'user:notification' stream' when a domain is blocked", %{
+    test "it sends favorite to 'user:notification' stream'", %{
       user: user
     } do
       user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
 
-      task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
+      {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
+      Streamer.add_socket("user:notification", user)
+      {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
+
+      assert_receive {:render_with_user, _, "notification.json", notif}
+      assert notif.activity.id == favorite_activity.id
+      refute Streamer.filtered_by_user?(user, notif)
+    end
 
-      Streamer.add_socket(
-        "user:notification",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+    test "it doesn't send the 'user:notification' stream' when a domain is blocked", %{
+      user: user
+    } do
+      user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
 
       {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
       {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
-      {:ok, notif} = CommonAPI.favorite(user2, activity.id)
+      Streamer.add_socket("user:notification", user)
+      {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
 
-      Streamer.stream("user:notification", notif)
-      Task.await(task)
+      refute_receive _
+      assert Streamer.filtered_by_user?(user, favorite_activity)
     end
 
     test "it sends follow activities to the 'user:notification' stream", %{
       user: user
     } do
       user_url = user.ap_id
+      user2 = insert(:user)
 
       body =
         File.read!("test/fixtures/users_mock/localhost.json")
@@ -169,47 +130,24 @@ defmodule Pleroma.Web.StreamerTest do
           %Tesla.Env{status: 200, body: body}
       end)
 
-      user2 = insert(:user)
-      task = Task.async(fn -> assert_receive {:text, _}, @streamer_timeout end)
-
-      Process.sleep(@streamer_start_wait)
-
-      Streamer.add_socket(
-        "user:notification",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      Streamer.add_socket("user:notification", user)
+      {:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user)
 
-      {:ok, _follower, _followed, _activity} = CommonAPI.follow(user2, user)
-
-      # We don't directly pipe the notification to the streamer as it's already
-      # generated as a side effect of CommonAPI.follow().
-      Task.await(task)
+      assert_receive {:render_with_user, _, "notification.json", notif}
+      assert notif.activity.id == follow_activity.id
+      refute Streamer.filtered_by_user?(user, notif)
     end
   end
 
-  test "it sends to public" do
+  test "it sends to public authenticated" do
     user = insert(:user)
     other_user = insert(:user)
 
-    task =
-      Task.async(fn ->
-        assert_receive {:text, _}, @streamer_timeout
-      end)
-
-    fake_socket = %StreamerSocket{
-      transport_pid: task.pid,
-      user: user
-    }
-
-    {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
+    Streamer.add_socket("public", other_user)
 
-    topics = %{
-      "public" => [fake_socket]
-    }
-
-    Worker.push_to_socket(topics, "public", activity)
-
-    Task.await(task)
+    {:ok, activity} = CommonAPI.post(user, %{"status" => "Test"})
+    assert_receive {:render_with_user, _, _, ^activity}
+    refute Streamer.filtered_by_user?(user, activity)
   end
 
   test "works for deletions" do
@@ -217,37 +155,32 @@ defmodule Pleroma.Web.StreamerTest do
     other_user = insert(:user)
     {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
 
-    task =
-      Task.async(fn ->
-        expected_event =
-          %{
-            "event" => "delete",
-            "payload" => activity.id
-          }
-          |> Jason.encode!()
-
-        assert_receive {:text, received_event}, @streamer_timeout
-        assert received_event == expected_event
-      end)
+    Streamer.add_socket("public", user)
 
-    fake_socket = %StreamerSocket{
-      transport_pid: task.pid,
-      user: user
-    }
+    {:ok, _} = CommonAPI.delete(activity.id, other_user)
+    activity_id = activity.id
+    assert_receive {:text, event}
+    assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
+  end
 
-    {:ok, activity} = CommonAPI.delete(activity.id, other_user)
+  test "it sends to public unauthenticated" do
+    user = insert(:user)
 
-    topics = %{
-      "public" => [fake_socket]
-    }
+    Streamer.add_socket("public", nil)
 
-    Worker.push_to_socket(topics, "public", activity)
+    {:ok, activity} = CommonAPI.post(user, %{"status" => "Test"})
+    activity_id = activity.id
+    assert_receive {:text, event}
+    assert %{"event" => "update", "payload" => payload} = Jason.decode!(event)
+    assert %{"id" => ^activity_id} = Jason.decode!(payload)
 
-    Task.await(task)
+    {:ok, _} = CommonAPI.delete(activity.id, user)
+    assert_receive {:text, event}
+    assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
   end
 
   describe "thread_containment" do
-    test "it doesn't send to user if recipients invalid and thread containment is enabled" do
+    test "it filters to user if recipients invalid and thread containment is enabled" do
       Pleroma.Config.put([:instance, :skip_thread_containment], false)
       author = insert(:user)
       user = insert(:user)
@@ -262,12 +195,10 @@ defmodule Pleroma.Web.StreamerTest do
             )
         )
 
-      task = Task.async(fn -> refute_receive {:text, _}, 1_000 end)
-      fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
-      topics = %{"public" => [fake_socket]}
-      Worker.push_to_socket(topics, "public", activity)
-
-      Task.await(task)
+      Streamer.add_socket("public", user)
+      Streamer.stream("public", activity)
+      assert_receive {:render_with_user, _, _, ^activity}
+      assert Streamer.filtered_by_user?(user, activity)
     end
 
     test "it sends message if recipients invalid and thread containment is disabled" do
@@ -285,12 +216,11 @@ defmodule Pleroma.Web.StreamerTest do
             )
         )
 
-      task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
-      fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
-      topics = %{"public" => [fake_socket]}
-      Worker.push_to_socket(topics, "public", activity)
+      Streamer.add_socket("public", user)
+      Streamer.stream("public", activity)
 
-      Task.await(task)
+      assert_receive {:render_with_user, _, _, ^activity}
+      refute Streamer.filtered_by_user?(user, activity)
     end
 
     test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
@@ -308,255 +238,168 @@ defmodule Pleroma.Web.StreamerTest do
             )
         )
 
-      task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
-      fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
-      topics = %{"public" => [fake_socket]}
-      Worker.push_to_socket(topics, "public", activity)
+      Streamer.add_socket("public", user)
+      Streamer.stream("public", activity)
 
-      Task.await(task)
+      assert_receive {:render_with_user, _, _, ^activity}
+      refute Streamer.filtered_by_user?(user, activity)
     end
   end
 
   describe "blocks" do
-    test "it doesn't send messages involving blocked users" do
+    test "it filters messages involving blocked users" do
       user = insert(:user)
       blocked_user = insert(:user)
       {:ok, _user_relationship} = User.block(user, blocked_user)
 
+      Streamer.add_socket("public", user)
       {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
-
-      task =
-        Task.async(fn ->
-          refute_receive {:text, _}, 1_000
-        end)
-
-      fake_socket = %StreamerSocket{
-        transport_pid: task.pid,
-        user: user
-      }
-
-      topics = %{
-        "public" => [fake_socket]
-      }
-
-      Worker.push_to_socket(topics, "public", activity)
-
-      Task.await(task)
+      assert_receive {:render_with_user, _, _, ^activity}
+      assert Streamer.filtered_by_user?(user, activity)
     end
 
-    test "it doesn't send messages transitively involving blocked users" do
+    test "it filters messages transitively involving blocked users" do
       blocker = insert(:user)
       blockee = insert(:user)
       friend = insert(:user)
 
-      task =
-        Task.async(fn ->
-          refute_receive {:text, _}, 1_000
-        end)
-
-      fake_socket = %StreamerSocket{
-        transport_pid: task.pid,
-        user: blocker
-      }
-
-      topics = %{
-        "public" => [fake_socket]
-      }
+      Streamer.add_socket("public", blocker)
 
       {:ok, _user_relationship} = User.block(blocker, blockee)
 
       {:ok, activity_one} = CommonAPI.post(friend, %{"status" => "hey! @#{blockee.nickname}"})
 
-      Worker.push_to_socket(topics, "public", activity_one)
+      assert_receive {:render_with_user, _, _, ^activity_one}
+      assert Streamer.filtered_by_user?(blocker, activity_one)
 
       {:ok, activity_two} = CommonAPI.post(blockee, %{"status" => "hey! @#{friend.nickname}"})
 
-      Worker.push_to_socket(topics, "public", activity_two)
+      assert_receive {:render_with_user, _, _, ^activity_two}
+      assert Streamer.filtered_by_user?(blocker, activity_two)
 
       {:ok, activity_three} = CommonAPI.post(blockee, %{"status" => "hey! @#{blocker.nickname}"})
 
-      Worker.push_to_socket(topics, "public", activity_three)
-
-      Task.await(task)
+      assert_receive {:render_with_user, _, _, ^activity_three}
+      assert Streamer.filtered_by_user?(blocker, activity_three)
     end
   end
 
-  test "it doesn't send unwanted DMs to list" do
-    user_a = insert(:user)
-    user_b = insert(:user)
-    user_c = insert(:user)
-
-    {:ok, user_a} = User.follow(user_a, user_b)
-
-    {:ok, list} = List.create("Test", user_a)
-    {:ok, list} = List.follow(list, user_b)
-
-    {:ok, activity} =
-      CommonAPI.post(user_b, %{
-        "status" => "@#{user_c.nickname} Test",
-        "visibility" => "direct"
-      })
-
-    task =
-      Task.async(fn ->
-        refute_receive {:text, _}, 1_000
-      end)
-
-    fake_socket = %StreamerSocket{
-      transport_pid: task.pid,
-      user: user_a
-    }
-
-    topics = %{
-      "list:#{list.id}" => [fake_socket]
-    }
-
-    Worker.handle_call({:stream, "list", activity}, self(), topics)
-
-    Task.await(task)
-  end
-
-  test "it doesn't send unwanted private posts to list" do
-    user_a = insert(:user)
-    user_b = insert(:user)
+  describe "lists" do
+    test "it doesn't send unwanted DMs to list" do
+      user_a = insert(:user)
+      user_b = insert(:user)
+      user_c = insert(:user)
 
-    {:ok, list} = List.create("Test", user_a)
-    {:ok, list} = List.follow(list, user_b)
+      {:ok, user_a} = User.follow(user_a, user_b)
 
-    {:ok, activity} =
-      CommonAPI.post(user_b, %{
-        "status" => "Test",
-        "visibility" => "private"
-      })
+      {:ok, list} = List.create("Test", user_a)
+      {:ok, list} = List.follow(list, user_b)
 
-    task =
-      Task.async(fn ->
-        refute_receive {:text, _}, 1_000
-      end)
+      Streamer.add_socket("list:#{list.id}", user_a)
 
-    fake_socket = %StreamerSocket{
-      transport_pid: task.pid,
-      user: user_a
-    }
+      {:ok, _activity} =
+        CommonAPI.post(user_b, %{
+          "status" => "@#{user_c.nickname} Test",
+          "visibility" => "direct"
+        })
 
-    topics = %{
-      "list:#{list.id}" => [fake_socket]
-    }
+      refute_receive _
+    end
 
-    Worker.handle_call({:stream, "list", activity}, self(), topics)
+    test "it doesn't send unwanted private posts to list" do
+      user_a = insert(:user)
+      user_b = insert(:user)
 
-    Task.await(task)
-  end
+      {:ok, list} = List.create("Test", user_a)
+      {:ok, list} = List.follow(list, user_b)
 
-  test "it sends wanted private posts to list" do
-    user_a = insert(:user)
-    user_b = insert(:user)
+      Streamer.add_socket("list:#{list.id}", user_a)
 
-    {:ok, user_a} = User.follow(user_a, user_b)
+      {:ok, _activity} =
+        CommonAPI.post(user_b, %{
+          "status" => "Test",
+          "visibility" => "private"
+        })
 
-    {:ok, list} = List.create("Test", user_a)
-    {:ok, list} = List.follow(list, user_b)
+      refute_receive _
+    end
 
-    {:ok, activity} =
-      CommonAPI.post(user_b, %{
-        "status" => "Test",
-        "visibility" => "private"
-      })
+    test "it sends wanted private posts to list" do
+      user_a = insert(:user)
+      user_b = insert(:user)
 
-    task =
-      Task.async(fn ->
-        assert_receive {:text, _}, 1_000
-      end)
+      {:ok, user_a} = User.follow(user_a, user_b)
 
-    fake_socket = %StreamerSocket{
-      transport_pid: task.pid,
-      user: user_a
-    }
+      {:ok, list} = List.create("Test", user_a)
+      {:ok, list} = List.follow(list, user_b)
 
-    Streamer.add_socket(
-      "list:#{list.id}",
-      fake_socket
-    )
+      Streamer.add_socket("list:#{list.id}", user_a)
 
-    Worker.handle_call({:stream, "list", activity}, self(), %{})
+      {:ok, activity} =
+        CommonAPI.post(user_b, %{
+          "status" => "Test",
+          "visibility" => "private"
+        })
 
-    Task.await(task)
+      assert_receive {:render_with_user, _, _, ^activity}
+      refute Streamer.filtered_by_user?(user_a, activity)
+    end
   end
 
-  test "it doesn't send muted reblogs" do
-    user1 = insert(:user)
-    user2 = insert(:user)
-    user3 = insert(:user)
-    CommonAPI.hide_reblogs(user1, user2)
-
-    {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
-    {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2)
-
-    task =
-      Task.async(fn ->
-        refute_receive {:text, _}, 1_000
-      end)
-
-    fake_socket = %StreamerSocket{
-      transport_pid: task.pid,
-      user: user1
-    }
-
-    topics = %{
-      "public" => [fake_socket]
-    }
-
-    Worker.push_to_socket(topics, "public", announce_activity)
+  describe "muted reblogs" do
+    test "it filters muted reblogs" do
+      user1 = insert(:user)
+      user2 = insert(:user)
+      user3 = insert(:user)
+      CommonAPI.follow(user1, user2)
+      CommonAPI.hide_reblogs(user1, user2)
 
-    Task.await(task)
-  end
+      {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
 
-  test "it does send non-reblog notification for reblog-muted actors" do
-    user1 = insert(:user)
-    user2 = insert(:user)
-    user3 = insert(:user)
-    CommonAPI.hide_reblogs(user1, user2)
+      Streamer.add_socket("user", user1)
+      {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2)
+      assert_receive {:render_with_user, _, _, ^announce_activity}
+      assert Streamer.filtered_by_user?(user1, announce_activity)
+    end
 
-    {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
-    {:ok, favorite_activity} = CommonAPI.favorite(user2, create_activity.id)
+    test "it filters reblog notification for reblog-muted actors" do
+      user1 = insert(:user)
+      user2 = insert(:user)
+      CommonAPI.follow(user1, user2)
+      CommonAPI.hide_reblogs(user1, user2)
 
-    task =
-      Task.async(fn ->
-        assert_receive {:text, _}, 1_000
-      end)
+      {:ok, create_activity} = CommonAPI.post(user1, %{"status" => "I'm kawen"})
+      Streamer.add_socket("user", user1)
+      {:ok, _favorite_activity, _} = CommonAPI.repeat(create_activity.id, user2)
 
-    fake_socket = %StreamerSocket{
-      transport_pid: task.pid,
-      user: user1
-    }
+      assert_receive {:render_with_user, _, "notification.json", notif}
+      assert Streamer.filtered_by_user?(user1, notif)
+    end
 
-    topics = %{
-      "public" => [fake_socket]
-    }
+    test "it send non-reblog notification for reblog-muted actors" do
+      user1 = insert(:user)
+      user2 = insert(:user)
+      CommonAPI.follow(user1, user2)
+      CommonAPI.hide_reblogs(user1, user2)
 
-    Worker.push_to_socket(topics, "public", favorite_activity)
+      {:ok, create_activity} = CommonAPI.post(user1, %{"status" => "I'm kawen"})
+      Streamer.add_socket("user", user1)
+      {:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id)
 
-    Task.await(task)
+      assert_receive {:render_with_user, _, "notification.json", notif}
+      refute Streamer.filtered_by_user?(user1, notif)
+    end
   end
 
-  test "it doesn't send posts from muted threads" do
+  test "it filters posts from muted threads" do
     user = insert(:user)
     user2 = insert(:user)
+    Streamer.add_socket("user", user2)
     {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
-
     {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
-
-    {:ok, activity} = CommonAPI.add_mute(user2, activity)
-
-    task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
-
-    Streamer.add_socket(
-      "user",
-      %{transport_pid: task.pid, assigns: %{user: user2}}
-    )
-
-    Streamer.stream("user", activity)
-    Task.await(task)
+    {:ok, _} = CommonAPI.add_mute(user2, activity)
+    assert_receive {:render_with_user, _, _, ^activity}
+    assert Streamer.filtered_by_user?(user2, activity)
   end
 
   describe "direct streams" do
@@ -568,22 +411,7 @@ defmodule Pleroma.Web.StreamerTest do
       user = insert(:user)
       another_user = insert(:user)
 
-      task =
-        Task.async(fn ->
-          assert_receive {:text, received_event}, @streamer_timeout
-
-          assert %{"event" => "conversation", "payload" => received_payload} =
-                   Jason.decode!(received_event)
-
-          assert %{"last_status" => last_status} = Jason.decode!(received_payload)
-          [participation] = Participation.for_user(user)
-          assert last_status["pleroma"]["direct_conversation_id"] == participation.id
-        end)
-
-      Streamer.add_socket(
-        "direct",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      Streamer.add_socket("direct", user)
 
       {:ok, _create_activity} =
         CommonAPI.post(another_user, %{
@@ -591,42 +419,47 @@ defmodule Pleroma.Web.StreamerTest do
           "visibility" => "direct"
         })
 
-      Task.await(task)
+      assert_receive {:text, received_event}
+
+      assert %{"event" => "conversation", "payload" => received_payload} =
+               Jason.decode!(received_event)
+
+      assert %{"last_status" => last_status} = Jason.decode!(received_payload)
+      [participation] = Participation.for_user(user)
+      assert last_status["pleroma"]["direct_conversation_id"] == participation.id
     end
 
     test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted" do
       user = insert(:user)
       another_user = insert(:user)
 
+      Streamer.add_socket("direct", user)
+
       {:ok, create_activity} =
         CommonAPI.post(another_user, %{
           "status" => "hi @#{user.nickname}",
           "visibility" => "direct"
         })
 
-      task =
-        Task.async(fn ->
-          assert_receive {:text, received_event}, @streamer_timeout
-          assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
+      create_activity_id = create_activity.id
+      assert_receive {:render_with_user, _, _, ^create_activity}
+      assert_receive {:text, received_conversation1}
+      assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
 
-          refute_receive {:text, _}, @streamer_timeout
-        end)
+      {:ok, _} = CommonAPI.delete(create_activity_id, another_user)
 
-      Process.sleep(@streamer_start_wait)
+      assert_receive {:text, received_event}
 
-      Streamer.add_socket(
-        "direct",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      assert %{"event" => "delete", "payload" => ^create_activity_id} =
+               Jason.decode!(received_event)
 
-      {:ok, _} = CommonAPI.delete(create_activity.id, another_user)
-
-      Task.await(task)
+      refute_receive _
     end
 
     test "it sends conversation update to the 'direct' stream when a message is deleted" do
       user = insert(:user)
       another_user = insert(:user)
+      Streamer.add_socket("direct", user)
 
       {:ok, create_activity} =
         CommonAPI.post(another_user, %{
@@ -636,35 +469,30 @@ defmodule Pleroma.Web.StreamerTest do
 
       {:ok, create_activity2} =
         CommonAPI.post(another_user, %{
-          "status" => "hi @#{user.nickname}",
+          "status" => "hi @#{user.nickname} 2",
           "in_reply_to_status_id" => create_activity.id,
           "visibility" => "direct"
         })
 
-      task =
-        Task.async(fn ->
-          assert_receive {:text, received_event}, @streamer_timeout
-          assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
-
-          assert_receive {:text, received_event}, @streamer_timeout
+      assert_receive {:render_with_user, _, _, ^create_activity}
+      assert_receive {:render_with_user, _, _, ^create_activity2}
+      assert_receive {:text, received_conversation1}
+      assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
+      assert_receive {:text, received_conversation1}
+      assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
 
-          assert %{"event" => "conversation", "payload" => received_payload} =
-                   Jason.decode!(received_event)
-
-          assert %{"last_status" => last_status} = Jason.decode!(received_payload)
-          assert last_status["id"] == to_string(create_activity.id)
-        end)
+      {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
 
-      Process.sleep(@streamer_start_wait)
+      assert_receive {:text, received_event}
+      assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
 
-      Streamer.add_socket(
-        "direct",
-        %{transport_pid: task.pid, assigns: %{user: user}}
-      )
+      assert_receive {:text, received_event}
 
-      {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
+      assert %{"event" => "conversation", "payload" => received_payload} =
+               Jason.decode!(received_event)
 
-      Task.await(task)
+      assert %{"last_status" => last_status} = Jason.decode!(received_payload)
+      assert last_status["id"] == to_string(create_activity.id)
     end
   end
 end