X-Git-Url: https://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fweb%2Fstreamer.ex;h=f009fbd9e1853e01001209207e6cd83270ca1a89;hb=1f863f0a36ebb0648c3d39ecb7ea9e3d01deab60;hp=7d4a1304a9b381e3f25bfec72acd38398adcc070;hpb=c9afb350e7a38aa915418c6b98cedd863ca0405b;p=akkoma diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex index 7d4a1304a..f009fbd9e 100644 --- a/lib/pleroma/web/streamer.ex +++ b/lib/pleroma/web/streamer.ex @@ -1,12 +1,11 @@ # Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors +# Copyright © 2017-2021 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Streamer do require Logger alias Pleroma.Activity - alias Pleroma.Chat.MessageReference alias Pleroma.Config alias Pleroma.Conversation.Participation alias Pleroma.Notification @@ -18,6 +17,7 @@ defmodule Pleroma.Web.Streamer do alias Pleroma.Web.OAuth.Token alias Pleroma.Web.Plugs.OAuthScopesPlug alias Pleroma.Web.StreamerView + require Pleroma.Constants @mix_env Mix.env() @registry Pleroma.Web.StreamerRegistry @@ -25,7 +25,7 @@ defmodule Pleroma.Web.Streamer do def registry, do: @registry @public_streams ["public", "public:local", "public:media", "public:local:media"] - @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"] + @user_streams ["user", "user:notification", "direct"] @doc "Expands and authorizes a stream, and registers the process for streaming." @spec get_topic_and_add_socket( @@ -37,7 +37,7 @@ defmodule Pleroma.Web.Streamer do {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized} def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do - add_socket(topic, user) + add_socket(topic, oauth_token) end end @@ -115,15 +115,20 @@ defmodule Pleroma.Web.Streamer do {:error, :unauthorized} end + # mastodon multi-topic WS + def get_topic(nil, _user, _oauth_token, _params) do + {:ok, :multi} + end + def get_topic(_stream, _user, _oauth_token, _params) do {:error, :bad_topic} end @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic." - def add_socket(topic, user) do + def add_socket(topic, oauth_token) do if should_env_send?() do - auth? = if user, do: true - Registry.register(@registry, topic, auth?) + oauth_token_id = if oauth_token, do: oauth_token.id, else: false + Registry.register(@registry, topic, oauth_token_id) end {:ok, topic} @@ -151,7 +156,7 @@ defmodule Pleroma.Web.Streamer do recipients = MapSet.new(item.recipients) domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks) - with parent <- Object.normalize(item) || item, + with parent <- Object.normalize(item, fetch: false) || item, true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)), true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids, true <- @@ -187,8 +192,8 @@ defmodule Pleroma.Web.Streamer do end defp do_stream("follow_relationship", item) do - text = StreamerView.render("follow_relationships_update.json", item) user_topic = "user:#{item.follower.id}" + text = StreamerView.render("follow_relationships_update.json", item, user_topic) Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n") @@ -236,20 +241,7 @@ defmodule Pleroma.Web.Streamer do when topic in ["user", "user:notification"] do Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list -> Enum.each(list, fn {pid, _auth} -> - send(pid, {:render_with_user, StreamerView, "notification.json", item}) - end) - end) - end - - defp do_stream(topic, {user, %MessageReference{} = cm_ref}) - when topic in ["user", "user:pleroma_chat"] do - topic = "#{topic}:#{user.id}" - - text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) - - Registry.dispatch(@registry, topic, fn list -> - Enum.each(list, fn {pid, _auth} -> - send(pid, {:text, text}) + send(pid, {:render_with_user, StreamerView, "notification.json", item, topic}) end) end) end @@ -261,7 +253,17 @@ defmodule Pleroma.Web.Streamer do User.get_recipients_from_activity(item) |> Enum.map(fn %{id: id} -> "user:#{id}" end) - Enum.each(recipient_topics, fn topic -> + hashtag_recipients = + if Pleroma.Constants.as_public() in item.recipients do + Pleroma.Hashtag.get_recipients_for_activity(item) + |> Enum.map(fn id -> "user:#{id}" end) + else + [] + end + + all_recipients = Enum.uniq(recipient_topics ++ hashtag_recipients) + + Enum.each(all_recipients, fn topic -> push_to_socket(topic, item) end) end @@ -273,7 +275,7 @@ defmodule Pleroma.Web.Streamer do end defp push_to_socket(topic, %Participation{} = participation) do - rendered = StreamerView.render("conversation.json", participation) + rendered = StreamerView.render("conversation.json", participation, topic) Registry.dispatch(@registry, topic, fn list -> Enum.each(list, fn {pid, _} -> @@ -296,13 +298,34 @@ defmodule Pleroma.Web.Streamer do defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop + defp push_to_socket(topic, %Activity{data: %{"type" => "Update"}} = item) do + create_activity = + Pleroma.Activity.get_create_by_object_ap_id(item.object.data["id"]) + |> Map.put(:object, item.object) + + anon_render = StreamerView.render("status_update.json", create_activity, topic) + + Registry.dispatch(@registry, topic, fn list -> + Enum.each(list, fn {pid, auth?} -> + if auth? do + send( + pid, + {:render_with_user, StreamerView, "status_update.json", create_activity, topic} + ) + else + send(pid, {:text, anon_render}) + end + end) + end) + end + defp push_to_socket(topic, item) do - anon_render = StreamerView.render("update.json", item) + anon_render = StreamerView.render("update.json", item, topic) Registry.dispatch(@registry, topic, fn list -> Enum.each(list, fn {pid, auth?} -> if auth? do - send(pid, {:render_with_user, StreamerView, "update.json", item}) + send(pid, {:render_with_user, StreamerView, "update.json", item, topic}) else send(pid, {:text, anon_render}) end @@ -320,6 +343,22 @@ defmodule Pleroma.Web.Streamer do end end + def close_streams_by_oauth_token(oauth_token) do + if should_env_send?() do + Registry.select( + @registry, + [ + { + {:"$1", :"$2", :"$3"}, + [{:==, :"$3", oauth_token.id}], + [:"$2"] + } + ] + ) + |> Enum.each(fn pid -> send(pid, :close) end) + end + end + # In test environement, only return true if the registry is started. # In benchmark environment, returns false. # In any other environment, always returns true.