# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Streamer do
require Logger
alias Pleroma.Activity
- alias Pleroma.Chat.MessageReference
alias Pleroma.Config
alias Pleroma.Conversation.Participation
alias Pleroma.Notification
def registry, do: @registry
@public_streams ["public", "public:local", "public:media", "public:local:media"]
- @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
+ @user_streams ["user", "user:notification", "direct"]
@doc "Expands and authorizes a stream, and registers the process for streaming."
@spec get_topic_and_add_socket(
recipients = MapSet.new(item.recipients)
domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
- with parent <- Object.normalize(item) || item,
+ with parent <- Object.normalize(item, fetch: false) || item,
true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
true <-
end)
end
- defp do_stream("relationships:update", item) do
- text = StreamerView.render("relationships_update.json", item)
+ defp do_stream("follow_relationship", item) do
+ text = StreamerView.render("follow_relationships_update.json", item)
+ user_topic = "user:#{item.follower.id}"
- [item.follower, item.following]
- |> Enum.map(fn %{id: id} -> "user:#{id}" end)
- |> Enum.each(fn user_topic ->
- Logger.debug("Trying to push relationships:update to #{user_topic}\n\n")
+ Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
- Registry.dispatch(@registry, user_topic, fn list ->
- Enum.each(list, fn {pid, _auth} ->
- send(pid, {:text, text})
- end)
+ Registry.dispatch(@registry, user_topic, fn list ->
+ Enum.each(list, fn {pid, _auth} ->
+ send(pid, {:text, text})
end)
end)
end
end)
end
- defp do_stream(topic, {user, %MessageReference{} = cm_ref})
- when topic in ["user", "user:pleroma_chat"] do
- topic = "#{topic}:#{user.id}"
-
- text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
-
- Registry.dispatch(@registry, topic, fn list ->
- Enum.each(list, fn {pid, _auth} ->
- send(pid, {:text, text})
- end)
- end)
- end
-
defp do_stream("user", item) do
Logger.debug("Trying to push to users")