refactoring for gun api modules
[akkoma] / lib / pleroma / pool / connections.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Pool.Connections do
6 use GenServer
7
8 alias Pleroma.Config
9
10 require Logger
11
12 @type domain :: String.t()
13 @type conn :: Pleroma.Gun.Conn.t()
14
15 @type t :: %__MODULE__{
16 conns: %{domain() => conn()},
17 opts: keyword()
18 }
19
20 defstruct conns: %{}, opts: []
21
22 alias Pleroma.Gun
23
24 @spec start_link({atom(), keyword()}) :: {:ok, pid()}
25 def start_link({name, opts}) do
26 GenServer.start_link(__MODULE__, opts, name: name)
27 end
28
29 @impl true
30 def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
31
32 @spec checkin(String.t() | URI.t(), atom()) :: pid() | nil
33 def checkin(url, name)
34 def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name)
35
36 def checkin(%URI{} = uri, name) do
37 timeout = Config.get([:connections_pool, :checkin_timeout], 250)
38
39 GenServer.call(name, {:checkin, uri}, timeout)
40 end
41
42 @spec alive?(atom()) :: boolean()
43 def alive?(name) do
44 if pid = Process.whereis(name) do
45 Process.alive?(pid)
46 else
47 false
48 end
49 end
50
51 @spec get_state(atom()) :: t()
52 def get_state(name) do
53 GenServer.call(name, :state)
54 end
55
56 @spec count(atom()) :: pos_integer()
57 def count(name) do
58 GenServer.call(name, :count)
59 end
60
61 @spec get_unused_conns(atom()) :: [{domain(), conn()}]
62 def get_unused_conns(name) do
63 GenServer.call(name, :unused_conns)
64 end
65
66 @spec checkout(pid(), pid(), atom()) :: :ok
67 def checkout(conn, pid, name) do
68 GenServer.cast(name, {:checkout, conn, pid})
69 end
70
71 @spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok
72 def add_conn(name, key, conn) do
73 GenServer.cast(name, {:add_conn, key, conn})
74 end
75
76 @spec remove_conn(atom(), String.t()) :: :ok
77 def remove_conn(name, key) do
78 GenServer.cast(name, {:remove_conn, key})
79 end
80
81 @impl true
82 def handle_cast({:add_conn, key, conn}, state) do
83 state = put_in(state.conns[key], conn)
84
85 Process.monitor(conn.conn)
86 {:noreply, state}
87 end
88
89 @impl true
90 def handle_cast({:checkout, conn_pid, pid}, state) do
91 Logger.debug("checkout #{inspect(conn_pid)}")
92
93 state =
94 with true <- Process.alive?(conn_pid),
95 {key, conn} <- find_conn(state.conns, conn_pid),
96 used_by <- List.keydelete(conn.used_by, pid, 0) do
97 conn_state =
98 if used_by == [] do
99 :idle
100 else
101 conn.conn_state
102 end
103
104 put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
105 else
106 false ->
107 Logger.debug("checkout for closed conn #{inspect(conn_pid)}")
108 state
109
110 nil ->
111 Logger.debug("checkout for alive conn #{inspect(conn_pid)}, but is not in state")
112 state
113 end
114
115 {:noreply, state}
116 end
117
118 @impl true
119 def handle_cast({:remove_conn, key}, state) do
120 state = put_in(state.conns, Map.delete(state.conns, key))
121 {:noreply, state}
122 end
123
124 @impl true
125 def handle_call({:checkin, uri}, from, state) do
126 key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
127 Logger.debug("checkin #{key}")
128
129 case state.conns[key] do
130 %{conn: conn, gun_state: :up} = current_conn ->
131 Logger.debug("reusing conn #{key}")
132
133 time = :os.system_time(:second)
134 last_reference = time - current_conn.last_reference
135 current_crf = crf(last_reference, 100, current_conn.crf)
136
137 state =
138 put_in(state.conns[key], %{
139 current_conn
140 | last_reference: time,
141 crf: current_crf,
142 conn_state: :active,
143 used_by: [from | current_conn.used_by]
144 })
145
146 {:reply, conn, state}
147
148 %{gun_state: :down} ->
149 {:reply, nil, state}
150
151 nil ->
152 {:reply, nil, state}
153 end
154 end
155
156 @impl true
157 def handle_call(:state, _from, state), do: {:reply, state, state}
158
159 @impl true
160 def handle_call(:count, _from, state) do
161 {:reply, Enum.count(state.conns), state}
162 end
163
164 @impl true
165 def handle_call(:unused_conns, _from, state) do
166 unused_conns =
167 state.conns
168 |> Enum.filter(fn {_k, v} ->
169 v.conn_state == :idle and v.used_by == []
170 end)
171 |> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
172 x.crf <= y.crf and x.last_reference <= y.last_reference
173 end)
174
175 {:reply, unused_conns, state}
176 end
177
178 @impl true
179 def handle_info({:gun_up, conn_pid, _protocol}, state) do
180 state =
181 with conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid),
182 {key, conn} <- find_conn(state.conns, conn_pid, conn_key),
183 {true, key} <- {Process.alive?(conn_pid), key} do
184 time = :os.system_time(:second)
185 last_reference = time - conn.last_reference
186 current_crf = crf(last_reference, 100, conn.crf)
187
188 put_in(state.conns[key], %{
189 conn
190 | gun_state: :up,
191 last_reference: time,
192 crf: current_crf,
193 conn_state: :active,
194 retries: 0
195 })
196 else
197 :error_gun_info ->
198 Logger.debug(":gun.info caused error")
199 state
200
201 {false, key} ->
202 Logger.debug(":gun_up message for closed conn #{inspect(conn_pid)}")
203
204 put_in(
205 state.conns,
206 Map.delete(state.conns, key)
207 )
208
209 nil ->
210 Logger.debug(":gun_up message for conn which is not found in state")
211
212 :ok = Gun.close(conn_pid)
213
214 state
215 end
216
217 {:noreply, state}
218 end
219
220 @impl true
221 def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do
222 retries = Config.get([:connections_pool, :retry], 1)
223 # we can't get info on this pid, because pid is dead
224 state =
225 with {key, conn} <- find_conn(state.conns, conn_pid),
226 {true, key} <- {Process.alive?(conn_pid), key} do
227 if conn.retries == retries do
228 Logger.debug("closing conn if retries is eq #{inspect(conn_pid)}")
229 :ok = Gun.close(conn.conn)
230
231 put_in(
232 state.conns,
233 Map.delete(state.conns, key)
234 )
235 else
236 put_in(state.conns[key], %{
237 conn
238 | gun_state: :down,
239 retries: conn.retries + 1
240 })
241 end
242 else
243 {false, key} ->
244 # gun can send gun_down for closed conn, maybe connection is not closed yet
245 Logger.debug(":gun_down message for closed conn #{inspect(conn_pid)}")
246
247 put_in(
248 state.conns,
249 Map.delete(state.conns, key)
250 )
251
252 nil ->
253 Logger.debug(":gun_down message for conn which is not found in state")
254
255 :ok = Gun.close(conn_pid)
256
257 state
258 end
259
260 {:noreply, state}
261 end
262
263 @impl true
264 def handle_info({:DOWN, _ref, :process, conn_pid, reason}, state) do
265 Logger.debug("received DOWM message for #{inspect(conn_pid)} reason -> #{inspect(reason)}")
266
267 state =
268 with {key, conn} <- find_conn(state.conns, conn_pid) do
269 Enum.each(conn.used_by, fn {pid, _ref} ->
270 Process.exit(pid, reason)
271 end)
272
273 put_in(
274 state.conns,
275 Map.delete(state.conns, key)
276 )
277 else
278 nil ->
279 Logger.debug(":DOWN message for conn which is not found in state")
280
281 state
282 end
283
284 {:noreply, state}
285 end
286
287 defp compose_key_gun_info(pid) do
288 try do
289 # sometimes :gun.info can raise MatchError, which lead to pool terminate
290 %{origin_host: origin_host, origin_scheme: scheme, origin_port: port} = Gun.info(pid)
291
292 host =
293 case :inet.ntoa(origin_host) do
294 {:error, :einval} -> origin_host
295 ip -> ip
296 end
297
298 "#{scheme}:#{host}:#{port}"
299 rescue
300 _ -> :error_gun_info
301 end
302 end
303
304 defp find_conn(conns, conn_pid) do
305 Enum.find(conns, fn {_key, conn} ->
306 conn.conn == conn_pid
307 end)
308 end
309
310 defp find_conn(conns, conn_pid, conn_key) do
311 Enum.find(conns, fn {key, conn} ->
312 key == conn_key and conn.conn == conn_pid
313 end)
314 end
315
316 def crf(current, steps, crf) do
317 1 + :math.pow(0.5, current / steps) * crf
318 end
319
320 def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do
321 "#{scheme}://#{host}#{path}"
322 end
323 end