Set sum types in query
[akkoma] / lib / pleroma / web / streamer / streamer.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer do
6 require Logger
7
8 alias Pleroma.Activity
9 alias Pleroma.Config
10 alias Pleroma.Conversation.Participation
11 alias Pleroma.Notification
12 alias Pleroma.Object
13 alias Pleroma.User
14 alias Pleroma.Web.ActivityPub.ActivityPub
15 alias Pleroma.Web.ActivityPub.Visibility
16 alias Pleroma.Web.CommonAPI
17 alias Pleroma.Web.StreamerView
18
19 @mix_env Mix.env()
20 @registry Pleroma.Web.StreamerRegistry
21
22 def registry, do: @registry
23
24 def add_socket(topic, %User{} = user) do
25 if should_env_send?(), do: Registry.register(@registry, user_topic(topic, user), true)
26 end
27
28 def add_socket(topic, _) do
29 if should_env_send?(), do: Registry.register(@registry, topic, false)
30 end
31
32 def remove_socket(topic) do
33 if should_env_send?(), do: Registry.unregister(@registry, topic)
34 end
35
36 def stream(topics, item) when is_list(topics) do
37 if should_env_send?() do
38 Enum.each(topics, fn t ->
39 spawn(fn -> do_stream(t, item) end)
40 end)
41 end
42
43 :ok
44 end
45
46 def stream(topic, items) when is_list(items) do
47 if should_env_send?() do
48 Enum.each(items, fn i ->
49 spawn(fn -> do_stream(topic, i) end)
50 end)
51
52 :ok
53 end
54 end
55
56 def stream(topic, item) do
57 if should_env_send?() do
58 spawn(fn -> do_stream(topic, item) end)
59 end
60
61 :ok
62 end
63
64 def filtered_by_user?(%User{} = user, %Activity{} = item) do
65 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
66 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
67
68 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
69 recipients = MapSet.new(item.recipients)
70 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
71
72 with parent <- Object.normalize(item) || item,
73 true <-
74 Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
75 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
76 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
77 true <- MapSet.disjoint?(recipients, recipient_blocks),
78 %{host: item_host} <- URI.parse(item.actor),
79 %{host: parent_host} <- URI.parse(parent.data["actor"]),
80 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
81 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
82 true <- thread_containment(item, user),
83 false <- CommonAPI.thread_muted?(user, item) do
84 false
85 else
86 _ -> true
87 end
88 end
89
90 def filtered_by_user?(%User{} = user, %Notification{activity: activity}) do
91 filtered_by_user?(user, activity)
92 end
93
94 defp do_stream("direct", item) do
95 recipient_topics =
96 User.get_recipients_from_activity(item)
97 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
98
99 Enum.each(recipient_topics, fn user_topic ->
100 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
101 push_to_socket(user_topic, item)
102 end)
103 end
104
105 defp do_stream("participation", participation) do
106 user_topic = "direct:#{participation.user_id}"
107 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
108
109 push_to_socket(user_topic, participation)
110 end
111
112 defp do_stream("list", item) do
113 # filter the recipient list if the activity is not public, see #270.
114 recipient_lists =
115 case Visibility.is_public?(item) do
116 true ->
117 Pleroma.List.get_lists_from_activity(item)
118
119 _ ->
120 Pleroma.List.get_lists_from_activity(item)
121 |> Enum.filter(fn list ->
122 owner = User.get_cached_by_id(list.user_id)
123
124 Visibility.visible_for_user?(item, owner)
125 end)
126 end
127
128 recipient_topics =
129 recipient_lists
130 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
131
132 Enum.each(recipient_topics, fn list_topic ->
133 Logger.debug("Trying to push message to #{list_topic}\n\n")
134 push_to_socket(list_topic, item)
135 end)
136 end
137
138 defp do_stream(topic, %Notification{} = item)
139 when topic in ["user", "user:notification"] do
140 Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
141 Enum.each(list, fn {pid, _auth} ->
142 send(pid, {:render_with_user, StreamerView, "notification.json", item})
143 end)
144 end)
145 end
146
147 defp do_stream("user", item) do
148 Logger.debug("Trying to push to users")
149
150 recipient_topics =
151 User.get_recipients_from_activity(item)
152 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
153
154 Enum.each(recipient_topics, fn topic ->
155 push_to_socket(topic, item)
156 end)
157 end
158
159 defp do_stream(topic, item) do
160 Logger.debug("Trying to push to #{topic}")
161 Logger.debug("Pushing item to #{topic}")
162 push_to_socket(topic, item)
163 end
164
165 defp push_to_socket(topic, %Participation{} = participation) do
166 rendered = StreamerView.render("conversation.json", participation)
167
168 Registry.dispatch(@registry, topic, fn list ->
169 Enum.each(list, fn {pid, _} ->
170 send(pid, {:text, rendered})
171 end)
172 end)
173 end
174
175 defp push_to_socket(topic, %Activity{
176 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
177 }) do
178 rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
179
180 Registry.dispatch(@registry, topic, fn list ->
181 Enum.each(list, fn {pid, _} ->
182 send(pid, {:text, rendered})
183 end)
184 end)
185 end
186
187 defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
188
189 defp push_to_socket(topic, item) do
190 anon_render = StreamerView.render("update.json", item)
191
192 Registry.dispatch(@registry, topic, fn list ->
193 Enum.each(list, fn {pid, auth?} ->
194 if auth? do
195 send(pid, {:render_with_user, StreamerView, "update.json", item})
196 else
197 send(pid, {:text, anon_render})
198 end
199 end)
200 end)
201 end
202
203 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
204
205 defp thread_containment(activity, user) do
206 if Config.get([:instance, :skip_thread_containment]) do
207 true
208 else
209 ActivityPub.contain_activity(activity, user)
210 end
211 end
212
213 # In test environement, only return true if the registry is started.
214 # In benchmark environment, returns false.
215 # In any other environment, always returns true.
216 cond do
217 @mix_env == :test ->
218 def should_env_send? do
219 case Process.whereis(@registry) do
220 nil ->
221 false
222
223 pid ->
224 Process.alive?(pid)
225 end
226 end
227
228 @mix_env == :benchmark ->
229 def should_env_send?, do: false
230
231 true ->
232 def should_env_send?, do: true
233 end
234
235 defp user_topic(topic, user)
236 when topic in ~w[user user:notification direct] do
237 "#{topic}:#{user.id}"
238 end
239
240 defp user_topic(topic, _) do
241 topic
242 end
243 end