1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.Streamer.Worker do
10 alias Pleroma.Activity
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Notification
16 alias Pleroma.Web.ActivityPub.ActivityPub
17 alias Pleroma.Web.ActivityPub.Visibility
18 alias Pleroma.Web.CommonAPI
19 alias Pleroma.Web.Streamer.State
20 alias Pleroma.Web.Streamer.StreamerSocket
21 alias Pleroma.Web.StreamerView
24 GenServer.start_link(__MODULE__, %{}, [])
31 def stream(pid, topics, items) do
32 GenServer.call(pid, {:stream, topics, items})
35 def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
36 Enum.each(topics, fn t ->
37 do_stream(%{topic: t, item: item})
40 {:reply, state, state}
43 def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
44 Enum.each(items, fn i ->
45 do_stream(%{topic: topic, item: i})
48 {:reply, state, state}
51 def handle_call({:stream, topic, item}, _from, state) do
52 do_stream(%{topic: topic, item: item})
54 {:reply, state, state}
57 defp do_stream(%{topic: "direct", item: item}) do
59 User.get_recipients_from_activity(item)
60 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
62 Enum.each(recipient_topics, fn user_topic ->
63 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
64 push_to_socket(State.get_sockets(), user_topic, item)
68 defp do_stream(%{topic: "participation", item: participation}) do
69 user_topic = "direct:#{participation.user_id}"
70 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
72 push_to_socket(State.get_sockets(), user_topic, participation)
75 defp do_stream(%{topic: "list", item: item}) do
76 # filter the recipient list if the activity is not public, see #270.
78 case Visibility.is_public?(item) do
80 Pleroma.List.get_lists_from_activity(item)
83 Pleroma.List.get_lists_from_activity(item)
84 |> Enum.filter(fn list ->
85 owner = User.get_cached_by_id(list.user_id)
87 Visibility.visible_for_user?(item, owner)
93 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
95 Enum.each(recipient_topics, fn list_topic ->
96 Logger.debug("Trying to push message to #{list_topic}\n\n")
97 push_to_socket(State.get_sockets(), list_topic, item)
101 defp do_stream(%{topic: topic, item: %Notification{} = item})
102 when topic in ["user", "user:notification"] do
104 |> Map.get("#{topic}:#{item.user_id}", [])
105 |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
106 with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
107 true <- should_send?(user, item) do
108 send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
113 defp do_stream(%{topic: "user", item: item}) do
114 Logger.debug("Trying to push to users")
117 User.get_recipients_from_activity(item)
118 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
120 Enum.each(recipient_topics, fn topic ->
121 push_to_socket(State.get_sockets(), topic, item)
125 defp do_stream(%{topic: topic, item: item}) do
126 Logger.debug("Trying to push to #{topic}")
127 Logger.debug("Pushing item to #{topic}")
128 push_to_socket(State.get_sockets(), topic, item)
131 defp should_send?(%User{} = user, %Activity{} = item) do
132 blocks = user.info.blocks || []
133 mutes = user.info.mutes || []
134 reblog_mutes = user.info.muted_reblogs || []
135 recipient_blocks = MapSet.new(blocks ++ mutes)
136 recipients = MapSet.new(item.recipients)
137 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
139 with parent when not is_nil(parent) <- Object.normalize(item),
140 true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
141 true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
142 true <- MapSet.disjoint?(recipients, recipient_blocks),
143 %{host: item_host} <- URI.parse(item.actor),
144 %{host: parent_host} <- URI.parse(parent.data["actor"]),
145 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
146 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
147 true <- thread_containment(item, user),
148 false <- CommonAPI.thread_muted?(user, item) do
155 defp should_send?(%User{} = user, %Notification{activity: activity}) do
156 should_send?(user, activity)
159 def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
160 Enum.each(topics[topic] || [], fn %StreamerSocket{
161 transport_pid: transport_pid,
164 # Get the current user so we have up-to-date blocks etc.
166 user = User.get_cached_by_ap_id(socket_user.ap_id)
168 if should_send?(user, item) do
169 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
172 send(transport_pid, {:text, StreamerView.render("update.json", item)})
177 def push_to_socket(topics, topic, %Participation{} = participation) do
178 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
179 send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
183 def push_to_socket(topics, topic, %Activity{
184 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
186 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
189 {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
194 def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
196 def push_to_socket(topics, topic, item) do
197 Enum.each(topics[topic] || [], fn %StreamerSocket{
198 transport_pid: transport_pid,
201 # Get the current user so we have up-to-date blocks etc.
203 user = User.get_cached_by_ap_id(socket_user.ap_id)
204 blocks = user.info.blocks || []
205 mutes = user.info.mutes || []
207 with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
208 true <- thread_containment(item, user) do
209 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
212 send(transport_pid, {:text, StreamerView.render("update.json", item)})
217 @spec thread_containment(Activity.t(), User.t()) :: boolean()
218 defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
220 defp thread_containment(activity, user) do
221 if Config.get([:instance, :skip_thread_containment]) do
224 ActivityPub.contain_activity(activity, user)