1 defmodule Pleroma.Gun.ConnectionPool do
4 def get_conn(uri, opts) do
5 case enforce_pool_limits() do
7 key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
9 case Registry.lookup(@registry, key) do
10 # The key has already been registered, but connection is not up yet
11 [{worker_pid, {nil, _used_by, _crf, _last_reference}}] ->
12 get_gun_pid_from_worker(worker_pid)
14 [{worker_pid, {gun_pid, _used_by, _crf, _last_reference}}] ->
15 GenServer.cast(worker_pid, {:add_client, self(), false})
19 # :gun.set_owner fails in :connected state for whatevever reason,
20 # so we open the connection in the process directly and send it's pid back
21 # We trust gun to handle timeouts by itself
22 case GenServer.start(Pleroma.Gun.ConnectionPool.Worker, [uri, key, opts, self()],
27 {:conn_pid, pid} -> {:ok, pid}
30 {:error, {:error, {:already_registered, worker_pid}}} ->
31 get_gun_pid_from_worker(worker_pid)
43 @enforcer_key "enforcer"
44 defp enforce_pool_limits() do
45 max_connections = Pleroma.Config.get([:connections_pool, :max_connections])
47 if Registry.count(@registry) >= max_connections do
48 case Registry.lookup(@registry, @enforcer_key) do
52 {:ok, _pid} = Registry.register(@registry, @enforcer_key, nil)
55 [:connections_pool, :reclaim_multiplier]
56 |> Pleroma.Config.get()
57 |> Kernel.*(max_connections)
65 {{:_, :"$1", {:_, :"$2", :"$3", :"$4"}}, [{:==, :"$2", []}],
66 [{{:"$1", :"$3", :"$4"}}]}
76 |> Enum.sort(fn {_pid1, crf1, last_reference1},
77 {_pid2, crf2, last_reference2} ->
78 crf1 <= crf2 and last_reference1 <= last_reference2
80 |> Enum.take(reclaim_max)
81 |> Enum.each(fn {pid, _, _} -> GenServer.call(pid, :idle_close) end)
85 wait_for_enforcer_finish(pid)
88 wait_for_enforcer_finish(pid)
95 defp wait_for_enforcer_finish(pid) do
96 ref = Process.monitor(pid)
99 {:DOWN, ^ref, :process, ^pid, :pool_full} ->
102 {:DOWN, ^ref, :process, ^pid, :normal} ->
107 defp get_gun_pid_from_worker(worker_pid) do
108 # GenServer.call will block the process for timeout length if
109 # the server crashes on startup (which will happen if gun fails to connect)
110 # so instead we use cast + monitor
112 ref = Process.monitor(worker_pid)
113 GenServer.cast(worker_pid, {:add_client, self(), true})
116 {:conn_pid, pid} -> {:ok, pid}
117 {:DOWN, ^ref, :process, ^worker_pid, reason} -> reason
121 def release_conn(conn_pid) do
123 Registry.select(@registry, [
124 {{:_, :"$1", {:"$2", :_, :_, :_}}, [{:==, :"$2", conn_pid}], [:"$1"]}
127 GenServer.cast(worker_pid, {:remove_client, self()})