45f9c7c5328eabb5cfbcd6dbe4069bd95247af48
[akkoma] / lib / pleroma / web / streamer / worker.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer.Worker do
6 use GenServer
7
8 require Logger
9
10 alias Pleroma.Activity
11 alias Pleroma.Config
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Notification
14 alias Pleroma.Object
15 alias Pleroma.User
16 alias Pleroma.Web.ActivityPub.ActivityPub
17 alias Pleroma.Web.ActivityPub.Visibility
18 alias Pleroma.Web.CommonAPI
19 alias Pleroma.Web.Streamer.State
20 alias Pleroma.Web.Streamer.StreamerSocket
21 alias Pleroma.Web.StreamerView
22
23 def start_link(_) do
24 GenServer.start_link(__MODULE__, %{}, [])
25 end
26
27 def init(init_arg) do
28 {:ok, init_arg}
29 end
30
31 def stream(pid, topics, items) do
32 GenServer.call(pid, {:stream, topics, items})
33 end
34
35 def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
36 Enum.each(topics, fn t ->
37 do_stream(%{topic: t, item: item})
38 end)
39
40 {:reply, state, state}
41 end
42
43 def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
44 Enum.each(items, fn i ->
45 do_stream(%{topic: topic, item: i})
46 end)
47
48 {:reply, state, state}
49 end
50
51 def handle_call({:stream, topic, item}, _from, state) do
52 do_stream(%{topic: topic, item: item})
53
54 {:reply, state, state}
55 end
56
57 defp do_stream(%{topic: "direct", item: item}) do
58 recipient_topics =
59 User.get_recipients_from_activity(item)
60 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
61
62 Enum.each(recipient_topics, fn user_topic ->
63 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
64 push_to_socket(State.get_sockets(), user_topic, item)
65 end)
66 end
67
68 defp do_stream(%{topic: "participation", item: participation}) do
69 user_topic = "direct:#{participation.user_id}"
70 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
71
72 push_to_socket(State.get_sockets(), user_topic, participation)
73 end
74
75 defp do_stream(%{topic: "list", item: item}) do
76 # filter the recipient list if the activity is not public, see #270.
77 recipient_lists =
78 case Visibility.is_public?(item) do
79 true ->
80 Pleroma.List.get_lists_from_activity(item)
81
82 _ ->
83 Pleroma.List.get_lists_from_activity(item)
84 |> Enum.filter(fn list ->
85 owner = User.get_cached_by_id(list.user_id)
86
87 Visibility.visible_for_user?(item, owner)
88 end)
89 end
90
91 recipient_topics =
92 recipient_lists
93 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
94
95 Enum.each(recipient_topics, fn list_topic ->
96 Logger.debug("Trying to push message to #{list_topic}\n\n")
97 push_to_socket(State.get_sockets(), list_topic, item)
98 end)
99 end
100
101 defp do_stream(%{topic: topic, item: %Notification{} = item})
102 when topic in ["user", "user:notification"] do
103 State.get_sockets()
104 |> Map.get("#{topic}:#{item.user_id}", [])
105 |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
106 with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
107 true <- should_send?(user, item) do
108 send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
109 end
110 end)
111 end
112
113 defp do_stream(%{topic: "user", item: item}) do
114 Logger.debug("Trying to push to users")
115
116 recipient_topics =
117 User.get_recipients_from_activity(item)
118 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
119
120 Enum.each(recipient_topics, fn topic ->
121 push_to_socket(State.get_sockets(), topic, item)
122 end)
123 end
124
125 defp do_stream(%{topic: topic, item: item}) do
126 Logger.debug("Trying to push to #{topic}")
127 Logger.debug("Pushing item to #{topic}")
128 push_to_socket(State.get_sockets(), topic, item)
129 end
130
131 defp should_send?(%User{} = user, %Activity{} = item) do
132 {:ok, [blocks, mutes]} = User.related_ap_ids(user, [:blocked_users, :muted_users])
133 reblog_mutes = user.muted_reblogs || []
134 recipient_blocks = MapSet.new(blocks ++ mutes)
135 recipients = MapSet.new(item.recipients)
136 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
137
138 with parent <- Object.normalize(item) || item,
139 true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
140 true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
141 true <- MapSet.disjoint?(recipients, recipient_blocks),
142 %{host: item_host} <- URI.parse(item.actor),
143 %{host: parent_host} <- URI.parse(parent.data["actor"]),
144 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
145 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
146 true <- thread_containment(item, user),
147 false <- CommonAPI.thread_muted?(user, item) do
148 true
149 else
150 _ -> false
151 end
152 end
153
154 defp should_send?(%User{} = user, %Notification{activity: activity}) do
155 should_send?(user, activity)
156 end
157
158 def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
159 Enum.each(topics[topic] || [], fn %StreamerSocket{
160 transport_pid: transport_pid,
161 user: socket_user
162 } ->
163 # Get the current user so we have up-to-date blocks etc.
164 if socket_user do
165 user = User.get_cached_by_ap_id(socket_user.ap_id)
166
167 if should_send?(user, item) do
168 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
169 end
170 else
171 send(transport_pid, {:text, StreamerView.render("update.json", item)})
172 end
173 end)
174 end
175
176 def push_to_socket(topics, topic, %Participation{} = participation) do
177 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
178 send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
179 end)
180 end
181
182 def push_to_socket(topics, topic, %Activity{
183 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
184 }) do
185 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
186 send(
187 transport_pid,
188 {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
189 )
190 end)
191 end
192
193 def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
194
195 def push_to_socket(topics, topic, item) do
196 Enum.each(topics[topic] || [], fn %StreamerSocket{
197 transport_pid: transport_pid,
198 user: socket_user
199 } ->
200 # Get the current user so we have up-to-date blocks etc.
201 if socket_user do
202 user = User.get_cached_by_ap_id(socket_user.ap_id)
203
204 if should_send?(user, item) do
205 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
206 end
207 else
208 send(transport_pid, {:text, StreamerView.render("update.json", item)})
209 end
210 end)
211 end
212
213 @spec thread_containment(Activity.t(), User.t()) :: boolean()
214 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
215
216 defp thread_containment(activity, user) do
217 if Config.get([:instance, :skip_thread_containment]) do
218 true
219 else
220 ActivityPub.contain_activity(activity, user)
221 end
222 end
223 end