1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.Streamer do
9 alias Pleroma.Chat.MessageReference
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Notification
14 alias Pleroma.Plugs.OAuthScopesPlug
16 alias Pleroma.Web.ActivityPub.ActivityPub
17 alias Pleroma.Web.ActivityPub.Visibility
18 alias Pleroma.Web.CommonAPI
19 alias Pleroma.Web.OAuth.Token
20 alias Pleroma.Web.StreamerView
23 @registry Pleroma.Web.StreamerRegistry
25 def registry, do: @registry
27 @public_streams ["public", "public:local", "public:media", "public:local:media"]
28 @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
30 @doc "Expands and authorizes a stream, and registers the process for streaming."
31 @spec get_topic_and_add_socket(
37 {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
38 def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
39 case get_topic(stream, user, oauth_token, params) do
40 {:ok, topic} -> add_socket(topic, user)
45 @doc "Expand and authorizes a stream"
46 @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
47 {:ok, topic :: String.t()} | {:error, :bad_topic}
48 def get_topic(stream, user, oauth_token, params \\ %{})
50 # Allow all public steams.
51 def get_topic(stream, _user, _oauth_token, _params) when stream in @public_streams do
55 # Allow all hashtags streams.
56 def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
57 {:ok, "hashtag:" <> tag}
60 # Expand user streams.
63 %User{id: user_id} = user,
64 %Token{user_id: token_user_id} = oauth_token,
67 when stream in @user_streams and user_id == token_user_id do
68 # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
70 if stream == "user:notification" do
71 ["read:notifications"]
76 if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
77 {:error, :unauthorized}
79 {:ok, stream <> ":" <> to_string(user.id)}
83 def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
84 {:error, :unauthorized}
90 %User{id: user_id} = user,
91 %Token{user_id: token_user_id} = oauth_token,
94 when user_id == token_user_id do
96 OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
97 {:error, :unauthorized}
99 Pleroma.List.get(id, user) ->
100 {:ok, "list:" <> to_string(id)}
107 def get_topic("list", _user, _oauth_token, _params) do
108 {:error, :unauthorized}
111 def get_topic(_stream, _user, _oauth_token, _params) do
115 @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
116 def add_socket(topic, user) do
117 if should_env_send?() do
118 auth? = if user, do: true
119 Registry.register(@registry, topic, auth?)
125 def remove_socket(topic) do
126 if should_env_send?(), do: Registry.unregister(@registry, topic)
129 def stream(topics, items) do
130 if should_env_send?() do
132 |> Enum.each(fn topic ->
134 |> Enum.each(fn item ->
135 spawn(fn -> do_stream(topic, item) end)
143 def filtered_by_user?(user, item, streamed_type \\ :activity)
145 def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
146 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
147 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
149 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
150 recipients = MapSet.new(item.recipients)
151 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
153 with parent <- Object.normalize(item) || item,
155 Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
156 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
158 !(streamed_type == :activity && item.data["type"] == "Announce" &&
159 parent.data["actor"] == user.ap_id),
160 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
161 true <- MapSet.disjoint?(recipients, recipient_blocks),
162 %{host: item_host} <- URI.parse(item.actor),
163 %{host: parent_host} <- URI.parse(parent.data["actor"]),
164 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
165 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
166 true <- thread_containment(item, user),
167 false <- CommonAPI.thread_muted?(user, parent) do
174 def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
175 filtered_by_user?(user, activity, :notification)
178 defp do_stream("direct", item) do
180 User.get_recipients_from_activity(item)
181 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
183 Enum.each(recipient_topics, fn user_topic ->
184 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
185 push_to_socket(user_topic, item)
189 defp do_stream("participation", participation) do
190 user_topic = "direct:#{participation.user_id}"
191 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
193 push_to_socket(user_topic, participation)
196 defp do_stream("list", item) do
197 # filter the recipient list if the activity is not public, see #270.
199 case Visibility.is_public?(item) do
201 Pleroma.List.get_lists_from_activity(item)
204 Pleroma.List.get_lists_from_activity(item)
205 |> Enum.filter(fn list ->
206 owner = User.get_cached_by_id(list.user_id)
208 Visibility.visible_for_user?(item, owner)
214 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
216 Enum.each(recipient_topics, fn list_topic ->
217 Logger.debug("Trying to push message to #{list_topic}\n\n")
218 push_to_socket(list_topic, item)
222 defp do_stream(topic, %Notification{} = item)
223 when topic in ["user", "user:notification"] do
224 Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
225 Enum.each(list, fn {pid, _auth} ->
226 send(pid, {:render_with_user, StreamerView, "notification.json", item})
231 defp do_stream(topic, {user, %MessageReference{} = cm_ref})
232 when topic in ["user", "user:pleroma_chat"] do
233 topic = "#{topic}:#{user.id}"
235 text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
237 Registry.dispatch(@registry, topic, fn list ->
238 Enum.each(list, fn {pid, _auth} ->
239 send(pid, {:text, text})
244 defp do_stream("user", item) do
245 Logger.debug("Trying to push to users")
248 User.get_recipients_from_activity(item)
249 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
251 Enum.each(recipient_topics, fn topic ->
252 push_to_socket(topic, item)
256 defp do_stream(topic, item) do
257 Logger.debug("Trying to push to #{topic}")
258 Logger.debug("Pushing item to #{topic}")
259 push_to_socket(topic, item)
262 defp push_to_socket(topic, %Participation{} = participation) do
263 rendered = StreamerView.render("conversation.json", participation)
265 Registry.dispatch(@registry, topic, fn list ->
266 Enum.each(list, fn {pid, _} ->
267 send(pid, {:text, rendered})
272 defp push_to_socket(topic, %Activity{
273 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
275 rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
277 Registry.dispatch(@registry, topic, fn list ->
278 Enum.each(list, fn {pid, _} ->
279 send(pid, {:text, rendered})
284 defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
286 defp push_to_socket(topic, item) do
287 anon_render = StreamerView.render("update.json", item)
289 Registry.dispatch(@registry, topic, fn list ->
290 Enum.each(list, fn {pid, auth?} ->
292 send(pid, {:render_with_user, StreamerView, "update.json", item})
294 send(pid, {:text, anon_render})
300 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
302 defp thread_containment(activity, user) do
303 if Config.get([:instance, :skip_thread_containment]) do
306 ActivityPub.contain_activity(activity, user)
310 # In test environement, only return true if the registry is started.
311 # In benchmark environment, returns false.
312 # In any other environment, always returns true.
315 def should_env_send? do
316 case Process.whereis(@registry) do
325 @mix_env == :benchmark ->
326 def should_env_send?, do: false
329 def should_env_send?, do: true