1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.Streamer.Worker do
10 alias Pleroma.Activity
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Notification
16 alias Pleroma.Web.ActivityPub.ActivityPub
17 alias Pleroma.Web.ActivityPub.Visibility
18 alias Pleroma.Web.CommonAPI
19 alias Pleroma.Web.Streamer.State
20 alias Pleroma.Web.Streamer.StreamerSocket
21 alias Pleroma.Web.StreamerView
24 GenServer.start_link(__MODULE__, %{}, [])
31 def stream(pid, topics, items) do
32 GenServer.call(pid, {:stream, topics, items})
35 def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
36 Enum.each(topics, fn t ->
37 do_stream(%{topic: t, item: item})
40 {:reply, state, state}
43 def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
44 Enum.each(items, fn i ->
45 do_stream(%{topic: topic, item: i})
48 {:reply, state, state}
51 def handle_call({:stream, topic, item}, _from, state) do
52 do_stream(%{topic: topic, item: item})
54 {:reply, state, state}
57 defp do_stream(%{topic: "direct", item: item}) do
59 User.get_recipients_from_activity(item)
60 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
62 Enum.each(recipient_topics, fn user_topic ->
63 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
64 push_to_socket(State.get_sockets(), user_topic, item)
68 defp do_stream(%{topic: "participation", item: participation}) do
69 user_topic = "direct:#{participation.user_id}"
70 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
72 push_to_socket(State.get_sockets(), user_topic, participation)
75 defp do_stream(%{topic: "list", item: item}) do
76 # filter the recipient list if the activity is not public, see #270.
78 case Visibility.is_public?(item) do
80 Pleroma.List.get_lists_from_activity(item)
83 Pleroma.List.get_lists_from_activity(item)
84 |> Enum.filter(fn list ->
85 owner = User.get_cached_by_id(list.user_id)
87 Visibility.visible_for_user?(item, owner)
93 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
95 Enum.each(recipient_topics, fn list_topic ->
96 Logger.debug("Trying to push message to #{list_topic}\n\n")
97 push_to_socket(State.get_sockets(), list_topic, item)
101 defp do_stream(%{topic: topic, item: %Notification{} = item})
102 when topic in ["user", "user:notification"] do
104 |> Map.get("#{topic}:#{item.user_id}", [])
105 |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
106 with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
107 true <- should_send?(user, item) do
108 send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
113 defp do_stream(%{topic: "user", item: item}) do
114 Logger.debug("Trying to push to users")
117 User.get_recipients_from_activity(item)
118 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
120 Enum.each(recipient_topics, fn topic ->
121 push_to_socket(State.get_sockets(), topic, item)
125 defp do_stream(%{topic: topic, item: item}) do
126 Logger.debug("Trying to push to #{topic}")
127 Logger.debug("Pushing item to #{topic}")
128 push_to_socket(State.get_sockets(), topic, item)
131 defp should_send?(%User{} = user, %Activity{} = item) do
132 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
133 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
135 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
136 recipients = MapSet.new(item.recipients)
137 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
139 with parent <- Object.normalize(item) || item,
141 Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
142 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
143 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
144 true <- MapSet.disjoint?(recipients, recipient_blocks),
145 %{host: item_host} <- URI.parse(item.actor),
146 %{host: parent_host} <- URI.parse(parent.data["actor"]),
147 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
148 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
149 true <- thread_containment(item, user),
150 false <- CommonAPI.thread_muted?(user, item) do
157 defp should_send?(%User{} = user, %Notification{activity: activity}) do
158 should_send?(user, activity)
161 def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
162 Enum.each(topics[topic] || [], fn %StreamerSocket{
163 transport_pid: transport_pid,
166 # Get the current user so we have up-to-date blocks etc.
168 user = User.get_cached_by_ap_id(socket_user.ap_id)
170 if should_send?(user, item) do
171 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
174 send(transport_pid, {:text, StreamerView.render("update.json", item)})
179 def push_to_socket(topics, topic, %Participation{} = participation) do
180 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
181 send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
185 def push_to_socket(topics, topic, %Activity{
186 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
188 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
191 {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
196 def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
198 def push_to_socket(topics, topic, item) do
199 Enum.each(topics[topic] || [], fn %StreamerSocket{
200 transport_pid: transport_pid,
203 # Get the current user so we have up-to-date blocks etc.
205 user = User.get_cached_by_ap_id(socket_user.ap_id)
207 if should_send?(user, item) do
208 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
211 send(transport_pid, {:text, StreamerView.render("update.json", item)})
216 @spec thread_containment(Activity.t(), User.t()) :: boolean()
217 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
219 defp thread_containment(activity, user) do
220 if Config.get([:instance, :skip_thread_containment]) do
223 ActivityPub.contain_activity(activity, user)