Merge branch 'fix/user-list-task' into 'develop'
[akkoma] / lib / pleroma / web / streamer / worker.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer.Worker do
6 use GenServer
7
8 require Logger
9
10 alias Pleroma.Activity
11 alias Pleroma.Config
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Notification
14 alias Pleroma.Object
15 alias Pleroma.User
16 alias Pleroma.Web.ActivityPub.ActivityPub
17 alias Pleroma.Web.ActivityPub.Visibility
18 alias Pleroma.Web.CommonAPI
19 alias Pleroma.Web.Streamer.State
20 alias Pleroma.Web.Streamer.StreamerSocket
21 alias Pleroma.Web.StreamerView
22
23 def start_link(_) do
24 GenServer.start_link(__MODULE__, %{}, [])
25 end
26
27 def init(init_arg) do
28 {:ok, init_arg}
29 end
30
31 def stream(pid, topics, items) do
32 GenServer.call(pid, {:stream, topics, items})
33 end
34
35 def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
36 Enum.each(topics, fn t ->
37 do_stream(%{topic: t, item: item})
38 end)
39
40 {:reply, state, state}
41 end
42
43 def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
44 Enum.each(items, fn i ->
45 do_stream(%{topic: topic, item: i})
46 end)
47
48 {:reply, state, state}
49 end
50
51 def handle_call({:stream, topic, item}, _from, state) do
52 do_stream(%{topic: topic, item: item})
53
54 {:reply, state, state}
55 end
56
57 defp do_stream(%{topic: "direct", item: item}) do
58 recipient_topics =
59 User.get_recipients_from_activity(item)
60 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
61
62 Enum.each(recipient_topics, fn user_topic ->
63 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
64 push_to_socket(State.get_sockets(), user_topic, item)
65 end)
66 end
67
68 defp do_stream(%{topic: "participation", item: participation}) do
69 user_topic = "direct:#{participation.user_id}"
70 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
71
72 push_to_socket(State.get_sockets(), user_topic, participation)
73 end
74
75 defp do_stream(%{topic: "list", item: item}) do
76 # filter the recipient list if the activity is not public, see #270.
77 recipient_lists =
78 case Visibility.is_public?(item) do
79 true ->
80 Pleroma.List.get_lists_from_activity(item)
81
82 _ ->
83 Pleroma.List.get_lists_from_activity(item)
84 |> Enum.filter(fn list ->
85 owner = User.get_cached_by_id(list.user_id)
86
87 Visibility.visible_for_user?(item, owner)
88 end)
89 end
90
91 recipient_topics =
92 recipient_lists
93 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
94
95 Enum.each(recipient_topics, fn list_topic ->
96 Logger.debug("Trying to push message to #{list_topic}\n\n")
97 push_to_socket(State.get_sockets(), list_topic, item)
98 end)
99 end
100
101 defp do_stream(%{topic: topic, item: %Notification{} = item})
102 when topic in ["user", "user:notification"] do
103 State.get_sockets()
104 |> Map.get("#{topic}:#{item.user_id}", [])
105 |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
106 with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
107 true <- should_send?(user, item) do
108 send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
109 end
110 end)
111 end
112
113 defp do_stream(%{topic: "user", item: item}) do
114 Logger.debug("Trying to push to users")
115
116 recipient_topics =
117 User.get_recipients_from_activity(item)
118 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
119
120 Enum.each(recipient_topics, fn topic ->
121 push_to_socket(State.get_sockets(), topic, item)
122 end)
123 end
124
125 defp do_stream(%{topic: topic, item: item}) do
126 Logger.debug("Trying to push to #{topic}")
127 Logger.debug("Pushing item to #{topic}")
128 push_to_socket(State.get_sockets(), topic, item)
129 end
130
131 defp should_send?(%User{} = user, %Activity{} = item) do
132 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
133 User.outgoing_relations_ap_ids(user, [:block, :mute, :reblog_mute])
134
135 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
136 recipients = MapSet.new(item.recipients)
137 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
138
139 with parent <- Object.normalize(item) || item,
140 true <-
141 Enum.all?([blocked_ap_ids, muted_ap_ids, reblog_muted_ap_ids], &(item.actor not in &1)),
142 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
143 true <- MapSet.disjoint?(recipients, recipient_blocks),
144 %{host: item_host} <- URI.parse(item.actor),
145 %{host: parent_host} <- URI.parse(parent.data["actor"]),
146 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
147 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
148 true <- thread_containment(item, user),
149 false <- CommonAPI.thread_muted?(user, item) do
150 true
151 else
152 _ -> false
153 end
154 end
155
156 defp should_send?(%User{} = user, %Notification{activity: activity}) do
157 should_send?(user, activity)
158 end
159
160 def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
161 Enum.each(topics[topic] || [], fn %StreamerSocket{
162 transport_pid: transport_pid,
163 user: socket_user
164 } ->
165 # Get the current user so we have up-to-date blocks etc.
166 if socket_user do
167 user = User.get_cached_by_ap_id(socket_user.ap_id)
168
169 if should_send?(user, item) do
170 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
171 end
172 else
173 send(transport_pid, {:text, StreamerView.render("update.json", item)})
174 end
175 end)
176 end
177
178 def push_to_socket(topics, topic, %Participation{} = participation) do
179 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
180 send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
181 end)
182 end
183
184 def push_to_socket(topics, topic, %Activity{
185 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
186 }) do
187 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
188 send(
189 transport_pid,
190 {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
191 )
192 end)
193 end
194
195 def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
196
197 def push_to_socket(topics, topic, item) do
198 Enum.each(topics[topic] || [], fn %StreamerSocket{
199 transport_pid: transport_pid,
200 user: socket_user
201 } ->
202 # Get the current user so we have up-to-date blocks etc.
203 if socket_user do
204 user = User.get_cached_by_ap_id(socket_user.ap_id)
205
206 if should_send?(user, item) do
207 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
208 end
209 else
210 send(transport_pid, {:text, StreamerView.render("update.json", item)})
211 end
212 end)
213 end
214
215 @spec thread_containment(Activity.t(), User.t()) :: boolean()
216 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
217
218 defp thread_containment(activity, user) do
219 if Config.get([:instance, :skip_thread_containment]) do
220 true
221 else
222 ActivityPub.contain_activity(activity, user)
223 end
224 end
225 end