1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.Streamer.Worker do
10 alias Pleroma.Activity
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Notification
16 alias Pleroma.Web.ActivityPub.ActivityPub
17 alias Pleroma.Web.ActivityPub.Visibility
18 alias Pleroma.Web.CommonAPI
19 alias Pleroma.Web.Streamer.State
20 alias Pleroma.Web.Streamer.StreamerSocket
21 alias Pleroma.Web.StreamerView
24 GenServer.start_link(__MODULE__, %{}, [])
31 def stream(pid, topics, items) do
32 GenServer.call(pid, {:stream, topics, items})
35 def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
36 Enum.each(topics, fn t ->
37 do_stream(%{topic: t, item: item})
40 {:reply, state, state}
43 def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
44 Enum.each(items, fn i ->
45 do_stream(%{topic: topic, item: i})
48 {:reply, state, state}
51 def handle_call({:stream, topic, item}, _from, state) do
52 do_stream(%{topic: topic, item: item})
54 {:reply, state, state}
57 defp do_stream(%{topic: "direct", item: item}) do
59 User.get_recipients_from_activity(item)
60 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
62 Enum.each(recipient_topics, fn user_topic ->
63 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
64 push_to_socket(State.get_sockets(), user_topic, item)
68 defp do_stream(%{topic: "participation", item: participation}) do
69 user_topic = "direct:#{participation.user_id}"
70 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
72 push_to_socket(State.get_sockets(), user_topic, participation)
75 defp do_stream(%{topic: "list", item: item}) do
76 # filter the recipient list if the activity is not public, see #270.
78 case Visibility.is_public?(item) do
80 Pleroma.List.get_lists_from_activity(item)
83 Pleroma.List.get_lists_from_activity(item)
84 |> Enum.filter(fn list ->
85 owner = User.get_cached_by_id(list.user_id)
87 Visibility.visible_for_user?(item, owner)
93 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
95 Enum.each(recipient_topics, fn list_topic ->
96 Logger.debug("Trying to push message to #{list_topic}\n\n")
97 push_to_socket(State.get_sockets(), list_topic, item)
101 defp do_stream(%{topic: topic, item: %Notification{} = item})
102 when topic in ["user", "user:notification"] do
104 |> Map.get("#{topic}:#{item.user_id}", [])
105 |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
106 with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
107 true <- should_send?(user, item) do
108 send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
113 defp do_stream(%{topic: "user", item: item}) do
114 Logger.debug("Trying to push to users")
117 User.get_recipients_from_activity(item)
118 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
120 Enum.each(recipient_topics, fn topic ->
121 push_to_socket(State.get_sockets(), topic, item)
125 defp do_stream(%{topic: topic, item: item}) do
126 Logger.debug("Trying to push to #{topic}")
127 Logger.debug("Pushing item to #{topic}")
128 push_to_socket(State.get_sockets(), topic, item)
131 defp should_send?(%User{} = user, %Activity{} = item) do
132 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
133 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
135 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
136 recipients = MapSet.new(item.recipients)
137 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
139 with parent <- Object.normalize(item) || item,
141 Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
142 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
143 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
144 true <- MapSet.disjoint?(recipients, recipient_blocks),
145 %{host: item_host} <- URI.parse(item.actor),
146 %{host: parent_host} <- URI.parse(parent.data["actor"]),
147 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
148 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
149 true <- thread_containment(item, user),
150 false <- CommonAPI.thread_muted?(user, item) do
157 defp should_send?(%User{} = user, %Notification{activity: activity}) do
158 should_send?(user, activity)
161 def push_to_socket(topics, topic, %Participation{} = participation) do
162 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
163 send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
167 def push_to_socket(topics, topic, %Activity{
168 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
170 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
173 {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
178 def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
180 def push_to_socket(topics, topic, item) do
181 Enum.each(topics[topic] || [], fn %StreamerSocket{
182 transport_pid: transport_pid,
185 # Get the current user so we have up-to-date blocks etc.
187 user = User.get_cached_by_ap_id(socket_user.ap_id)
189 if should_send?(user, item) do
190 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
193 send(transport_pid, {:text, StreamerView.render("update.json", item)})
198 @spec thread_containment(Activity.t(), User.t()) :: boolean()
199 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
201 defp thread_containment(activity, user) do
202 if Config.get([:instance, :skip_thread_containment]) do
205 ActivityPub.contain_activity(activity, user)