Merge develop
[akkoma] / lib / pleroma / web / streamer / worker.ex
1 defmodule Pleroma.Web.Streamer.Worker do
2 use GenServer
3
4 require Logger
5
6 alias Pleroma.Activity
7 alias Pleroma.Config
8 alias Pleroma.Conversation.Participation
9 alias Pleroma.Notification
10 alias Pleroma.Object
11 alias Pleroma.User
12 alias Pleroma.Web.ActivityPub.ActivityPub
13 alias Pleroma.Web.ActivityPub.Visibility
14 alias Pleroma.Web.CommonAPI
15 alias Pleroma.Web.Streamer.State
16 alias Pleroma.Web.Streamer.StreamerSocket
17 alias Pleroma.Web.StreamerView
18
19 def start_link(_) do
20 GenServer.start_link(__MODULE__, %{}, [])
21 end
22
23 def init(init_arg) do
24 {:ok, init_arg}
25 end
26
27 def stream(pid, topics, items) do
28 GenServer.call(pid, {:stream, topics, items})
29 end
30
31 def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
32 Enum.each(topics, fn t ->
33 do_stream(%{topic: t, item: item})
34 end)
35
36 {:reply, state, state}
37 end
38
39 def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
40 Enum.each(items, fn i ->
41 do_stream(%{topic: topic, item: i})
42 end)
43
44 {:reply, state, state}
45 end
46
47 def handle_call({:stream, topic, item}, _from, state) do
48 do_stream(%{topic: topic, item: item})
49
50 {:reply, state, state}
51 end
52
53 defp do_stream(%{topic: "direct", item: item}) do
54 recipient_topics =
55 User.get_recipients_from_activity(item)
56 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
57
58 Enum.each(recipient_topics, fn user_topic ->
59 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
60 push_to_socket(State.get_sockets(), user_topic, item)
61 end)
62 end
63
64 defp do_stream(%{topic: "participation", item: participation}) do
65 user_topic = "direct:#{participation.user_id}"
66 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
67
68 push_to_socket(State.get_sockets(), user_topic, participation)
69 end
70
71 defp do_stream(%{topic: "list", item: item}) do
72 # filter the recipient list if the activity is not public, see #270.
73 recipient_lists =
74 case Visibility.is_public?(item) do
75 true ->
76 Pleroma.List.get_lists_from_activity(item)
77
78 _ ->
79 Pleroma.List.get_lists_from_activity(item)
80 |> Enum.filter(fn list ->
81 owner = User.get_cached_by_id(list.user_id)
82
83 Visibility.visible_for_user?(item, owner)
84 end)
85 end
86
87 recipient_topics =
88 recipient_lists
89 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
90
91 Enum.each(recipient_topics, fn list_topic ->
92 Logger.debug("Trying to push message to #{list_topic}\n\n")
93 push_to_socket(State.get_sockets(), list_topic, item)
94 end)
95 end
96
97 defp do_stream(%{topic: topic, item: %Notification{} = item})
98 when topic in ["user", "user:notification"] do
99 State.get_sockets()
100 |> Map.get("#{topic}:#{item.user_id}", [])
101 |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
102 with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
103 true <- should_send?(user, item) do
104 send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
105 end
106 end)
107 end
108
109 defp do_stream(%{topic: "user", item: item}) do
110 Logger.debug("Trying to push to users")
111
112 recipient_topics =
113 User.get_recipients_from_activity(item)
114 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
115
116 Enum.each(recipient_topics, fn topic ->
117 push_to_socket(State.get_sockets(), topic, item)
118 end)
119 end
120
121 defp do_stream(%{topic: topic, item: item}) do
122 Logger.debug("Trying to push to #{topic}")
123 Logger.debug("Pushing item to #{topic}")
124 push_to_socket(State.get_sockets(), topic, item)
125 end
126
127 defp should_send?(%User{} = user, %Activity{} = item) do
128 blocks = user.info.blocks || []
129 mutes = user.info.mutes || []
130 reblog_mutes = user.info.muted_reblogs || []
131 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
132
133 with parent when not is_nil(parent) <- Object.normalize(item),
134 true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
135 true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
136 %{host: item_host} <- URI.parse(item.actor),
137 %{host: parent_host} <- URI.parse(parent.data["actor"]),
138 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
139 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
140 true <- thread_containment(item, user),
141 false <- CommonAPI.thread_muted?(user, item) do
142 true
143 else
144 _ -> false
145 end
146 end
147
148 defp should_send?(%User{} = user, %Notification{activity: activity}) do
149 should_send?(user, activity)
150 end
151
152 def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
153 Enum.each(topics[topic] || [], fn %StreamerSocket{
154 transport_pid: transport_pid,
155 user: socket_user
156 } ->
157 # Get the current user so we have up-to-date blocks etc.
158 if socket_user do
159 user = User.get_cached_by_ap_id(socket_user.ap_id)
160
161 if should_send?(user, item) do
162 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
163 end
164 else
165 send(transport_pid, {:text, StreamerView.render("update.json", item)})
166 end
167 end)
168 end
169
170 def push_to_socket(topics, topic, %Participation{} = participation) do
171 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
172 send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
173 end)
174 end
175
176 def push_to_socket(topics, topic, %Activity{
177 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
178 }) do
179 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
180 send(
181 transport_pid,
182 {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
183 )
184 end)
185 end
186
187 def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
188
189 def push_to_socket(topics, topic, item) do
190 Enum.each(topics[topic] || [], fn %StreamerSocket{
191 transport_pid: transport_pid,
192 user: socket_user
193 } ->
194 # Get the current user so we have up-to-date blocks etc.
195 if socket_user do
196 user = User.get_cached_by_ap_id(socket_user.ap_id)
197 blocks = user.info.blocks || []
198 mutes = user.info.mutes || []
199
200 with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
201 true <- thread_containment(item, user) do
202 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
203 end
204 else
205 send(transport_pid, {:text, StreamerView.render("update.json", item)})
206 end
207 end)
208 end
209
210 @spec thread_containment(Activity.t(), User.t()) :: boolean()
211 defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
212
213 defp thread_containment(activity, user) do
214 if Config.get([:instance, :skip_thread_containment]) do
215 true
216 else
217 ActivityPub.contain_activity(activity, user)
218 end
219 end
220 end