Revert "Merge branch 'revert-4fabf83a' into 'develop'"
authorEgor Kislitsyn <egor@kislitsyn.com>
Mon, 16 Sep 2019 10:03:37 +0000 (17:03 +0700)
committerAriadne Conill <ariadne@dereferenced.org>
Fri, 4 Oct 2019 16:01:31 +0000 (16:01 +0000)
This reverts commit fe7fd331263007e0fb2877ef7370a09a9704da36, reversing
changes made to 4fabf83ad01352442906d79187aeab4c777f4df8.

26 files changed:
.gitignore
config/config.exs
lib/pleroma/activity/ir/topics.ex [new file with mode: 0644]
lib/pleroma/application.ex
lib/pleroma/notification.ex
lib/pleroma/web/activity_pub/activity_pub.ex
lib/pleroma/web/mastodon_api/websocket_handler.ex
lib/pleroma/web/streamer.ex [deleted file]
lib/pleroma/web/streamer/ping.ex [new file with mode: 0644]
lib/pleroma/web/streamer/state.ex [new file with mode: 0644]
lib/pleroma/web/streamer/streamer.ex [new file with mode: 0644]
lib/pleroma/web/streamer/streamer_socket.ex [new file with mode: 0644]
lib/pleroma/web/streamer/supervisor.ex [new file with mode: 0644]
lib/pleroma/web/streamer/worker.ex [new file with mode: 0644]
lib/pleroma/web/views/streamer_view.ex [new file with mode: 0644]
mix.exs
mix.lock
test/activity/ir/topics_test.exs [new file with mode: 0644]
test/integration/mastodon_websocket_test.exs
test/notification_test.exs
test/support/conn_case.ex
test/support/data_case.ex
test/web/activity_pub/activity_pub_test.exs
test/web/streamer/ping_test.exs [new file with mode: 0644]
test/web/streamer/state_test.exs [new file with mode: 0644]
test/web/streamer/streamer_test.exs [moved from test/web/streamer_test.exs with 86% similarity]

index 4e71a7df0d37346ed4381f6f203e7a6163f2851e..3b0c7d3614100edebfe6b28a607599e70d58daab 100644 (file)
@@ -43,3 +43,7 @@ docs/generated_config.md
 # Code test coverage
 /cover
 /Elixir.*.coverdata
+
+.idea
+pleroma.iml
+
index 0e91df88660a83e3a40ab91bc6dc12ff321b8a24..4efadb82327175c2c9ed31a78c6f0e6b17abdd39 100644 (file)
@@ -313,6 +313,10 @@ config :pleroma, :activitypub,
   follow_handshake_timeout: 500,
   sign_object_fetches: true
 
+config :pleroma, :streamer,
+  workers: 3,
+  overflow_workers: 2
+
 config :pleroma, :user, deny_follow_blocked: true
 
 config :pleroma, :mrf_normalize_markup, scrub_policy: Pleroma.HTML.Scrubber.Default
diff --git a/lib/pleroma/activity/ir/topics.ex b/lib/pleroma/activity/ir/topics.ex
new file mode 100644 (file)
index 0000000..010897a
--- /dev/null
@@ -0,0 +1,63 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Activity.Ir.Topics do
+  alias Pleroma.Object
+  alias Pleroma.Web.ActivityPub.Visibility
+
+  def get_activity_topics(activity) do
+    activity
+    |> Object.normalize()
+    |> generate_topics(activity)
+    |> List.flatten()
+  end
+
+  defp generate_topics(%{data: %{"type" => "Answer"}}, _) do
+    []
+  end
+
+  defp generate_topics(object, activity) do
+    ["user", "list"] ++ visibility_tags(object, activity)
+  end
+
+  defp visibility_tags(object, activity) do
+    case Visibility.get_visibility(activity) do
+      "public" ->
+        if activity.local do
+          ["public", "public:local"]
+        else
+          ["public"]
+        end
+        |> item_creation_tags(object, activity)
+
+      "direct" ->
+        ["direct"]
+
+      _ ->
+        []
+    end
+  end
+
+  defp item_creation_tags(tags, %{data: %{"type" => "Create"}} = object, activity) do
+    tags ++ hashtags_to_topics(object) ++ attachment_topics(object, activity)
+  end
+
+  defp item_creation_tags(tags, _, _) do
+    tags
+  end
+
+  defp hashtags_to_topics(%{data: %{"tag" => tags}}) do
+    tags
+    |> Enum.filter(&is_bitstring(&1))
+    |> Enum.map(fn tag -> "hashtag:" <> tag end)
+  end
+
+  defp hashtags_to_topics(_), do: []
+
+  defp attachment_topics(%{data: %{"attachment" => []}}, _act), do: []
+
+  defp attachment_topics(_object, %{local: true}), do: ["public:media", "public:local:media"]
+
+  defp attachment_topics(_object, _act), do: ["public:media"]
+end
index 1d46925f80b990ec9bab9dee5546c16c8eaf041c..45ea83c3890a571754ec6f8134bca894894c3b44 100644 (file)
@@ -142,7 +142,7 @@ defmodule Pleroma.Application do
   defp streamer_child(:test), do: []
 
   defp streamer_child(_) do
-    [Pleroma.Web.Streamer]
+    [Pleroma.Web.Streamer.supervisor()]
   end
 
   defp oauth_cleanup_child(true),
index b7c880c51584da65c62d87e194ee6dbc3bdc1b1d..8012389ac3753d55b88307095c274f7bc1bdd297 100644 (file)
@@ -210,8 +210,10 @@ defmodule Pleroma.Notification do
     unless skip?(activity, user) do
       notification = %Notification{user_id: user.id, activity: activity}
       {:ok, notification} = Repo.insert(notification)
-      Streamer.stream("user", notification)
-      Streamer.stream("user:notification", notification)
+
+      ["user", "user:notification"]
+      |> Streamer.stream(notification)
+
       Push.send(notification)
       notification
     end
index d556b982fcc6c2b2e63736f3f81306918a8de999..c616373f5d44b2181b96195816d75f5609a34707 100644 (file)
@@ -4,6 +4,7 @@
 
 defmodule Pleroma.Web.ActivityPub.ActivityPub do
   alias Pleroma.Activity
+  alias Pleroma.Activity.Ir.Topics
   alias Pleroma.Config
   alias Pleroma.Conversation
   alias Pleroma.Notification
@@ -16,6 +17,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
   alias Pleroma.User
   alias Pleroma.Web.ActivityPub.MRF
   alias Pleroma.Web.ActivityPub.Transmogrifier
+  alias Pleroma.Web.Streamer
   alias Pleroma.Web.WebFinger
 
   import Ecto.Query
@@ -186,9 +188,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
       participations
       |> Repo.preload(:user)
 
-    Enum.each(participations, fn participation ->
-      Pleroma.Web.Streamer.stream("participation", participation)
-    end)
+    Streamer.stream("participation", participations)
   end
 
   def stream_out_participations(%Object{data: %{"context" => context}}, user) do
