Remove vapidPublicKey from Nodeinfo
[akkoma] / lib / pleroma / web / streamer / worker.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer.Worker do
6 use GenServer
7
8 require Logger
9
10 alias Pleroma.Activity
11 alias Pleroma.Config
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Notification
14 alias Pleroma.Object
15 alias Pleroma.User
16 alias Pleroma.Web.ActivityPub.ActivityPub
17 alias Pleroma.Web.ActivityPub.Visibility
18 alias Pleroma.Web.CommonAPI
19 alias Pleroma.Web.Streamer.State
20 alias Pleroma.Web.Streamer.StreamerSocket
21 alias Pleroma.Web.StreamerView
22
23 def start_link(_) do
24 GenServer.start_link(__MODULE__, %{}, [])
25 end
26
27 def init(init_arg) do
28 {:ok, init_arg}
29 end
30
31 def stream(pid, topics, items) do
32 GenServer.call(pid, {:stream, topics, items})
33 end
34
35 def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
36 Enum.each(topics, fn t ->
37 do_stream(%{topic: t, item: item})
38 end)
39
40 {:reply, state, state}
41 end
42
43 def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
44 Enum.each(items, fn i ->
45 do_stream(%{topic: topic, item: i})
46 end)
47
48 {:reply, state, state}
49 end
50
51 def handle_call({:stream, topic, item}, _from, state) do
52 do_stream(%{topic: topic, item: item})
53
54 {:reply, state, state}
55 end
56
57 defp do_stream(%{topic: "direct", item: item}) do
58 recipient_topics =
59 User.get_recipients_from_activity(item)
60 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
61
62 Enum.each(recipient_topics, fn user_topic ->
63 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
64 push_to_socket(State.get_sockets(), user_topic, item)
65 end)
66 end
67
68 defp do_stream(%{topic: "participation", item: participation}) do
69 user_topic = "direct:#{participation.user_id}"
70 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
71
72 push_to_socket(State.get_sockets(), user_topic, participation)
73 end
74
75 defp do_stream(%{topic: "list", item: item}) do
76 # filter the recipient list if the activity is not public, see #270.
77 recipient_lists =
78 case Visibility.is_public?(item) do
79 true ->
80 Pleroma.List.get_lists_from_activity(item)
81
82 _ ->
83 Pleroma.List.get_lists_from_activity(item)
84 |> Enum.filter(fn list ->
85 owner = User.get_cached_by_id(list.user_id)
86
87 Visibility.visible_for_user?(item, owner)
88 end)
89 end
90
91 recipient_topics =
92 recipient_lists
93 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
94
95 Enum.each(recipient_topics, fn list_topic ->
96 Logger.debug("Trying to push message to #{list_topic}\n\n")
97 push_to_socket(State.get_sockets(), list_topic, item)
98 end)
99 end
100
101 defp do_stream(%{topic: topic, item: %Notification{} = item})
102 when topic in ["user", "user:notification"] do
103 State.get_sockets()
104 |> Map.get("#{topic}:#{item.user_id}", [])
105 |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
106 with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
107 true <- should_send?(user, item) do
108 send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
109 end
110 end)
111 end
112
113 defp do_stream(%{topic: "user", item: item}) do
114 Logger.debug("Trying to push to users")
115
116 recipient_topics =
117 User.get_recipients_from_activity(item)
118 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
119
120 Enum.each(recipient_topics, fn topic ->
121 push_to_socket(State.get_sockets(), topic, item)
122 end)
123 end
124
125 defp do_stream(%{topic: topic, item: item}) do
126 Logger.debug("Trying to push to #{topic}")
127 Logger.debug("Pushing item to #{topic}")
128 push_to_socket(State.get_sockets(), topic, item)
129 end
130
131 defp should_send?(%User{} = user, %Activity{} = item) do
132 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
133 User.outgoing_relations_ap_ids(user, [:block, :mute, :reblog_mute])
134
135 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
136 recipients = MapSet.new(item.recipients)
137 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
138
139 with parent <- Object.normalize(item) || item,
140 true <-
141 Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
142 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
143 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
144 true <- MapSet.disjoint?(recipients, recipient_blocks),
145 %{host: item_host} <- URI.parse(item.actor),
146 %{host: parent_host} <- URI.parse(parent.data["actor"]),
147 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
148 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
149 true <- thread_containment(item, user),
150 false <- CommonAPI.thread_muted?(user, item) do
151 true
152 else
153 _ -> false
154 end
155 end
156
157 defp should_send?(%User{} = user, %Notification{activity: activity}) do
158 should_send?(user, activity)
159 end
160
161 def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
162 Enum.each(topics[topic] || [], fn %StreamerSocket{
163 transport_pid: transport_pid,
164 user: socket_user
165 } ->
166 # Get the current user so we have up-to-date blocks etc.
167 if socket_user do
168 user = User.get_cached_by_ap_id(socket_user.ap_id)
169
170 if should_send?(user, item) do
171 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
172 end
173 else
174 send(transport_pid, {:text, StreamerView.render("update.json", item)})
175 end
176 end)
177 end
178
179 def push_to_socket(topics, topic, %Participation{} = participation) do
180 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
181 send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
182 end)
183 end
184
185 def push_to_socket(topics, topic, %Activity{
186 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
187 }) do
188 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
189 send(
190 transport_pid,
191 {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
192 )
193 end)
194 end
195
196 def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
197
198 def push_to_socket(topics, topic, item) do
199 Enum.each(topics[topic] || [], fn %StreamerSocket{
200 transport_pid: transport_pid,
201 user: socket_user
202 } ->
203 # Get the current user so we have up-to-date blocks etc.
204 if socket_user do
205 user = User.get_cached_by_ap_id(socket_user.ap_id)
206
207 if should_send?(user, item) do
208 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
209 end
210 else
211 send(transport_pid, {:text, StreamerView.render("update.json", item)})
212 end
213 end)
214 end
215
216 @spec thread_containment(Activity.t(), User.t()) :: boolean()
217 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
218
219 defp thread_containment(activity, user) do
220 if Config.get([:instance, :skip_thread_containment]) do
221 true
222 else
223 ActivityPub.contain_activity(activity, user)
224 end
225 end
226 end