Streamer: Add a chat message stream.
[akkoma] / lib / pleroma / web / streamer / streamer.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer do
6 require Logger
7
8 alias Pleroma.Activity
9 alias Pleroma.Config
10 alias Pleroma.Conversation.Participation
11 alias Pleroma.Notification
12 alias Pleroma.Object
13 alias Pleroma.Repo
14 alias Pleroma.User
15 alias Pleroma.Web.ActivityPub.ActivityPub
16 alias Pleroma.Web.ActivityPub.Visibility
17 alias Pleroma.Web.CommonAPI
18 alias Pleroma.Web.StreamerView
19
20 @mix_env Mix.env()
21 @registry Pleroma.Web.StreamerRegistry
22
23 def registry, do: @registry
24
25 @public_streams ["public", "public:local", "public:media", "public:local:media"]
26 @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
27
28 @doc "Expands and authorizes a stream, and registers the process for streaming."
29 @spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) ::
30 {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
31 def get_topic_and_add_socket(stream, user, params \\ %{}) do
32 case get_topic(stream, user, params) do
33 {:ok, topic} -> add_socket(topic, user)
34 error -> error
35 end
36 end
37
38 @doc "Expand and authorizes a stream"
39 @spec get_topic(stream :: String.t(), User.t() | nil, Map.t()) ::
40 {:ok, topic :: String.t()} | {:error, :bad_topic}
41 def get_topic(stream, user, params \\ %{})
42
43 # Allow all public steams.
44 def get_topic(stream, _, _) when stream in @public_streams do
45 {:ok, stream}
46 end
47
48 # Allow all hashtags streams.
49 def get_topic("hashtag", _, %{"tag" => tag}) do
50 {:ok, "hashtag:" <> tag}
51 end
52
53 # Expand user streams.
54 def get_topic(stream, %User{} = user, _) when stream in @user_streams do
55 {:ok, stream <> ":" <> to_string(user.id)}
56 end
57
58 def get_topic(stream, _, _) when stream in @user_streams do
59 {:error, :unauthorized}
60 end
61
62 # List streams.
63 def get_topic("list", %User{} = user, %{"list" => id}) do
64 if Pleroma.List.get(id, user) do
65 {:ok, "list:" <> to_string(id)}
66 else
67 {:error, :bad_topic}
68 end
69 end
70
71 def get_topic("list", _, _) do
72 {:error, :unauthorized}
73 end
74
75 def get_topic(_, _, _) do
76 {:error, :bad_topic}
77 end
78
79 @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
80 def add_socket(topic, user) do
81 if should_env_send?() do
82 auth? = if user, do: true
83 Registry.register(@registry, topic, auth?)
84 end
85
86 {:ok, topic}
87 end
88
89 def remove_socket(topic) do
90 if should_env_send?(), do: Registry.unregister(@registry, topic)
91 end
92
93 def stream(topics, item) when is_list(topics) do
94 if should_env_send?() do
95 Enum.each(topics, fn t ->
96 spawn(fn -> do_stream(t, item) end)
97 end)
98 end
99
100 :ok
101 end
102
103 def stream(topic, items) when is_list(items) do
104 if should_env_send?() do
105 Enum.each(items, fn i ->
106 spawn(fn -> do_stream(topic, i) end)
107 end)
108
109 :ok
110 end
111 end
112
113 def stream(topic, item) do
114 if should_env_send?() do
115 spawn(fn -> do_stream(topic, item) end)
116 end
117
118 :ok
119 end
120
121 def filtered_by_user?(%User{} = user, %Activity{} = item) do
122 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
123 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
124
125 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
126 recipients = MapSet.new(item.recipients)
127 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
128
129 with parent <- Object.normalize(item) || item,
130 true <-
131 Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
132 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
133 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
134 true <- MapSet.disjoint?(recipients, recipient_blocks),
135 %{host: item_host} <- URI.parse(item.actor),
136 %{host: parent_host} <- URI.parse(parent.data["actor"]),
137 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
138 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
139 true <- thread_containment(item, user),
140 false <- CommonAPI.thread_muted?(user, item) do
141 false
142 else
143 _ -> true
144 end
145 end
146
147 def filtered_by_user?(%User{} = user, %Notification{activity: activity}) do
148 filtered_by_user?(user, activity)
149 end
150
151 defp do_stream("direct", item) do
152 recipient_topics =
153 User.get_recipients_from_activity(item)
154 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
155
156 Enum.each(recipient_topics, fn user_topic ->
157 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
158 push_to_socket(user_topic, item)
159 end)
160 end
161
162 defp do_stream("participation", participation) do
163 user_topic = "direct:#{participation.user_id}"
164 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
165
166 push_to_socket(user_topic, participation)
167 end
168
169 defp do_stream("list", item) do
170 # filter the recipient list if the activity is not public, see #270.
171 recipient_lists =
172 case Visibility.is_public?(item) do
173 true ->
174 Pleroma.List.get_lists_from_activity(item)
175
176 _ ->
177 Pleroma.List.get_lists_from_activity(item)
178 |> Enum.filter(fn list ->
179 owner = User.get_cached_by_id(list.user_id)
180
181 Visibility.visible_for_user?(item, owner)
182 end)
183 end
184
185 recipient_topics =
186 recipient_lists
187 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
188
189 Enum.each(recipient_topics, fn list_topic ->
190 Logger.debug("Trying to push message to #{list_topic}\n\n")
191 push_to_socket(list_topic, item)
192 end)
193 end
194
195 defp do_stream(topic, %Notification{} = item)
196 when topic in ["user", "user:notification"] do
197 Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
198 Enum.each(list, fn {pid, _auth} ->
199 send(pid, {:render_with_user, StreamerView, "notification.json", item})
200 end)
201 end)
202 end
203
204 defp do_stream(topic, %{data: %{"type" => "ChatMessage"}} = object)
205 when topic in ["user", "user:pleroma_chat"] do
206 recipients = [object.data["actor"] | object.data["to"]]
207
208 topics =
209 %{ap_id: recipients, local: true}
210 |> Pleroma.User.Query.build()
211 |> Repo.all()
212 |> Enum.map(fn %{id: id} = user -> {user, "#{topic}:#{id}"} end)
213
214 Enum.each(topics, fn {user, topic} ->
215 Registry.dispatch(@registry, topic, fn list ->
216 Enum.each(list, fn {pid, _auth} ->
217 text = StreamerView.render("chat_update.json", object, user, recipients)
218 send(pid, {:text, text})
219 end)
220 end)
221 end)
222 end
223
224 defp do_stream("user", item) do
225 Logger.debug("Trying to push to users")
226
227 recipient_topics =
228 User.get_recipients_from_activity(item)
229 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
230
231 Enum.each(recipient_topics, fn topic ->
232 push_to_socket(topic, item)
233 end)
234 end
235
236 defp do_stream(topic, item) do
237 Logger.debug("Trying to push to #{topic}")
238 Logger.debug("Pushing item to #{topic}")
239 push_to_socket(topic, item)
240 end
241
242 defp push_to_socket(topic, %Participation{} = participation) do
243 rendered = StreamerView.render("conversation.json", participation)
244
245 Registry.dispatch(@registry, topic, fn list ->
246 Enum.each(list, fn {pid, _} ->
247 send(pid, {:text, rendered})
248 end)
249 end)
250 end
251
252 defp push_to_socket(topic, %Activity{
253 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
254 }) do
255 rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
256
257 Registry.dispatch(@registry, topic, fn list ->
258 Enum.each(list, fn {pid, _} ->
259 send(pid, {:text, rendered})
260 end)
261 end)
262 end
263
264 defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
265
266 defp push_to_socket(topic, item) do
267 anon_render = StreamerView.render("update.json", item)
268
269 Registry.dispatch(@registry, topic, fn list ->
270 Enum.each(list, fn {pid, auth?} ->
271 if auth? do
272 send(pid, {:render_with_user, StreamerView, "update.json", item})
273 else
274 send(pid, {:text, anon_render})
275 end
276 end)
277 end)
278 end
279
280 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
281
282 defp thread_containment(activity, user) do
283 if Config.get([:instance, :skip_thread_containment]) do
284 true
285 else
286 ActivityPub.contain_activity(activity, user)
287 end
288 end
289
290 # In test environement, only return true if the registry is started.
291 # In benchmark environment, returns false.
292 # In any other environment, always returns true.
293 cond do
294 @mix_env == :test ->
295 def should_env_send? do
296 case Process.whereis(@registry) do
297 nil ->
298 false
299
300 pid ->
301 Process.alive?(pid)
302 end
303 end
304
305 @mix_env == :benchmark ->
306 def should_env_send?, do: false
307
308 true ->
309 def should_env_send?, do: true
310 end
311 end