@@ -207,41 +207,15 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
 
   def stream_out_participations(_, _), do: :noop
 
-  def stream_out(activity) do
-    if activity.data["type"] in ["Create", "Announce", "Delete"] do
-      object = Object.normalize(activity)
-      # Do not stream out poll replies
-      unless object.data["type"] == "Answer" do
-        Pleroma.Web.Streamer.stream("user", activity)
-        Pleroma.Web.Streamer.stream("list", activity)
-
-        if get_visibility(activity) == "public" do
-          Pleroma.Web.Streamer.stream("public", activity)
-
-          if activity.local do
-            Pleroma.Web.Streamer.stream("public:local", activity)
-          end
-
-          if activity.data["type"] in ["Create"] do
-            object.data
-            |> Map.get("tag", [])
-            |> Enum.filter(fn tag -> is_bitstring(tag) end)
-            |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
-
-            if object.data["attachment"] != [] do
-              Pleroma.Web.Streamer.stream("public:media", activity)
-
-              if activity.local do
-                Pleroma.Web.Streamer.stream("public:local:media", activity)
-              end
-            end
-          end
-        else
-          if get_visibility(activity) == "direct",
-            do: Pleroma.Web.Streamer.stream("direct", activity)
-        end
-      end
-    end
+  def stream_out(%Activity{data: %{"type" => data_type}} = activity)
+      when data_type in ["Create", "Announce", "Delete"] do
+    activity
+    |> Topics.get_activity_topics()
+    |> Streamer.stream(activity)
+  end
+
+  def stream_out(_activity) do
+    :noop
   end
 
   def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
index dbd3542eadcc57929addcbbad7f682fa95d34d2d..3c26eb4069b2a2fb7bad3986d680ee0ffe21fd01 100644 (file)
@@ -8,6 +8,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
   alias Pleroma.Repo
   alias Pleroma.User
   alias Pleroma.Web.OAuth.Token
+  alias Pleroma.Web.Streamer
 
   @behaviour :cowboy_websocket
 
@@ -24,7 +25,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
   ]
   @anonymous_streams ["public", "public:local", "hashtag"]
 
-  # Handled by periodic keepalive in Pleroma.Web.Streamer.
+  # Handled by periodic keepalive in Pleroma.Web.Streamer.Ping.
   @timeout :infinity
 
   def init(%{qs: qs} = req, state) do
@@ -65,7 +66,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
       }, topic #{state.topic}"
     )
 
-    Pleroma.Web.Streamer.add_socket(state.topic, streamer_socket(state))
+    Streamer.add_socket(state.topic, streamer_socket(state))
     {:ok, state}
   end
 
@@ -80,7 +81,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
       }, topic #{state.topic || "?"}: #{inspect(reason)}"
     )
 
