Connection pool: implement logging and telemetry events
[akkoma] / lib / pleroma / gun / connection_pool / worker_supervisor.ex
1 defmodule Pleroma.Gun.ConnectionPool.WorkerSupervisor do
2 @moduledoc "Supervisor for pool workers. Does not do anything except enforce max connection limit"
3
4 use DynamicSupervisor
5
6 def start_link(opts) do
7 DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__)
8 end
9
10 def init(_opts) do
11 DynamicSupervisor.init(
12 strategy: :one_for_one,
13 max_children: Pleroma.Config.get([:connections_pool, :max_connections])
14 )
15 end
16
17 def start_worker(opts) do
18 case DynamicSupervisor.start_child(__MODULE__, {Pleroma.Gun.ConnectionPool.Worker, opts}) do
19 {:error, :max_children} ->
20 case free_pool() do
21 :ok ->
22 start_worker(opts)
23
24 :error ->
25 :telemetry.execute([:pleroma, :connection_pool, :provision_failure], %{opts: opts})
26 {:error, :pool_full}
27 end
28
29 res ->
30 res
31 end
32 end
33
34 @registry Pleroma.Gun.ConnectionPool
35 @enforcer_key "enforcer"
36 defp free_pool do
37 case Registry.lookup(@registry, @enforcer_key) do
38 [] ->
39 pid =
40 spawn(fn ->
41 {:ok, _pid} = Registry.register(@registry, @enforcer_key, nil)
42 max_connections = Pleroma.Config.get([:connections_pool, :max_connections])
43
44 reclaim_max =
45 [:connections_pool, :reclaim_multiplier]
46 |> Pleroma.Config.get()
47 |> Kernel.*(max_connections)
48 |> round
49 |> max(1)
50
51 :telemetry.execute([:pleroma, :connection_pool, :reclaim, :start], %{}, %{
52 max_connections: max_connections,
53 reclaim_max: reclaim_max
54 })
55
56 # :ets.fun2ms(
57 # fn {_, {worker_pid, {_, used_by, crf, last_reference}}} when used_by == [] ->
58 # {worker_pid, crf, last_reference} end)
59 unused_conns =
60 Registry.select(
61 @registry,
62 [
63 {{:_, :"$1", {:_, :"$2", :"$3", :"$4"}}, [{:==, :"$2", []}],
64 [{{:"$1", :"$3", :"$4"}}]}
65 ]
66 )
67
68 case unused_conns do
69 [] ->
70 :telemetry.execute(
71 [:pleroma, :connection_pool, :reclaim, :stop],
72 %{reclaimed_count: 0},
73 %{
74 max_connections: max_connections
75 }
76 )
77
78 exit(:no_unused_conns)
79
80 unused_conns ->
81 reclaimed =
82 unused_conns
83 |> Enum.sort(fn {_pid1, crf1, last_reference1},
84 {_pid2, crf2, last_reference2} ->
85 crf1 <= crf2 and last_reference1 <= last_reference2
86 end)
87 |> Enum.take(reclaim_max)
88
89 reclaimed
90 |> Enum.each(fn {pid, _, _} ->
91 DynamicSupervisor.terminate_child(__MODULE__, pid)
92 end)
93
94 :telemetry.execute(
95 [:pleroma, :connection_pool, :reclaim, :stop],
96 %{reclaimed_count: Enum.count(reclaimed)},
97 %{max_connections: max_connections}
98 )
99 end
100 end)
101
102 wait_for_enforcer_finish(pid)
103
104 [{pid, _}] ->
105 wait_for_enforcer_finish(pid)
106 end
107 end
108
109 defp wait_for_enforcer_finish(pid) do
110 ref = Process.monitor(pid)
111
112 receive do
113 {:DOWN, ^ref, :process, ^pid, :no_unused_conns} ->
114 :error
115
116 {:DOWN, ^ref, :process, ^pid, :normal} ->
117 :ok
118 end
119 end
120 end