X-Git-Url: https://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fweb%2Fstreamer.ex;h=fba5d1c0232ef29ab5d6ef632e3a966694c024b7;hb=95e4018c1a17bd96331cdeb19d1c62a599061351;hp=0b6cc89e906b4f2d8858fb68261b0d288062e686;hpb=35ba48494f5129d3a0010b045ff36d98e7e7984f;p=akkoma
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex
index 0b6cc89e9..fba5d1c02 100644
--- a/lib/pleroma/web/streamer.ex
+++ b/lib/pleroma/web/streamer.ex
@@ -1,12 +1,11 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors
+# Copyright © 2017-2021 Pleroma Authors
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Streamer do
require Logger
alias Pleroma.Activity
- alias Pleroma.Chat.MessageReference
alias Pleroma.Config
alias Pleroma.Conversation.Participation
alias Pleroma.Notification
@@ -25,7 +24,7 @@ defmodule Pleroma.Web.Streamer do
def registry, do: @registry
@public_streams ["public", "public:local", "public:media", "public:local:media"]
- @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
+ @user_streams ["user", "user:notification", "direct"]
@doc "Expands and authorizes a stream, and registers the process for streaming."
@spec get_topic_and_add_socket(
@@ -37,7 +36,7 @@ defmodule Pleroma.Web.Streamer do
{:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
- add_socket(topic, user)
+ add_socket(topic, oauth_token)
end
end
@@ -115,15 +114,20 @@ defmodule Pleroma.Web.Streamer do
{:error, :unauthorized}
end
+ # mastodon multi-topic WS
+ def get_topic(nil, _user, _oauth_token, _params) do
+ {:ok, :multi}
+ end
+
def get_topic(_stream, _user, _oauth_token, _params) do
{:error, :bad_topic}
end
@doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
- def add_socket(topic, user) do
+ def add_socket(topic, oauth_token) do
if should_env_send?() do
- auth? = if user, do: true
- Registry.register(@registry, topic, auth?)
+ oauth_token_id = if oauth_token, do: oauth_token.id, else: false
+ Registry.register(@registry, topic, oauth_token_id)
end
{:ok, topic}
@@ -151,7 +155,7 @@ defmodule Pleroma.Web.Streamer do
recipients = MapSet.new(item.recipients)
domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
- with parent <- Object.normalize(item) || item,
+ with parent <- Object.normalize(item, fetch: false) || item,
true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
true <-
@@ -186,18 +190,15 @@ defmodule Pleroma.Web.Streamer do
end)
end
- defp do_stream("relationships:update", item) do
- text = StreamerView.render("relationships_update.json", item)
+ defp do_stream("follow_relationship", item) do
+ user_topic = "user:#{item.follower.id}"
+ text = StreamerView.render("follow_relationships_update.json", item, user_topic)
- [item.follower, item.following]
- |> Enum.map(fn %{id: id} -> "user:#{id}" end)
- |> Enum.each(fn user_topic ->
- Logger.debug("Trying to push relationships:update to #{user_topic}\n\n")
+ Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
- Registry.dispatch(@registry, user_topic, fn list ->
- Enum.each(list, fn {pid, _auth} ->
- send(pid, {:text, text})
- end)
+ Registry.dispatch(@registry, user_topic, fn list ->
+ Enum.each(list, fn {pid, _auth} ->
+ send(pid, {:text, text})
end)
end)
end
@@ -239,20 +240,7 @@ defmodule Pleroma.Web.Streamer do
when topic in ["user", "user:notification"] do
Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
Enum.each(list, fn {pid, _auth} ->
- send(pid, {:render_with_user, StreamerView, "notification.json", item})
- end)
- end)
- end
-
- defp do_stream(topic, {user, %MessageReference{} = cm_ref})
- when topic in ["user", "user:pleroma_chat"] do
- topic = "#{topic}:#{user.id}"
-
- text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
-
- Registry.dispatch(@registry, topic, fn list ->
- Enum.each(list, fn {pid, _auth} ->
- send(pid, {:text, text})
+ send(pid, {:render_with_user, StreamerView, "notification.json", item, topic})
end)
end)
end
@@ -276,7 +264,7 @@ defmodule Pleroma.Web.Streamer do
end
defp push_to_socket(topic, %Participation{} = participation) do
- rendered = StreamerView.render("conversation.json", participation)
+ rendered = StreamerView.render("conversation.json", participation, topic)
Registry.dispatch(@registry, topic, fn list ->
Enum.each(list, fn {pid, _} ->
@@ -300,12 +288,12 @@ defmodule Pleroma.Web.Streamer do
defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
defp push_to_socket(topic, item) do
- anon_render = StreamerView.render("update.json", item)
+ anon_render = StreamerView.render("update.json", item, topic)
Registry.dispatch(@registry, topic, fn list ->
Enum.each(list, fn {pid, auth?} ->
if auth? do
- send(pid, {:render_with_user, StreamerView, "update.json", item})
+ send(pid, {:render_with_user, StreamerView, "update.json", item, topic})
else
send(pid, {:text, anon_render})
end
@@ -323,6 +311,22 @@ defmodule Pleroma.Web.Streamer do
end
end
+ def close_streams_by_oauth_token(oauth_token) do
+ if should_env_send?() do
+ Registry.select(
+ @registry,
+ [
+ {
+ {:"$1", :"$2", :"$3"},
+ [{:==, :"$3", oauth_token.id}],
+ [:"$2"]
+ }
+ ]
+ )
+ |> Enum.each(fn pid -> send(pid, :close) end)
+ end
+ end
+
# In test environement, only return true if the registry is started.
# In benchmark environment, returns false.
# In any other environment, always returns true.