Stream follow updates
[akkoma] / lib / pleroma / web / streamer.ex
index bbaddd8529f530e22b1fea21103541a27e7e50fb..0b6cc89e906b4f2d8858fb68261b0d288062e686 100644 (file)
@@ -1,11 +1,12 @@
 # Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Web.Streamer do
-  use GenServer
   require Logger
+
   alias Pleroma.Activity
+  alias Pleroma.Chat.MessageReference
   alias Pleroma.Config
   alias Pleroma.Conversation.Participation
   alias Pleroma.Notification
@@ -14,69 +15,201 @@ defmodule Pleroma.Web.Streamer do
   alias Pleroma.Web.ActivityPub.ActivityPub
   alias Pleroma.Web.ActivityPub.Visibility
   alias Pleroma.Web.CommonAPI
-  alias Pleroma.Web.MastodonAPI.NotificationView
+  alias Pleroma.Web.OAuth.Token
+  alias Pleroma.Web.Plugs.OAuthScopesPlug
+  alias Pleroma.Web.StreamerView
+
+  @mix_env Mix.env()
+  @registry Pleroma.Web.StreamerRegistry
+
+  def registry, do: @registry
+
+  @public_streams ["public", "public:local", "public:media", "public:local:media"]
+  @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
+
+  @doc "Expands and authorizes a stream, and registers the process for streaming."
+  @spec get_topic_and_add_socket(
+          stream :: String.t(),
+          User.t() | nil,
+          Token.t() | nil,
+          Map.t() | nil
+        ) ::
+          {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
+  def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
+    with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
+      add_socket(topic, user)
+    end
+  end
 
-  @keepalive_interval :timer.seconds(30)
+  @doc "Expand and authorizes a stream"
+  @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
+          {:ok, topic :: String.t()} | {:error, :bad_topic}
+  def get_topic(stream, user, oauth_token, params \\ %{})
 
-  def start_link(_) do
-    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
+  # Allow all public steams.
+  def get_topic(stream, _user, _oauth_token, _params) when stream in @public_streams do
+    {:ok, stream}
   end
 
-  def add_socket(topic, socket) do
-    GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
+  # Allow all hashtags streams.
+  def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
+    {:ok, "hashtag:" <> tag}
   end
 
-  def remove_socket(topic, socket) do
-    GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
+  # Allow remote instance streams.
+  def get_topic("public:remote", _user, _oauth_token, %{"instance" => instance} = _params) do
+    {:ok, "public:remote:" <> instance}
   end
 
-  def stream(topic, item) do
-    GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
+  def get_topic("public:remote:media", _user, _oauth_token, %{"instance" => instance} = _params) do
+    {:ok, "public:remote:media:" <> instance}
   end
 
-  def init(args) do
-    Process.send_after(self(), %{action: :ping}, @keepalive_interval)
+  # Expand user streams.
+  def get_topic(
+        stream,
+        %User{id: user_id} = user,
+        %Token{user_id: user_id} = oauth_token,
+        _params
+      )
+      when stream in @user_streams do
+    # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
+    required_scopes =
+      if stream == "user:notification" do
+        ["read:notifications"]
+      else
+        ["read:statuses"]
+      end
+
+    if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
+      {:error, :unauthorized}
+    else
+      {:ok, stream <> ":" <> to_string(user.id)}
+    end
+  end
 
-    {:ok, args}
+  def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
+    {:error, :unauthorized}
   end
 
-  def handle_info(%{action: :ping}, topics) do
-    topics
-    |> Map.values()
-    |> List.flatten()
-    |> Enum.each(fn socket ->
-      Logger.debug("Sending keepalive ping")
-      send(socket.transport_pid, {:text, ""})
-    end)
+  # List streams.
+  def get_topic(
+        "list",
+        %User{id: user_id} = user,
+        %Token{user_id: user_id} = oauth_token,
+        %{"list" => id}
+      ) do
+    cond do
+      OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
+        {:error, :unauthorized}
+
+      Pleroma.List.get(id, user) ->
+        {:ok, "list:" <> to_string(id)}
+
+      true ->
+        {:error, :bad_topic}
+    end
+  end
+
+  def get_topic("list", _user, _oauth_token, _params) do
+    {:error, :unauthorized}
+  end
+
+  def get_topic(_stream, _user, _oauth_token, _params) do
+    {:error, :bad_topic}
+  end
+
+  @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
+  def add_socket(topic, user) do
+    if should_env_send?() do
+      auth? = if user, do: true
+      Registry.register(@registry, topic, auth?)
+    end
+
+    {:ok, topic}
+  end
 
-    Process.send_after(self(), %{action: :ping}, @keepalive_interval)
+  def remove_socket(topic) do
+    if should_env_send?(), do: Registry.unregister(@registry, topic)
+  end
 
-    {:noreply, topics}
+  def stream(topics, items) do
+    if should_env_send?() do
+      for topic <- List.wrap(topics), item <- List.wrap(items) do
+        spawn(fn -> do_stream(topic, item) end)
+      end
+    end
   end
 
-  def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
+  def filtered_by_user?(user, item, streamed_type \\ :activity)
+
+  def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
+    %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
+      User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
+
+    recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
+    recipients = MapSet.new(item.recipients)
+    domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
+
+    with parent <- Object.normalize(item) || item,
+         true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
+         true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
+         true <-
+           !(streamed_type == :activity && item.data["type"] == "Announce" &&
+               parent.data["actor"] == user.ap_id),
+         true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
+         true <- MapSet.disjoint?(recipients, recipient_blocks),
+         %{host: item_host} <- URI.parse(item.actor),
+         %{host: parent_host} <- URI.parse(parent.data["actor"]),
+         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
+         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
+         true <- thread_containment(item, user),
+         false <- CommonAPI.thread_muted?(user, parent) do
+      false
+    else
+      _ -> true
+    end
+  end
+
+  def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
+    filtered_by_user?(user, activity, :notification)
+  end
+
+  defp do_stream("direct", item) do
     recipient_topics =
       User.get_recipients_from_activity(item)
       |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
 
-    Enum.each(recipient_topics || [], fn user_topic ->
+    Enum.each(recipient_topics, fn user_topic ->
       Logger.debug("Trying to push direct message to #{user_topic}\n\n")
-      push_to_socket(topics, user_topic, item)
+      push_to_socket(user_topic, item)
     end)
+  end
+
+  defp do_stream("relationships:update", item) do
+    text = StreamerView.render("relationships_update.json", item)
 
-    {:noreply, topics}
+    [item.follower, item.following]
+    |> Enum.map(fn %{id: id} -> "user:#{id}" end)
+    |> Enum.each(fn user_topic ->
+      Logger.debug("Trying to push relationships:update to #{user_topic}\n\n")
+
+      Registry.dispatch(@registry, user_topic, fn list ->
+        Enum.each(list, fn {pid, _auth} ->
+          send(pid, {:text, text})
+        end)
+      end)
+    end)
   end
 
-  def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do
+  defp do_stream("participation", participation) do
     user_topic = "direct:#{participation.user_id}"
     Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
 
-    push_to_socket(topics, user_topic, participation)
-
-    {:noreply, topics}
+    push_to_socket(user_topic, participation)
   end
 
-  def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
+  defp do_stream("list", item) do
     # filter the recipient list if the activity is not public, see #270.
     recipient_lists =
       case Visibility.is_public?(item) do
@@ -96,36 +229,35 @@ defmodule Pleroma.Web.Streamer do
       recipient_lists
       |> Enum.map(fn %{id: id} -> "list:#{id}" end)
 
-    Enum.each(recipient_topics || [], fn list_topic ->
+    Enum.each(recipient_topics, fn list_topic ->
       Logger.debug("Trying to push message to #{list_topic}\n\n")
-      push_to_socket(topics, list_topic, item)
+      push_to_socket(list_topic, item)
     end)
-
-    {:noreply, topics}
   end
 
-  def handle_cast(
-        %{action: :stream, topic: topic, item: %Notification{} = item},
-        topics
-      )
-      when topic in ["user", "user:notification"] do
-    topics
-    |> Map.get("#{topic}:#{item.user_id}", [])
-    |> Enum.each(fn socket ->
-      with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id),
-           true <- should_send?(user, item),
-           false <- CommonAPI.thread_muted?(user, item.activity) do
-        send(
-          socket.transport_pid,
-          {:text, represent_notification(socket.assigns[:user], item)}
-        )
-      end
+  defp do_stream(topic, %Notification{} = item)
+       when topic in ["user", "user:notification"] do
+    Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
+      Enum.each(list, fn {pid, _auth} ->
+        send(pid, {:render_with_user, StreamerView, "notification.json", item})
+      end)
     end)
+  end
+
+  defp do_stream(topic, {user, %MessageReference{} = cm_ref})
+       when topic in ["user", "user:pleroma_chat"] do
+    topic = "#{topic}:#{user.id}"
 
-    {:noreply, topics}
+    text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
+
+    Registry.dispatch(@registry, topic, fn list ->
+      Enum.each(list, fn {pid, _auth} ->
+        send(pid, {:text, text})
+      end)
+    end)
   end
 
-  def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
+  defp do_stream("user", item) do
     Logger.debug("Trying to push to users")
 
     recipient_topics =
@@ -133,180 +265,55 @@ defmodule Pleroma.Web.Streamer do
       |> Enum.map(fn %{id: id} -> "user:#{id}" end)
 
     Enum.each(recipient_topics, fn topic ->
-      push_to_socket(topics, topic, item)
+      push_to_socket(topic, item)
     end)
-
-    {:noreply, topics}
   end
 
-  def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
+  defp do_stream(topic, item) do
     Logger.debug("Trying to push to #{topic}")
     Logger.debug("Pushing item to #{topic}")
-    push_to_socket(topics, topic, item)
-    {:noreply, topics}
-  end
-
-  def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
-    topic = internal_topic(topic, socket)
-    sockets_for_topic = sockets[topic] || []
-    sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
-    sockets = Map.put(sockets, topic, sockets_for_topic)
-    Logger.debug("Got new conn for #{topic}")
-    {:noreply, sockets}
-  end
-
-  def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
-    topic = internal_topic(topic, socket)
-    sockets_for_topic = sockets[topic] || []
-    sockets_for_topic = List.delete(sockets_for_topic, socket)
-    sockets = Map.put(sockets, topic, sockets_for_topic)
-    Logger.debug("Removed conn for #{topic}")
-    {:noreply, sockets}
-  end
-
-  def handle_cast(m, state) do
-    Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
-    {:noreply, state}
-  end
-
-  defp represent_update(%Activity{} = activity, %User{} = user) do
-    %{
-      event: "update",
-      payload:
-        Pleroma.Web.MastodonAPI.StatusView.render(
-          "status.json",
-          activity: activity,
-          for: user
-        )
-        |> Jason.encode!()
-    }
-    |> Jason.encode!()
-  end
-
-  defp represent_update(%Activity{} = activity) do
-    %{
-      event: "update",
-      payload:
-        Pleroma.Web.MastodonAPI.StatusView.render(
-          "status.json",
-          activity: activity
-        )
-        |> Jason.encode!()
-    }
-    |> Jason.encode!()
-  end
-
-  def represent_conversation(%Participation{} = participation) do
-    %{
-      event: "conversation",
-      payload:
-        Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
-          participation: participation,
-          user: participation.user
-        })
-        |> Jason.encode!()
-    }
-    |> Jason.encode!()
-  end
-
-  @spec represent_notification(User.t(), Notification.t()) :: binary()
-  defp represent_notification(%User{} = user, %Notification{} = notify) do
-    %{
-      event: "notification",
-      payload:
-        NotificationView.render(
-          "show.json",
-          %{notification: notify, for: user}
-        )
-        |> Jason.encode!()
-    }
-    |> Jason.encode!()
-  end
-
-  defp should_send?(%User{} = user, %Activity{} = item) do
-    blocks = user.info.blocks || []
-    mutes = user.info.mutes || []
-    reblog_mutes = user.info.muted_reblogs || []
-    domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
-
-    with parent when not is_nil(parent) <- Object.normalize(item),
-         true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
-         true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
-         %{host: item_host} <- URI.parse(item.actor),
-         %{host: parent_host} <- URI.parse(parent.data["actor"]),
-         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
-         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
-         true <- thread_containment(item, user) do
-      true
-    else
-      _ -> false
-    end
-  end
-
-  defp should_send?(%User{} = user, %Notification{activity: activity}) do
-    should_send?(user, activity)
+    push_to_socket(topic, item)
   end
 
