X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fweb%2Fstreamer%2Fstreamer.ex;h=d1d2c9b9c5f85bfa53bd48168a5796a8523c80b1;hb=0883a706dc376fdfb7de9df1366803e87c8e7c98;hp=5ad4aa9367a8c588f6b3c1f4440f48375967b4c6;hpb=b078e0567dbecc768f88d991a2565141eb9e8c50;p=akkoma diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex index 5ad4aa936..d1d2c9b9c 100644 --- a/lib/pleroma/web/streamer/streamer.ex +++ b/lib/pleroma/web/streamer/streamer.ex @@ -6,6 +6,7 @@ defmodule Pleroma.Web.Streamer do require Logger alias Pleroma.Activity + alias Pleroma.Chat.MessageReference alias Pleroma.Config alias Pleroma.Conversation.Participation alias Pleroma.Notification @@ -21,41 +22,83 @@ defmodule Pleroma.Web.Streamer do def registry, do: @registry - def add_socket(topic, %User{} = user) do - if should_env_send?(), do: Registry.register(@registry, user_topic(topic, user), true) + @public_streams ["public", "public:local", "public:media", "public:local:media"] + @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"] + + @doc "Expands and authorizes a stream, and registers the process for streaming." + @spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) :: + {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized} + def get_topic_and_add_socket(stream, user, params \\ %{}) do + case get_topic(stream, user, params) do + {:ok, topic} -> add_socket(topic, user) + error -> error + end end - def add_socket(topic, _) do - if should_env_send?(), do: Registry.register(@registry, topic, false) + @doc "Expand and authorizes a stream" + @spec get_topic(stream :: String.t(), User.t() | nil, Map.t()) :: + {:ok, topic :: String.t()} | {:error, :bad_topic} + def get_topic(stream, user, params \\ %{}) + + # Allow all public steams. + def get_topic(stream, _, _) when stream in @public_streams do + {:ok, stream} end - def remove_socket(topic) do - if should_env_send?(), do: Registry.unregister(@registry, topic) + # Allow all hashtags streams. + def get_topic("hashtag", _, %{"tag" => tag}) do + {:ok, "hashtag:" <> tag} end - def stream(topics, item) when is_list(topics) do - if should_env_send?() do - Enum.each(topics, fn t -> - spawn(fn -> do_stream(t, item) end) - end) + # Expand user streams. + def get_topic(stream, %User{} = user, _) when stream in @user_streams do + {:ok, stream <> ":" <> to_string(user.id)} + end + + def get_topic(stream, _, _) when stream in @user_streams do + {:error, :unauthorized} + end + + # List streams. + def get_topic("list", %User{} = user, %{"list" => id}) do + if Pleroma.List.get(id, user) do + {:ok, "list:" <> to_string(id)} + else + {:error, :bad_topic} end + end - :ok + def get_topic("list", _, _) do + {:error, :unauthorized} end - def stream(topic, items) when is_list(items) do - if should_env_send?() do - Enum.each(items, fn i -> - spawn(fn -> do_stream(topic, i) end) - end) + def get_topic(_, _, _) do + {:error, :bad_topic} + end - :ok + @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic." + def add_socket(topic, user) do + if should_env_send?() do + auth? = if user, do: true + Registry.register(@registry, topic, auth?) end + + {:ok, topic} + end + + def remove_socket(topic) do + if should_env_send?(), do: Registry.unregister(@registry, topic) end - def stream(topic, item) do + def stream(topics, items) do if should_env_send?() do - spawn(fn -> do_stream(topic, item) end) + List.wrap(topics) + |> Enum.each(fn topic -> + List.wrap(items) + |> Enum.each(fn item -> + spawn(fn -> do_stream(topic, item) end) + end) + end) end :ok @@ -80,7 +123,7 @@ defmodule Pleroma.Web.Streamer do false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), true <- thread_containment(item, user), - false <- CommonAPI.thread_muted?(user, item) do + false <- CommonAPI.thread_muted?(user, parent) do false else _ -> true @@ -144,6 +187,19 @@ defmodule Pleroma.Web.Streamer do end) end + defp do_stream(topic, {user, %MessageReference{} = cm_ref}) + when topic in ["user", "user:pleroma_chat"] do + topic = "#{topic}:#{user.id}" + + text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) + + Registry.dispatch(@registry, topic, fn list -> + Enum.each(list, fn {pid, _auth} -> + send(pid, {:text, text}) + end) + end) + end + defp do_stream("user", item) do Logger.debug("Trying to push to users") @@ -231,13 +287,4 @@ defmodule Pleroma.Web.Streamer do true -> def should_env_send?, do: true end - - defp user_topic(topic, user) - when topic in ~w[user user:notification direct] do - "#{topic}:#{user.id}" - end - - defp user_topic(topic, _) do - topic - end end