5cb8d488a99fce63736d8aff1e2e493e2b849684
[akkoma] / lib / pleroma / gun / connection_pool / worker_supervisor.ex
1 defmodule Pleroma.Gun.ConnectionPool.WorkerSupervisor do
2 @moduledoc "Supervisor for pool workers. Does not do anything except enforce max connection limit"
3
4 use DynamicSupervisor
5
6 def start_link(opts) do
7 DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__)
8 end
9
10 def init(_opts) do
11 DynamicSupervisor.init(
12 strategy: :one_for_one,
13 max_children: Pleroma.Config.get([:connections_pool, :max_connections])
14 )
15 end
16
17 def start_worker(opts, retry \\ false) do
18 case DynamicSupervisor.start_child(__MODULE__, {Pleroma.Gun.ConnectionPool.Worker, opts}) do
19 {:error, :max_children} ->
20 if retry or free_pool() == :error do
21 :telemetry.execute([:pleroma, :connection_pool, :provision_failure], %{opts: opts})
22 {:error, :pool_full}
23 else
24 start_worker(opts, true)
25 end
26
27 res ->
28 res
29 end
30 end
31
32 @registry Pleroma.Gun.ConnectionPool
33 @enforcer_key "enforcer"
34 defp free_pool do
35 case Registry.lookup(@registry, @enforcer_key) do
36 [] ->
37 pid =
38 spawn(fn ->
39 {:ok, _pid} = Registry.register(@registry, @enforcer_key, nil)
40 max_connections = Pleroma.Config.get([:connections_pool, :max_connections])
41
42 reclaim_max =
43 [:connections_pool, :reclaim_multiplier]
44 |> Pleroma.Config.get()
45 |> Kernel.*(max_connections)
46 |> round
47 |> max(1)
48
49 :telemetry.execute([:pleroma, :connection_pool, :reclaim, :start], %{}, %{
50 max_connections: max_connections,
51 reclaim_max: reclaim_max
52 })
53
54 # :ets.fun2ms(
55 # fn {_, {worker_pid, {_, used_by, crf, last_reference}}} when used_by == [] ->
56 # {worker_pid, crf, last_reference} end)
57 unused_conns =
58 Registry.select(
59 @registry,
60 [
61 {{:_, :"$1", {:_, :"$2", :"$3", :"$4"}}, [{:==, :"$2", []}],
62 [{{:"$1", :"$3", :"$4"}}]}
63 ]
64 )
65
66 case unused_conns do
67 [] ->
68 :telemetry.execute(
69 [:pleroma, :connection_pool, :reclaim, :stop],
70 %{reclaimed_count: 0},
71 %{
72 max_connections: max_connections
73 }
74 )
75
76 exit(:no_unused_conns)
77
78 unused_conns ->
79 reclaimed =
80 unused_conns
81 |> Enum.sort(fn {_pid1, crf1, last_reference1},
82 {_pid2, crf2, last_reference2} ->
83 crf1 <= crf2 and last_reference1 <= last_reference2
84 end)
85 |> Enum.take(reclaim_max)
86
87 reclaimed
88 |> Enum.each(fn {pid, _, _} ->
89 DynamicSupervisor.terminate_child(__MODULE__, pid)
90 end)
91
92 :telemetry.execute(
93 [:pleroma, :connection_pool, :reclaim, :stop],
94 %{reclaimed_count: Enum.count(reclaimed)},
95 %{max_connections: max_connections}
96 )
97 end
98 end)
99
100 wait_for_enforcer_finish(pid)
101
102 [{pid, _}] ->
103 wait_for_enforcer_finish(pid)
104 end
105 end
106
107 defp wait_for_enforcer_finish(pid) do
108 ref = Process.monitor(pid)
109
110 receive do
111 {:DOWN, ^ref, :process, ^pid, :no_unused_conns} ->
112 :error
113
114 {:DOWN, ^ref, :process, ^pid, :normal} ->
115 :ok
116 end
117 end
118 end