Merge branch 'hotfix/delete-activities' into 'develop'
[akkoma] / lib / pleroma / web / streamer.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer do
6 use GenServer
7 require Logger
8 alias Pleroma.Activity
9 alias Pleroma.Notification
10 alias Pleroma.Object
11 alias Pleroma.Repo
12 alias Pleroma.User
13 alias Pleroma.Web.ActivityPub.Visibility
14
15 @keepalive_interval :timer.seconds(30)
16
17 def start_link do
18 GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
19 end
20
21 def add_socket(topic, socket) do
22 GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
23 end
24
25 def remove_socket(topic, socket) do
26 GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
27 end
28
29 def stream(topic, item) do
30 GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
31 end
32
33 def init(args) do
34 spawn(fn ->
35 # 30 seconds
36 Process.sleep(@keepalive_interval)
37 GenServer.cast(__MODULE__, %{action: :ping})
38 end)
39
40 {:ok, args}
41 end
42
43 def handle_cast(%{action: :ping}, topics) do
44 Map.values(topics)
45 |> List.flatten()
46 |> Enum.each(fn socket ->
47 Logger.debug("Sending keepalive ping")
48 send(socket.transport_pid, {:text, ""})
49 end)
50
51 spawn(fn ->
52 # 30 seconds
53 Process.sleep(@keepalive_interval)
54 GenServer.cast(__MODULE__, %{action: :ping})
55 end)
56
57 {:noreply, topics}
58 end
59
60 def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
61 recipient_topics =
62 User.get_recipients_from_activity(item)
63 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
64
65 Enum.each(recipient_topics || [], fn user_topic ->
66 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
67 push_to_socket(topics, user_topic, item)
68 end)
69
70 {:noreply, topics}
71 end
72
73 def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
74 # filter the recipient list if the activity is not public, see #270.
75 recipient_lists =
76 case Visibility.is_public?(item) do
77 true ->
78 Pleroma.List.get_lists_from_activity(item)
79
80 _ ->
81 Pleroma.List.get_lists_from_activity(item)
82 |> Enum.filter(fn list ->
83 owner = Repo.get(User, list.user_id)
84
85 Visibility.visible_for_user?(item, owner)
86 end)
87 end
88
89 recipient_topics =
90 recipient_lists
91 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
92
93 Enum.each(recipient_topics || [], fn list_topic ->
94 Logger.debug("Trying to push message to #{list_topic}\n\n")
95 push_to_socket(topics, list_topic, item)
96 end)
97
98 {:noreply, topics}
99 end
100
101 def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
102 topic = "user:#{item.user_id}"
103
104 Enum.each(topics[topic] || [], fn socket ->
105 json =
106 %{
107 event: "notification",
108 payload:
109 Pleroma.Web.MastodonAPI.MastodonAPIController.render_notification(
110 socket.assigns["user"],
111 item
112 )
113 |> Jason.encode!()
114 }
115 |> Jason.encode!()
116
117 send(socket.transport_pid, {:text, json})
118 end)
119
120 {:noreply, topics}
121 end
122
123 def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
124 Logger.debug("Trying to push to users")
125
126 recipient_topics =
127 User.get_recipients_from_activity(item)
128 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
129
130 Enum.each(recipient_topics, fn topic ->
131 push_to_socket(topics, topic, item)
132 end)
133
134 {:noreply, topics}
135 end
136
137 def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
138 Logger.debug("Trying to push to #{topic}")
139 Logger.debug("Pushing item to #{topic}")
140 push_to_socket(topics, topic, item)
141 {:noreply, topics}
142 end
143
144 def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
145 topic = internal_topic(topic, socket)
146 sockets_for_topic = sockets[topic] || []
147 sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
148 sockets = Map.put(sockets, topic, sockets_for_topic)
149 Logger.debug("Got new conn for #{topic}")
150 {:noreply, sockets}
151 end
152
153 def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
154 topic = internal_topic(topic, socket)
155 sockets_for_topic = sockets[topic] || []
156 sockets_for_topic = List.delete(sockets_for_topic, socket)
157 sockets = Map.put(sockets, topic, sockets_for_topic)
158 Logger.debug("Removed conn for #{topic}")
159 {:noreply, sockets}
160 end
161
162 def handle_cast(m, state) do
163 Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
164 {:noreply, state}
165 end
166
167 defp represent_update(%Activity{} = activity, %User{} = user) do
168 %{
169 event: "update",
170 payload:
171 Pleroma.Web.MastodonAPI.StatusView.render(
172 "status.json",
173 activity: activity,
174 for: user
175 )
176 |> Jason.encode!()
177 }
178 |> Jason.encode!()
179 end
180
181 defp represent_update(%Activity{} = activity) do
182 %{
183 event: "update",
184 payload:
185 Pleroma.Web.MastodonAPI.StatusView.render(
186 "status.json",
187 activity: activity
188 )
189 |> Jason.encode!()
190 }
191 |> Jason.encode!()
192 end
193
194 def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
195 Enum.each(topics[topic] || [], fn socket ->
196 # Get the current user so we have up-to-date blocks etc.
197 if socket.assigns[:user] do
198 user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
199 blocks = user.info.blocks || []
200 mutes = user.info.mutes || []
201
202 parent = Object.normalize(item.data["object"])
203
204 unless is_nil(parent) or item.actor in blocks or item.actor in mutes or
205 parent.data["actor"] in blocks or parent.data["actor"] in mutes do
206 send(socket.transport_pid, {:text, represent_update(item, user)})
207 end
208 else
209 send(socket.transport_pid, {:text, represent_update(item)})
210 end
211 end)
212 end
213
214 def push_to_socket(topics, topic, %Activity{
215 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
216 }) do
217 Enum.each(topics[topic] || [], fn socket ->
218 send(
219 socket.transport_pid,
220 {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
221 )
222 end)
223 end
224
225 def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
226
227 def push_to_socket(topics, topic, item) do
228 Enum.each(topics[topic] || [], fn socket ->
229 # Get the current user so we have up-to-date blocks etc.
230 if socket.assigns[:user] do
231 user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
232 blocks = user.info.blocks || []
233 mutes = user.info.mutes || []
234
235 unless item.actor in blocks or item.actor in mutes do
236 send(socket.transport_pid, {:text, represent_update(item, user)})
237 end
238 else
239 send(socket.transport_pid, {:text, represent_update(item)})
240 end
241 end)
242 end
243
244 defp internal_topic(topic, socket) when topic in ~w[user direct] do
245 "#{topic}:#{socket.assigns[:user].id}"
246 end
247
248 defp internal_topic(topic, _), do: topic
249 end