bbb7483e5e87d9262a63b89444f9170b7073f788
[akkoma] / lib / pleroma / web / streamer / worker.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer.Worker do
6 use GenServer
7
8 require Logger
9
10 alias Pleroma.Activity
11 alias Pleroma.Config
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Notification
14 alias Pleroma.Object
15 alias Pleroma.User
16 alias Pleroma.Web.ActivityPub.ActivityPub
17 alias Pleroma.Web.ActivityPub.Visibility
18 alias Pleroma.Web.CommonAPI
19 alias Pleroma.Web.Streamer.State
20 alias Pleroma.Web.Streamer.StreamerSocket
21 alias Pleroma.Web.StreamerView
22
23 def start_link(_) do
24 GenServer.start_link(__MODULE__, %{}, [])
25 end
26
27 def init(init_arg) do
28 {:ok, init_arg}
29 end
30
31 def stream(pid, topics, items) do
32 GenServer.call(pid, {:stream, topics, items})
33 end
34
35 def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
36 Enum.each(topics, fn t ->
37 do_stream(%{topic: t, item: item})
38 end)
39
40 {:reply, state, state}
41 end
42
43 def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
44 Enum.each(items, fn i ->
45 do_stream(%{topic: topic, item: i})
46 end)
47
48 {:reply, state, state}
49 end
50
51 def handle_call({:stream, topic, item}, _from, state) do
52 do_stream(%{topic: topic, item: item})
53
54 {:reply, state, state}
55 end
56
57 defp do_stream(%{topic: "direct", item: item}) do
58 recipient_topics =
59 User.get_recipients_from_activity(item)
60 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
61
62 Enum.each(recipient_topics, fn user_topic ->
63 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
64 push_to_socket(State.get_sockets(), user_topic, item)
65 end)
66 end
67
68 defp do_stream(%{topic: "participation", item: participation}) do
69 user_topic = "direct:#{participation.user_id}"
70 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
71
72 push_to_socket(State.get_sockets(), user_topic, participation)
73 end
74
75 defp do_stream(%{topic: "list", item: item}) do
76 # filter the recipient list if the activity is not public, see #270.
77 recipient_lists =
78 case Visibility.is_public?(item) do
79 true ->
80 Pleroma.List.get_lists_from_activity(item)
81
82 _ ->
83 Pleroma.List.get_lists_from_activity(item)
84 |> Enum.filter(fn list ->
85 owner = User.get_cached_by_id(list.user_id)
86
87 Visibility.visible_for_user?(item, owner)
88 end)
89 end
90
91 recipient_topics =
92 recipient_lists
93 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
94
95 Enum.each(recipient_topics, fn list_topic ->
96 Logger.debug("Trying to push message to #{list_topic}\n\n")
97 push_to_socket(State.get_sockets(), list_topic, item)
98 end)
99 end
100
101 defp do_stream(%{topic: topic, item: %Notification{} = item})
102 when topic in ["user", "user:notification"] do
103 State.get_sockets()
104 |> Map.get("#{topic}:#{item.user_id}", [])
105 |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
106 with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
107 true <- should_send?(user, item) do
108 send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
109 end
110 end)
111 end
112
113 defp do_stream(%{topic: "user", item: item}) do
114 Logger.debug("Trying to push to users")
115
116 recipient_topics =
117 User.get_recipients_from_activity(item)
118 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
119
120 Enum.each(recipient_topics, fn topic ->
121 push_to_socket(State.get_sockets(), topic, item)
122 end)
123 end
124
125 defp do_stream(%{topic: topic, item: item}) do
126 Logger.debug("Trying to push to #{topic}")
127 Logger.debug("Pushing item to #{topic}")
128 push_to_socket(State.get_sockets(), topic, item)
129 end
130
131 defp should_send?(%User{} = user, %Activity{} = item) do
132 blocks = user.info.blocks || []
133 mutes = user.info.mutes || []
134 reblog_mutes = user.info.muted_reblogs || []
135 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
136
137 with parent when not is_nil(parent) <- Object.normalize(item),
138 true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
139 true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
140 %{host: item_host} <- URI.parse(item.actor),
141 %{host: parent_host} <- URI.parse(parent.data["actor"]),
142 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
143 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
144 true <- thread_containment(item, user),
145 false <- CommonAPI.thread_muted?(user, item) do
146 true
147 else
148 _ -> false
149 end
150 end
151
152 defp should_send?(%User{} = user, %Notification{activity: activity}) do
153 should_send?(user, activity)
154 end
155
156 def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
157 Enum.each(topics[topic] || [], fn %StreamerSocket{
158 transport_pid: transport_pid,
159 user: socket_user
160 } ->
161 # Get the current user so we have up-to-date blocks etc.
162 if socket_user do
163 user = User.get_cached_by_ap_id(socket_user.ap_id)
164
165 if should_send?(user, item) do
166 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
167 end
168 else
169 send(transport_pid, {:text, StreamerView.render("update.json", item)})
170 end
171 end)
172 end
173
174 def push_to_socket(topics, topic, %Participation{} = participation) do
175 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
176 send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
177 end)
178 end
179
180 def push_to_socket(topics, topic, %Activity{
181 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
182 }) do
183 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
184 send(
185 transport_pid,
186 {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
187 )
188 end)
189 end
190
191 def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
192
193 def push_to_socket(topics, topic, item) do
194 Enum.each(topics[topic] || [], fn %StreamerSocket{
195 transport_pid: transport_pid,
196 user: socket_user
197 } ->
198 # Get the current user so we have up-to-date blocks etc.
199 if socket_user do
200 user = User.get_cached_by_ap_id(socket_user.ap_id)
201 blocks = user.info.blocks || []
202 mutes = user.info.mutes || []
203
204 with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
205 true <- thread_containment(item, user) do
206 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
207 end
208 else
209 send(transport_pid, {:text, StreamerView.render("update.json", item)})
210 end
211 end)
212 end
213
214 @spec thread_containment(Activity.t(), User.t()) :: boolean()
215 defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
216
217 defp thread_containment(activity, user) do
218 if Config.get([:instance, :skip_thread_containment]) do
219 true
220 else
221 ActivityPub.contain_activity(activity, user)
222 end
223 end
224 end