Refactor gun pooling and simplify adapter option insertion
[akkoma] / lib / pleroma / gun / connection_pool.ex
1 defmodule Pleroma.Gun.ConnectionPool do
2 @registry __MODULE__
3
4 def get_conn(uri, opts) do
5 case enforce_pool_limits() do
6 :ok ->
7 key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
8
9 case Registry.lookup(@registry, key) do
10 # The key has already been registered, but connection is not up yet
11 [{worker_pid, {nil, _used_by, _crf, _last_reference}}] ->
12 get_gun_pid_from_worker(worker_pid)
13
14 [{worker_pid, {gun_pid, _used_by, _crf, _last_reference}}] ->
15 GenServer.cast(worker_pid, {:add_client, self(), false})
16 {:ok, gun_pid}
17
18 [] ->
19 # :gun.set_owner fails in :connected state for whatevever reason,
20 # so we open the connection in the process directly and send it's pid back
21 # We trust gun to handle timeouts by itself
22 case GenServer.start(Pleroma.Gun.ConnectionPool.Worker, [uri, key, opts, self()],
23 timeout: :infinity
24 ) do
25 {:ok, _worker_pid} ->
26 receive do
27 {:conn_pid, pid} -> {:ok, pid}
28 end
29
30 {:error, {:error, {:already_registered, worker_pid}}} ->
31 get_gun_pid_from_worker(worker_pid)
32
33 err ->
34 err
35 end
36 end
37
38 :error ->
39 {:error, :pool_full}
40 end
41 end
42
43 @enforcer_key "enforcer"
44 defp enforce_pool_limits() do
45 max_connections = Pleroma.Config.get([:connections_pool, :max_connections])
46
47 if Registry.count(@registry) >= max_connections do
48 case Registry.lookup(@registry, @enforcer_key) do
49 [] ->
50 pid =
51 spawn(fn ->
52 {:ok, _pid} = Registry.register(@registry, @enforcer_key, nil)
53
54 reclaim_max =
55 [:connections_pool, :reclaim_multiplier]
56 |> Pleroma.Config.get()
57 |> Kernel.*(max_connections)
58 |> round
59 |> max(1)
60
61 unused_conns =
62 Registry.select(
63 @registry,
64 [
65 {{:_, :"$1", {:_, :"$2", :"$3", :"$4"}}, [{:==, :"$2", []}],
66 [{{:"$1", :"$3", :"$4"}}]}
67 ]
68 )
69
70 case unused_conns do
71 [] ->
72 exit(:pool_full)
73
74 unused_conns ->
75 unused_conns
76 |> Enum.sort(fn {_pid1, crf1, last_reference1},
77 {_pid2, crf2, last_reference2} ->
78 crf1 <= crf2 and last_reference1 <= last_reference2
79 end)
80 |> Enum.take(reclaim_max)
81 |> Enum.each(fn {pid, _, _} -> GenServer.call(pid, :idle_close) end)
82 end
83 end)
84
85 wait_for_enforcer_finish(pid)
86
87 [{pid, _}] ->
88 wait_for_enforcer_finish(pid)
89 end
90 else
91 :ok
92 end
93 end
94
95 defp wait_for_enforcer_finish(pid) do
96 ref = Process.monitor(pid)
97
98 receive do
99 {:DOWN, ^ref, :process, ^pid, :pool_full} ->
100 :error
101
102 {:DOWN, ^ref, :process, ^pid, :normal} ->
103 :ok
104 end
105 end
106
107 defp get_gun_pid_from_worker(worker_pid) do
108 # GenServer.call will block the process for timeout length if
109 # the server crashes on startup (which will happen if gun fails to connect)
110 # so instead we use cast + monitor
111
112 ref = Process.monitor(worker_pid)
113 GenServer.cast(worker_pid, {:add_client, self(), true})
114
115 receive do
116 {:conn_pid, pid} -> {:ok, pid}
117 {:DOWN, ^ref, :process, ^worker_pid, reason} -> reason
118 end
119 end
120
121 def release_conn(conn_pid) do
122 [worker_pid] =
123 Registry.select(@registry, [
124 {{:_, :"$1", {:"$2", :_, :_, :_}}, [{:==, :"$2", conn_pid}], [:"$1"]}
125 ])
126
127 GenServer.cast(worker_pid, {:remove_client, self()})
128 end
129 end