end
defp http_children(Tesla.Adapter.Gun, _) do
- [{Registry, keys: :unique, name: Pleroma.Gun.ConnectionPool}]
+ Pleroma.Gun.ConnectionPool.children()
end
defp http_children(_, _), do: []
defmodule Pleroma.Gun.ConnectionPool do
@registry __MODULE__
+ alias Pleroma.Gun.ConnectionPool.WorkerSupervisor
+
+ def children do
+ [
+ {Registry, keys: :unique, name: @registry},
+ Pleroma.Gun.ConnectionPool.WorkerSupervisor
+ ]
+ end
+
def get_conn(uri, opts) do
key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
{:ok, gun_pid}
[] ->
- case enforce_pool_limits() do
- :ok ->
- # :gun.set_owner fails in :connected state for whatevever reason,
- # so we open the connection in the process directly and send it's pid back
- # We trust gun to handle timeouts by itself
- case GenServer.start(Pleroma.Gun.ConnectionPool.Worker, [uri, key, opts, self()],
- timeout: :infinity
- ) do
- {:ok, _worker_pid} ->
- receive do
- {:conn_pid, pid} -> {:ok, pid}
- end
-
- {:error, {:error, {:already_registered, worker_pid}}} ->
- get_gun_pid_from_worker(worker_pid)
-
- err ->
- err
+ # :gun.set_owner fails in :connected state for whatevever reason,
+ # so we open the connection in the process directly and send it's pid back
+ # We trust gun to handle timeouts by itself
+ case WorkerSupervisor.start_worker([uri, key, opts, self()]) do
+ {:ok, _worker_pid} ->
+ receive do
+ {:conn_pid, pid} -> {:ok, pid}
end
- :error ->
- {:error, :pool_full}
- end
- end
- end
-
- @enforcer_key "enforcer"
- defp enforce_pool_limits() do
- max_connections = Pleroma.Config.get([:connections_pool, :max_connections])
-
- if Registry.count(@registry) >= max_connections do
- case Registry.lookup(@registry, @enforcer_key) do
- [] ->
- pid =
- spawn(fn ->
- {:ok, _pid} = Registry.register(@registry, @enforcer_key, nil)
-
- reclaim_max =
- [:connections_pool, :reclaim_multiplier]
- |> Pleroma.Config.get()
- |> Kernel.*(max_connections)
- |> round
- |> max(1)
-
- unused_conns =
- Registry.select(
- @registry,
- [
- {{:_, :"$1", {:_, :"$2", :"$3", :"$4"}}, [{:==, :"$2", []}],
- [{{:"$1", :"$3", :"$4"}}]}
- ]
- )
+ {:error, {:error, {:already_registered, worker_pid}}} ->
+ get_gun_pid_from_worker(worker_pid)
- case unused_conns do
- [] ->
- exit(:pool_full)
-
- unused_conns ->
- unused_conns
- |> Enum.sort(fn {_pid1, crf1, last_reference1},
- {_pid2, crf2, last_reference2} ->
- crf1 <= crf2 and last_reference1 <= last_reference2
- end)
- |> Enum.take(reclaim_max)
- |> Enum.each(fn {pid, _, _} -> GenServer.call(pid, :idle_close) end)
- end
- end)
-
- wait_for_enforcer_finish(pid)
-
- [{pid, _}] ->
- wait_for_enforcer_finish(pid)
- end
- else
- :ok
- end
- end
-
- defp wait_for_enforcer_finish(pid) do
- ref = Process.monitor(pid)
-
- receive do
- {:DOWN, ^ref, :process, ^pid, :pool_full} ->
- :error
-
- {:DOWN, ^ref, :process, ^pid, :normal} ->
- :ok
+ err ->
+ err
+ end
end
end
defmodule Pleroma.Gun.ConnectionPool.Worker do
alias Pleroma.Gun
- use GenServer
+ use GenServer, restart: :temporary
@registry Pleroma.Gun.ConnectionPool
+ def start_link(opts) do
+ GenServer.start_link(__MODULE__, opts)
+ end
+
@impl true
def init([uri, key, opts, client_pid]) do
time = :os.system_time(:second)
{:stop, {:error, down_message}, state}
end
- @impl true
- def handle_call(:idle_close, _, %{key: key} = state) do
- Registry.unregister(@registry, key)
- {:stop, :normal, state}
- end
-
# LRFU policy: https://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.55.1478
defp crf(time_delta, prev_crf) do
1 + :math.pow(0.5, time_delta / 100) * prev_crf
--- /dev/null
+defmodule Pleroma.Gun.ConnectionPool.WorkerSupervisor do
+ @doc "Supervisor for pool workers. Does not do anything except enforce max connection limit"
+
+ use DynamicSupervisor
+
+ def start_link(opts) do
+ DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__)
+ end
+
+ def init(_opts) do
+ DynamicSupervisor.init(
+ strategy: :one_for_one,
+ max_children: Pleroma.Config.get([:connections_pool, :max_connections])
+ )
+ end
+
+ def start_worker(opts) do
+ case DynamicSupervisor.start_child(__MODULE__, {Pleroma.Gun.ConnectionPool.Worker, opts}) do
+ {:error, :max_children} ->
+ case free_pool() do
+ :ok -> start_worker(opts)
+ :error -> {:error, :pool_full}
+ end
+
+ res ->
+ res
+ end
+ end
+
+ @registry Pleroma.Gun.ConnectionPool
+ @enforcer_key "enforcer"
+ defp free_pool do
+ case Registry.lookup(@registry, @enforcer_key) do
+ [] ->
+ pid =
+ spawn(fn ->
+ {:ok, _pid} = Registry.register(@registry, @enforcer_key, nil)
+
+ max_connections = Pleroma.Config.get([:connections_pool, :max_connections])
+
+ reclaim_max =
+ [:connections_pool, :reclaim_multiplier]
+ |> Pleroma.Config.get()
+ |> Kernel.*(max_connections)
+ |> round
+ |> max(1)
+
+ unused_conns =
+ Registry.select(
+ @registry,
+ [
+ {{:_, :"$1", {:_, :"$2", :"$3", :"$4"}}, [{:==, :"$2", []}],
+ [{{:"$1", :"$3", :"$4"}}]}
+ ]
+ )
+
+ case unused_conns do
+ [] ->
+ exit(:no_unused_conns)
+
+ unused_conns ->
+ unused_conns
+ |> Enum.sort(fn {_pid1, crf1, last_reference1}, {_pid2, crf2, last_reference2} ->
+ crf1 <= crf2 and last_reference1 <= last_reference2
+ end)
+ |> Enum.take(reclaim_max)
+ |> Enum.each(fn {pid, _, _} ->
+ DynamicSupervisor.terminate_child(__MODULE__, pid)
+ end)
+ end
+ end)
+
+ wait_for_enforcer_finish(pid)
+
+ [{pid, _}] ->
+ wait_for_enforcer_finish(pid)
+ end
+ end
+
+ defp wait_for_enforcer_finish(pid) do
+ ref = Process.monitor(pid)
+
+ receive do
+ {:DOWN, ^ref, :process, ^pid, :no_unused_conns} ->
+ :error
+
+ {:DOWN, ^ref, :process, ^pid, :normal} ->
+ :ok
+ end
+ end
+end