-    Pleroma.Web.Streamer.remove_socket(state.topic, streamer_socket(state))
+    Streamer.remove_socket(state.topic, streamer_socket(state))
     :ok
   end
 
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex
deleted file mode 100644 (file)
index 587c43f..0000000
+++ /dev/null
@@ -1,318 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Streamer do
-  use GenServer
-  require Logger
-  alias Pleroma.Activity
-  alias Pleroma.Config
-  alias Pleroma.Conversation.Participation
-  alias Pleroma.Notification
-  alias Pleroma.Object
-  alias Pleroma.User
-  alias Pleroma.Web.ActivityPub.ActivityPub
-  alias Pleroma.Web.ActivityPub.Visibility
-  alias Pleroma.Web.CommonAPI
-  alias Pleroma.Web.MastodonAPI.NotificationView
-
-  @keepalive_interval :timer.seconds(30)
-
-  def start_link(_) do
-    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
-  end
-
-  def add_socket(topic, socket) do
-    GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
-  end
-
-  def remove_socket(topic, socket) do
-    GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
-  end
-
-  def stream(topic, item) do
-    GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
-  end
-
-  def init(args) do
-    Process.send_after(self(), %{action: :ping}, @keepalive_interval)
-
-    {:ok, args}
-  end
-
-  def handle_info(%{action: :ping}, topics) do
-    topics
-    |> Map.values()
-    |> List.flatten()
-    |> Enum.each(fn socket ->
-      Logger.debug("Sending keepalive ping")
-      send(socket.transport_pid, {:text, ""})
-    end)
-
-    Process.send_after(self(), %{action: :ping}, @keepalive_interval)
-
-    {:noreply, topics}
-  end
-
-  def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
-    recipient_topics =
-      User.get_recipients_from_activity(item)
-      |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
-
-    Enum.each(recipient_topics || [], fn user_topic ->
-      Logger.debug("Trying to push direct message to #{user_topic}\n\n")
-      push_to_socket(topics, user_topic, item)
-    end)
-
-    {:noreply, topics}
-  end
-
-  def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do
-    user_topic = "direct:#{participation.user_id}"
-    Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
-
-    push_to_socket(topics, user_topic, participation)
-
-    {:noreply, topics}
-  end
-
-  def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
-    # filter the recipient list if the activity is not public, see #270.
-    recipient_lists =
-      case Visibility.is_public?(item) do
-        true ->
-          Pleroma.List.get_lists_from_activity(item)
-
-        _ ->
-          Pleroma.List.get_lists_from_activity(item)
-          |> Enum.filter(fn list ->
-            owner = User.get_cached_by_id(list.user_id)
-
-            Visibility.visible_for_user?(item, owner)
-          end)
-      end
-
-    recipient_topics =
-      recipient_lists
-      |> Enum.map(fn %{id: id} -> "list:#{id}" end)
-
-    Enum.each(recipient_topics || [], fn list_topic ->
-      Logger.debug("Trying to push message to #{list_topic}\n\n")
-      push_to_socket(topics, list_topic, item)
-    end)
-
-    {:noreply, topics}
-  end
-
-  def handle_cast(
-        %{action: :stream, topic: topic, item: %Notification{} = item},
-        topics
-      )
-      when topic in ["user", "user:notification"] do
-    topics
-    |> Map.get("#{topic}:#{item.user_id}", [])
-    |> Enum.each(fn socket ->
-      with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id),
-           true <- should_send?(user, item) do
-        send(
-          socket.transport_pid,
-          {:text, represent_notification(socket.assigns[:user], item)}
-        )
-      end
-    end)
-
-    {:noreply, topics}
-  end
-
-  def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
-    Logger.debug("Trying to push to users")
-
-    recipient_topics =
-      User.get_recipients_from_activity(item)
-      |> Enum.map(fn %{id: id} -> "user:#{id}" end)
-
-    Enum.each(recipient_topics, fn topic ->
-      push_to_socket(topics, topic, item)
-    end)
-
-    {:noreply, topics}
-  end
-
-  def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
-    Logger.debug("Trying to push to #{topic}")
-    Logger.debug("Pushing item to #{topic}")
-    push_to_socket(topics, topic, item)
-    {:noreply, topics}
-  end
-
-  def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
-    topic = internal_topic(topic, socket)
-    sockets_for_topic = sockets[topic] || []
-    sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
-    sockets = Map.put(sockets, topic, sockets_for_topic)
-    Logger.debug("Got new conn for #{topic}")
-    {:noreply, sockets}
-  end
-
-  def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
-    topic = internal_topic(topic, socket)
-    sockets_for_topic = sockets[topic] || []
-    sockets_for_topic = List.delete(sockets_for_topic, socket)
-    sockets = Map.put(sockets, topic, sockets_for_topic)
-    Logger.debug("Removed conn for #{topic}")
-    {:noreply, sockets}
-  end
-
-  def handle_cast(m, state) do
-    Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
-    {:noreply, state}
-  end
-
-  defp represent_update(%Activity{} = activity, %User{} = user) do
-    %{
-      event: "update",
-      payload:
-        Pleroma.Web.MastodonAPI.StatusView.render(
-          "status.json",
-          activity: activity,
-          for: user
-        )
-        |> Jason.encode!()
-    }
-    |> Jason.encode!()
-  end
-
-  defp represent_update(%Activity{} = activity) do
-    %{
-      event: "update",
-      payload:
-        Pleroma.Web.MastodonAPI.StatusView.render(
-          "status.json",
-          activity: activity
-        )
-        |> Jason.encode!()
-    }
-    |> Jason.encode!()
-  end
-
-  def represent_conversation(%Participation{} = participation) do
-    %{
-      event: "conversation",
-      payload:
-        Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
-          participation: participation,
-          for: participation.user
-        })
-        |> Jason.encode!()
-    }
-    |> Jason.encode!()
-  end
-
-  @spec represent_notification(User.t(), Notification.t()) :: binary()
-  defp represent_notification(%User{} = user, %Notification{} = notify) do
-    %{
-      event: "notification",
-      payload:
-        NotificationView.render(
-          "show.json",
-          %{notification: notify, for: user}
-        )
-        |> Jason.encode!()
-    }
-    |> Jason.encode!()
-  end
-
-  defp should_send?(%User{} = user, %Activity{} = item) do
-    blocks = user.info.blocks || []
-    mutes = user.info.mutes || []
-    reblog_mutes = user.info.muted_reblogs || []
-    domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
-
-    with parent when not is_nil(parent) <- Object.normalize(item),
-         true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
-         true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
-         %{host: item_host} <- URI.parse(item.actor),
-         %{host: parent_host} <- URI.parse(parent.data["actor"]),
-         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
-         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
-         true <- thread_containment(item, user),
-         false <- CommonAPI.thread_muted?(user, item) do
-      true
-    else
-      _ -> false
-    end
-  end
-
-  defp should_send?(%User{} = user, %Notification{activity: activity}) do
-    should_send?(user, activity)
-  end
-
-  def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
-    Enum.each(topics[topic] || [], fn socket ->
-      # Get the current user so we have up-to-date blocks etc.
-      if socket.assigns[:user] do
-        user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
-
-        if should_send?(user, item) do
-          send(socket.transport_pid, {:text, represent_update(item, user)})
-        end
-      else
-        send(socket.transport_pid, {:text, represent_update(item)})
-      end
-    end)
-  end
-
-  def push_to_socket(topics, topic, %Participation{} = participation) do
-    Enum.each(topics[topic] || [], fn socket ->
-      send(socket.transport_pid, {:text, represent_conversation(participation)})
-    end)
-  end
-
-  def push_to_socket(topics, topic, %Activity{
-        data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
-      }) do
-    Enum.each(topics[topic] || [], fn socket ->
-      send(
-        socket.transport_pid,
-        {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
-      )
-    end)
-  end
-
-  def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
-
-  def push_to_socket(topics, topic, item) do
-    Enum.each(topics[topic] || [], fn socket ->
-      # Get the current user so we have up-to-date blocks etc.
-      if socket.assigns[:user] do
-        user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
-        blocks = user.info.blocks || []
-        mutes = user.info.mutes || []
-
-        with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
-             true <- thread_containment(item, user) do
-          send(socket.transport_pid, {:text, represent_update(item, user)})
-        end
-      else
-        send(socket.transport_pid, {:text, represent_update(item)})
-      end
-    end)
-  end
-
-  defp internal_topic(topic, socket) when topic in ~w[user user:notification direct] do
-    "#{topic}:#{socket.assigns[:user].id}"
-  end
-
-  defp internal_topic(topic, _), do: topic
-
-  @spec thread_containment(Activity.t(), User.t()) :: boolean()
-  defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
-
-  defp thread_containment(activity, user) do
-    if Config.get([:instance, :skip_thread_containment]) do
-      true
-    else
-      ActivityPub.contain_activity(activity, user)
-    end
-  end
-end
diff --git a/lib/pleroma/web/streamer/ping.ex b/lib/pleroma/web/streamer/ping.ex
new file mode 100644 (file)
index 0000000..f77cbb9
--- /dev/null
@@ -0,0 +1,33 @@
+defmodule Pleroma.Web.Streamer.Ping do
+  use GenServer
+  require Logger
+
+  alias Pleroma.Web.Streamer.State
+  alias Pleroma.Web.Streamer.StreamerSocket
+
+  @keepalive_interval :timer.seconds(30)
+
+  def start_link(opts) do
+    ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval)
+    GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__)
+  end
+
+  def init(%{ping_interval: ping_interval} = args) do
+    Process.send_after(self(), :ping, ping_interval)
+    {:ok, args}
+  end
+
+  def handle_info(:ping, %{ping_interval: ping_interval} = state) do
+    State.get_sockets()
+    |> Map.values()
+    |> List.flatten()
+    |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} ->
+      Logger.debug("Sending keepalive ping")
+      send(transport_pid, {:text, ""})
+    end)
+
+    Process.send_after(self(), :ping, ping_interval)
+
+    {:noreply, state}
+  end
+end
diff --git a/lib/pleroma/web/streamer/state.ex b/lib/pleroma/web/streamer/state.ex
new file mode 100644 (file)
index 0000000..7b51990
--- /dev/null
@@ -0,0 +1,68 @@
+defmodule Pleroma.Web.Streamer.State do
+  use GenServer
+  require Logger
+
+  alias Pleroma.Web.Streamer.StreamerSocket
+
+  def start_link(_) do
+    GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__)
+  end
+
+  def add_socket(topic, socket) do
+    GenServer.call(__MODULE__, {:add, socket, topic})
+  end
+
+  def remove_socket(topic, socket) do
+    GenServer.call(__MODULE__, {:remove, socket, topic})
+  end
+
+  def get_sockets do
+    %{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state)
+    stream_sockets
+  end
+
+  def init(init_arg) do
+    {:ok, init_arg}
+  end
+
+  def handle_call(:get_state, _from, state) do
+    {:reply, state, state}
+  end
+
+  def handle_call({:add, socket, topic}, _from, %{sockets: sockets} = state) do
+    internal_topic = internal_topic(topic, socket)
+    stream_socket = StreamerSocket.from_socket(socket)
+
+    sockets_for_topic =
+      sockets
+      |> Map.get(internal_topic, [])
+      |> List.insert_at(0, stream_socket)
+      |> Enum.uniq()
+
+    state = put_in(state, [:sockets, internal_topic], sockets_for_topic)
+    Logger.debug("Got new conn for #{topic}")
+    {:reply, state, state}
+  end
+
+  def handle_call({:remove, socket, topic}, _from, %{sockets: sockets} = state) do
+    internal_topic = internal_topic(topic, socket)
+    stream_socket = StreamerSocket.from_socket(socket)
+
+    sockets_for_topic =
+      sockets
+      |> Map.get(internal_topic, [])
+      |> List.delete(stream_socket)
+
+    state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic)
+    {:reply, state, state}
+  end
+
+  defp internal_topic(topic, socket)
+       when topic in ~w[user user:notification direct] do
+    "#{topic}:#{socket.assigns[:user].id}"
+  end
+
+  defp internal_topic(topic, _) do
+    topic
+  end
+end
diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex
new file mode 100644 (file)
index 0000000..8cf7192
--- /dev/null
@@ -0,0 +1,55 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.Streamer do
+  alias Pleroma.Web.Streamer.State
+  alias Pleroma.Web.Streamer.Worker
+
+  @timeout 60_000
+  @mix_env Mix.env()
+
+  def add_socket(topic, socket) do
+    State.add_socket(topic, socket)
+  end
+
+  def remove_socket(topic, socket) do
+    State.remove_socket(topic, socket)
+  end
+
+  def get_sockets do
+    State.get_sockets()
+  end
+
+  def stream(topics, items) do
+    if should_send?() do
+      Task.async(fn ->
+        :poolboy.transaction(
+          :streamer_worker,
+          &Worker.stream(&1, topics, items),
+          @timeout
+        )
+      end)
+    end
+  end
+
+  def supervisor, do: Pleroma.Web.Streamer.Supervisor
+
+  defp should_send? do
+    handle_should_send(@mix_env)
+  end
+
+  defp handle_should_send(:test) do
+    case Process.whereis(:streamer_worker) do
+      nil ->
+        false
+
+      pid ->
+        Process.alive?(pid)
+    end
+  end
+
+  defp handle_should_send(_) do
+    true
+  end
+end
diff --git a/lib/pleroma/web/streamer/streamer_socket.ex b/lib/pleroma/web/streamer/streamer_socket.ex
new file mode 100644 (file)
index 0000000..f006c03
--- /dev/null
@@ -0,0 +1,31 @@
+defmodule Pleroma.Web.Streamer.StreamerSocket do
+  defstruct transport_pid: nil, user: nil
+
+  alias Pleroma.User
+  alias Pleroma.Web.Streamer.StreamerSocket
+
+  def from_socket(%{
+        transport_pid: transport_pid,
+        assigns: %{user: nil}
+      }) do
+    %StreamerSocket{
+      transport_pid: transport_pid
+    }
+  end
+
+  def from_socket(%{
+        transport_pid: transport_pid,
+        assigns: %{user: %User{} = user}
+      }) do
+    %StreamerSocket{
+      transport_pid: transport_pid,
+      user: user
+    }
+  end
+
+  def from_socket(%{transport_pid: transport_pid}) do
+    %StreamerSocket{
+      transport_pid: transport_pid
+    }
+  end
+end
diff --git a/lib/pleroma/web/streamer/supervisor.ex b/lib/pleroma/web/streamer/supervisor.ex
new file mode 100644 (file)
index 0000000..6afe193
--- /dev/null
@@ -0,0 +1,33 @@
+defmodule Pleroma.Web.Streamer.Supervisor do
+  use Supervisor
+
+  def start_link(opts) do
+    Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
+  end
+
+  def init(args) do
+    children = [
+      {Pleroma.Web.Streamer.State, args},
+      {Pleroma.Web.Streamer.Ping, args},
+      :poolboy.child_spec(:streamer_worker, poolboy_config())
+    ]
+
+    opts = [strategy: :one_for_one, name: Pleroma.Web.Streamer.Supervisor]
+    Supervisor.init(children, opts)
+  end
+
+  defp poolboy_config do
+    opts =
+      Pleroma.Config.get(:streamer,
+        workers: 3,
+        overflow_workers: 2
+      )
+
+    [
+      {:name, {:local, :streamer_worker}},
+      {:worker_module, Pleroma.Web.Streamer.Worker},
+      {:size, opts[:workers]},
+      {:max_overflow, opts[:overflow_workers]}
+    ]
+  end
+end
diff --git a/lib/pleroma/web/streamer/worker.ex b/lib/pleroma/web/streamer/worker.ex
new file mode 100644 (file)
index 0000000..5804508
--- /dev/null
@@ -0,0 +1,220 @@
+defmodule Pleroma.Web.Streamer.Worker do
+  use GenServer
+
+  require Logger
+
+  alias Pleroma.Activity
+  alias Pleroma.Config
+  alias Pleroma.Conversation.Participation
+  alias Pleroma.Notification
+  alias Pleroma.Object
+  alias Pleroma.User
+  alias Pleroma.Web.ActivityPub.ActivityPub
+  alias Pleroma.Web.ActivityPub.Visibility
+  alias Pleroma.Web.CommonAPI
+  alias Pleroma.Web.Streamer.State
+  alias Pleroma.Web.Streamer.StreamerSocket
+  alias Pleroma.Web.StreamerView
+
+  def start_link(_) do
+    GenServer.start_link(__MODULE__, %{}, [])
+  end
+
+  def init(init_arg) do
+    {:ok, init_arg}
+  end
+
+  def stream(pid, topics, items) do
+    GenServer.call(pid, {:stream, topics, items})
+  end
+
+  def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
+    Enum.each(topics, fn t ->
+      do_stream(%{topic: t, item: item})
+    end)
+
+    {:reply, state, state}
+  end
+
+  def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
+    Enum.each(items, fn i ->
+      do_stream(%{topic: topic, item: i})
+    end)
+
+    {:reply, state, state}
+  end
+
+  def handle_call({:stream, topic, item}, _from, state) do
+    do_stream(%{topic: topic, item: item})
+
+    {:reply, state, state}
+  end
+
+  defp do_stream(%{topic: "direct", item: item}) do
+    recipient_topics =
+      User.get_recipients_from_activity(item)
+      |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
+
+    Enum.each(recipient_topics, fn user_topic ->
+      Logger.debug("Trying to push direct message to #{user_topic}\n\n")
+      push_to_socket(State.get_sockets(), user_topic, item)
+    end)
+  end
+
+  defp do_stream(%{topic: "participation", item: participation}) do
+    user_topic = "direct:#{participation.user_id}"
+    Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
+
+    push_to_socket(State.get_sockets(), user_topic, participation)
+  end
+
+  defp do_stream(%{topic: "list", item: item}) do
+    # filter the recipient list if the activity is not public, see #270.
+    recipient_lists =
+      case Visibility.is_public?(item) do
+        true ->
+          Pleroma.List.get_lists_from_activity(item)
+
+        _ ->
+          Pleroma.List.get_lists_from_activity(item)
+          |> Enum.filter(fn list ->
+            owner = User.get_cached_by_id(list.user_id)
+
+            Visibility.visible_for_user?(item, owner)
+          end)
+      end
+
+    recipient_topics =
+      recipient_lists
+      |> Enum.map(fn %{id: id} -> "list:#{id}" end)
+
+    Enum.each(recipient_topics, fn list_topic ->
+      Logger.debug("Trying to push message to #{list_topic}\n\n")
+      push_to_socket(State.get_sockets(), list_topic, item)
+    end)
+  end
+
+  defp do_stream(%{topic: topic, item: %Notification{} = item})
+       when topic in ["user", "user:notification"] do
+    State.get_sockets()
+    |> Map.get("#{topic}:#{item.user_id}", [])
+    |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
+      with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
+           true <- should_send?(user, item) do
+        send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
+      end
+    end)
+  end
+
+  defp do_stream(%{topic: "user", item: item}) do
+    Logger.debug("Trying to push to users")
+
+    recipient_topics =
+      User.get_recipients_from_activity(item)
+      |> Enum.map(fn %{id: id} -> "user:#{id}" end)
+
+    Enum.each(recipient_topics, fn topic ->
+      push_to_socket(State.get_sockets(), topic, item)
+    end)
+  end
+
+  defp do_stream(%{topic: topic, item: item}) do
+    Logger.debug("Trying to push to #{topic}")
+    Logger.debug("Pushing item to #{topic}")
+    push_to_socket(State.get_sockets(), topic, item)
+  end
+
+  defp should_send?(%User{} = user, %Activity{} = item) do
+    blocks = user.info.blocks || []
+    mutes = user.info.mutes || []
+    reblog_mutes = user.info.muted_reblogs || []
+    domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
+
+    with parent when not is_nil(parent) <- Object.normalize(item),
+         true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
+         true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
+         %{host: item_host} <- URI.parse(item.actor),
+         %{host: parent_host} <- URI.parse(parent.data["actor"]),
+         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
+         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
+         true <- thread_containment(item, user),
+         false <- CommonAPI.thread_muted?(user, item) do
+      true
+    else
+      _ -> false
+    end
+  end
+
+  defp should_send?(%User{} = user, %Notification{activity: activity}) do
+    should_send?(user, activity)
+  end
+
+  def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
+    Enum.each(topics[topic] || [], fn %StreamerSocket{
+                                        transport_pid: transport_pid,
+                                        user: socket_user
+                                      } ->
+      # Get the current user so we have up-to-date blocks etc.
+      if socket_user do
+        user = User.get_cached_by_ap_id(socket_user.ap_id)
+
+        if should_send?(user, item) do
+          send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
+        end
+      else
+        send(transport_pid, {:text, StreamerView.render("update.json", item)})
+      end
+    end)
+  end
+
+  def push_to_socket(topics, topic, %Participation{} = participation) do
+    Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
+      send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
+    end)
+  end
+
+  def push_to_socket(topics, topic, %Activity{
+        data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
+      }) do
+    Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
+      send(
+        transport_pid,
+        {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
+      )
+    end)
+  end
+
+  def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
+
+  def push_to_socket(topics, topic, item) do
+    Enum.each(topics[topic] || [], fn %StreamerSocket{
+                                        transport_pid: transport_pid,
+                                        user: socket_user
+                                      } ->
+      # Get the current user so we have up-to-date blocks etc.
+      if socket_user do
+        user = User.get_cached_by_ap_id(socket_user.ap_id)
+        blocks = user.info.blocks || []
+        mutes = user.info.mutes || []
+
+        with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
+             true <- thread_containment(item, user) do
+          send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
+        end
+      else
+        send(transport_pid, {:text, StreamerView.render("update.json", item)})
+      end
+    end)
+  end
+
+  @spec thread_containment(Activity.t(), User.t()) :: boolean()
+  defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
+
+  defp thread_containment(activity, user) do
+    if Config.get([:instance, :skip_thread_containment]) do
+      true
+    else
+      ActivityPub.contain_activity(activity, user)
+    end
+  end
+end
diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex
new file mode 100644 (file)
index 0000000..b13030f
--- /dev/null
@@ -0,0 +1,66 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.StreamerView do
+  use Pleroma.Web, :view
+
+  alias Pleroma.Activity
+  alias Pleroma.Conversation.Participation
+  alias Pleroma.Notification
+  alias Pleroma.User
+  alias Pleroma.Web.MastodonAPI.NotificationView
+
+  def render("update.json", %Activity{} = activity, %User{} = user) do
+    %{
+      event: "update",
+      payload:
+        Pleroma.Web.MastodonAPI.StatusView.render(
+          "status.json",
+          activity: activity,
+          for: user
+        )
+        |> Jason.encode!()
+    }
+    |> Jason.encode!()
+  end
+
+  def render("notification.json", %User{} = user, %Notification{} = notify) do
+    %{
+      event: "notification",
+      payload:
+        NotificationView.render(
+          "show.json",
+          %{notification: notify, for: user}
+        )
+        |> Jason.encode!()
+    }
+    |> Jason.encode!()
+  end
+
+  def render("update.json", %Activity{} = activity) do
+    %{
+      event: "update",
+      payload:
+        Pleroma.Web.MastodonAPI.StatusView.render(
+          "status.json",
+          activity: activity
+        )
+        |> Jason.encode!()
+    }
+    |> Jason.encode!()
+  end
+
+  def render("conversation.json", %Participation{} = participation) do
+    %{
+      event: "conversation",
+      payload:
+        Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
+          participation: participation,
+          for: participation.user
+        })
+        |> Jason.encode!()
+    }
+    |> Jason.encode!()
+  end
+end
diff --git a/mix.exs b/mix.exs
index 706d97b8e144ed2acfcb1d24b5ebcafe793d7eb7..8ff0b289559d073bd44968ed27816043a881bca5 100644 (file)
--- a/mix.exs
+++ b/mix.exs
@@ -143,6 +143,7 @@ defmodule Pleroma.Mixfile do
        ref: "293d77bb6f4a67ac8bde1428735c3b42f22cbb30"},
       {:pleroma_job_queue, "~> 0.3"},
       {:telemetry, "~> 0.3"},
