Merge remote-tracking branch 'remotes/origin/develop' into 2168-media-preview-proxy
[akkoma] / lib / pleroma / web / streamer / streamer.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer do
6 require Logger
7
8 alias Pleroma.Activity
9 alias Pleroma.Chat.MessageReference
10 alias Pleroma.Config
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Notification
13 alias Pleroma.Object
14 alias Pleroma.User
15 alias Pleroma.Web.ActivityPub.ActivityPub
16 alias Pleroma.Web.ActivityPub.Visibility
17 alias Pleroma.Web.CommonAPI
18 alias Pleroma.Web.StreamerView
19
20 @mix_env Mix.env()
21 @registry Pleroma.Web.StreamerRegistry
22
23 def registry, do: @registry
24
25 @public_streams ["public", "public:local", "public:media", "public:local:media"]
26 @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
27
28 @doc "Expands and authorizes a stream, and registers the process for streaming."
29 @spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) ::
30 {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
31 def get_topic_and_add_socket(stream, user, params \\ %{}) do
32 case get_topic(stream, user, params) do
33 {:ok, topic} -> add_socket(topic, user)
34 error -> error
35 end
36 end
37
38 @doc "Expand and authorizes a stream"
39 @spec get_topic(stream :: String.t(), User.t() | nil, Map.t()) ::
40 {:ok, topic :: String.t()} | {:error, :bad_topic}
41 def get_topic(stream, user, params \\ %{})
42
43 # Allow all public steams.
44 def get_topic(stream, _, _) when stream in @public_streams do
45 {:ok, stream}
46 end
47
48 # Allow all hashtags streams.
49 def get_topic("hashtag", _, %{"tag" => tag}) do
50 {:ok, "hashtag:" <> tag}
51 end
52
53 # Expand user streams.
54 def get_topic(stream, %User{} = user, _) when stream in @user_streams do
55 {:ok, stream <> ":" <> to_string(user.id)}
56 end
57
58 def get_topic(stream, _, _) when stream in @user_streams do
59 {:error, :unauthorized}
60 end
61
62 # List streams.
63 def get_topic("list", %User{} = user, %{"list" => id}) do
64 if Pleroma.List.get(id, user) do
65 {:ok, "list:" <> to_string(id)}
66 else
67 {:error, :bad_topic}
68 end
69 end
70
71 def get_topic("list", _, _) do
72 {:error, :unauthorized}
73 end
74
75 def get_topic(_, _, _) do
76 {:error, :bad_topic}
77 end
78
79 @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
80 def add_socket(topic, user) do
81 if should_env_send?() do
82 auth? = if user, do: true
83 Registry.register(@registry, topic, auth?)
84 end
85
86 {:ok, topic}
87 end
88
89 def remove_socket(topic) do
90 if should_env_send?(), do: Registry.unregister(@registry, topic)
91 end
92
93 def stream(topics, items) do
94 if should_env_send?() do
95 List.wrap(topics)
96 |> Enum.each(fn topic ->
97 List.wrap(items)
98 |> Enum.each(fn item ->
99 spawn(fn -> do_stream(topic, item) end)
100 end)
101 end)
102 end
103
104 :ok
105 end
106
107 def filtered_by_user?(user, item, streamed_type \\ :activity)
108
109 def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
110 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
111 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
112
113 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
114 recipients = MapSet.new(item.recipients)
115 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
116
117 with parent <- Object.normalize(item) || item,
118 true <-
119 Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
120 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
121 true <-
122 !(streamed_type == :activity && item.data["type"] == "Announce" &&
123 parent.data["actor"] == user.ap_id),
124 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
125 true <- MapSet.disjoint?(recipients, recipient_blocks),
126 %{host: item_host} <- URI.parse(item.actor),
127 %{host: parent_host} <- URI.parse(parent.data["actor"]),
128 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
129 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
130 true <- thread_containment(item, user),
131 false <- CommonAPI.thread_muted?(user, parent) do
132 false
133 else
134 _ -> true
135 end
136 end
137
138 def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
139 filtered_by_user?(user, activity, :notification)
140 end
141
142 defp do_stream("direct", item) do
143 recipient_topics =
144 User.get_recipients_from_activity(item)
145 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
146
147 Enum.each(recipient_topics, fn user_topic ->
148 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
149 push_to_socket(user_topic, item)
150 end)
151 end
152
153 defp do_stream("participation", participation) do
154 user_topic = "direct:#{participation.user_id}"
155 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
156
157 push_to_socket(user_topic, participation)
158 end
159
160 defp do_stream("list", item) do
161 # filter the recipient list if the activity is not public, see #270.
162 recipient_lists =
163 case Visibility.is_public?(item) do
164 true ->
165 Pleroma.List.get_lists_from_activity(item)
166
167 _ ->
168 Pleroma.List.get_lists_from_activity(item)
169 |> Enum.filter(fn list ->
170 owner = User.get_cached_by_id(list.user_id)
171
172 Visibility.visible_for_user?(item, owner)
173 end)
174 end
175
176 recipient_topics =
177 recipient_lists
178 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
179
180 Enum.each(recipient_topics, fn list_topic ->
181 Logger.debug("Trying to push message to #{list_topic}\n\n")
182 push_to_socket(list_topic, item)
183 end)
184 end
185
186 defp do_stream(topic, %Notification{} = item)
187 when topic in ["user", "user:notification"] do
188 Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
189 Enum.each(list, fn {pid, _auth} ->
190 send(pid, {:render_with_user, StreamerView, "notification.json", item})
191 end)
192 end)
193 end
194
195 defp do_stream(topic, {user, %MessageReference{} = cm_ref})
196 when topic in ["user", "user:pleroma_chat"] do
197 topic = "#{topic}:#{user.id}"
198
199 text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
200
201 Registry.dispatch(@registry, topic, fn list ->
202 Enum.each(list, fn {pid, _auth} ->
203 send(pid, {:text, text})
204 end)
205 end)
206 end
207
208 defp do_stream("user", item) do
209 Logger.debug("Trying to push to users")
210
211 recipient_topics =
212 User.get_recipients_from_activity(item)
213 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
214
215 Enum.each(recipient_topics, fn topic ->
216 push_to_socket(topic, item)
217 end)
218 end
219
220 defp do_stream(topic, item) do
221 Logger.debug("Trying to push to #{topic}")
222 Logger.debug("Pushing item to #{topic}")
223 push_to_socket(topic, item)
224 end
225
226 defp push_to_socket(topic, %Participation{} = participation) do
227 rendered = StreamerView.render("conversation.json", participation)
228
229 Registry.dispatch(@registry, topic, fn list ->
230 Enum.each(list, fn {pid, _} ->
231 send(pid, {:text, rendered})
232 end)
233 end)
234 end
235
236 defp push_to_socket(topic, %Activity{
237 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
238 }) do
239 rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
240
241 Registry.dispatch(@registry, topic, fn list ->
242 Enum.each(list, fn {pid, _} ->
243 send(pid, {:text, rendered})
244 end)
245 end)
246 end
247
248 defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
249
250 defp push_to_socket(topic, item) do
251 anon_render = StreamerView.render("update.json", item)
252
253 Registry.dispatch(@registry, topic, fn list ->
254 Enum.each(list, fn {pid, auth?} ->
255 if auth? do
256 send(pid, {:render_with_user, StreamerView, "update.json", item})
257 else
258 send(pid, {:text, anon_render})
259 end
260 end)
261 end)
262 end
263
264 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
265
266 defp thread_containment(activity, user) do
267 if Config.get([:instance, :skip_thread_containment]) do
268 true
269 else
270 ActivityPub.contain_activity(activity, user)
271 end
272 end
273
274 # In test environement, only return true if the registry is started.
275 # In benchmark environment, returns false.
276 # In any other environment, always returns true.
277 cond do
278 @mix_env == :test ->
279 def should_env_send? do
280 case Process.whereis(@registry) do
281 nil ->
282 false
283
284 pid ->
285 Process.alive?(pid)
286 end
287 end
288
289 @mix_env == :benchmark ->
290 def should_env_send?, do: false
291
292 true ->
293 def should_env_send?, do: true
294 end
295 end