require Logger
alias Pleroma.Activity
+ alias Pleroma.Chat.MessageReference
alias Pleroma.Config
alias Pleroma.Conversation.Participation
alias Pleroma.Notification
def registry, do: @registry
@public_streams ["public", "public:local", "public:media", "public:local:media"]
- @user_streams ["user", "user:notification", "direct"]
+ @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
@doc "Expands and authorizes a stream, and registers the process for streaming."
@spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) ::
if should_env_send?(), do: Registry.unregister(@registry, topic)
end
- def stream(topics, item) when is_list(topics) do
+ def stream(topics, items) do
if should_env_send?() do
- Enum.each(topics, fn t ->
- spawn(fn -> do_stream(t, item) end)
+ List.wrap(topics)
+ |> Enum.each(fn topic ->
+ List.wrap(items)
+ |> Enum.each(fn item ->
+ spawn(fn -> do_stream(topic, item) end)
+ end)
end)
end
:ok
end
- def stream(topic, items) when is_list(items) do
- if should_env_send?() do
- Enum.each(items, fn i ->
- spawn(fn -> do_stream(topic, i) end)
- end)
-
- :ok
- end
- end
-
- def stream(topic, item) do
- if should_env_send?() do
- spawn(fn -> do_stream(topic, item) end)
- end
-
- :ok
- end
+ def filtered_by_user?(user, item, streamed_type \\ :activity)
- def filtered_by_user?(%User{} = user, %Activity{} = item) do
+ def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
%{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
true <-
Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
+ true <-
+ !(streamed_type == :activity && item.data["type"] == "Announce" &&
+ parent.data["actor"] == user.ap_id),
true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
true <- MapSet.disjoint?(recipients, recipient_blocks),
%{host: item_host} <- URI.parse(item.actor),
end
end
- def filtered_by_user?(%User{} = user, %Notification{activity: activity}) do
- filtered_by_user?(user, activity)
+ def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
+ filtered_by_user?(user, activity, :notification)
end
defp do_stream("direct", item) do
end)
end
+ defp do_stream(topic, {user, %MessageReference{} = cm_ref})
+ when topic in ["user", "user:pleroma_chat"] do
+ topic = "#{topic}:#{user.id}"
+
+ text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
+
+ Registry.dispatch(@registry, topic, fn list ->
+ Enum.each(list, fn {pid, _auth} ->
+ send(pid, {:text, text})
+ end)
+ end)
+ end
+
defp do_stream("user", item) do
Logger.debug("Trying to push to users")