1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.Streamer.Worker do
10 alias Pleroma.Activity
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Notification
16 alias Pleroma.Web.ActivityPub.ActivityPub
17 alias Pleroma.Web.ActivityPub.Visibility
18 alias Pleroma.Web.CommonAPI
19 alias Pleroma.Web.Streamer.State
20 alias Pleroma.Web.Streamer.StreamerSocket
21 alias Pleroma.Web.StreamerView
24 GenServer.start_link(__MODULE__, %{}, [])
31 def stream(pid, topics, items) do
32 GenServer.call(pid, {:stream, topics, items})
35 def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
36 Enum.each(topics, fn t ->
37 do_stream(%{topic: t, item: item})
40 {:reply, state, state}
43 def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
44 Enum.each(items, fn i ->
45 do_stream(%{topic: topic, item: i})
48 {:reply, state, state}
51 def handle_call({:stream, topic, item}, _from, state) do
52 do_stream(%{topic: topic, item: item})
54 {:reply, state, state}
57 defp do_stream(%{topic: "direct", item: item}) do
59 User.get_recipients_from_activity(item)
60 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
62 Enum.each(recipient_topics, fn user_topic ->
63 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
64 push_to_socket(State.get_sockets(), user_topic, item)
68 defp do_stream(%{topic: "participation", item: participation}) do
69 user_topic = "direct:#{participation.user_id}"
70 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
72 push_to_socket(State.get_sockets(), user_topic, participation)
75 defp do_stream(%{topic: "list", item: item}) do
76 # filter the recipient list if the activity is not public, see #270.
78 case Visibility.is_public?(item) do
80 Pleroma.List.get_lists_from_activity(item)
83 Pleroma.List.get_lists_from_activity(item)
84 |> Enum.filter(fn list ->
85 owner = User.get_cached_by_id(list.user_id)
87 Visibility.visible_for_user?(item, owner)
93 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
95 Enum.each(recipient_topics, fn list_topic ->
96 Logger.debug("Trying to push message to #{list_topic}\n\n")
97 push_to_socket(State.get_sockets(), list_topic, item)
101 defp do_stream(%{topic: topic, item: %Notification{} = item})
102 when topic in ["user", "user:notification"] do
104 |> Map.get("#{topic}:#{item.user_id}", [])
105 |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
106 with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
107 true <- should_send?(user, item) do
108 send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
113 defp do_stream(%{topic: "user", item: item}) do
114 Logger.debug("Trying to push to users")
117 User.get_recipients_from_activity(item)
118 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
120 Enum.each(recipient_topics, fn topic ->
121 push_to_socket(State.get_sockets(), topic, item)
125 defp do_stream(%{topic: topic, item: item}) do
126 Logger.debug("Trying to push to #{topic}")
127 Logger.debug("Pushing item to #{topic}")
128 push_to_socket(State.get_sockets(), topic, item)
131 defp should_send?(%User{} = user, %Activity{} = item) do
132 blocks = user.info.blocks || []
133 mutes = user.info.mutes || []
134 reblog_mutes = user.info.muted_reblogs || []
135 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
137 with parent when not is_nil(parent) <- Object.normalize(item),
138 true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
139 true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
140 %{host: item_host} <- URI.parse(item.actor),
141 %{host: parent_host} <- URI.parse(parent.data["actor"]),
142 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
143 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
144 true <- thread_containment(item, user),
145 false <- CommonAPI.thread_muted?(user, item) do
152 defp should_send?(%User{} = user, %Notification{activity: activity}) do
153 should_send?(user, activity)
156 def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
157 Enum.each(topics[topic] || [], fn %StreamerSocket{
158 transport_pid: transport_pid,
161 # Get the current user so we have up-to-date blocks etc.
163 user = User.get_cached_by_ap_id(socket_user.ap_id)
165 if should_send?(user, item) do
166 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
169 send(transport_pid, {:text, StreamerView.render("update.json", item)})
174 def push_to_socket(topics, topic, %Participation{} = participation) do
175 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
176 send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
180 def push_to_socket(topics, topic, %Activity{
181 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
183 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
186 {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
191 def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
193 def push_to_socket(topics, topic, item) do
194 Enum.each(topics[topic] || [], fn %StreamerSocket{
195 transport_pid: transport_pid,
198 # Get the current user so we have up-to-date blocks etc.
200 user = User.get_cached_by_ap_id(socket_user.ap_id)
201 blocks = user.info.blocks || []
202 mutes = user.info.mutes || []
204 with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
205 true <- thread_containment(item, user) do
206 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
209 send(transport_pid, {:text, StreamerView.render("update.json", item)})
214 @spec thread_containment(Activity.t(), User.t()) :: boolean()
215 defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
217 defp thread_containment(activity, user) do
218 if Config.get([:instance, :skip_thread_containment]) do
221 ActivityPub.contain_activity(activity, user)