+      {:poolboy, "~> 1.5"},
       {:prometheus_ex, "~> 3.0"},
       {:prometheus_plugs, "~> 1.1"},
       {:prometheus_phoenix, "~> 1.3"},
index 408778714c406150184b72998a09674efa5f1494..f6d6eafb66019ac5db74c8c2ce10eeca74a18640 100644 (file)
--- a/mix.lock
+++ b/mix.lock
@@ -70,6 +70,7 @@
   "plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm"},
   "plug_static_index_html": {:hex, :plug_static_index_html, "1.0.0", "840123d4d3975585133485ea86af73cb2600afd7f2a976f9f5fd8b3808e636a0", [:mix], [{:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"},
   "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"},
+  "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm"},
   "postgrex": {:hex, :postgrex, "0.14.3", "5754dee2fdf6e9e508cbf49ab138df964278700b764177e8f3871e658b345a1e", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"},
   "prometheus": {:hex, :prometheus, "4.4.1", "1e96073b3ed7788053768fea779cbc896ddc3bdd9ba60687f2ad50b252ac87d6", [:mix, :rebar3], [], "hexpm"},
   "prometheus_ecto": {:hex, :prometheus_ecto, "1.4.1", "6c768ea9654de871e5b32fab2eac348467b3021604ebebbcbd8bcbe806a65ed5", [:mix], [{:ecto, "~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 1.1 or ~> 2.0 or ~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}], "hexpm"},
diff --git a/test/activity/ir/topics_test.exs b/test/activity/ir/topics_test.exs
new file mode 100644 (file)
index 0000000..e75f835
--- /dev/null
@@ -0,0 +1,141 @@
+defmodule Pleroma.Activity.Ir.TopicsTest do
+  use Pleroma.DataCase
+
+  alias Pleroma.Activity
+  alias Pleroma.Activity.Ir.Topics
+  alias Pleroma.Object
+
+  require Pleroma.Constants
+
+  describe "poll answer" do
+    test "produce no topics" do
+      activity = %Activity{object: %Object{data: %{"type" => "Answer"}}}
+
+      assert [] == Topics.get_activity_topics(activity)
+    end
+  end
+
+  describe "non poll answer" do
+    test "always add user and list topics" do
+      activity = %Activity{object: %Object{data: %{"type" => "FooBar"}}}
+      topics = Topics.get_activity_topics(activity)
+
+      assert Enum.member?(topics, "user")
+      assert Enum.member?(topics, "list")
+    end
+  end
+
+  describe "public visibility" do
+    setup do
+      activity = %Activity{
+        object: %Object{data: %{"type" => "Note"}},
+        data: %{"to" => [Pleroma.Constants.as_public()]}
+      }
+
+      {:ok, activity: activity}
+    end
+
+    test "produces public topic", %{activity: activity} do
+      topics = Topics.get_activity_topics(activity)
+
+      assert Enum.member?(topics, "public")
+    end
+
+    test "local action produces public:local topic", %{activity: activity} do
+      activity = %{activity | local: true}
+      topics = Topics.get_activity_topics(activity)
+
+      assert Enum.member?(topics, "public:local")
+    end
+
+    test "non-local action does not produce public:local topic", %{activity: activity} do
+      activity = %{activity | local: false}
+      topics = Topics.get_activity_topics(activity)
+
+      refute Enum.member?(topics, "public:local")
+    end
+  end
+
+  describe "public visibility create events" do
+    setup do
+      activity = %Activity{
+        object: %Object{data: %{"type" => "Create", "attachment" => []}},
+        data: %{"to" => [Pleroma.Constants.as_public()]}
+      }
+
+      {:ok, activity: activity}
+    end
+
+    test "with no attachments doesn't produce public:media topics", %{activity: activity} do
+      topics = Topics.get_activity_topics(activity)
+
+      refute Enum.member?(topics, "public:media")
+      refute Enum.member?(topics, "public:local:media")
+    end
+
+    test "converts tags to hash tags", %{activity: %{object: %{data: data} = object} = activity} do
+      tagged_data = Map.put(data, "tag", ["foo", "bar"])
+      activity = %{activity | object: %{object | data: tagged_data}}
+
+      topics = Topics.get_activity_topics(activity)
+
+      assert Enum.member?(topics, "hashtag:foo")
+      assert Enum.member?(topics, "hashtag:bar")
+    end
+
+    test "only converts strinngs to hash tags", %{
+      activity: %{object: %{data: data} = object} = activity
+    } do
+      tagged_data = Map.put(data, "tag", [2])
+      activity = %{activity | object: %{object | data: tagged_data}}
+
+      topics = Topics.get_activity_topics(activity)
+
+      refute Enum.member?(topics, "hashtag:2")
+    end
+  end
+
+  describe "public visibility create events with attachments" do
+    setup do
+      activity = %Activity{
+        object: %Object{data: %{"type" => "Create", "attachment" => ["foo"]}},
+        data: %{"to" => [Pleroma.Constants.as_public()]}
+      }
+
+      {:ok, activity: activity}
+    end
+
+    test "produce public:media topics", %{activity: activity} do
+      topics = Topics.get_activity_topics(activity)
+
+      assert Enum.member?(topics, "public:media")
+    end
+
+    test "local produces public:local:media topics", %{activity: activity} do
+      topics = Topics.get_activity_topics(activity)
+
+      assert Enum.member?(topics, "public:local:media")
+    end
+
+    test "non-local doesn't produce public:local:media topics", %{activity: activity} do
+      activity = %{activity | local: false}
+
+      topics = Topics.get_activity_topics(activity)
+
+      refute Enum.member?(topics, "public:local:media")
+    end
+  end
+
+  describe "non-public visibility" do
+    test "produces direct topic" do
+      activity = %Activity{object: %Object{data: %{"type" => "Note"}}, data: %{"to" => []}}
+      topics = Topics.get_activity_topics(activity)
+
+      assert Enum.member?(topics, "direct")
+      refute Enum.member?(topics, "public")
+      refute Enum.member?(topics, "public:local")
+      refute Enum.member?(topics, "public:media")
+      refute Enum.member?(topics, "public:local:media")
+    end
+  end
+end
index 3975cdcd694b3a39bc14ebce532b89faaa021b55..40eff169c57f8490829420f2ad98d654d4ec0d47 100644 (file)
@@ -10,7 +10,6 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
   alias Pleroma.Integration.WebsocketClient
   alias Pleroma.Web.CommonAPI
   alias Pleroma.Web.OAuth
-  alias Pleroma.Web.Streamer
 
   @path Pleroma.Web.Endpoint.url()
         |> URI.parse()
@@ -18,16 +17,6 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
         |> Map.put(:path, "/api/v1/streaming")
         |> URI.to_string()
 
-  setup do
-    GenServer.start(Streamer, %{}, name: Streamer)
-
-    on_exit(fn ->
-      if pid = Process.whereis(Streamer) do
-        Process.exit(pid, :kill)
-      end
-    end)
-  end
-
   def start_socket(qs \\ nil, headers \\ []) do
     path =
       case qs do
@@ -48,12 +37,14 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
     assert {:error, {403, _}} = start_socket("?stream=user")
   end
 
+  @tag needs_streamer: true
   test "allows public streams without authentication" do
     assert {:ok, _} = start_socket("?stream=public")
     assert {:ok, _} = start_socket("?stream=public:local")
     assert {:ok, _} = start_socket("?stream=hashtag&tag=lain")
   end
 
+  @tag needs_streamer: true
   test "receives well formatted events" do
     user = insert(:user)
     {:ok, _} = start_socket("?stream=public")
@@ -98,16 +89,19 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
       assert {:ok, _} = start_socket("?stream=user&access_token=#{state.token.token}")
     end
 
+    @tag needs_streamer: true
     test "accepts the 'user' stream", %{token: token} = _state do
       assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
       assert {:error, {403, "Forbidden"}} = start_socket("?stream=user")
     end
 
+    @tag needs_streamer: true
     test "accepts the 'user:notification' stream", %{token: token} = _state do
       assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
       assert {:error, {403, "Forbidden"}} = start_socket("?stream=user:notification")
     end
 
+    @tag needs_streamer: true
     test "accepts valid token on Sec-WebSocket-Protocol header", %{token: token} do
       assert {:ok, _} = start_socket("?stream=user", [{"Sec-WebSocket-Protocol", token.token}])
 
index 2a52dad8d87f9a344d74a8a89c5e234cee5c6b73..d68d4485fa21cb883df0b396d874b95773ea08c4 100644 (file)
@@ -68,16 +68,7 @@ defmodule Pleroma.NotificationTest do
   end
 
   describe "create_notification" do
-    setup do
-      GenServer.start(Streamer, %{}, name: Streamer)
-
-      on_exit(fn ->
-        if pid = Process.whereis(Streamer) do
-          Process.exit(pid, :kill)
-        end
-      end)
-    end
-
+    @tag needs_streamer: true
     test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do
       user = insert(:user)
       task = Task.async(fn -> assert_receive {:text, _}, 4_000 end)
index ec5892ff53f45ca182ef70d37472d2b20b7b4d8f..b39c706774e086872b4ebdfb27ac312a626cc880 100644 (file)
@@ -40,6 +40,10 @@ defmodule Pleroma.Web.ConnCase do
       Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()})
     end
 
+    if tags[:needs_streamer] do
+      start_supervised(Pleroma.Web.Streamer.supervisor())
+    end
+
     {:ok, conn: Phoenix.ConnTest.build_conn()}
   end
 end
index f3d98e7e3173f46bf19bbd7b2c3235eaa2b0291d..17fa15214082c7e533f840d67c35c751060bb5bf 100644 (file)
@@ -39,6 +39,10 @@ defmodule Pleroma.DataCase do
       Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()})
     end
 
+    if tags[:needs_streamer] do
+      start_supervised(Pleroma.Web.Streamer.supervisor())
+    end
+
     :ok
   end
 
index f72b44aed4c0f531b6ebda36e778da5fc4e7c825..ed6f201050b625eae885078800a38d15fb8925fc 100644 (file)
@@ -38,9 +38,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubTest do
         stream: fn _, _ -> nil end do
         ActivityPub.stream_out_participations(conversation.participations)
 
-        Enum.each(participations, fn participation ->
-          assert called(Pleroma.Web.Streamer.stream("participation", participation))
-        end)
+        assert called(Pleroma.Web.Streamer.stream("participation", participations))
       end
     end
   end
diff --git a/test/web/streamer/ping_test.exs b/test/web/streamer/ping_test.exs
new file mode 100644 (file)
index 0000000..3d52c00
--- /dev/null
@@ -0,0 +1,36 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.PingTest do
+  use Pleroma.DataCase
+
+  import Pleroma.Factory
+  alias Pleroma.Web.Streamer
+
+  setup do
+    start_supervised({Streamer.supervisor(), [ping_interval: 30]})
+
+    :ok
+  end
+
+  describe "sockets" do
+    setup do
+      user = insert(:user)
+      {:ok, %{user: user}}
+    end
+
+    test "it sends pings", %{user: user} do
+      task =
+        Task.async(fn ->
+          assert_receive {:text, received_event}, 40
+          assert_receive {:text, received_event}, 40
+          assert_receive {:text, received_event}, 40
+        end)
+
+      Streamer.add_socket("public", %{transport_pid: task.pid, assigns: %{user: user}})
+
+      Task.await(task)
+    end
+  end
+end
diff --git a/test/web/streamer/state_test.exs b/test/web/streamer/state_test.exs
new file mode 100644 (file)
index 0000000..d1aeac5
--- /dev/null
@@ -0,0 +1,54 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.StateTest do
+  use Pleroma.DataCase
+
+  import Pleroma.Factory
+  alias Pleroma.Web.Streamer
+  alias Pleroma.Web.Streamer.StreamerSocket
+
+  @moduletag needs_streamer: true
+
+  describe "sockets" do
+    setup do
+      user = insert(:user)
+      user2 = insert(:user)
+      {:ok, %{user: user, user2: user2}}
+    end
+
+    test "it can add a socket", %{user: user} do
+      Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}})
+
+      assert(%{"public" => [%StreamerSocket{transport_pid: 1}]} = Streamer.get_sockets())
+    end
+
+    test "it can add multiple sockets per user", %{user: user} do
+      Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}})
+      Streamer.add_socket("public", %{transport_pid: 2, assigns: %{user: user}})
+
+      assert(
+        %{
+          "public" => [
+            %StreamerSocket{transport_pid: 2},
+            %StreamerSocket{transport_pid: 1}
+          ]
+        } = Streamer.get_sockets()
+      )
+    end
+
+    test "it will not add a duplicate socket", %{user: user} do
+      Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}})
+      Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}})
+
+      assert(
+        %{
+          "activity" => [
+            %StreamerSocket{transport_pid: 1}
+          ]
+        } = Streamer.get_sockets()
+      )
+    end
+  end
+end
similarity index 86%
rename from test/web/streamer_test.exs
rename to test/web/streamer/streamer_test.exs
index 96fa7645f982b1f596af704dafb1d378a4db3706..88847e20f5c2223f2726de12a72e7f8b68c7aefa 100644 (file)
@@ -5,24 +5,20 @@
 defmodule Pleroma.Web.StreamerTest do
   use Pleroma.DataCase
 
