ChatMessageReferences: Change seen -> unread
[akkoma] / lib / pleroma / web / streamer / streamer.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer do
6 require Logger
7
8 alias Pleroma.Activity
9 alias Pleroma.ChatMessageReference
10 alias Pleroma.Config
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Notification
13 alias Pleroma.Object
14 alias Pleroma.User
15 alias Pleroma.Web.ActivityPub.ActivityPub
16 alias Pleroma.Web.ActivityPub.Visibility
17 alias Pleroma.Web.CommonAPI
18 alias Pleroma.Web.StreamerView
19
20 @mix_env Mix.env()
21 @registry Pleroma.Web.StreamerRegistry
22
23 def registry, do: @registry
24
25 @public_streams ["public", "public:local", "public:media", "public:local:media"]
26 @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
27
28 @doc "Expands and authorizes a stream, and registers the process for streaming."
29 @spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) ::
30 {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
31 def get_topic_and_add_socket(stream, user, params \\ %{}) do
32 case get_topic(stream, user, params) do
33 {:ok, topic} -> add_socket(topic, user)
34 error -> error
35 end
36 end
37
38 @doc "Expand and authorizes a stream"
39 @spec get_topic(stream :: String.t(), User.t() | nil, Map.t()) ::
40 {:ok, topic :: String.t()} | {:error, :bad_topic}
41 def get_topic(stream, user, params \\ %{})
42
43 # Allow all public steams.
44 def get_topic(stream, _, _) when stream in @public_streams do
45 {:ok, stream}
46 end
47
48 # Allow all hashtags streams.
49 def get_topic("hashtag", _, %{"tag" => tag}) do
50 {:ok, "hashtag:" <> tag}
51 end
52
53 # Expand user streams.
54 def get_topic(stream, %User{} = user, _) when stream in @user_streams do
55 {:ok, stream <> ":" <> to_string(user.id)}
56 end
57
58 def get_topic(stream, _, _) when stream in @user_streams do
59 {:error, :unauthorized}
60 end
61
62 # List streams.
63 def get_topic("list", %User{} = user, %{"list" => id}) do
64 if Pleroma.List.get(id, user) do
65 {:ok, "list:" <> to_string(id)}
66 else
67 {:error, :bad_topic}
68 end
69 end
70
71 def get_topic("list", _, _) do
72 {:error, :unauthorized}
73 end
74
75 def get_topic(_, _, _) do
76 {:error, :bad_topic}
77 end
78
79 @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
80 def add_socket(topic, user) do
81 if should_env_send?() do
82 auth? = if user, do: true
83 Registry.register(@registry, topic, auth?)
84 end
85
86 {:ok, topic}
87 end
88
89 def remove_socket(topic) do
90 if should_env_send?(), do: Registry.unregister(@registry, topic)
91 end
92
93 def stream(topics, item) when is_list(topics) do
94 if should_env_send?() do
95 Enum.each(topics, fn t ->
96 spawn(fn -> do_stream(t, item) end)
97 end)
98 end
99
100 :ok
101 end
102
103 def stream(topic, items) when is_list(items) do
104 if should_env_send?() do
105 Enum.each(items, fn i ->
106 spawn(fn -> do_stream(topic, i) end)
107 end)
108
109 :ok
110 end
111 end
112
113 def stream(topic, item) do
114 if should_env_send?() do
115 spawn(fn -> do_stream(topic, item) end)
116 end
117
118 :ok
119 end
120
121 def filtered_by_user?(%User{} = user, %Activity{} = item) do
122 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
123 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
124
125 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
126 recipients = MapSet.new(item.recipients)
127 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
128
129 with parent <- Object.normalize(item) || item,
130 true <-
131 Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
132 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
133 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
134 true <- MapSet.disjoint?(recipients, recipient_blocks),
135 %{host: item_host} <- URI.parse(item.actor),
136 %{host: parent_host} <- URI.parse(parent.data["actor"]),
137 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
138 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
139 true <- thread_containment(item, user),
140 false <- CommonAPI.thread_muted?(user, parent) do
141 false
142 else
143 _ -> true
144 end
145 end
146
147 def filtered_by_user?(%User{} = user, %Notification{activity: activity}) do
148 filtered_by_user?(user, activity)
149 end
150
151 defp do_stream("direct", item) do
152 recipient_topics =
153 User.get_recipients_from_activity(item)
154 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
155
156 Enum.each(recipient_topics, fn user_topic ->
157 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
158 push_to_socket(user_topic, item)
159 end)
160 end
161
162 defp do_stream("participation", participation) do
163 user_topic = "direct:#{participation.user_id}"
164 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
165
166 push_to_socket(user_topic, participation)
167 end
168
169 defp do_stream("list", item) do
170 # filter the recipient list if the activity is not public, see #270.
171 recipient_lists =
172 case Visibility.is_public?(item) do
173 true ->
174 Pleroma.List.get_lists_from_activity(item)
175
176 _ ->
177 Pleroma.List.get_lists_from_activity(item)
178 |> Enum.filter(fn list ->
179 owner = User.get_cached_by_id(list.user_id)
180
181 Visibility.visible_for_user?(item, owner)
182 end)
183 end
184
185 recipient_topics =
186 recipient_lists
187 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
188
189 Enum.each(recipient_topics, fn list_topic ->
190 Logger.debug("Trying to push message to #{list_topic}\n\n")
191 push_to_socket(list_topic, item)
192 end)
193 end
194
195 defp do_stream(topic, %Notification{} = item)
196 when topic in ["user", "user:notification"] do
197 Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
198 Enum.each(list, fn {pid, _auth} ->
199 send(pid, {:render_with_user, StreamerView, "notification.json", item})
200 end)
201 end)
202 end
203
204 defp do_stream(topic, {user, %ChatMessageReference{} = cm_ref})
205 when topic in ["user", "user:pleroma_chat"] do
206 topic = "#{topic}:#{user.id}"
207
208 text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
209
210 Registry.dispatch(@registry, topic, fn list ->
211 Enum.each(list, fn {pid, _auth} ->
212 send(pid, {:text, text})
213 end)
214 end)
215 end
216
217 defp do_stream("user", item) do
218 Logger.debug("Trying to push to users")
219
220 recipient_topics =
221 User.get_recipients_from_activity(item)
222 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
223
224 Enum.each(recipient_topics, fn topic ->
225 push_to_socket(topic, item)
226 end)
227 end
228
229 defp do_stream(topic, item) do
230 Logger.debug("Trying to push to #{topic}")
231 Logger.debug("Pushing item to #{topic}")
232 push_to_socket(topic, item)
233 end
234
235 defp push_to_socket(topic, %Participation{} = participation) do
236 rendered = StreamerView.render("conversation.json", participation)
237
238 Registry.dispatch(@registry, topic, fn list ->
239 Enum.each(list, fn {pid, _} ->
240 send(pid, {:text, rendered})
241 end)
242 end)
243 end
244
245 defp push_to_socket(topic, %Activity{
246 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
247 }) do
248 rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
249
250 Registry.dispatch(@registry, topic, fn list ->
251 Enum.each(list, fn {pid, _} ->
252 send(pid, {:text, rendered})
253 end)
254 end)
255 end
256
257 defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
258
259 defp push_to_socket(topic, item) do
260 anon_render = StreamerView.render("update.json", item)
261
262 Registry.dispatch(@registry, topic, fn list ->
263 Enum.each(list, fn {pid, auth?} ->
264 if auth? do
265 send(pid, {:render_with_user, StreamerView, "update.json", item})
266 else
267 send(pid, {:text, anon_render})
268 end
269 end)
270 end)
271 end
272
273 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
274
275 defp thread_containment(activity, user) do
276 if Config.get([:instance, :skip_thread_containment]) do
277 true
278 else
279 ActivityPub.contain_activity(activity, user)
280 end
281 end
282
283 # In test environement, only return true if the registry is started.
284 # In benchmark environment, returns false.
285 # In any other environment, always returns true.
286 cond do
287 @mix_env == :test ->
288 def should_env_send? do
289 case Process.whereis(@registry) do
290 nil ->
291 false
292
293 pid ->
294 Process.alive?(pid)
295 end
296 end
297
298 @mix_env == :benchmark ->
299 def should_env_send?, do: false
300
301 true ->
302 def should_env_send?, do: true
303 end
304 end