X-Git-Url: https://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fweb%2Fstreamer.ex;h=fc3bbb13029d5b31612abe432735f82abf6160ac;hb=939b3bfe43b8985c93d2dfa15ef600facd8db730;hp=d618dfe54a93b74bf6fd9bb81b7151893b591500;hpb=b738f709532ff18845f5d8cc3888d0bd67f750ab;p=akkoma diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex index d618dfe54..fc3bbb130 100644 --- a/lib/pleroma/web/streamer.ex +++ b/lib/pleroma/web/streamer.ex @@ -1,5 +1,5 @@ # Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors +# Copyright © 2017-2021 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Streamer do @@ -36,9 +36,8 @@ defmodule Pleroma.Web.Streamer do ) :: {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized} def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do - case get_topic(stream, user, oauth_token, params) do - {:ok, topic} -> add_socket(topic, user) - error -> error + with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do + add_socket(topic, user) end end @@ -57,14 +56,23 @@ defmodule Pleroma.Web.Streamer do {:ok, "hashtag:" <> tag} end + # Allow remote instance streams. + def get_topic("public:remote", _user, _oauth_token, %{"instance" => instance} = _params) do + {:ok, "public:remote:" <> instance} + end + + def get_topic("public:remote:media", _user, _oauth_token, %{"instance" => instance} = _params) do + {:ok, "public:remote:media:" <> instance} + end + # Expand user streams. def get_topic( stream, %User{id: user_id} = user, - %Token{user_id: token_user_id} = oauth_token, + %Token{user_id: user_id} = oauth_token, _params ) - when stream in @user_streams and user_id == token_user_id do + when stream in @user_streams do # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope) required_scopes = if stream == "user:notification" do @@ -88,10 +96,9 @@ defmodule Pleroma.Web.Streamer do def get_topic( "list", %User{id: user_id} = user, - %Token{user_id: token_user_id} = oauth_token, + %Token{user_id: user_id} = oauth_token, %{"list" => id} - ) - when user_id == token_user_id do + ) do cond do OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] -> {:error, :unauthorized} @@ -128,16 +135,10 @@ defmodule Pleroma.Web.Streamer do def stream(topics, items) do if should_env_send?() do - List.wrap(topics) - |> Enum.each(fn topic -> - List.wrap(items) - |> Enum.each(fn item -> - spawn(fn -> do_stream(topic, item) end) - end) - end) + for topic <- List.wrap(topics), item <- List.wrap(items) do + spawn(fn -> do_stream(topic, item) end) + end end - - :ok end def filtered_by_user?(user, item, streamed_type \\ :activity) @@ -150,9 +151,8 @@ defmodule Pleroma.Web.Streamer do recipients = MapSet.new(item.recipients) domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks) - with parent <- Object.normalize(item) || item, - true <- - Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)), + with parent <- Object.normalize(item, fetch: false) || item, + true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)), true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids, true <- !(streamed_type == :activity && item.data["type"] == "Announce" && @@ -186,6 +186,19 @@ defmodule Pleroma.Web.Streamer do end) end + defp do_stream("follow_relationship", item) do + text = StreamerView.render("follow_relationships_update.json", item) + user_topic = "user:#{item.follower.id}" + + Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n") + + Registry.dispatch(@registry, user_topic, fn list -> + Enum.each(list, fn {pid, _auth} -> + send(pid, {:text, text}) + end) + end) + end + defp do_stream("participation", participation) do user_topic = "direct:#{participation.user_id}" Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")