d1d2c9b9c5f85bfa53bd48168a5796a8523c80b1
[akkoma] / lib / pleroma / web / streamer / streamer.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer do
6 require Logger
7
8 alias Pleroma.Activity
9 alias Pleroma.Chat.MessageReference
10 alias Pleroma.Config
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Notification
13 alias Pleroma.Object
14 alias Pleroma.User
15 alias Pleroma.Web.ActivityPub.ActivityPub
16 alias Pleroma.Web.ActivityPub.Visibility
17 alias Pleroma.Web.CommonAPI
18 alias Pleroma.Web.StreamerView
19
20 @mix_env Mix.env()
21 @registry Pleroma.Web.StreamerRegistry
22
23 def registry, do: @registry
24
25 @public_streams ["public", "public:local", "public:media", "public:local:media"]
26 @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
27
28 @doc "Expands and authorizes a stream, and registers the process for streaming."
29 @spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) ::
30 {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
31 def get_topic_and_add_socket(stream, user, params \\ %{}) do
32 case get_topic(stream, user, params) do
33 {:ok, topic} -> add_socket(topic, user)
34 error -> error
35 end
36 end
37
38 @doc "Expand and authorizes a stream"
39 @spec get_topic(stream :: String.t(), User.t() | nil, Map.t()) ::
40 {:ok, topic :: String.t()} | {:error, :bad_topic}
41 def get_topic(stream, user, params \\ %{})
42
43 # Allow all public steams.
44 def get_topic(stream, _, _) when stream in @public_streams do
45 {:ok, stream}
46 end
47
48 # Allow all hashtags streams.
49 def get_topic("hashtag", _, %{"tag" => tag}) do
50 {:ok, "hashtag:" <> tag}
51 end
52
53 # Expand user streams.
54 def get_topic(stream, %User{} = user, _) when stream in @user_streams do
55 {:ok, stream <> ":" <> to_string(user.id)}
56 end
57
58 def get_topic(stream, _, _) when stream in @user_streams do
59 {:error, :unauthorized}
60 end
61
62 # List streams.
63 def get_topic("list", %User{} = user, %{"list" => id}) do
64 if Pleroma.List.get(id, user) do
65 {:ok, "list:" <> to_string(id)}
66 else
67 {:error, :bad_topic}
68 end
69 end
70
71 def get_topic("list", _, _) do
72 {:error, :unauthorized}
73 end
74
75 def get_topic(_, _, _) do
76 {:error, :bad_topic}
77 end
78
79 @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
80 def add_socket(topic, user) do
81 if should_env_send?() do
82 auth? = if user, do: true
83 Registry.register(@registry, topic, auth?)
84 end
85
86 {:ok, topic}
87 end
88
89 def remove_socket(topic) do
90 if should_env_send?(), do: Registry.unregister(@registry, topic)
91 end
92
93 def stream(topics, items) do
94 if should_env_send?() do
95 List.wrap(topics)
96 |> Enum.each(fn topic ->
97 List.wrap(items)
98 |> Enum.each(fn item ->
99 spawn(fn -> do_stream(topic, item) end)
100 end)
101 end)
102 end
103
104 :ok
105 end
106
107 def filtered_by_user?(%User{} = user, %Activity{} = item) do
108 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
109 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
110
111 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
112 recipients = MapSet.new(item.recipients)
113 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
114
115 with parent <- Object.normalize(item) || item,
116 true <-
117 Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
118 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
119 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
120 true <- MapSet.disjoint?(recipients, recipient_blocks),
121 %{host: item_host} <- URI.parse(item.actor),
122 %{host: parent_host} <- URI.parse(parent.data["actor"]),
123 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
124 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
125 true <- thread_containment(item, user),
126 false <- CommonAPI.thread_muted?(user, parent) do
127 false
128 else
129 _ -> true
130 end
131 end
132
133 def filtered_by_user?(%User{} = user, %Notification{activity: activity}) do
134 filtered_by_user?(user, activity)
135 end
136
137 defp do_stream("direct", item) do
138 recipient_topics =
139 User.get_recipients_from_activity(item)
140 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
141
142 Enum.each(recipient_topics, fn user_topic ->
143 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
144 push_to_socket(user_topic, item)
145 end)
146 end
147
148 defp do_stream("participation", participation) do
149 user_topic = "direct:#{participation.user_id}"
150 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
151
152 push_to_socket(user_topic, participation)
153 end
154
155 defp do_stream("list", item) do
156 # filter the recipient list if the activity is not public, see #270.
157 recipient_lists =
158 case Visibility.is_public?(item) do
159 true ->
160 Pleroma.List.get_lists_from_activity(item)
161
162 _ ->
163 Pleroma.List.get_lists_from_activity(item)
164 |> Enum.filter(fn list ->
165 owner = User.get_cached_by_id(list.user_id)
166
167 Visibility.visible_for_user?(item, owner)
168 end)
169 end
170
171 recipient_topics =
172 recipient_lists
173 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
174
175 Enum.each(recipient_topics, fn list_topic ->
176 Logger.debug("Trying to push message to #{list_topic}\n\n")
177 push_to_socket(list_topic, item)
178 end)
179 end
180
181 defp do_stream(topic, %Notification{} = item)
182 when topic in ["user", "user:notification"] do
183 Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
184 Enum.each(list, fn {pid, _auth} ->
185 send(pid, {:render_with_user, StreamerView, "notification.json", item})
186 end)
187 end)
188 end
189
190 defp do_stream(topic, {user, %MessageReference{} = cm_ref})
191 when topic in ["user", "user:pleroma_chat"] do
192 topic = "#{topic}:#{user.id}"
193
194 text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
195
196 Registry.dispatch(@registry, topic, fn list ->
197 Enum.each(list, fn {pid, _auth} ->
198 send(pid, {:text, text})
199 end)
200 end)
201 end
202
203 defp do_stream("user", item) do
204 Logger.debug("Trying to push to users")
205
206 recipient_topics =
207 User.get_recipients_from_activity(item)
208 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
209
210 Enum.each(recipient_topics, fn topic ->
211 push_to_socket(topic, item)
212 end)
213 end
214
215 defp do_stream(topic, item) do
216 Logger.debug("Trying to push to #{topic}")
217 Logger.debug("Pushing item to #{topic}")
218 push_to_socket(topic, item)
219 end
220
221 defp push_to_socket(topic, %Participation{} = participation) do
222 rendered = StreamerView.render("conversation.json", participation)
223
224 Registry.dispatch(@registry, topic, fn list ->
225 Enum.each(list, fn {pid, _} ->
226 send(pid, {:text, rendered})
227 end)
228 end)
229 end
230
231 defp push_to_socket(topic, %Activity{
232 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
233 }) do
234 rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
235
236 Registry.dispatch(@registry, topic, fn list ->
237 Enum.each(list, fn {pid, _} ->
238 send(pid, {:text, rendered})
239 end)
240 end)
241 end
242
243 defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
244
245 defp push_to_socket(topic, item) do
246 anon_render = StreamerView.render("update.json", item)
247
248 Registry.dispatch(@registry, topic, fn list ->
249 Enum.each(list, fn {pid, auth?} ->
250 if auth? do
251 send(pid, {:render_with_user, StreamerView, "update.json", item})
252 else
253 send(pid, {:text, anon_render})
254 end
255 end)
256 end)
257 end
258
259 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
260
261 defp thread_containment(activity, user) do
262 if Config.get([:instance, :skip_thread_containment]) do
263 true
264 else
265 ActivityPub.contain_activity(activity, user)
266 end
267 end
268
269 # In test environement, only return true if the registry is started.
270 # In benchmark environment, returns false.
271 # In any other environment, always returns true.
272 cond do
273 @mix_env == :test ->
274 def should_env_send? do
275 case Process.whereis(@registry) do
276 nil ->
277 false
278
279 pid ->
280 Process.alive?(pid)
281 end
282 end
283
284 @mix_env == :benchmark ->
285 def should_env_send?, do: false
286
287 true ->
288 def should_env_send?, do: true
289 end
290 end