1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.Streamer do
10 alias Pleroma.Conversation.Participation
11 alias Pleroma.Notification
14 alias Pleroma.Web.ActivityPub.ActivityPub
15 alias Pleroma.Web.ActivityPub.Visibility
16 alias Pleroma.Web.CommonAPI
17 alias Pleroma.Web.MastodonAPI.NotificationView
19 @keepalive_interval :timer.seconds(30)
22 GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
25 def add_socket(topic, socket) do
26 GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
29 def remove_socket(topic, socket) do
30 GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
33 def stream(topic, item) do
34 GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
38 Process.send_after(self(), %{action: :ping}, @keepalive_interval)
43 def handle_info(%{action: :ping}, topics) do
47 |> Enum.each(fn socket ->
48 Logger.debug("Sending keepalive ping")
49 send(socket.transport_pid, {:text, ""})
52 Process.send_after(self(), %{action: :ping}, @keepalive_interval)
57 def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
59 User.get_recipients_from_activity(item)
60 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
62 Enum.each(recipient_topics || [], fn user_topic ->
63 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
64 push_to_socket(topics, user_topic, item)
70 def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do
71 user_topic = "direct:#{participation.user_id}"
72 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
74 push_to_socket(topics, user_topic, participation)
79 def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
80 # filter the recipient list if the activity is not public, see #270.
82 case Visibility.is_public?(item) do
84 Pleroma.List.get_lists_from_activity(item)
87 Pleroma.List.get_lists_from_activity(item)
88 |> Enum.filter(fn list ->
89 owner = User.get_cached_by_id(list.user_id)
91 Visibility.visible_for_user?(item, owner)
97 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
99 Enum.each(recipient_topics || [], fn list_topic ->
100 Logger.debug("Trying to push message to #{list_topic}\n\n")
101 push_to_socket(topics, list_topic, item)
108 %{action: :stream, topic: topic, item: %Notification{} = item},
111 when topic in ["user", "user:notification"] do
113 |> Map.get("#{topic}:#{item.user_id}", [])
114 |> Enum.each(fn socket ->
115 with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id),
116 true <- should_send?(user, item) do
118 socket.transport_pid,
119 {:text, represent_notification(socket.assigns[:user], item)}
127 def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
128 Logger.debug("Trying to push to users")
131 User.get_recipients_from_activity(item)
132 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
134 Enum.each(recipient_topics, fn topic ->
135 push_to_socket(topics, topic, item)
141 def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
142 Logger.debug("Trying to push to #{topic}")
143 Logger.debug("Pushing item to #{topic}")
144 push_to_socket(topics, topic, item)
148 def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
149 topic = internal_topic(topic, socket)
150 sockets_for_topic = sockets[topic] || []
151 sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
152 sockets = Map.put(sockets, topic, sockets_for_topic)
153 Logger.debug("Got new conn for #{topic}")
157 def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
158 topic = internal_topic(topic, socket)
159 sockets_for_topic = sockets[topic] || []
160 sockets_for_topic = List.delete(sockets_for_topic, socket)
161 sockets = Map.put(sockets, topic, sockets_for_topic)
162 Logger.debug("Removed conn for #{topic}")
166 def handle_cast(m, state) do
167 Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
171 defp represent_update(%Activity{} = activity, %User{} = user) do
175 Pleroma.Web.MastodonAPI.StatusView.render(
185 defp represent_update(%Activity{} = activity) do
189 Pleroma.Web.MastodonAPI.StatusView.render(
198 def represent_conversation(%Participation{} = participation) do
200 event: "conversation",
202 Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
203 participation: participation,
204 for: participation.user
211 @spec represent_notification(User.t(), Notification.t()) :: binary()
212 defp represent_notification(%User{} = user, %Notification{} = notify) do
214 event: "notification",
216 NotificationView.render(
218 %{notification: notify, for: user}
225 defp should_send?(%User{} = user, %Activity{} = item) do
226 blocks = user.info.blocks || []
227 mutes = user.info.mutes || []
228 reblog_mutes = user.info.muted_reblogs || []
229 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
231 with parent when not is_nil(parent) <- Object.normalize(item),
232 true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
233 true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
234 %{host: item_host} <- URI.parse(item.actor),
235 %{host: parent_host} <- URI.parse(parent.data["actor"]),
236 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
237 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
238 true <- thread_containment(item, user),
239 false <- CommonAPI.thread_muted?(user, item) do
246 defp should_send?(%User{} = user, %Notification{activity: activity}) do
247 should_send?(user, activity)
250 def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
251 Enum.each(topics[topic] || [], fn socket ->
252 # Get the current user so we have up-to-date blocks etc.
253 if socket.assigns[:user] do
254 user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
256 if should_send?(user, item) do
257 send(socket.transport_pid, {:text, represent_update(item, user)})
260 send(socket.transport_pid, {:text, represent_update(item)})
265 def push_to_socket(topics, topic, %Participation{} = participation) do
266 Enum.each(topics[topic] || [], fn socket ->
267 send(socket.transport_pid, {:text, represent_conversation(participation)})
271 def push_to_socket(topics, topic, %Activity{
272 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
274 Enum.each(topics[topic] || [], fn socket ->
276 socket.transport_pid,
277 {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
282 def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
284 def push_to_socket(topics, topic, item) do
285 Enum.each(topics[topic] || [], fn socket ->
286 # Get the current user so we have up-to-date blocks etc.
287 if socket.assigns[:user] do
288 user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
289 blocks = user.info.blocks || []
290 mutes = user.info.mutes || []
292 with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
293 true <- thread_containment(item, user) do
294 send(socket.transport_pid, {:text, represent_update(item, user)})
297 send(socket.transport_pid, {:text, represent_update(item)})
302 defp internal_topic(topic, socket) when topic in ~w[user user:notification direct] do
303 "#{topic}:#{socket.assigns[:user].id}"
306 defp internal_topic(topic, _), do: topic
308 @spec thread_containment(Activity.t(), User.t()) :: boolean()
309 defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
311 defp thread_containment(activity, user) do
312 if Config.get([:instance, :skip_thread_containment]) do
315 ActivityPub.contain_activity(activity, user)