ebde4bbf6f2bdacf44a21e9b310d6bb75c073662
[akkoma] / lib / pleroma / gun / connection_pool / worker.ex
1 defmodule Pleroma.Gun.ConnectionPool.Worker do
2 alias Pleroma.Gun
3 use GenServer
4
5 @registry Pleroma.Gun.ConnectionPool
6
7 @impl true
8 def init([uri, key, opts, client_pid]) do
9 time = :os.system_time(:second)
10 # Register before opening connection to prevent race conditions
11 with {:ok, _owner} <- Registry.register(@registry, key, {nil, [client_pid], 1, time}),
12 {:ok, conn_pid} <- Gun.Conn.open(uri, opts),
13 Process.link(conn_pid) do
14 {_, _} =
15 Registry.update_value(@registry, key, fn {_, used_by, crf, last_reference} ->
16 {conn_pid, used_by, crf, last_reference}
17 end)
18
19 send(client_pid, {:conn_pid, conn_pid})
20 {:ok, %{key: key, timer: nil}, :hibernate}
21 else
22 err -> {:stop, err}
23 end
24 end
25
26 @impl true
27 def handle_cast({:add_client, client_pid, send_pid_back}, %{key: key} = state) do
28 time = :os.system_time(:second)
29
30 {{conn_pid, _, _, _}, _} =
31 Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} ->
32 {conn_pid, [client_pid | used_by], crf(time - last_reference, crf), time}
33 end)
34
35 if send_pid_back, do: send(client_pid, {:conn_pid, conn_pid})
36
37 state =
38 if state.timer != nil do
39 Process.cancel_timer(state[:timer])
40 %{state | timer: nil}
41 else
42 state
43 end
44
45 {:noreply, state, :hibernate}
46 end
47
48 @impl true
49 def handle_cast({:remove_client, client_pid}, %{key: key} = state) do
50 {{_conn_pid, used_by, _crf, _last_reference}, _} =
51 Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} ->
52 {conn_pid, List.delete(used_by, client_pid), crf, last_reference}
53 end)
54
55 timer =
56 if used_by == [] do
57 max_idle = Pleroma.Config.get([:connections_pool, :max_idle_time], 30_000)
58 Process.send_after(self(), :idle_close, max_idle)
59 else
60 nil
61 end
62
63 {:noreply, %{state | timer: timer}, :hibernate}
64 end
65
66 @impl true
67 def handle_info(:idle_close, state) do
68 # Gun monitors the owner process, and will close the connection automatically
69 # when it's terminated
70 {:stop, :normal, state}
71 end
72
73 # Gracefully shutdown if the connection got closed without any streams left
74 @impl true
75 def handle_info({:gun_down, _pid, _protocol, _reason, []}, state) do
76 {:stop, :normal, state}
77 end
78
79 # Otherwise, shutdown with an error
80 @impl true
81 def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams} = down_message, state) do
82 {:stop, {:error, down_message}, state}
83 end
84
85 @impl true
86 def handle_call(:idle_close, _, %{key: key} = state) do
87 Registry.unregister(@registry, key)
88 {:stop, :normal, state}
89 end
90
91 # LRFU policy: https://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.55.1478
92 defp crf(time_delta, prev_crf) do
93 1 + :math.pow(0.5, time_delta / 100) * prev_crf
94 end
95 end