X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fweb%2Fstreamer.ex;h=d5b1d067895446a0aaa6604ebb2ef08577fcd417;hb=11ec9daa5b742f8a1b408497321392e144f45019;hp=1fb8ac1c5b66e8082fb88d009c4aa69732c96399;hpb=d44850d7a5d1303c1c9d185fc5eb6642823b3ec4;p=akkoma diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex index 1fb8ac1c5..d5b1d0678 100644 --- a/lib/pleroma/web/streamer.ex +++ b/lib/pleroma/web/streamer.ex @@ -1,12 +1,11 @@ # Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors +# Copyright © 2017-2021 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Streamer do require Logger alias Pleroma.Activity - alias Pleroma.Chat.MessageReference alias Pleroma.Config alias Pleroma.Conversation.Participation alias Pleroma.Notification @@ -25,7 +24,7 @@ defmodule Pleroma.Web.Streamer do def registry, do: @registry @public_streams ["public", "public:local", "public:media", "public:local:media"] - @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"] + @user_streams ["user", "user:notification", "direct"] @doc "Expands and authorizes a stream, and registers the process for streaming." @spec get_topic_and_add_socket( @@ -115,6 +114,11 @@ defmodule Pleroma.Web.Streamer do {:error, :unauthorized} end + # mastodon multi-topic WS + def get_topic(nil, _user, _oauth_token, _params) do + {:ok, :multi} + end + def get_topic(_stream, _user, _oauth_token, _params) do {:error, :bad_topic} end @@ -187,8 +191,8 @@ defmodule Pleroma.Web.Streamer do end defp do_stream("follow_relationship", item) do - text = StreamerView.render("follow_relationships_update.json", item) user_topic = "user:#{item.follower.id}" + text = StreamerView.render("follow_relationships_update.json", item, user_topic) Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n") @@ -236,20 +240,7 @@ defmodule Pleroma.Web.Streamer do when topic in ["user", "user:notification"] do Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list -> Enum.each(list, fn {pid, _auth} -> - send(pid, {:render_with_user, StreamerView, "notification.json", item}) - end) - end) - end - - defp do_stream(topic, {user, %MessageReference{} = cm_ref}) - when topic in ["user", "user:pleroma_chat"] do - topic = "#{topic}:#{user.id}" - - text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) - - Registry.dispatch(@registry, topic, fn list -> - Enum.each(list, fn {pid, _auth} -> - send(pid, {:text, text}) + send(pid, {:render_with_user, StreamerView, "notification.json", item, topic}) end) end) end @@ -273,7 +264,7 @@ defmodule Pleroma.Web.Streamer do end defp push_to_socket(topic, %Participation{} = participation) do - rendered = StreamerView.render("conversation.json", participation) + rendered = StreamerView.render("conversation.json", participation, topic) Registry.dispatch(@registry, topic, fn list -> Enum.each(list, fn {pid, _} -> @@ -297,12 +288,12 @@ defmodule Pleroma.Web.Streamer do defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop defp push_to_socket(topic, item) do - anon_render = StreamerView.render("update.json", item) + anon_render = StreamerView.render("update.json", item, topic) Registry.dispatch(@registry, topic, fn list -> Enum.each(list, fn {pid, auth?} -> if auth? do - send(pid, {:render_with_user, StreamerView, "update.json", item}) + send(pid, {:render_with_user, StreamerView, "update.json", item, topic}) else send(pid, {:text, anon_render}) end