streamer: worker: check for lack of intersectionality between a user's blocklist...
[akkoma] / lib / pleroma / web / streamer / worker.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer.Worker do
6 use GenServer
7
8 require Logger
9
10 alias Pleroma.Activity
11 alias Pleroma.Config
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Notification
14 alias Pleroma.Object
15 alias Pleroma.User
16 alias Pleroma.Web.ActivityPub.ActivityPub
17 alias Pleroma.Web.ActivityPub.Visibility
18 alias Pleroma.Web.CommonAPI
19 alias Pleroma.Web.Streamer.State
20 alias Pleroma.Web.Streamer.StreamerSocket
21 alias Pleroma.Web.StreamerView
22
23 def start_link(_) do
24 GenServer.start_link(__MODULE__, %{}, [])
25 end
26
27 def init(init_arg) do
28 {:ok, init_arg}
29 end
30
31 def stream(pid, topics, items) do
32 GenServer.call(pid, {:stream, topics, items})
33 end
34
35 def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
36 Enum.each(topics, fn t ->
37 do_stream(%{topic: t, item: item})
38 end)
39
40 {:reply, state, state}
41 end
42
43 def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
44 Enum.each(items, fn i ->
45 do_stream(%{topic: topic, item: i})
46 end)
47
48 {:reply, state, state}
49 end
50
51 def handle_call({:stream, topic, item}, _from, state) do
52 do_stream(%{topic: topic, item: item})
53
54 {:reply, state, state}
55 end
56
57 defp do_stream(%{topic: "direct", item: item}) do
58 recipient_topics =
59 User.get_recipients_from_activity(item)
60 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
61
62 Enum.each(recipient_topics, fn user_topic ->
63 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
64 push_to_socket(State.get_sockets(), user_topic, item)
65 end)
66 end
67
68 defp do_stream(%{topic: "participation", item: participation}) do
69 user_topic = "direct:#{participation.user_id}"
70 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
71
72 push_to_socket(State.get_sockets(), user_topic, participation)
73 end
74
75 defp do_stream(%{topic: "list", item: item}) do
76 # filter the recipient list if the activity is not public, see #270.
77 recipient_lists =
78 case Visibility.is_public?(item) do
79 true ->
80 Pleroma.List.get_lists_from_activity(item)
81
82 _ ->
83 Pleroma.List.get_lists_from_activity(item)
84 |> Enum.filter(fn list ->
85 owner = User.get_cached_by_id(list.user_id)
86
87 Visibility.visible_for_user?(item, owner)
88 end)
89 end
90
91 recipient_topics =
92 recipient_lists
93 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
94
95 Enum.each(recipient_topics, fn list_topic ->
96 Logger.debug("Trying to push message to #{list_topic}\n\n")
97 push_to_socket(State.get_sockets(), list_topic, item)
98 end)
99 end
100
101 defp do_stream(%{topic: topic, item: %Notification{} = item})
102 when topic in ["user", "user:notification"] do
103 State.get_sockets()
104 |> Map.get("#{topic}:#{item.user_id}", [])
105 |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
106 with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
107 true <- should_send?(user, item) do
108 send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
109 end
110 end)
111 end
112
113 defp do_stream(%{topic: "user", item: item}) do
114 Logger.debug("Trying to push to users")
115
116 recipient_topics =
117 User.get_recipients_from_activity(item)
118 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
119
120 Enum.each(recipient_topics, fn topic ->
121 push_to_socket(State.get_sockets(), topic, item)
122 end)
123 end
124
125 defp do_stream(%{topic: topic, item: item}) do
126 Logger.debug("Trying to push to #{topic}")
127 Logger.debug("Pushing item to #{topic}")
128 push_to_socket(State.get_sockets(), topic, item)
129 end
130
131 defp should_send?(%User{} = user, %Activity{} = item) do
132 blocks = user.info.blocks || []
133 mutes = user.info.mutes || []
134 reblog_mutes = user.info.muted_reblogs || []
135 recipient_blocks = MapSet.new(blocks ++ mutes)
136 recipients = MapSet.new(item.recipients)
137 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
138
139 with parent when not is_nil(parent) <- Object.normalize(item),
140 true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
141 true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
142 true <- MapSet.disjoint?(recipients, recipient_blocks),
143 %{host: item_host} <- URI.parse(item.actor),
144 %{host: parent_host} <- URI.parse(parent.data["actor"]),
145 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
146 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
147 true <- thread_containment(item, user),
148 false <- CommonAPI.thread_muted?(user, item) do
149 true
150 else
151 _ -> false
152 end
153 end
154
155 defp should_send?(%User{} = user, %Notification{activity: activity}) do
156 should_send?(user, activity)
157 end
158
159 def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
160 Enum.each(topics[topic] || [], fn %StreamerSocket{
161 transport_pid: transport_pid,
162 user: socket_user
163 } ->
164 # Get the current user so we have up-to-date blocks etc.
165 if socket_user do
166 user = User.get_cached_by_ap_id(socket_user.ap_id)
167
168 if should_send?(user, item) do
169 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
170 end
171 else
172 send(transport_pid, {:text, StreamerView.render("update.json", item)})
173 end
174 end)
175 end
176
177 def push_to_socket(topics, topic, %Participation{} = participation) do
178 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
179 send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
180 end)
181 end
182
183 def push_to_socket(topics, topic, %Activity{
184 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
185 }) do
186 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
187 send(
188 transport_pid,
189 {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
190 )
191 end)
192 end
193
194 def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
195
196 def push_to_socket(topics, topic, item) do
197 Enum.each(topics[topic] || [], fn %StreamerSocket{
198 transport_pid: transport_pid,
199 user: socket_user
200 } ->
201 # Get the current user so we have up-to-date blocks etc.
202 if socket_user do
203 user = User.get_cached_by_ap_id(socket_user.ap_id)
204 blocks = user.info.blocks || []
205 mutes = user.info.mutes || []
206
207 with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
208 true <- thread_containment(item, user) do
209 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
210 end
211 else
212 send(transport_pid, {:text, StreamerView.render("update.json", item)})
213 end
214 end)
215 end
216
217 @spec thread_containment(Activity.t(), User.t()) :: boolean()
218 defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
219
220 defp thread_containment(activity, user) do
221 if Config.get([:instance, :skip_thread_containment]) do
222 true
223 else
224 ActivityPub.contain_activity(activity, user)
225 end
226 end
227 end