X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fweb%2Fstreamer.ex;h=0b6cc89e906b4f2d8858fb68261b0d288062e686;hb=35ba48494f5129d3a0010b045ff36d98e7e7984f;hp=86e2dc4ddc8e3d8c4deb1b5c67cbe8972a98689a;hpb=7271b2a954ee242e5d5ccf7602dc0772efadea21;p=akkoma diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex index 86e2dc4dd..0b6cc89e9 100644 --- a/lib/pleroma/web/streamer.ex +++ b/lib/pleroma/web/streamer.ex @@ -1,11 +1,12 @@ # Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors +# Copyright © 2017-2020 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Streamer do - use GenServer require Logger + alias Pleroma.Activity + alias Pleroma.Chat.MessageReference alias Pleroma.Config alias Pleroma.Conversation.Participation alias Pleroma.Notification @@ -14,76 +15,201 @@ defmodule Pleroma.Web.Streamer do alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.CommonAPI - alias Pleroma.Web.MastodonAPI.NotificationView + alias Pleroma.Web.OAuth.Token + alias Pleroma.Web.Plugs.OAuthScopesPlug + alias Pleroma.Web.StreamerView + + @mix_env Mix.env() + @registry Pleroma.Web.StreamerRegistry + + def registry, do: @registry + + @public_streams ["public", "public:local", "public:media", "public:local:media"] + @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"] + + @doc "Expands and authorizes a stream, and registers the process for streaming." + @spec get_topic_and_add_socket( + stream :: String.t(), + User.t() | nil, + Token.t() | nil, + Map.t() | nil + ) :: + {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized} + def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do + with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do + add_socket(topic, user) + end + end - @keepalive_interval :timer.seconds(30) + @doc "Expand and authorizes a stream" + @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) :: + {:ok, topic :: String.t()} | {:error, :bad_topic} + def get_topic(stream, user, oauth_token, params \\ %{}) - def start_link do - GenServer.start_link(__MODULE__, %{}, name: __MODULE__) + # Allow all public steams. + def get_topic(stream, _user, _oauth_token, _params) when stream in @public_streams do + {:ok, stream} end - def add_socket(topic, socket) do - GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic}) + # Allow all hashtags streams. + def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do + {:ok, "hashtag:" <> tag} end - def remove_socket(topic, socket) do - GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic}) + # Allow remote instance streams. + def get_topic("public:remote", _user, _oauth_token, %{"instance" => instance} = _params) do + {:ok, "public:remote:" <> instance} end - def stream(topic, item) do - GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item}) + def get_topic("public:remote:media", _user, _oauth_token, %{"instance" => instance} = _params) do + {:ok, "public:remote:media:" <> instance} end - def init(args) do - spawn(fn -> - # 30 seconds - Process.sleep(@keepalive_interval) - GenServer.cast(__MODULE__, %{action: :ping}) - end) + # Expand user streams. + def get_topic( + stream, + %User{id: user_id} = user, + %Token{user_id: user_id} = oauth_token, + _params + ) + when stream in @user_streams do + # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope) + required_scopes = + if stream == "user:notification" do + ["read:notifications"] + else + ["read:statuses"] + end - {:ok, args} + if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do + {:error, :unauthorized} + else + {:ok, stream <> ":" <> to_string(user.id)} + end end - def handle_cast(%{action: :ping}, topics) do - Map.values(topics) - |> List.flatten() - |> Enum.each(fn socket -> - Logger.debug("Sending keepalive ping") - send(socket.transport_pid, {:text, ""}) - end) + def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do + {:error, :unauthorized} + end - spawn(fn -> - # 30 seconds - Process.sleep(@keepalive_interval) - GenServer.cast(__MODULE__, %{action: :ping}) - end) + # List streams. + def get_topic( + "list", + %User{id: user_id} = user, + %Token{user_id: user_id} = oauth_token, + %{"list" => id} + ) do + cond do + OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] -> + {:error, :unauthorized} + + Pleroma.List.get(id, user) -> + {:ok, "list:" <> to_string(id)} + + true -> + {:error, :bad_topic} + end + end + + def get_topic("list", _user, _oauth_token, _params) do + {:error, :unauthorized} + end + + def get_topic(_stream, _user, _oauth_token, _params) do + {:error, :bad_topic} + end + + @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic." + def add_socket(topic, user) do + if should_env_send?() do + auth? = if user, do: true + Registry.register(@registry, topic, auth?) + end + + {:ok, topic} + end + + def remove_socket(topic) do + if should_env_send?(), do: Registry.unregister(@registry, topic) + end + + def stream(topics, items) do + if should_env_send?() do + for topic <- List.wrap(topics), item <- List.wrap(items) do + spawn(fn -> do_stream(topic, item) end) + end + end + end + + def filtered_by_user?(user, item, streamed_type \\ :activity) + + def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do + %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} = + User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute]) + + recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids) + recipients = MapSet.new(item.recipients) + domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks) + + with parent <- Object.normalize(item) || item, + true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)), + true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids, + true <- + !(streamed_type == :activity && item.data["type"] == "Announce" && + parent.data["actor"] == user.ap_id), + true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)), + true <- MapSet.disjoint?(recipients, recipient_blocks), + %{host: item_host} <- URI.parse(item.actor), + %{host: parent_host} <- URI.parse(parent.data["actor"]), + false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), + false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), + true <- thread_containment(item, user), + false <- CommonAPI.thread_muted?(user, parent) do + false + else + _ -> true + end + end - {:noreply, topics} + def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do + filtered_by_user?(user, activity, :notification) end - def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do + defp do_stream("direct", item) do recipient_topics = User.get_recipients_from_activity(item) |> Enum.map(fn %{id: id} -> "direct:#{id}" end) - Enum.each(recipient_topics || [], fn user_topic -> + Enum.each(recipient_topics, fn user_topic -> Logger.debug("Trying to push direct message to #{user_topic}\n\n") - push_to_socket(topics, user_topic, item) + push_to_socket(user_topic, item) end) + end + + defp do_stream("relationships:update", item) do + text = StreamerView.render("relationships_update.json", item) + + [item.follower, item.following] + |> Enum.map(fn %{id: id} -> "user:#{id}" end) + |> Enum.each(fn user_topic -> + Logger.debug("Trying to push relationships:update to #{user_topic}\n\n") - {:noreply, topics} + Registry.dispatch(@registry, user_topic, fn list -> + Enum.each(list, fn {pid, _auth} -> + send(pid, {:text, text}) + end) + end) + end) end - def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do + defp do_stream("participation", participation) do user_topic = "direct:#{participation.user_id}" Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") - push_to_socket(topics, user_topic, participation) - - {:noreply, topics} + push_to_socket(user_topic, participation) end - def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do + defp do_stream("list", item) do # filter the recipient list if the activity is not public, see #270. recipient_lists = case Visibility.is_public?(item) do @@ -103,36 +229,35 @@ defmodule Pleroma.Web.Streamer do recipient_lists |> Enum.map(fn %{id: id} -> "list:#{id}" end) - Enum.each(recipient_topics || [], fn list_topic -> + Enum.each(recipient_topics, fn list_topic -> Logger.debug("Trying to push message to #{list_topic}\n\n") - push_to_socket(topics, list_topic, item) + push_to_socket(list_topic, item) end) - - {:noreply, topics} end - def handle_cast( - %{action: :stream, topic: topic, item: %Notification{} = item}, - topics - ) - when topic in ["user", "user:notification"] do - topics - |> Map.get("#{topic}:#{item.user_id}", []) - |> Enum.each(fn socket -> - with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id), - true <- should_send?(user, item), - false <- CommonAPI.thread_muted?(user, item.activity) do - send( - socket.transport_pid, - {:text, represent_notification(socket.assigns[:user], item)} - ) - end + defp do_stream(topic, %Notification{} = item) + when topic in ["user", "user:notification"] do + Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list -> + Enum.each(list, fn {pid, _auth} -> + send(pid, {:render_with_user, StreamerView, "notification.json", item}) + end) end) + end + + defp do_stream(topic, {user, %MessageReference{} = cm_ref}) + when topic in ["user", "user:pleroma_chat"] do + topic = "#{topic}:#{user.id}" - {:noreply, topics} + text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) + + Registry.dispatch(@registry, topic, fn list -> + Enum.each(list, fn {pid, _auth} -> + send(pid, {:text, text}) + end) + end) end - def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do + defp do_stream("user", item) do Logger.debug("Trying to push to users") recipient_topics = @@ -140,175 +265,55 @@ defmodule Pleroma.Web.Streamer do |> Enum.map(fn %{id: id} -> "user:#{id}" end) Enum.each(recipient_topics, fn topic -> - push_to_socket(topics, topic, item) + push_to_socket(topic, item) end) - - {:noreply, topics} end - def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do + defp do_stream(topic, item) do Logger.debug("Trying to push to #{topic}") Logger.debug("Pushing item to #{topic}") - push_to_socket(topics, topic, item) - {:noreply, topics} - end - - def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do - topic = internal_topic(topic, socket) - sockets_for_topic = sockets[topic] || [] - sockets_for_topic = Enum.uniq([socket | sockets_for_topic]) - sockets = Map.put(sockets, topic, sockets_for_topic) - Logger.debug("Got new conn for #{topic}") - {:noreply, sockets} - end - - def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do - topic = internal_topic(topic, socket) - sockets_for_topic = sockets[topic] || [] - sockets_for_topic = List.delete(sockets_for_topic, socket) - sockets = Map.put(sockets, topic, sockets_for_topic) - Logger.debug("Removed conn for #{topic}") - {:noreply, sockets} - end - - def handle_cast(m, state) do - Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}") - {:noreply, state} - end - - defp represent_update(%Activity{} = activity, %User{} = user) do - %{ - event: "update", - payload: - Pleroma.Web.MastodonAPI.StatusView.render( - "status.json", - activity: activity, - for: user - ) - |> Jason.encode!() - } - |> Jason.encode!() - end - - defp represent_update(%Activity{} = activity) do - %{ - event: "update", - payload: - Pleroma.Web.MastodonAPI.StatusView.render( - "status.json", - activity: activity - ) - |> Jason.encode!() - } - |> Jason.encode!() - end - - def represent_conversation(%Participation{} = participation) do - %{ - event: "conversation", - payload: - Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{ - participation: participation, - user: participation.user - }) - |> Jason.encode!() - } - |> Jason.encode!() - end - - @spec represent_notification(User.t(), Notification.t()) :: binary() - defp represent_notification(%User{} = user, %Notification{} = notify) do - %{ - event: "notification", - payload: - NotificationView.render( - "show.json", - %{notification: notify, for: user} - ) - |> Jason.encode!() - } - |> Jason.encode!() - end - - defp should_send?(%User{} = user, %Activity{} = item) do - blocks = user.info.blocks || [] - mutes = user.info.mutes || [] - reblog_mutes = user.info.muted_reblogs || [] - - with parent when not is_nil(parent) <- Object.normalize(item), - true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)), - true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)), - true <- thread_containment(item, user) do - true - else - _ -> false - end - end - - defp should_send?(%User{} = user, %Notification{activity: activity}) do - should_send?(user, activity) + push_to_socket(topic, item) end - def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do - Enum.each(topics[topic] || [], fn socket -> - # Get the current user so we have up-to-date blocks etc. - if socket.assigns[:user] do - user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id) + defp push_to_socket(topic, %Participation{} = participation) do + rendered = StreamerView.render("conversation.json", participation) - if should_send?(user, item) do - send(socket.transport_pid, {:text, represent_update(item, user)}) - end - else - send(socket.transport_pid, {:text, represent_update(item)}) - end + Registry.dispatch(@registry, topic, fn list -> + Enum.each(list, fn {pid, _} -> + send(pid, {:text, rendered}) + end) end) end - def push_to_socket(topics, topic, %Participation{} = participation) do - Enum.each(topics[topic] || [], fn socket -> - send(socket.transport_pid, {:text, represent_conversation(participation)}) - end) - end + defp push_to_socket(topic, %Activity{ + data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} + }) do + rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)}) - def push_to_socket(topics, topic, %Activity{ - data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} - }) do - Enum.each(topics[topic] || [], fn socket -> - send( - socket.transport_pid, - {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()} - ) + Registry.dispatch(@registry, topic, fn list -> + Enum.each(list, fn {pid, _} -> + send(pid, {:text, rendered}) + end) end) end - def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop + defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop - def push_to_socket(topics, topic, item) do - Enum.each(topics[topic] || [], fn socket -> - # Get the current user so we have up-to-date blocks etc. - if socket.assigns[:user] do - user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id) - blocks = user.info.blocks || [] - mutes = user.info.mutes || [] + defp push_to_socket(topic, item) do + anon_render = StreamerView.render("update.json", item) - with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)), - true <- thread_containment(item, user) do - send(socket.transport_pid, {:text, represent_update(item, user)}) + Registry.dispatch(@registry, topic, fn list -> + Enum.each(list, fn {pid, auth?} -> + if auth? do + send(pid, {:render_with_user, StreamerView, "update.json", item}) + else + send(pid, {:text, anon_render}) end - else - send(socket.transport_pid, {:text, represent_update(item)}) - end + end) end) end - defp internal_topic(topic, socket) when topic in ~w[user user:notification direct] do - "#{topic}:#{socket.assigns[:user].id}" - end - - defp internal_topic(topic, _), do: topic - - @spec thread_containment(Activity.t(), User.t()) :: boolean() - defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true + defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true defp thread_containment(activity, user) do if Config.get([:instance, :skip_thread_containment]) do @@ -317,4 +322,26 @@ defmodule Pleroma.Web.Streamer do ActivityPub.contain_activity(activity, user) end end + + # In test environement, only return true if the registry is started. + # In benchmark environment, returns false. + # In any other environment, always returns true. + cond do + @mix_env == :test -> + def should_env_send? do + case Process.whereis(@registry) do + nil -> + false + + pid -> + Process.alive?(pid) + end + end + + @mix_env == :benchmark -> + def should_env_send?, do: false + + true -> + def should_env_send?, do: true + end end