User: Add raw_bio, storing unformatted bio
[akkoma] / lib / pleroma / web / streamer / streamer.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer do
6 require Logger
7
8 alias Pleroma.Activity
9 alias Pleroma.Config
10 alias Pleroma.Conversation.Participation
11 alias Pleroma.Notification
12 alias Pleroma.Object
13 alias Pleroma.User
14 alias Pleroma.Web.ActivityPub.ActivityPub
15 alias Pleroma.Web.ActivityPub.Visibility
16 alias Pleroma.Web.CommonAPI
17 alias Pleroma.Web.StreamerView
18
19 @mix_env Mix.env()
20 @registry Pleroma.Web.StreamerRegistry
21
22 def registry, do: @registry
23
24 @public_streams ["public", "public:local", "public:media", "public:local:media"]
25 @user_streams ["user", "user:notification", "direct"]
26
27 @doc "Expands and authorizes a stream, and registers the process for streaming."
28 @spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) ::
29 {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
30 def get_topic_and_add_socket(stream, user, params \\ %{}) do
31 case get_topic(stream, user, params) do
32 {:ok, topic} -> add_socket(topic, user)
33 error -> error
34 end
35 end
36
37 @doc "Expand and authorizes a stream"
38 @spec get_topic(stream :: String.t(), User.t() | nil, Map.t()) ::
39 {:ok, topic :: String.t()} | {:error, :bad_topic}
40 def get_topic(stream, user, params \\ %{})
41
42 # Allow all public steams.
43 def get_topic(stream, _, _) when stream in @public_streams do
44 {:ok, stream}
45 end
46
47 # Allow all hashtags streams.
48 def get_topic("hashtag", _, %{"tag" => tag}) do
49 {:ok, "hashtag:" <> tag}
50 end
51
52 # Expand user streams.
53 def get_topic(stream, %User{} = user, _) when stream in @user_streams do
54 {:ok, stream <> ":" <> to_string(user.id)}
55 end
56
57 def get_topic(stream, _, _) when stream in @user_streams do
58 {:error, :unauthorized}
59 end
60
61 # List streams.
62 def get_topic("list", %User{} = user, %{"list" => id}) do
63 if Pleroma.List.get(id, user) do
64 {:ok, "list:" <> to_string(id)}
65 else
66 {:error, :bad_topic}
67 end
68 end
69
70 def get_topic("list", _, _) do
71 {:error, :unauthorized}
72 end
73
74 def get_topic(_, _, _) do
75 {:error, :bad_topic}
76 end
77
78 @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
79 def add_socket(topic, user) do
80 if should_env_send?() do
81 auth? = if user, do: true
82 Registry.register(@registry, topic, auth?)
83 end
84
85 {:ok, topic}
86 end
87
88 def remove_socket(topic) do
89 if should_env_send?(), do: Registry.unregister(@registry, topic)
90 end
91
92 def stream(topics, item) when is_list(topics) do
93 if should_env_send?() do
94 Enum.each(topics, fn t ->
95 spawn(fn -> do_stream(t, item) end)
96 end)
97 end
98
99 :ok
100 end
101
102 def stream(topic, items) when is_list(items) do
103 if should_env_send?() do
104 Enum.each(items, fn i ->
105 spawn(fn -> do_stream(topic, i) end)
106 end)
107
108 :ok
109 end
110 end
111
112 def stream(topic, item) do
113 if should_env_send?() do
114 spawn(fn -> do_stream(topic, item) end)
115 end
116
117 :ok
118 end
119
120 def filtered_by_user?(%User{} = user, %Activity{} = item) do
121 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
122 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
123
124 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
125 recipients = MapSet.new(item.recipients)
126 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
127
128 with parent <- Object.normalize(item) || item,
129 true <-
130 Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
131 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
132 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
133 true <- MapSet.disjoint?(recipients, recipient_blocks),
134 %{host: item_host} <- URI.parse(item.actor),
135 %{host: parent_host} <- URI.parse(parent.data["actor"]),
136 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
137 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
138 true <- thread_containment(item, user),
139 false <- CommonAPI.thread_muted?(user, parent) do
140 false
141 else
142 _ -> true
143 end
144 end
145
146 def filtered_by_user?(%User{} = user, %Notification{activity: activity}) do
147 filtered_by_user?(user, activity)
148 end
149
150 defp do_stream("direct", item) do
151 recipient_topics =
152 User.get_recipients_from_activity(item)
153 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
154
155 Enum.each(recipient_topics, fn user_topic ->
156 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
157 push_to_socket(user_topic, item)
158 end)
159 end
160
161 defp do_stream("participation", participation) do
162 user_topic = "direct:#{participation.user_id}"
163 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
164
165 push_to_socket(user_topic, participation)
166 end
167
168 defp do_stream("list", item) do
169 # filter the recipient list if the activity is not public, see #270.
170 recipient_lists =
171 case Visibility.is_public?(item) do
172 true ->
173 Pleroma.List.get_lists_from_activity(item)
174
175 _ ->
176 Pleroma.List.get_lists_from_activity(item)
177 |> Enum.filter(fn list ->
178 owner = User.get_cached_by_id(list.user_id)
179
180 Visibility.visible_for_user?(item, owner)
181 end)
182 end
183
184 recipient_topics =
185 recipient_lists
186 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
187
188 Enum.each(recipient_topics, fn list_topic ->
189 Logger.debug("Trying to push message to #{list_topic}\n\n")
190 push_to_socket(list_topic, item)
191 end)
192 end
193
194 defp do_stream(topic, %Notification{} = item)
195 when topic in ["user", "user:notification"] do
196 Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
197 Enum.each(list, fn {pid, _auth} ->
198 send(pid, {:render_with_user, StreamerView, "notification.json", item})
199 end)
200 end)
201 end
202
203 defp do_stream("user", item) do
204 Logger.debug("Trying to push to users")
205
206 recipient_topics =
207 User.get_recipients_from_activity(item)
208 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
209
210 Enum.each(recipient_topics, fn topic ->
211 push_to_socket(topic, item)
212 end)
213 end
214
215 defp do_stream(topic, item) do
216 Logger.debug("Trying to push to #{topic}")
217 Logger.debug("Pushing item to #{topic}")
218 push_to_socket(topic, item)
219 end
220
221 defp push_to_socket(topic, %Participation{} = participation) do
222 rendered = StreamerView.render("conversation.json", participation)
223
224 Registry.dispatch(@registry, topic, fn list ->
225 Enum.each(list, fn {pid, _} ->
226 send(pid, {:text, rendered})
227 end)
228 end)
229 end
230
231 defp push_to_socket(topic, %Activity{
232 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
233 }) do
234 rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
235
236 Registry.dispatch(@registry, topic, fn list ->
237 Enum.each(list, fn {pid, _} ->
238 send(pid, {:text, rendered})
239 end)
240 end)
241 end
242
243 defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
244
245 defp push_to_socket(topic, item) do
246 anon_render = StreamerView.render("update.json", item)
247
248 Registry.dispatch(@registry, topic, fn list ->
249 Enum.each(list, fn {pid, auth?} ->
250 if auth? do
251 send(pid, {:render_with_user, StreamerView, "update.json", item})
252 else
253 send(pid, {:text, anon_render})
254 end
255 end)
256 end)
257 end
258
259 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
260
261 defp thread_containment(activity, user) do
262 if Config.get([:instance, :skip_thread_containment]) do
263 true
264 else
265 ActivityPub.contain_activity(activity, user)
266 end
267 end
268
269 # In test environement, only return true if the registry is started.
270 # In benchmark environment, returns false.
271 # In any other environment, always returns true.
272 cond do
273 @mix_env == :test ->
274 def should_env_send? do
275 case Process.whereis(@registry) do
276 nil ->
277 false
278
279 pid ->
280 Process.alive?(pid)
281 end
282 end
283
284 @mix_env == :benchmark ->
285 def should_env_send?, do: false
286
287 true ->
288 def should_env_send?, do: true
289 end
290 end