+  import Pleroma.Factory
+
   alias Pleroma.List
   alias Pleroma.User
   alias Pleroma.Web.CommonAPI
   alias Pleroma.Web.Streamer
-  import Pleroma.Factory
+  alias Pleroma.Web.Streamer.StreamerSocket
+  alias Pleroma.Web.Streamer.Worker
 
+  @moduletag needs_streamer: true
   clear_config_all([:instance, :skip_thread_containment])
 
   describe "user streams" do
     setup do
-      GenServer.start(Streamer, %{}, name: Streamer)
-
-      on_exit(fn ->
-        if pid = Process.whereis(Streamer) do
-          Process.exit(pid, :kill)
-        end
-      end)
-
       user = insert(:user)
       notify = insert(:notification, user: user, activity: build(:note_activity))
       {:ok, %{user: user, notify: notify}}
@@ -125,11 +121,9 @@ defmodule Pleroma.Web.StreamerTest do
         assert_receive {:text, _}, 4_000
       end)
 
-    fake_socket = %{
+    fake_socket = %StreamerSocket{
       transport_pid: task.pid,
-      assigns: %{
-        user: user
-      }
+      user: user
     }
 
     {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
@@ -138,7 +132,7 @@ defmodule Pleroma.Web.StreamerTest do
       "public" => [fake_socket]
     }
 
-    Streamer.push_to_socket(topics, "public", activity)
+    Worker.push_to_socket(topics, "public", activity)
 
     Task.await(task)
 
@@ -155,11 +149,9 @@ defmodule Pleroma.Web.StreamerTest do
         assert received_event == expected_event
       end)
 
