hackney_pool_children() ++
[
Pleroma.Web.Federator.RetryQueue,
- Pleroma.Stats,
- %{
- id: :web_push_init,
- start: {Task, :start_link, [&Pleroma.Web.Push.init/0]},
- restart: :temporary
- },
- %{
- id: :federator_init,
- start: {Task, :start_link, [&Pleroma.Web.Federator.init/0]},
- restart: :temporary
- },
- %{
- id: :internal_fetch_init,
- start: {Task, :start_link, [&Pleroma.Web.ActivityPub.InternalFetchActor.init/0]},
- restart: :temporary
- }
+ Pleroma.Stats
] ++
+ task_children(@env) ++
oauth_cleanup_child(oauth_cleanup_enabled?()) ++
streamer_child(@env) ++
chat_child(@env, chat_enabled?()) ++
end
end
- defp after_supervisor_start do
- with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest],
- true <- digest_config[:active] do
- PleromaJobQueue.schedule(
- digest_config[:schedule],
- :digest_emails,
- Pleroma.DigestEmailWorker
- )
- end
+ defp task_children(:test) do
+ [
+ %{
+ id: :web_push_init,
+ start: {Task, :start_link, [&Pleroma.Web.Push.init/0]},
+ restart: :temporary
+ },
+ %{
+ id: :federator_init,
+ start: {Task, :start_link, [&Pleroma.Web.Federator.init/0]},
+ restart: :temporary
+ }
+ ]
+ end
- :ok
+ defp task_children(_) do
+ [
+ %{
+ id: :web_push_init,
+ start: {Task, :start_link, [&Pleroma.Web.Push.init/0]},
+ restart: :temporary
+ },
+ %{
+ id: :federator_init,
+ start: {Task, :start_link, [&Pleroma.Web.Federator.init/0]},
+ restart: :temporary
+ },
+ %{
+ id: :internal_fetch_init,
+ start: {Task, :start_link, [&Pleroma.Web.ActivityPub.InternalFetchActor.init/0]},
+ restart: :temporary
+ }
+ ]
end
end
alias Pleroma.Web.Streamer.StreamerSocket
+ @env Mix.env()
+
def start_link(_) do
GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__)
end
def add_socket(topic, socket) do
- GenServer.call(__MODULE__, {:add, socket, topic})
+ GenServer.call(__MODULE__, {:add, topic, socket})
end
def remove_socket(topic, socket) do
- GenServer.call(__MODULE__, {:remove, socket, topic})
+ do_remove_socket(@env, topic, socket)
end
def get_sockets do
{:reply, state, state}
end
- def handle_call({:add, socket, topic}, _from, %{sockets: sockets} = state) do
+ def handle_call({:add, topic, socket}, _from, %{sockets: sockets} = state) do
internal_topic = internal_topic(topic, socket)
stream_socket = StreamerSocket.from_socket(socket)
{:reply, state, state}
end
- def handle_call({:remove, socket, topic}, _from, %{sockets: sockets} = state) do
+ def handle_call({:remove, topic, socket}, _from, %{sockets: sockets} = state) do
internal_topic = internal_topic(topic, socket)
stream_socket = StreamerSocket.from_socket(socket)
{:reply, state, state}
end
+ defp do_remove_socket(:test, _, _) do
+ :ok
+ end
+
+ defp do_remove_socket(_env, topic, socket) do
+ GenServer.call(__MODULE__, {:remove, topic, socket})
+ end
+
defp internal_topic(topic, socket)
when topic in ~w[user user:notification direct] do
"#{topic}:#{socket.assigns[:user].id}"
|> Map.put(:path, "/api/v1/streaming")
|> URI.to_string()
+ setup_all do
+ start_supervised(Pleroma.Web.Streamer.supervisor())
+ :ok
+ end
+
def start_socket(qs \\ nil, headers \\ []) do
path =
case qs do
end
test "refuses invalid requests" do
- assert {:error, {400, _}} = start_socket()
- assert {:error, {404, _}} = start_socket("?stream=ncjdk")
+ capture_log(fn ->
+ assert {:error, {400, _}} = start_socket()
+ assert {:error, {404, _}} = start_socket("?stream=ncjdk")
+ Process.sleep(30)
+ end)
end
test "requires authentication and a valid token for protected streams" do
- assert {:error, {403, _}} = start_socket("?stream=user&access_token=aaaaaaaaaaaa")
- assert {:error, {403, _}} = start_socket("?stream=user")
+ capture_log(fn ->
+ assert {:error, {403, _}} = start_socket("?stream=user&access_token=aaaaaaaaaaaa")
+ assert {:error, {403, _}} = start_socket("?stream=user")
+ Process.sleep(30)
+ end)
end
- @tag needs_streamer: true
test "allows public streams without authentication" do
assert {:ok, _} = start_socket("?stream=public")
assert {:ok, _} = start_socket("?stream=public:local")
assert {:ok, _} = start_socket("?stream=hashtag&tag=lain")
end
- @tag needs_streamer: true
test "receives well formatted events" do
user = insert(:user)
{:ok, _} = start_socket("?stream=public")
assert {:ok, _} = start_socket("?stream=user&access_token=#{state.token.token}")
end
- @tag needs_streamer: true
test "accepts the 'user' stream", %{token: token} = _state do
assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
- assert {:error, {403, "Forbidden"}} = start_socket("?stream=user")
+
+ assert capture_log(fn ->
+ assert {:error, {403, "Forbidden"}} = start_socket("?stream=user")
+ Process.sleep(30)
+ end) =~ ":badarg"
end
- @tag needs_streamer: true
test "accepts the 'user:notification' stream", %{token: token} = _state do
assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
- assert {:error, {403, "Forbidden"}} = start_socket("?stream=user:notification")
+
+ assert capture_log(fn ->
+ assert {:error, {403, "Forbidden"}} = start_socket("?stream=user:notification")
+ Process.sleep(30)
+ end) =~ ":badarg"
end
- @tag needs_streamer: true
test "accepts valid token on Sec-WebSocket-Protocol header", %{token: token} do
assert {:ok, _} = start_socket("?stream=user", [{"Sec-WebSocket-Protocol", token.token}])
- assert {:error, {403, "Forbidden"}} =
- start_socket("?stream=user", [{"Sec-WebSocket-Protocol", "I am a friend"}])
+ assert capture_log(fn ->
+ assert {:error, {403, "Forbidden"}} =
+ start_socket("?stream=user", [{"Sec-WebSocket-Protocol", "I am a friend"}])
+
+ Process.sleep(30)
+ end) =~ ":badarg"
end
end
end