Merge remote-tracking branch 'remotes/origin/develop' into 2168-media-preview-proxy
[akkoma] / lib / pleroma / web / streamer / streamer.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer do
6 require Logger
7
8 alias Pleroma.Activity
9 alias Pleroma.Chat.MessageReference
10 alias Pleroma.Config
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Notification
13 alias Pleroma.Object
14 alias Pleroma.User
15 alias Pleroma.Web.ActivityPub.ActivityPub
16 alias Pleroma.Web.ActivityPub.Visibility
17 alias Pleroma.Web.CommonAPI
18 alias Pleroma.Web.StreamerView
19
20 @mix_env Mix.env()
21 @registry Pleroma.Web.StreamerRegistry
22
23 def registry, do: @registry
24
25 @public_streams ["public", "public:local", "public:media", "public:local:media"]
26 @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
27
28 @doc "Expands and authorizes a stream, and registers the process for streaming."
29 @spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) ::
30 {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
31 def get_topic_and_add_socket(stream, user, params \\ %{}) do
32 case get_topic(stream, user, params) do
33 {:ok, topic} -> add_socket(topic, user)
34 error -> error
35 end
36 end
37
38 @doc "Expand and authorizes a stream"
39 @spec get_topic(stream :: String.t(), User.t() | nil, Map.t()) ::
40 {:ok, topic :: String.t()} | {:error, :bad_topic}
41 def get_topic(stream, user, params \\ %{})
42
43 # Allow all public steams.
44 def get_topic(stream, _, _) when stream in @public_streams do
45 {:ok, stream}
46 end
47
48 # Allow all hashtags streams.
49 def get_topic("hashtag", _, %{"tag" => tag}) do
50 {:ok, "hashtag:" <> tag}
51 end
52
53 # Expand user streams.
54 def get_topic(stream, %User{} = user, _) when stream in @user_streams do
55 {:ok, stream <> ":" <> to_string(user.id)}
56 end
57
58 def get_topic(stream, _, _) when stream in @user_streams do
59 {:error, :unauthorized}
60 end
61
62 # List streams.
63 def get_topic("list", %User{} = user, %{"list" => id}) do
64 if Pleroma.List.get(id, user) do
65 {:ok, "list:" <> to_string(id)}
66 else
67 {:error, :bad_topic}
68 end
69 end
70
71 def get_topic("list", _, _) do
72 {:error, :unauthorized}
73 end
74
75 def get_topic(_, _, _) do
76 {:error, :bad_topic}
77 end
78
79 @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
80 def add_socket(topic, user) do
81 if should_env_send?() do
82 auth? = if user, do: true
83 Registry.register(@registry, topic, auth?)
84 end
85
86 {:ok, topic}
87 end
88
89 def remove_socket(topic) do
90 if should_env_send?(), do: Registry.unregister(@registry, topic)
91 end
92
93 def stream(topics, items) do
94 if should_env_send?() do
95 List.wrap(topics)
96 |> Enum.each(fn topic ->
97 List.wrap(items)
98 |> Enum.each(fn item ->
99 spawn(fn -> do_stream(topic, item) end)
100 end)
101 end)
102 end
103
104 :ok
105 end
106
107 def filtered_by_user?(%User{} = user, %Activity{} = item) do
108 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
109 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
110
111 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
112 recipients = MapSet.new(item.recipients)
113 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
114
115 with parent <- Object.normalize(item) || item,
116 true <-
117 Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
118 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
119 true <- !(item.data["type"] == "Announce" && parent.data["actor"] == user.ap_id),
120 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
121 true <- MapSet.disjoint?(recipients, recipient_blocks),
122 %{host: item_host} <- URI.parse(item.actor),
123 %{host: parent_host} <- URI.parse(parent.data["actor"]),
124 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
125 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
126 true <- thread_containment(item, user),
127 false <- CommonAPI.thread_muted?(user, parent) do
128 false
129 else
130 _ -> true
131 end
132 end
133
134 def filtered_by_user?(%User{} = user, %Notification{activity: activity}) do
135 filtered_by_user?(user, activity)
136 end
137
138 defp do_stream("direct", item) do
139 recipient_topics =
140 User.get_recipients_from_activity(item)
141 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
142
143 Enum.each(recipient_topics, fn user_topic ->
144 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
145 push_to_socket(user_topic, item)
146 end)
147 end
148
149 defp do_stream("participation", participation) do
150 user_topic = "direct:#{participation.user_id}"
151 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
152
153 push_to_socket(user_topic, participation)
154 end
155
156 defp do_stream("list", item) do
157 # filter the recipient list if the activity is not public, see #270.
158 recipient_lists =
159 case Visibility.is_public?(item) do
160 true ->
161 Pleroma.List.get_lists_from_activity(item)
162
163 _ ->
164 Pleroma.List.get_lists_from_activity(item)
165 |> Enum.filter(fn list ->
166 owner = User.get_cached_by_id(list.user_id)
167
168 Visibility.visible_for_user?(item, owner)
169 end)
170 end
171
172 recipient_topics =
173 recipient_lists
174 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
175
176 Enum.each(recipient_topics, fn list_topic ->
177 Logger.debug("Trying to push message to #{list_topic}\n\n")
178 push_to_socket(list_topic, item)
179 end)
180 end
181
182 defp do_stream(topic, %Notification{} = item)
183 when topic in ["user", "user:notification"] do
184 Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
185 Enum.each(list, fn {pid, _auth} ->
186 send(pid, {:render_with_user, StreamerView, "notification.json", item})
187 end)
188 end)
189 end
190
191 defp do_stream(topic, {user, %MessageReference{} = cm_ref})
192 when topic in ["user", "user:pleroma_chat"] do
193 topic = "#{topic}:#{user.id}"
194
195 text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
196
197 Registry.dispatch(@registry, topic, fn list ->
198 Enum.each(list, fn {pid, _auth} ->
199 send(pid, {:text, text})
200 end)
201 end)
202 end
203
204 defp do_stream("user", item) do
205 Logger.debug("Trying to push to users")
206
207 recipient_topics =
208 User.get_recipients_from_activity(item)
209 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
210
211 Enum.each(recipient_topics, fn topic ->
212 push_to_socket(topic, item)
213 end)
214 end
215
216 defp do_stream(topic, item) do
217 Logger.debug("Trying to push to #{topic}")
218 Logger.debug("Pushing item to #{topic}")
219 push_to_socket(topic, item)
220 end
221
222 defp push_to_socket(topic, %Participation{} = participation) do
223 rendered = StreamerView.render("conversation.json", participation)
224
225 Registry.dispatch(@registry, topic, fn list ->
226 Enum.each(list, fn {pid, _} ->
227 send(pid, {:text, rendered})
228 end)
229 end)
230 end
231
232 defp push_to_socket(topic, %Activity{
233 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
234 }) do
235 rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
236
237 Registry.dispatch(@registry, topic, fn list ->
238 Enum.each(list, fn {pid, _} ->
239 send(pid, {:text, rendered})
240 end)
241 end)
242 end
243
244 defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
245
246 defp push_to_socket(topic, item) do
247 anon_render = StreamerView.render("update.json", item)
248
249 Registry.dispatch(@registry, topic, fn list ->
250 Enum.each(list, fn {pid, auth?} ->
251 if auth? do
252 send(pid, {:render_with_user, StreamerView, "update.json", item})
253 else
254 send(pid, {:text, anon_render})
255 end
256 end)
257 end)
258 end
259
260 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
261
262 defp thread_containment(activity, user) do
263 if Config.get([:instance, :skip_thread_containment]) do
264 true
265 else
266 ActivityPub.contain_activity(activity, user)
267 end
268 end
269
270 # In test environement, only return true if the registry is started.
271 # In benchmark environment, returns false.
272 # In any other environment, always returns true.
273 cond do
274 @mix_env == :test ->
275 def should_env_send? do
276 case Process.whereis(@registry) do
277 nil ->
278 false
279
280 pid ->
281 Process.alive?(pid)
282 end
283 end
284
285 @mix_env == :benchmark ->
286 def should_env_send?, do: false
287
288 true ->
289 def should_env_send?, do: true
290 end
291 end