-    fake_socket = %{
+    fake_socket = %StreamerSocket{
       transport_pid: task.pid,
-      assigns: %{
-        user: user
-      }
+      user: user
     }
 
     {:ok, activity} = CommonAPI.delete(activity.id, other_user)
@@ -168,7 +160,7 @@ defmodule Pleroma.Web.StreamerTest do
       "public" => [fake_socket]
     }
 
-    Streamer.push_to_socket(topics, "public", activity)
+    Worker.push_to_socket(topics, "public", activity)
 
     Task.await(task)
   end
@@ -189,9 +181,9 @@ defmodule Pleroma.Web.StreamerTest do
         )
 
       task = Task.async(fn -> refute_receive {:text, _}, 1_000 end)
-      fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
+      fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
       topics = %{"public" => [fake_socket]}
-      Streamer.push_to_socket(topics, "public", activity)
+      Worker.push_to_socket(topics, "public", activity)
 
       Task.await(task)
     end
@@ -211,9 +203,9 @@ defmodule Pleroma.Web.StreamerTest do
         )
 
       task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
-      fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
+      fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
       topics = %{"public" => [fake_socket]}
-      Streamer.push_to_socket(topics, "public", activity)
+      Worker.push_to_socket(topics, "public", activity)
 
       Task.await(task)
     end