-  def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
-    Enum.each(topics[topic] || [], fn socket ->
-      # Get the current user so we have up-to-date blocks etc.
-      if socket.assigns[:user] do
-        user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
+  defp push_to_socket(topic, %Participation{} = participation) do
+    rendered = StreamerView.render("conversation.json", participation)
 
-        if should_send?(user, item) do
-          send(socket.transport_pid, {:text, represent_update(item, user)})
-        end
-      else
-        send(socket.transport_pid, {:text, represent_update(item)})
-      end
+    Registry.dispatch(@registry, topic, fn list ->
+      Enum.each(list, fn {pid, _} ->
+        send(pid, {:text, rendered})
+      end)
     end)
   end
 
-  def push_to_socket(topics, topic, %Participation{} = participation) do
-    Enum.each(topics[topic] || [], fn socket ->
-      send(socket.transport_pid, {:text, represent_conversation(participation)})
-    end)
-  end
+  defp push_to_socket(topic, %Activity{
+         data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
+       }) do
+    rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
 
-  def push_to_socket(topics, topic, %Activity{
-        data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
-      }) do
-    Enum.each(topics[topic] || [], fn socket ->
-      send(
-        socket.transport_pid,
-        {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
-      )
+    Registry.dispatch(@registry, topic, fn list ->
+      Enum.each(list, fn {pid, _} ->
+        send(pid, {:text, rendered})
+      end)
     end)
   end
 
-  def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
+  defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
 
-  def push_to_socket(topics, topic, item) do
-    Enum.each(topics[topic] || [], fn socket ->
-      # Get the current user so we have up-to-date blocks etc.
-      if socket.assigns[:user] do
-        user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
-        blocks = user.info.blocks || []
-        mutes = user.info.mutes || []
+  defp push_to_socket(topic, item) do
+    anon_render = StreamerView.render("update.json", item)
 
-        with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
-             true <- thread_containment(item, user) do
-          send(socket.transport_pid, {:text, represent_update(item, user)})
+    Registry.dispatch(@registry, topic, fn list ->
+      Enum.each(list, fn {pid, auth?} ->
+        if auth? do
+          send(pid, {:render_with_user, StreamerView, "update.json", item})
+        else
+          send(pid, {:text, anon_render})
         end
-      else
-        send(socket.transport_pid, {:text, represent_update(item)})
-      end
+      end)
     end)
   end
 
-  defp internal_topic(topic, socket) when topic in ~w[user user:notification direct] do
-    "#{topic}:#{socket.assigns[:user].id}"
-  end
-
-  defp internal_topic(topic, _), do: topic
-
-  @spec thread_containment(Activity.t(), User.t()) :: boolean()
-  defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
+  defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
 
   defp thread_containment(activity, user) do
     if Config.get([:instance, :skip_thread_containment]) do
@@ -315,4 +322,26 @@ defmodule Pleroma.Web.Streamer do
       ActivityPub.contain_activity(activity, user)
     end
   end
+
+  # In test environement, only return true if the registry is started.
+  # In benchmark environment, returns false.
+  # In any other environment, always returns true.
+  cond do
+    @mix_env == :test ->
+      def should_env_send? do
+        case Process.whereis(@registry) do
+          nil ->
+            false
+
+          pid ->
+            Process.alive?(pid)
+        end
+      end
+
+    @mix_env == :benchmark ->
+      def should_env_send?, do: false
+
+    true ->
+      def should_env_send?, do: true
+  end
 end