1e5700b6a9e4aef307e6a7668b2683fc24bd67ca
[akkoma] / lib / pleroma / web / streamer / streamer.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer do
6 alias Pleroma.Web.Streamer.State
7 alias Pleroma.Web.Streamer.Worker
8 alias Pleroma.User
9
10 @timeout 60_000
11 @mix_env Mix.env()
12
13 @public_streams ["public", "public:local", "public:media", "public:local:media"]
14 @user_streams ["user", "user:notification", "direct"]
15
16 @doc "Expands and authorizes a stream, and registers the process for streaming."
17 @spec get_topic_and_add_socket(stream :: String.t(), State.t(), Map.t() | nil) ::
18 {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
19 def get_topic_and_add_socket(stream, socket, params \\ %{}) do
20 user =
21 case socket do
22 %{assigns: %{user: user}} -> user
23 _ -> nil
24 end
25
26 case get_topic(stream, user, params) do
27 {:ok, topic} ->
28 add_socket(topic, socket)
29 {:ok, topic}
30
31 error ->
32 error
33 end
34 end
35
36 @doc "Expand and authorizes a stream"
37 @spec get_topic(stream :: String.t(), User.t() | nil, Map.t()) ::
38 {:ok, topic :: String.t()} | {:error, :bad_topic}
39 def get_topic(stream, user, params \\ %{})
40
41 # Allow all public steams.
42 def get_topic(stream, _, _) when stream in @public_streams do
43 {:ok, stream}
44 end
45
46 # Allow all hashtags streams.
47 def get_topic("hashtag", _, %{"tag" => tag}) do
48 {:ok, "hashtag:" <> tag}
49 end
50
51 # Expand user streams.
52 def get_topic(stream, %User{} = user, _) when stream in @user_streams do
53 {:ok, stream <> ":" <> to_string(user.id)}
54 end
55
56 def get_topic(stream, _, _) when stream in @user_streams do
57 {:error, :unauthorized}
58 end
59
60 # List streams.
61 def get_topic("list", %User{} = user, %{"list" => id}) do
62 if Pleroma.List.get(id, user) do
63 {:ok, "list:" <> to_string(id)}
64 else
65 {:error, :bad_topic}
66 end
67 end
68
69 def get_topic("list", _, _) do
70 {:error, :unauthorized}
71 end
72
73 def get_topic(_, _, _) do
74 {:error, :bad_topic}
75 end
76
77 def add_socket(topic, socket) do
78 State.add_socket(topic, socket)
79 end
80
81 def remove_socket(topic, socket) do
82 State.remove_socket(topic, socket)
83 end
84
85 def get_sockets do
86 State.get_sockets()
87 end
88
89 def stream(topics, items) do
90 if should_send?() do
91 Task.async(fn ->
92 :poolboy.transaction(
93 :streamer_worker,
94 &Worker.stream(&1, topics, items),
95 @timeout
96 )
97 end)
98 end
99 end
100
101 def supervisor, do: Pleroma.Web.Streamer.Supervisor
102
103 defp should_send? do
104 handle_should_send(@mix_env)
105 end
106
107 defp handle_should_send(:test) do
108 case Process.whereis(:streamer_worker) do
109 nil ->
110 false
111
112 pid ->
113 Process.alive?(pid)
114 end
115 end
116
117 defp handle_should_send(:benchmark), do: false
118
119 defp handle_should_send(_), do: true
120 end