Remerge of hashtag following (#341)
[akkoma] / lib / pleroma / web / streamer.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer do
6 require Logger
7
8 alias Pleroma.Activity
9 alias Pleroma.Config
10 alias Pleroma.Conversation.Participation
11 alias Pleroma.Notification
12 alias Pleroma.Object
13 alias Pleroma.User
14 alias Pleroma.Web.ActivityPub.ActivityPub
15 alias Pleroma.Web.ActivityPub.Visibility
16 alias Pleroma.Web.CommonAPI
17 alias Pleroma.Web.OAuth.Token
18 alias Pleroma.Web.Plugs.OAuthScopesPlug
19 alias Pleroma.Web.StreamerView
20 require Pleroma.Constants
21
22 @mix_env Mix.env()
23 @registry Pleroma.Web.StreamerRegistry
24
25 def registry, do: @registry
26
27 @public_streams ["public", "public:local", "public:media", "public:local:media"]
28 @user_streams ["user", "user:notification", "direct"]
29
30 @doc "Expands and authorizes a stream, and registers the process for streaming."
31 @spec get_topic_and_add_socket(
32 stream :: String.t(),
33 User.t() | nil,
34 Token.t() | nil,
35 Map.t() | nil
36 ) ::
37 {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
38 def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
39 with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
40 add_socket(topic, oauth_token)
41 end
42 end
43
44 @doc "Expand and authorizes a stream"
45 @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
46 {:ok, topic :: String.t()} | {:error, :bad_topic}
47 def get_topic(stream, user, oauth_token, params \\ %{})
48
49 # Allow all public steams.
50 def get_topic(stream, _user, _oauth_token, _params) when stream in @public_streams do
51 {:ok, stream}
52 end
53
54 # Allow all hashtags streams.
55 def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
56 {:ok, "hashtag:" <> tag}
57 end
58
59 # Allow remote instance streams.
60 def get_topic("public:remote", _user, _oauth_token, %{"instance" => instance} = _params) do
61 {:ok, "public:remote:" <> instance}
62 end
63
64 def get_topic("public:remote:media", _user, _oauth_token, %{"instance" => instance} = _params) do
65 {:ok, "public:remote:media:" <> instance}
66 end
67
68 # Expand user streams.
69 def get_topic(
70 stream,
71 %User{id: user_id} = user,
72 %Token{user_id: user_id} = oauth_token,
73 _params
74 )
75 when stream in @user_streams do
76 # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
77 required_scopes =
78 if stream == "user:notification" do
79 ["read:notifications"]
80 else
81 ["read:statuses"]
82 end
83
84 if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
85 {:error, :unauthorized}
86 else
87 {:ok, stream <> ":" <> to_string(user.id)}
88 end
89 end
90
91 def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
92 {:error, :unauthorized}
93 end
94
95 # List streams.
96 def get_topic(
97 "list",
98 %User{id: user_id} = user,
99 %Token{user_id: user_id} = oauth_token,
100 %{"list" => id}
101 ) do
102 cond do
103 OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
104 {:error, :unauthorized}
105
106 Pleroma.List.get(id, user) ->
107 {:ok, "list:" <> to_string(id)}
108
109 true ->
110 {:error, :bad_topic}
111 end
112 end
113
114 def get_topic("list", _user, _oauth_token, _params) do
115 {:error, :unauthorized}
116 end
117
118 # mastodon multi-topic WS
119 def get_topic(nil, _user, _oauth_token, _params) do
120 {:ok, :multi}
121 end
122
123 def get_topic(_stream, _user, _oauth_token, _params) do
124 {:error, :bad_topic}
125 end
126
127 @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
128 def add_socket(topic, oauth_token) do
129 if should_env_send?() do
130 oauth_token_id = if oauth_token, do: oauth_token.id, else: false
131 Registry.register(@registry, topic, oauth_token_id)
132 end
133
134 {:ok, topic}
135 end
136
137 def remove_socket(topic) do
138 if should_env_send?(), do: Registry.unregister(@registry, topic)
139 end
140
141 def stream(topics, items) do
142 if should_env_send?() do
143 for topic <- List.wrap(topics), item <- List.wrap(items) do
144 spawn(fn -> do_stream(topic, item) end)
145 end
146 end
147 end
148
149 def filtered_by_user?(user, item, streamed_type \\ :activity)
150
151 def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
152 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
153 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
154
155 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
156 recipients = MapSet.new(item.recipients)
157 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
158
159 with parent <- Object.normalize(item, fetch: false) || item,
160 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
161 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
162 true <-
163 !(streamed_type == :activity && item.data["type"] == "Announce" &&
164 parent.data["actor"] == user.ap_id),
165 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
166 true <- MapSet.disjoint?(recipients, recipient_blocks),
167 %{host: item_host} <- URI.parse(item.actor),
168 %{host: parent_host} <- URI.parse(parent.data["actor"]),
169 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
170 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
171 true <- thread_containment(item, user),
172 false <- CommonAPI.thread_muted?(user, parent) do
173 false
174 else
175 _ -> true
176 end
177 end
178
179 def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
180 filtered_by_user?(user, activity, :notification)
181 end
182
183 defp do_stream("direct", item) do
184 recipient_topics =
185 User.get_recipients_from_activity(item)
186 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
187
188 Enum.each(recipient_topics, fn user_topic ->
189 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
190 push_to_socket(user_topic, item)
191 end)
192 end
193
194 defp do_stream("follow_relationship", item) do
195 user_topic = "user:#{item.follower.id}"
196 text = StreamerView.render("follow_relationships_update.json", item, user_topic)
197
198 Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
199
200 Registry.dispatch(@registry, user_topic, fn list ->
201 Enum.each(list, fn {pid, _auth} ->
202 send(pid, {:text, text})
203 end)
204 end)
205 end
206
207 defp do_stream("participation", participation) do
208 user_topic = "direct:#{participation.user_id}"
209 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
210
211 push_to_socket(user_topic, participation)
212 end
213
214 defp do_stream("list", item) do
215 # filter the recipient list if the activity is not public, see #270.
216 recipient_lists =
217 case Visibility.is_public?(item) do
218 true ->
219 Pleroma.List.get_lists_from_activity(item)
220
221 _ ->
222 Pleroma.List.get_lists_from_activity(item)
223 |> Enum.filter(fn list ->
224 owner = User.get_cached_by_id(list.user_id)
225
226 Visibility.visible_for_user?(item, owner)
227 end)
228 end
229
230 recipient_topics =
231 recipient_lists
232 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
233
234 Enum.each(recipient_topics, fn list_topic ->
235 Logger.debug("Trying to push message to #{list_topic}\n\n")
236 push_to_socket(list_topic, item)
237 end)
238 end
239
240 defp do_stream(topic, %Notification{} = item)
241 when topic in ["user", "user:notification"] do
242 Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
243 Enum.each(list, fn {pid, _auth} ->
244 send(pid, {:render_with_user, StreamerView, "notification.json", item, topic})
245 end)
246 end)
247 end
248
249 defp do_stream("user", item) do
250 Logger.debug("Trying to push to users")
251
252 recipient_topics =
253 User.get_recipients_from_activity(item)
254 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
255
256 hashtag_recipients =
257 if Pleroma.Constants.as_public() in item.recipients do
258 Pleroma.Hashtag.get_recipients_for_activity(item)
259 |> Enum.map(fn id -> "user:#{id}" end)
260 else
261 []
262 end
263
264 all_recipients = Enum.uniq(recipient_topics ++ hashtag_recipients)
265
266 Enum.each(all_recipients, fn topic ->
267 push_to_socket(topic, item)
268 end)
269 end
270
271 defp do_stream(topic, item) do
272 Logger.debug("Trying to push to #{topic}")
273 Logger.debug("Pushing item to #{topic}")
274 push_to_socket(topic, item)
275 end
276
277 defp push_to_socket(topic, %Participation{} = participation) do
278 rendered = StreamerView.render("conversation.json", participation, topic)
279
280 Registry.dispatch(@registry, topic, fn list ->
281 Enum.each(list, fn {pid, _} ->
282 send(pid, {:text, rendered})
283 end)
284 end)
285 end
286
287 defp push_to_socket(topic, %Activity{
288 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
289 }) do
290 rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
291
292 Registry.dispatch(@registry, topic, fn list ->
293 Enum.each(list, fn {pid, _} ->
294 send(pid, {:text, rendered})
295 end)
296 end)
297 end
298
299 defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
300
301 defp push_to_socket(topic, %Activity{data: %{"type" => "Update"}} = item) do
302 create_activity =
303 Pleroma.Activity.get_create_by_object_ap_id(item.object.data["id"])
304 |> Map.put(:object, item.object)
305
306 anon_render = StreamerView.render("status_update.json", create_activity, topic)
307
308 Registry.dispatch(@registry, topic, fn list ->
309 Enum.each(list, fn {pid, auth?} ->
310 if auth? do
311 send(
312 pid,
313 {:render_with_user, StreamerView, "status_update.json", create_activity, topic}
314 )
315 else
316 send(pid, {:text, anon_render})
317 end
318 end)
319 end)
320 end
321
322 defp push_to_socket(topic, item) do
323 anon_render = StreamerView.render("update.json", item, topic)
324
325 Registry.dispatch(@registry, topic, fn list ->
326 Enum.each(list, fn {pid, auth?} ->
327 if auth? do
328 send(pid, {:render_with_user, StreamerView, "update.json", item, topic})
329 else
330 send(pid, {:text, anon_render})
331 end
332 end)
333 end)
334 end
335
336 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
337
338 defp thread_containment(activity, user) do
339 if Config.get([:instance, :skip_thread_containment]) do
340 true
341 else
342 ActivityPub.contain_activity(activity, user)
343 end
344 end
345
346 def close_streams_by_oauth_token(oauth_token) do
347 if should_env_send?() do
348 Registry.select(
349 @registry,
350 [
351 {
352 {:"$1", :"$2", :"$3"},
353 [{:==, :"$3", oauth_token.id}],
354 [:"$2"]
355 }
356 ]
357 )
358 |> Enum.each(fn pid -> send(pid, :close) end)
359 end
360 end
361
362 # In test environement, only return true if the registry is started.
363 # In benchmark environment, returns false.
364 # In any other environment, always returns true.
365 cond do
366 @mix_env == :test ->
367 def should_env_send? do
368 case Process.whereis(@registry) do
369 nil ->
370 false
371
372 pid ->
373 Process.alive?(pid)
374 end
375 end
376
377 @mix_env == :benchmark ->
378 def should_env_send?, do: false
379
380 true ->
381 def should_env_send?, do: true
382 end
383 end