1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.Streamer.Worker do
10 alias Pleroma.Activity
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Notification
16 alias Pleroma.Web.ActivityPub.ActivityPub
17 alias Pleroma.Web.ActivityPub.Visibility
18 alias Pleroma.Web.CommonAPI
19 alias Pleroma.Web.Streamer.State
20 alias Pleroma.Web.Streamer.StreamerSocket
21 alias Pleroma.Web.StreamerView
24 GenServer.start_link(__MODULE__, %{}, [])
31 def stream(pid, topics, items) do
32 GenServer.call(pid, {:stream, topics, items})
35 def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
36 Enum.each(topics, fn t ->
37 do_stream(%{topic: t, item: item})
40 {:reply, state, state}
43 def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
44 Enum.each(items, fn i ->
45 do_stream(%{topic: topic, item: i})
48 {:reply, state, state}
51 def handle_call({:stream, topic, item}, _from, state) do
52 do_stream(%{topic: topic, item: item})
54 {:reply, state, state}
57 defp do_stream(%{topic: "direct", item: item}) do
59 User.get_recipients_from_activity(item)
60 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
62 Enum.each(recipient_topics, fn user_topic ->
63 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
64 push_to_socket(State.get_sockets(), user_topic, item)
68 defp do_stream(%{topic: "participation", item: participation}) do
69 user_topic = "direct:#{participation.user_id}"
70 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
72 push_to_socket(State.get_sockets(), user_topic, participation)
75 defp do_stream(%{topic: "list", item: item}) do
76 # filter the recipient list if the activity is not public, see #270.
78 case Visibility.is_public?(item) do
80 Pleroma.List.get_lists_from_activity(item)
83 Pleroma.List.get_lists_from_activity(item)
84 |> Enum.filter(fn list ->
85 owner = User.get_cached_by_id(list.user_id)
87 Visibility.visible_for_user?(item, owner)
93 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
95 Enum.each(recipient_topics, fn list_topic ->
96 Logger.debug("Trying to push message to #{list_topic}\n\n")
97 push_to_socket(State.get_sockets(), list_topic, item)
101 defp do_stream(%{topic: topic, item: %Notification{} = item})
102 when topic in ["user", "user:notification"] do
104 |> Map.get("#{topic}:#{item.user_id}", [])
105 |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
106 with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
107 true <- should_send?(user, item) do
108 send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
113 defp do_stream(%{topic: "user", item: item}) do
114 Logger.debug("Trying to push to users")
117 User.get_recipients_from_activity(item)
118 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
120 Enum.each(recipient_topics, fn topic ->
121 push_to_socket(State.get_sockets(), topic, item)
125 defp do_stream(%{topic: topic, item: item}) do
126 Logger.debug("Trying to push to #{topic}")
127 Logger.debug("Pushing item to #{topic}")
128 push_to_socket(State.get_sockets(), topic, item)
131 defp should_send?(%User{} = user, %Activity{} = item) do
132 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
133 User.outgoing_relations_ap_ids(user, [:block, :mute, :reblog_mute])
135 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
136 recipients = MapSet.new(item.recipients)
137 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
139 with parent <- Object.normalize(item) || item,
141 Enum.all?([blocked_ap_ids, muted_ap_ids, reblog_muted_ap_ids], &(item.actor not in &1)),
142 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
143 true <- MapSet.disjoint?(recipients, recipient_blocks),
144 %{host: item_host} <- URI.parse(item.actor),
145 %{host: parent_host} <- URI.parse(parent.data["actor"]),
146 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
147 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
148 true <- thread_containment(item, user),
149 false <- CommonAPI.thread_muted?(user, item) do
156 defp should_send?(%User{} = user, %Notification{activity: activity}) do
157 should_send?(user, activity)
160 def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
161 Enum.each(topics[topic] || [], fn %StreamerSocket{
162 transport_pid: transport_pid,
165 # Get the current user so we have up-to-date blocks etc.
167 user = User.get_cached_by_ap_id(socket_user.ap_id)
169 if should_send?(user, item) do
170 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
173 send(transport_pid, {:text, StreamerView.render("update.json", item)})
178 def push_to_socket(topics, topic, %Participation{} = participation) do
179 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
180 send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
184 def push_to_socket(topics, topic, %Activity{
185 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
187 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
190 {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
195 def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
197 def push_to_socket(topics, topic, item) do
198 Enum.each(topics[topic] || [], fn %StreamerSocket{
199 transport_pid: transport_pid,
202 # Get the current user so we have up-to-date blocks etc.
204 user = User.get_cached_by_ap_id(socket_user.ap_id)
206 if should_send?(user, item) do
207 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
210 send(transport_pid, {:text, StreamerView.render("update.json", item)})
215 @spec thread_containment(Activity.t(), User.t()) :: boolean()
216 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
218 defp thread_containment(activity, user) do
219 if Config.get([:instance, :skip_thread_containment]) do
222 ActivityPub.contain_activity(activity, user)