projects
/
akkoma
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge remote-tracking branch 'pleroma/develop' into object-tombstone-visibility
[akkoma]
/
lib
/
pleroma
/
gun
/
connection_pool
/
worker.ex
diff --git
a/lib/pleroma/gun/connection_pool/worker.ex
b/lib/pleroma/gun/connection_pool/worker.ex
index fec9d0efa9daa0323a02e26159c491e00313424d..a3fa75386946f198d4eb3158c65aa6c5b1daa277 100644
(file)
--- a/
lib/pleroma/gun/connection_pool/worker.ex
+++ b/
lib/pleroma/gun/connection_pool/worker.ex
@@
-1,11
+1,15
@@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
defmodule Pleroma.Gun.ConnectionPool.Worker do
alias Pleroma.Gun
use GenServer, restart: :temporary
defmodule Pleroma.Gun.ConnectionPool.Worker do
alias Pleroma.Gun
use GenServer, restart: :temporary
-
@registry
Pleroma.Gun.ConnectionPool
+
defp registry, do:
Pleroma.Gun.ConnectionPool
def start_link([key | _] = opts) do
def start_link([key | _] = opts) do
- GenServer.start_link(__MODULE__, opts, name: {:via, Registry, {
@registry
, key}})
+ GenServer.start_link(__MODULE__, opts, name: {:via, Registry, {
registry()
, key}})
end
@impl true
end
@impl true
@@
-15,20
+19,24
@@
defmodule Pleroma.Gun.ConnectionPool.Worker do
@impl true
def handle_continue({:connect, [key, uri, opts, client_pid]}, _) do
@impl true
def handle_continue({:connect, [key, uri, opts, client_pid]}, _) do
- with {:ok, conn_pid} <- Gun.Conn.open(uri, opts),
+ with {:ok, conn_pid
, protocol
} <- Gun.Conn.open(uri, opts),
Process.link(conn_pid) do
time = :erlang.monotonic_time(:millisecond)
{_, _} =
Process.link(conn_pid) do
time = :erlang.monotonic_time(:millisecond)
{_, _} =
- Registry.update_value(
@registry
, key, fn _ ->
+ Registry.update_value(
registry()
, key, fn _ ->
{conn_pid, [client_pid], 1, time}
end)
send(client_pid, {:conn_pid, conn_pid})
{:noreply,
{conn_pid, [client_pid], 1, time}
end)
send(client_pid, {:conn_pid, conn_pid})
{:noreply,
- %{key: key, timer: nil, client_monitors: %{client_pid => Process.monitor(client_pid)}},
- :hibernate}
+ %{
+ key: key,
+ timer: nil,
+ client_monitors: %{client_pid => Process.monitor(client_pid)},
+ protocol: protocol
+ }, :hibernate}
else
err ->
{:stop, {:shutdown, err}, nil}
else
err ->
{:stop, {:shutdown, err}, nil}
@@
-53,14
+61,20
@@
defmodule Pleroma.Gun.ConnectionPool.Worker do
end
@impl true
end
@impl true
- def handle_call(:add_client, {client_pid, _}, %{key: key} = state) do
+ def handle_call(:add_client, {client_pid, _}, %{key: key
, protocol: protocol
} = state) do
time = :erlang.monotonic_time(:millisecond)
time = :erlang.monotonic_time(:millisecond)
- {{conn_pid,
_
, _, _}, _} =
- Registry.update_value(
@registry
, key, fn {conn_pid, used_by, crf, last_reference} ->
+ {{conn_pid,
used_by
, _, _}, _} =
+ Registry.update_value(
registry()
, key, fn {conn_pid, used_by, crf, last_reference} ->
{conn_pid, [client_pid | used_by], crf(time - last_reference, crf), time}
end)
{conn_pid, [client_pid | used_by], crf(time - last_reference, crf), time}
end)
+ :telemetry.execute(
+ [:pleroma, :connection_pool, :client, :add],
+ %{client_pid: client_pid, clients: used_by},
+ %{key: state.key, protocol: protocol}
+ )
+
state =
if state.timer != nil do
Process.cancel_timer(state[:timer])
state =
if state.timer != nil do
Process.cancel_timer(state[:timer])
@@
-78,12
+92,13
@@
defmodule Pleroma.Gun.ConnectionPool.Worker do
@impl true
def handle_call(:remove_client, {client_pid, _}, %{key: key} = state) do
{{_conn_pid, used_by, _crf, _last_reference}, _} =
@impl true
def handle_call(:remove_client, {client_pid, _}, %{key: key} = state) do
{{_conn_pid, used_by, _crf, _last_reference}, _} =
- Registry.update_value(
@registry
, key, fn {conn_pid, used_by, crf, last_reference} ->
+ Registry.update_value(
registry()
, key, fn {conn_pid, used_by, crf, last_reference} ->
{conn_pid, List.delete(used_by, client_pid), crf, last_reference}
end)
{ref, state} = pop_in(state.client_monitors[client_pid])
{conn_pid, List.delete(used_by, client_pid), crf, last_reference}
end)
{ref, state} = pop_in(state.client_monitors[client_pid])
- Process.demonitor(ref)
+
+ Process.demonitor(ref, [:flush])
timer =
if used_by == [] do
timer =
if used_by == [] do
@@
-103,22
+118,27
@@
defmodule Pleroma.Gun.ConnectionPool.Worker do
{:stop, :normal, state}
end
{:stop, :normal, state}
end
+ @impl true
+ def handle_info({:gun_up, _pid, _protocol}, state) do
+ {:noreply, state, :hibernate}
+ end
+
# Gracefully shutdown if the connection got closed without any streams left
@impl true
def handle_info({:gun_down, _pid, _protocol, _reason, []}, state) do
{:stop, :normal, state}
end
# Gracefully shutdown if the connection got closed without any streams left
@impl true
def handle_info({:gun_down, _pid, _protocol, _reason, []}, state) do
{:stop, :normal, state}
end
- # Otherwise,
shutdown with an error
+ # Otherwise,
wait for retry
@impl true
@impl true
- def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams}
= down_message
, state) do
- {:
stop, {:error, down_message}, st
ate}
+ def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams}, state) do
+ {:
noreply, state, :hibern
ate}
end
@impl true
def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
:telemetry.execute(
end
@impl true
def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
:telemetry.execute(
- [:pleroma, :connection_pool, :client
_death
],
+ [:pleroma, :connection_pool, :client
, :dead
],
%{client_pid: pid, reason: reason},
%{key: state.key}
)
%{client_pid: pid, reason: reason},
%{key: state.key}
)