1 defmodule Pleroma.Web.Streamer.Worker do
8 alias Pleroma.Conversation.Participation
9 alias Pleroma.Notification
12 alias Pleroma.Web.ActivityPub.ActivityPub
13 alias Pleroma.Web.ActivityPub.Visibility
14 alias Pleroma.Web.CommonAPI
15 alias Pleroma.Web.Streamer.State
16 alias Pleroma.Web.Streamer.StreamerSocket
17 alias Pleroma.Web.StreamerView
20 GenServer.start_link(__MODULE__, %{}, [])
27 def stream(pid, topics, items) do
28 GenServer.call(pid, {:stream, topics, items})
31 def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
32 Enum.each(topics, fn t ->
33 do_stream(%{topic: t, item: item})
36 {:reply, state, state}
39 def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
40 Enum.each(items, fn i ->
41 do_stream(%{topic: topic, item: i})
44 {:reply, state, state}
47 def handle_call({:stream, topic, item}, _from, state) do
48 do_stream(%{topic: topic, item: item})
50 {:reply, state, state}
53 defp do_stream(%{topic: "direct", item: item}) do
55 User.get_recipients_from_activity(item)
56 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
58 Enum.each(recipient_topics, fn user_topic ->
59 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
60 push_to_socket(State.get_sockets(), user_topic, item)
64 defp do_stream(%{topic: "participation", item: participation}) do
65 user_topic = "direct:#{participation.user_id}"
66 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
68 push_to_socket(State.get_sockets(), user_topic, participation)
71 defp do_stream(%{topic: "list", item: item}) do
72 # filter the recipient list if the activity is not public, see #270.
74 case Visibility.is_public?(item) do
76 Pleroma.List.get_lists_from_activity(item)
79 Pleroma.List.get_lists_from_activity(item)
80 |> Enum.filter(fn list ->
81 owner = User.get_cached_by_id(list.user_id)
83 Visibility.visible_for_user?(item, owner)
89 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
91 Enum.each(recipient_topics, fn list_topic ->
92 Logger.debug("Trying to push message to #{list_topic}\n\n")
93 push_to_socket(State.get_sockets(), list_topic, item)
97 defp do_stream(%{topic: topic, item: %Notification{} = item})
98 when topic in ["user", "user:notification"] do
100 |> Map.get("#{topic}:#{item.user_id}", [])
101 |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
102 with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
103 true <- should_send?(user, item) do
104 send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
109 defp do_stream(%{topic: "user", item: item}) do
110 Logger.debug("Trying to push to users")
113 User.get_recipients_from_activity(item)
114 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
116 Enum.each(recipient_topics, fn topic ->
117 push_to_socket(State.get_sockets(), topic, item)
121 defp do_stream(%{topic: topic, item: item}) do
122 Logger.debug("Trying to push to #{topic}")
123 Logger.debug("Pushing item to #{topic}")
124 push_to_socket(State.get_sockets(), topic, item)
127 defp should_send?(%User{} = user, %Activity{} = item) do
128 blocks = user.info.blocks || []
129 mutes = user.info.mutes || []
130 reblog_mutes = user.info.muted_reblogs || []
131 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
133 with parent when not is_nil(parent) <- Object.normalize(item),
134 true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
135 true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
136 %{host: item_host} <- URI.parse(item.actor),
137 %{host: parent_host} <- URI.parse(parent.data["actor"]),
138 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
139 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
140 true <- thread_containment(item, user),
141 false <- CommonAPI.thread_muted?(user, item) do
148 defp should_send?(%User{} = user, %Notification{activity: activity}) do
149 should_send?(user, activity)
152 def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
153 Enum.each(topics[topic] || [], fn %StreamerSocket{
154 transport_pid: transport_pid,
157 # Get the current user so we have up-to-date blocks etc.
159 user = User.get_cached_by_ap_id(socket_user.ap_id)
161 if should_send?(user, item) do
162 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
165 send(transport_pid, {:text, StreamerView.render("update.json", item)})
170 def push_to_socket(topics, topic, %Participation{} = participation) do
171 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
172 send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
176 def push_to_socket(topics, topic, %Activity{
177 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
179 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
182 {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
187 def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
189 def push_to_socket(topics, topic, item) do
190 Enum.each(topics[topic] || [], fn %StreamerSocket{
191 transport_pid: transport_pid,
194 # Get the current user so we have up-to-date blocks etc.
196 user = User.get_cached_by_ap_id(socket_user.ap_id)
197 blocks = user.info.blocks || []
198 mutes = user.info.mutes || []
200 with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
201 true <- thread_containment(item, user) do
202 send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
205 send(transport_pid, {:text, StreamerView.render("update.json", item)})
210 @spec thread_containment(Activity.t(), User.t()) :: boolean()
211 defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
213 defp thread_containment(activity, user) do
214 if Config.get([:instance, :skip_thread_containment]) do
217 ActivityPub.contain_activity(activity, user)