@@ -233,9 +225,9 @@ defmodule Pleroma.Web.StreamerTest do
         )
 
       task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
-      fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
+      fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
       topics = %{"public" => [fake_socket]}
-      Streamer.push_to_socket(topics, "public", activity)
+      Worker.push_to_socket(topics, "public", activity)
 
       Task.await(task)
     end
@@ -251,11 +243,9 @@ defmodule Pleroma.Web.StreamerTest do
         refute_receive {:text, _}, 1_000
       end)
 
-    fake_socket = %{
+    fake_socket = %StreamerSocket{
       transport_pid: task.pid,
-      assigns: %{
-        user: user
-      }
+      user: user
     }
 
     {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
@@ -264,7 +254,7 @@ defmodule Pleroma.Web.StreamerTest do
       "public" => [fake_socket]
     }
 
-    Streamer.push_to_socket(topics, "public", activity)
+    Worker.push_to_socket(topics, "public", activity)
 
     Task.await(task)
   end
@@ -284,11 +274,9 @@ defmodule Pleroma.Web.StreamerTest do
         refute_receive {:text, _}, 1_000
       end)
 
-    fake_socket = %{
+    fake_socket = %StreamerSocket{
       transport_pid: task.pid,
-      assigns: %{
-        user: user_a
-      }
+      user: user_a
     }
 
     {:ok, activity} =
@@ -301,7 +289,7 @@ defmodule Pleroma.Web.StreamerTest do
       "list:#{list.id}" => [fake_socket]
     }
 
