1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.Streamer do
6 alias Pleroma.Web.Streamer.State
7 alias Pleroma.Web.Streamer.Worker
13 @public_streams ["public", "public:local", "public:media", "public:local:media"]
14 @user_streams ["user", "user:notification", "direct"]
16 @doc "Expands and authorizes a stream, and registers the process for streaming."
17 @spec get_topic_and_add_socket(stream :: String.t(), State.t(), Map.t() | nil) ::
18 {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
19 def get_topic_and_add_socket(stream, socket, params \\ %{}) do
22 %{assigns: %{user: user}} -> user
26 case get_topic(stream, user, params) do
28 add_socket(topic, socket)
36 @doc "Expand and authorizes a stream"
37 @spec get_topic(stream :: String.t(), User.t() | nil, Map.t()) ::
38 {:ok, topic :: String.t()} | {:error, :bad_topic}
39 def get_topic(stream, user, params \\ %{})
41 # Allow all public steams.
42 def get_topic(stream, _, _) when stream in @public_streams do
46 # Allow all hashtags streams.
47 def get_topic("hashtag", _, %{"tag" => tag}) do
48 {:ok, "hashtag:" <> tag}
51 # Expand user streams.
52 def get_topic(stream, %User{} = user, _) when stream in @user_streams do
53 {:ok, stream <> ":" <> to_string(user.id)}
56 def get_topic(stream, _, _) when stream in @user_streams do
57 {:error, :unauthorized}
61 def get_topic("list", %User{} = user, %{"list" => id}) do
62 if Pleroma.List.get(id, user) do
63 {:ok, "list:" <> to_string(id)}
69 def get_topic("list", _, _) do
70 {:error, :unauthorized}
73 def get_topic(_, _, _) do
77 def add_socket(topic, socket) do
78 State.add_socket(topic, socket)
81 def remove_socket(topic, socket) do
82 State.remove_socket(topic, socket)
89 def stream(topics, items) do
94 &Worker.stream(&1, topics, items),
101 def supervisor, do: Pleroma.Web.Streamer.Supervisor
104 handle_should_send(@mix_env)
107 defp handle_should_send(:test) do
108 case Process.whereis(:streamer_worker) do
117 defp handle_should_send(:benchmark), do: false
119 defp handle_should_send(_), do: true