Merge branch 'develop' into issue/1411
[akkoma] / lib / pleroma / web / streamer / worker.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer.Worker do
6 use GenServer
7
8 require Logger
9
10 alias Pleroma.Activity
11 alias Pleroma.Config
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Notification
14 alias Pleroma.Object
15 alias Pleroma.User
16 alias Pleroma.Web.ActivityPub.ActivityPub
17 alias Pleroma.Web.ActivityPub.Visibility
18 alias Pleroma.Web.CommonAPI
19 alias Pleroma.Web.Streamer.State
20 alias Pleroma.Web.Streamer.StreamerSocket
21 alias Pleroma.Web.StreamerView
22
23 def start_link(_) do
24 GenServer.start_link(__MODULE__, %{}, [])
25 end
26
27 def init(init_arg) do
28 {:ok, init_arg}
29 end
30
31 def stream(pid, topics, items) do
32 GenServer.call(pid, {:stream, topics, items})
33 end
34
35 def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
36 Enum.each(topics, fn t ->
37 do_stream(%{topic: t, item: item})
38 end)
39
40 {:reply, state, state}
41 end
42
43 def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
44 Enum.each(items, fn i ->
45 do_stream(%{topic: topic, item: i})
46 end)
47
48 {:reply, state, state}
49 end
50
51 def handle_call({:stream, topic, item}, _from, state) do
52 do_stream(%{topic: topic, item: item})
53
54 {:reply, state, state}
55 end
56
57 defp do_stream(%{topic: "direct", item: item}) do
58 recipient_topics =
59 User.get_recipients_from_activity(item)
60 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
61
62 Enum.each(recipient_topics, fn user_topic ->
63 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
64 push_to_socket(State.get_sockets(), user_topic, item)
65 end)
66 end
67
68 defp do_stream(%{topic: "participation", item: participation}) do
69 user_topic = "direct:#{participation.user_id}"
70 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
71
72 push_to_socket(State.get_sockets(), user_topic, participation)
73 end
74
75 defp do_stream(%{topic: "list", item: item}) do
76 # filter the recipient list if the activity is not public, see #270.
77 recipient_lists =
78 case Visibility.is_public?(item) do
79 true ->
80 Pleroma.List.get_lists_from_activity(item)
81
82 _ ->
83 Pleroma.List.get_lists_from_activity(item)
84 |> Enum.filter(fn list ->
85 owner = User.get_cached_by_id(list.user_id)
86
87 Visibility.visible_for_user?(item, owner)
88 end)
89 end
90
91 recipient_topics =
92 recipient_lists
93 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
94
95 Enum.each(recipient_topics, fn list_topic ->
96 Logger.debug("Trying to push message to #{list_topic}\n\n")
97 push_to_socket(State.get_sockets(), list_topic, item)
98 end)
99 end
100
101 defp do_stream(%{topic: topic, item: %Notification{} = item})
102 when topic in ["user", "user:notification"] do
103 State.get_sockets()
104 |> Map.get("#{topic}:#{item.user_id}", [])
105 |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
106 with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
107 true <- should_send?(user, item) do
108 send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
109 end
110 end)
111 end
112
113 defp do_stream(%{topic: "user", item: item}) do
114 Logger.debug("Trying to push to users")
115
116 recipient_topics =
117 User.get_recipients_from_activity(item)
118 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
119
120 Enum.each(recipient_topics, fn topic ->
121 push_to_socket(State.get_sockets(), topic, item)
122 end)
123 end
124
125 defp do_stream(%{topic: topic, item: item}) do
126 Logger.debug("Trying to push to #{topic}")
127 Logger.debug("Pushing item to #{topic}")
128 push_to_socket(State.get_sockets(), topic, item)
129 end
130
131 defp should_send?(%User{} = user, %Activity{} = item) do
132 blocks = user.blocks || []
133 mutes = user.mutes || []
134 reblog_mutes = user.muted_reblogs || []
135 recipient_blocks = MapSet.new(blocks ++ mutes)
136 recipients = MapSet.new(item.recipients)
137 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
138
139 with parent <- Object.normalize(item) || item,
140 true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
141 true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
142 true <- MapSet.disjoint?(recipients, recipient_blocks),
143 %{host: item_host} <- URI.parse(item.actor),
144 %{host: parent_host} <- URI.parse(parent.data["actor"]),
145 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
146 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
147 true <- thread_containment(item, user),
148 false <- CommonAPI.thread_muted?(user, item) do
149 true
150 else
151 _ -> false
152 end
153 end
154
155 defp should_send?(%User{} = user, %Notification{activity: activity}) do
156 should_send?(user, activity)
157 end
158
159 def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
160 Enum.each(topics[topic] || [], fn %StreamerSocket{
161 transport_pid: transport_pid,
162 user: socket_user
163 } ->
164 # Get the current user so we have up-to-date blocks etc.
165 if socket_user do
166 user = User.get_cached_by_ap_id(socket_user.ap_id)
167
168 if should_send?(user, item) do
169 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
170 end
171 else
172 send(transport_pid, {:text, StreamerView.render("update.json", item)})
173 end
174 end)
175 end
176
177 def push_to_socket(topics, topic, %Participation{} = participation) do
178 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
179 send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
180 end)
181 end
182
183 def push_to_socket(topics, topic, %Activity{
184 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
185 }) do
186 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
187 send(
188 transport_pid,
189 {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
190 )
191 end)
192 end
193
194 def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
195
196 def push_to_socket(topics, topic, item) do
197 Enum.each(topics[topic] || [], fn %StreamerSocket{
198 transport_pid: transport_pid,
199 user: socket_user
200 } ->
201 # Get the current user so we have up-to-date blocks etc.
202 if socket_user do
203 user = User.get_cached_by_ap_id(socket_user.ap_id)
204
205 if should_send?(user, item) do
206 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
207 end
208 else
209 send(transport_pid, {:text, StreamerView.render("update.json", item)})
210 end
211 end)
212 end
213
214 @spec thread_containment(Activity.t(), User.t()) :: boolean()
215 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
216
217 defp thread_containment(activity, user) do
218 if Config.get([:instance, :skip_thread_containment]) do
219 true
220 else
221 ActivityPub.contain_activity(activity, user)
222 end
223 end
224 end