-    Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
+    Worker.handle_call({:stream, "list", activity}, self(), topics)
 
     Task.await(task)
   end
@@ -318,11 +306,9 @@ defmodule Pleroma.Web.StreamerTest do
         refute_receive {:text, _}, 1_000
       end)
 
-    fake_socket = %{
+    fake_socket = %StreamerSocket{
       transport_pid: task.pid,
-      assigns: %{
-        user: user_a
-      }
+      user: user_a
     }
 
     {:ok, activity} =
@@ -335,12 +321,12 @@ defmodule Pleroma.Web.StreamerTest do
       "list:#{list.id}" => [fake_socket]
     }
 
-    Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
+    Worker.handle_call({:stream, "list", activity}, self(), topics)
 
     Task.await(task)
   end
 
-  test "it send wanted private posts to list" do
+  test "it sends wanted private posts to list" do
     user_a = insert(:user)
     user_b = insert(:user)
 
@@ -354,11 +340,9 @@ defmodule Pleroma.Web.StreamerTest do
         assert_receive {:text, _}, 1_000
       end)
 
-    fake_socket = %{
+    fake_socket = %StreamerSocket{
       transport_pid: task.pid,
-      assigns: %{
-        user: user_a
-      }
+      user: user_a
     }
 
     {:ok, activity} =
@@ -367,11 +351,12 @@ defmodule Pleroma.Web.StreamerTest do
         "visibility" => "private"
       })
 
-    topics = %{
-      "list:#{list.id}" => [fake_socket]
-    }
+    Streamer.add_socket(
+      "list:#{list.id}",
+      fake_socket
+    )
 
-    Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
+    Worker.handle_call({:stream, "list", activity}, self(), %{})
 
     Task.await(task)
   end
@@ -387,11 +372,9 @@ defmodule Pleroma.Web.StreamerTest do
         refute_receive {:text, _}, 1_000
       end)
 
-    fake_socket = %{
+    fake_socket = %StreamerSocket{
       transport_pid: task.pid,
-      assigns: %{
-        user: user1
-      }
+      user: user1
     }
 
     {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
@@ -401,7 +384,7 @@ defmodule Pleroma.Web.StreamerTest do
       "public" => [fake_socket]
     }
 
-    Streamer.push_to_socket(topics, "public", announce_activity)
+    Worker.push_to_socket(topics, "public", announce_activity)
 
     Task.await(task)
   end
@@ -417,6 +400,8 @@ defmodule Pleroma.Web.StreamerTest do
 
     task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
 
+    Process.sleep(4000)
+
     Streamer.add_socket(
       "user",
       %{transport_pid: task.pid, assigns: %{user: user2}}
@@ -428,14 +413,6 @@ defmodule Pleroma.Web.StreamerTest do
 
   describe "direct streams" do
     setup do
-      GenServer.start(Streamer, %{}, name: Streamer)
-
-      on_exit(fn ->
-        if pid = Process.whereis(Streamer) do
-          Process.exit(pid, :kill)
-        end
-      end)
-
       :ok
     end
 
@@ -480,6 +457,8 @@ defmodule Pleroma.Web.StreamerTest do
           refute_receive {:text, _}, 4_000
         end)
 
+      Process.sleep(1000)
+
       Streamer.add_socket(
         "direct",
         %{transport_pid: task.pid, assigns: %{user: user}}
@@ -521,6 +500,8 @@ defmodule Pleroma.Web.StreamerTest do
           assert last_status["id"] == to_string(create_activity.id)
         end)
 
+      Process.sleep(1000)
+
       Streamer.add_socket(
         "direct",
         %{transport_pid: task.pid, assigns: %{user: user}}