1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.Streamer.Worker do
10 alias Pleroma.Activity
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Notification
16 alias Pleroma.Web.ActivityPub.ActivityPub
17 alias Pleroma.Web.ActivityPub.Visibility
18 alias Pleroma.Web.CommonAPI
19 alias Pleroma.Web.Streamer.State
20 alias Pleroma.Web.Streamer.StreamerSocket
21 alias Pleroma.Web.StreamerView
24 GenServer.start_link(__MODULE__, %{}, [])
31 def stream(pid, topics, items) do
32 GenServer.call(pid, {:stream, topics, items})
35 def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
36 Enum.each(topics, fn t ->
37 do_stream(%{topic: t, item: item})
40 {:reply, state, state}
43 def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
44 Enum.each(items, fn i ->
45 do_stream(%{topic: topic, item: i})
48 {:reply, state, state}
51 def handle_call({:stream, topic, item}, _from, state) do
52 do_stream(%{topic: topic, item: item})
54 {:reply, state, state}
57 defp do_stream(%{topic: "direct", item: item}) do
59 User.get_recipients_from_activity(item)
60 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
62 Enum.each(recipient_topics, fn user_topic ->
63 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
64 push_to_socket(State.get_sockets(), user_topic, item)
68 defp do_stream(%{topic: "participation", item: participation}) do
69 user_topic = "direct:#{participation.user_id}"
70 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
72 push_to_socket(State.get_sockets(), user_topic, participation)
75 defp do_stream(%{topic: "list", item: item}) do
76 # filter the recipient list if the activity is not public, see #270.
78 case Visibility.is_public?(item) do
80 Pleroma.List.get_lists_from_activity(item)
83 Pleroma.List.get_lists_from_activity(item)
84 |> Enum.filter(fn list ->
85 owner = User.get_cached_by_id(list.user_id)
87 Visibility.visible_for_user?(item, owner)
93 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
95 Enum.each(recipient_topics, fn list_topic ->
96 Logger.debug("Trying to push message to #{list_topic}\n\n")
97 push_to_socket(State.get_sockets(), list_topic, item)
101 defp do_stream(%{topic: topic, item: %Notification{} = item})
102 when topic in ["user", "user:notification"] do
104 |> Map.get("#{topic}:#{item.user_id}", [])
105 |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
106 with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
107 true <- should_send?(user, item) do
108 send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
113 defp do_stream(%{topic: "user", item: item}) do
114 Logger.debug("Trying to push to users")
117 User.get_recipients_from_activity(item)
118 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
120 Enum.each(recipient_topics, fn topic ->
121 push_to_socket(State.get_sockets(), topic, item)
125 defp do_stream(%{topic: topic, item: item}) do
126 Logger.debug("Trying to push to #{topic}")
127 Logger.debug("Pushing item to #{topic}")
128 push_to_socket(State.get_sockets(), topic, item)
131 defp should_send?(%User{} = user, %Activity{} = item) do
132 {:ok, [blocks, mutes]} = User.related_ap_ids(user, [:blocked_users, :muted_users])
133 reblog_mutes = user.muted_reblogs || []
134 recipient_blocks = MapSet.new(blocks ++ mutes)
135 recipients = MapSet.new(item.recipients)
136 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
138 with parent <- Object.normalize(item) || item,
139 true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
140 true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
141 true <- MapSet.disjoint?(recipients, recipient_blocks),
142 %{host: item_host} <- URI.parse(item.actor),
143 %{host: parent_host} <- URI.parse(parent.data["actor"]),
144 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
145 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
146 true <- thread_containment(item, user),
147 false <- CommonAPI.thread_muted?(user, item) do
154 defp should_send?(%User{} = user, %Notification{activity: activity}) do
155 should_send?(user, activity)
158 def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
159 Enum.each(topics[topic] || [], fn %StreamerSocket{
160 transport_pid: transport_pid,
163 # Get the current user so we have up-to-date blocks etc.
165 user = User.get_cached_by_ap_id(socket_user.ap_id)
167 if should_send?(user, item) do
168 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
171 send(transport_pid, {:text, StreamerView.render("update.json", item)})
176 def push_to_socket(topics, topic, %Participation{} = participation) do
177 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
178 send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
182 def push_to_socket(topics, topic, %Activity{
183 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
185 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
188 {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
193 def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
195 def push_to_socket(topics, topic, item) do
196 Enum.each(topics[topic] || [], fn %StreamerSocket{
197 transport_pid: transport_pid,
200 # Get the current user so we have up-to-date blocks etc.
202 user = User.get_cached_by_ap_id(socket_user.ap_id)
204 if should_send?(user, item) do
205 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
208 send(transport_pid, {:text, StreamerView.render("update.json", item)})
213 @spec thread_containment(Activity.t(), User.t()) :: boolean()
214 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
216 defp thread_containment(activity, user) do
217 if Config.get([:instance, :skip_thread_containment]) do
220 ActivityPub.contain_activity(activity, user)