# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Streamer do
- use GenServer
require Logger
+
alias Pleroma.Activity
+ alias Pleroma.Chat.MessageReference
+ alias Pleroma.Config
+ alias Pleroma.Conversation.Participation
alias Pleroma.Notification
alias Pleroma.Object
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Visibility
- alias Pleroma.Web.MastodonAPI.NotificationView
+ alias Pleroma.Web.CommonAPI
+ alias Pleroma.Web.OAuth.Token
+ alias Pleroma.Web.Plugs.OAuthScopesPlug
+ alias Pleroma.Web.StreamerView
+
+ @mix_env Mix.env()
+ @registry Pleroma.Web.StreamerRegistry
+
+ def registry, do: @registry
+
+ @public_streams ["public", "public:local", "public:media", "public:local:media"]
+ @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
+
+ @doc "Expands and authorizes a stream, and registers the process for streaming."
+ @spec get_topic_and_add_socket(
+ stream :: String.t(),
+ User.t() | nil,
+ Token.t() | nil,
+ Map.t() | nil
+ ) ::
+ {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
+ def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
+ with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
+ add_socket(topic, user)
+ end
+ end
- @keepalive_interval :timer.seconds(30)
+ @doc "Expand and authorizes a stream"
+ @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
+ {:ok, topic :: String.t()} | {:error, :bad_topic}
+ def get_topic(stream, user, oauth_token, params \\ %{})
- def start_link do
- GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
+ # Allow all public steams.
+ def get_topic(stream, _user, _oauth_token, _params) when stream in @public_streams do
+ {:ok, stream}
end
- def add_socket(topic, socket) do
- GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
+ # Allow all hashtags streams.
+ def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
+ {:ok, "hashtag:" <> tag}
end
- def remove_socket(topic, socket) do
- GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
+ # Allow remote instance streams.
+ def get_topic("public:remote", _user, _oauth_token, %{"instance" => instance} = _params) do
+ {:ok, "public:remote:" <> instance}
end
- def stream(topic, item) do
- GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
+ def get_topic("public:remote:media", _user, _oauth_token, %{"instance" => instance} = _params) do
+ {:ok, "public:remote:media:" <> instance}
end
- def init(args) do
- spawn(fn ->
- # 30 seconds
- Process.sleep(@keepalive_interval)
- GenServer.cast(__MODULE__, %{action: :ping})
- end)
+ # Expand user streams.
+ def get_topic(
+ stream,
+ %User{id: user_id} = user,
+ %Token{user_id: user_id} = oauth_token,
+ _params
+ )
+ when stream in @user_streams do
+ # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
+ required_scopes =
+ if stream == "user:notification" do
+ ["read:notifications"]
+ else
+ ["read:statuses"]
+ end
- {:ok, args}
+ if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
+ {:error, :unauthorized}
+ else
+ {:ok, stream <> ":" <> to_string(user.id)}
+ end
end
- def handle_cast(%{action: :ping}, topics) do
- Map.values(topics)
- |> List.flatten()
- |> Enum.each(fn socket ->
- Logger.debug("Sending keepalive ping")
- send(socket.transport_pid, {:text, ""})
- end)
+ def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
+ {:error, :unauthorized}
+ end
- spawn(fn ->
- # 30 seconds
- Process.sleep(@keepalive_interval)
- GenServer.cast(__MODULE__, %{action: :ping})
- end)
+ # List streams.
+ def get_topic(
+ "list",
+ %User{id: user_id} = user,
+ %Token{user_id: user_id} = oauth_token,
+ %{"list" => id}
+ ) do
+ cond do
+ OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
+ {:error, :unauthorized}
+
+ Pleroma.List.get(id, user) ->
+ {:ok, "list:" <> to_string(id)}
+
+ true ->
+ {:error, :bad_topic}
+ end
+ end
+
+ def get_topic("list", _user, _oauth_token, _params) do
+ {:error, :unauthorized}
+ end
+
+ def get_topic(_stream, _user, _oauth_token, _params) do
+ {:error, :bad_topic}
+ end
+
+ @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
+ def add_socket(topic, user) do
+ if should_env_send?() do
+ auth? = if user, do: true
+ Registry.register(@registry, topic, auth?)
+ end
+
+ {:ok, topic}
+ end
- {:noreply, topics}
+ def remove_socket(topic) do
+ if should_env_send?(), do: Registry.unregister(@registry, topic)
+ end
+
+ def stream(topics, items) do
+ if should_env_send?() do
+ for topic <- List.wrap(topics), item <- List.wrap(items) do
+ spawn(fn -> do_stream(topic, item) end)
+ end
+ end
end
- def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
+ def filtered_by_user?(user, item, streamed_type \\ :activity)
+
+ def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
+ %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
+ User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
+
+ recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
+ recipients = MapSet.new(item.recipients)
+ domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
+
+ with parent <- Object.normalize(item) || item,
+ true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
+ true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
+ true <-
+ !(streamed_type == :activity && item.data["type"] == "Announce" &&
+ parent.data["actor"] == user.ap_id),
+ true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
+ true <- MapSet.disjoint?(recipients, recipient_blocks),
+ %{host: item_host} <- URI.parse(item.actor),
+ %{host: parent_host} <- URI.parse(parent.data["actor"]),
+ false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
+ false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
+ true <- thread_containment(item, user),
+ false <- CommonAPI.thread_muted?(user, parent) do
+ false
+ else
+ _ -> true
+ end
+ end
+
+ def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
+ filtered_by_user?(user, activity, :notification)
+ end
+
+ defp do_stream("direct", item) do
recipient_topics =
User.get_recipients_from_activity(item)
|> Enum.map(fn %{id: id} -> "direct:#{id}" end)
- Enum.each(recipient_topics || [], fn user_topic ->
+ Enum.each(recipient_topics, fn user_topic ->
Logger.debug("Trying to push direct message to #{user_topic}\n\n")
- push_to_socket(topics, user_topic, item)
+ push_to_socket(user_topic, item)
end)
+ end
+
+ defp do_stream("follow_relationship", item) do
+ text = StreamerView.render("follow_relationships_update.json", item)
+ user_topic = "user:#{item.follower.id}"
- {:noreply, topics}
+ Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
+
+ Registry.dispatch(@registry, user_topic, fn list ->
+ Enum.each(list, fn {pid, _auth} ->
+ send(pid, {:text, text})
+ end)
+ end)
end
- def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
+ defp do_stream("participation", participation) do
+ user_topic = "direct:#{participation.user_id}"
+ Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
+
+ push_to_socket(user_topic, participation)
+ end
+
+ defp do_stream("list", item) do
# filter the recipient list if the activity is not public, see #270.
recipient_lists =
case Visibility.is_public?(item) do
_ ->
Pleroma.List.get_lists_from_activity(item)
|> Enum.filter(fn list ->
- owner = User.get_by_id(list.user_id)
+ owner = User.get_cached_by_id(list.user_id)
Visibility.visible_for_user?(item, owner)
end)
recipient_lists
|> Enum.map(fn %{id: id} -> "list:#{id}" end)
- Enum.each(recipient_topics || [], fn list_topic ->
+ Enum.each(recipient_topics, fn list_topic ->
Logger.debug("Trying to push message to #{list_topic}\n\n")
- push_to_socket(topics, list_topic, item)
+ push_to_socket(list_topic, item)
end)
+ end
- {:noreply, topics}
+ defp do_stream(topic, %Notification{} = item)
+ when topic in ["user", "user:notification"] do
+ Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
+ Enum.each(list, fn {pid, _auth} ->
+ send(pid, {:render_with_user, StreamerView, "notification.json", item})
+ end)
+ end)
end
- def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
- topic = "user:#{item.user_id}"
+ defp do_stream(topic, {user, %MessageReference{} = cm_ref})
+ when topic in ["user", "user:pleroma_chat"] do
+ topic = "#{topic}:#{user.id}"
- Enum.each(topics[topic] || [], fn socket ->
- json =
- %{
- event: "notification",
- payload:
- NotificationView.render("show.json", %{
- notification: item,
- for: socket.assigns["user"]
- })
- |> Jason.encode!()
- }
- |> Jason.encode!()
+ text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
- send(socket.transport_pid, {:text, json})
+ Registry.dispatch(@registry, topic, fn list ->
+ Enum.each(list, fn {pid, _auth} ->
+ send(pid, {:text, text})
+ end)
end)
-
- {:noreply, topics}
end
- def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
+ defp do_stream("user", item) do
Logger.debug("Trying to push to users")
recipient_topics =
|> Enum.map(fn %{id: id} -> "user:#{id}" end)
Enum.each(recipient_topics, fn topic ->
- push_to_socket(topics, topic, item)
+ push_to_socket(topic, item)
end)
-
- {:noreply, topics}
end
- def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
+ defp do_stream(topic, item) do
Logger.debug("Trying to push to #{topic}")
Logger.debug("Pushing item to #{topic}")
- push_to_socket(topics, topic, item)
- {:noreply, topics}
- end
-
- def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
- topic = internal_topic(topic, socket)
- sockets_for_topic = sockets[topic] || []
- sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
- sockets = Map.put(sockets, topic, sockets_for_topic)
- Logger.debug("Got new conn for #{topic}")
- {:noreply, sockets}
- end
-
- def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
- topic = internal_topic(topic, socket)
- sockets_for_topic = sockets[topic] || []
- sockets_for_topic = List.delete(sockets_for_topic, socket)
- sockets = Map.put(sockets, topic, sockets_for_topic)
- Logger.debug("Removed conn for #{topic}")
- {:noreply, sockets}
- end
-
- def handle_cast(m, state) do
- Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
- {:noreply, state}
- end
-
- defp represent_update(%Activity{} = activity, %User{} = user) do
- %{
- event: "update",
- payload:
- Pleroma.Web.MastodonAPI.StatusView.render(
- "status.json",
- activity: activity,
- for: user
- )
- |> Jason.encode!()
- }
- |> Jason.encode!()
- end
-
- defp represent_update(%Activity{} = activity) do
- %{
- event: "update",
- payload:
- Pleroma.Web.MastodonAPI.StatusView.render(
- "status.json",
- activity: activity
- )
- |> Jason.encode!()
- }
- |> Jason.encode!()
- end
-
- def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
- Enum.each(topics[topic] || [], fn socket ->
- # Get the current user so we have up-to-date blocks etc.
- if socket.assigns[:user] do
- user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
- blocks = user.info.blocks || []
- mutes = user.info.mutes || []
- reblog_mutes = user.info.muted_reblogs || []
-
- parent = Object.normalize(item)
-
- unless is_nil(parent) or item.actor in blocks or item.actor in mutes or
- item.actor in reblog_mutes or not ActivityPub.contain_activity(item, user) or
- parent.data["actor"] in blocks or parent.data["actor"] in mutes do
- send(socket.transport_pid, {:text, represent_update(item, user)})
- end
- else
- send(socket.transport_pid, {:text, represent_update(item)})
- end
+ push_to_socket(topic, item)
+ end
+
+ defp push_to_socket(topic, %Participation{} = participation) do
+ rendered = StreamerView.render("conversation.json", participation)
+
+ Registry.dispatch(@registry, topic, fn list ->
+ Enum.each(list, fn {pid, _} ->
+ send(pid, {:text, rendered})
+ end)
end)
end
- def push_to_socket(topics, topic, %Activity{
- data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
- }) do
- Enum.each(topics[topic] || [], fn socket ->
- send(
- socket.transport_pid,
- {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
- )
+ defp push_to_socket(topic, %Activity{
+ data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
+ }) do
+ rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
+
+ Registry.dispatch(@registry, topic, fn list ->
+ Enum.each(list, fn {pid, _} ->
+ send(pid, {:text, rendered})
+ end)
end)
end
- def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
+ defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
- def push_to_socket(topics, topic, item) do
- Enum.each(topics[topic] || [], fn socket ->
- # Get the current user so we have up-to-date blocks etc.
- if socket.assigns[:user] do
- user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
- blocks = user.info.blocks || []
- mutes = user.info.mutes || []
+ defp push_to_socket(topic, item) do
+ anon_render = StreamerView.render("update.json", item)
- unless item.actor in blocks or item.actor in mutes or
- not ActivityPub.contain_activity(item, user) do
- send(socket.transport_pid, {:text, represent_update(item, user)})
+ Registry.dispatch(@registry, topic, fn list ->
+ Enum.each(list, fn {pid, auth?} ->
+ if auth? do
+ send(pid, {:render_with_user, StreamerView, "update.json", item})
+ else
+ send(pid, {:text, anon_render})
end
- else
- send(socket.transport_pid, {:text, represent_update(item)})
- end
+ end)
end)
end
- defp internal_topic(topic, socket) when topic in ~w[user direct] do
- "#{topic}:#{socket.assigns[:user].id}"
+ defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
+
+ defp thread_containment(activity, user) do
+ if Config.get([:instance, :skip_thread_containment]) do
+ true
+ else
+ ActivityPub.contain_activity(activity, user)
+ end
end
- defp internal_topic(topic, _), do: topic
+ # In test environement, only return true if the registry is started.
+ # In benchmark environment, returns false.
+ # In any other environment, always returns true.
+ cond do
+ @mix_env == :test ->
+ def should_env_send? do
+ case Process.whereis(@registry) do
+ nil ->
+ false
+
+ pid ->
+ Process.alive?(pid)
+ end
+ end
+
+ @mix_env == :benchmark ->
+ def should_env_send?, do: false
+
+ true ->
+ def should_env_send?, do: true
+ end
end