removing retry option and changing some logger messages levels
[akkoma] / lib / pleroma / pool / connections.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Pool.Connections do
6 use GenServer
7
8 require Logger
9
10 @type domain :: String.t()
11 @type conn :: Pleroma.Gun.Conn.t()
12
13 @type t :: %__MODULE__{
14 conns: %{domain() => conn()},
15 opts: keyword()
16 }
17
18 defstruct conns: %{}, opts: []
19
20 alias Pleroma.Gun.API
21 alias Pleroma.Gun.Conn
22
23 @spec start_link({atom(), keyword()}) :: {:ok, pid()}
24 def start_link({name, opts}) do
25 GenServer.start_link(__MODULE__, opts, name: name)
26 end
27
28 @impl true
29 def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
30
31 @spec checkin(String.t() | URI.t(), atom()) :: pid() | nil
32 def checkin(url, name)
33 def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name)
34
35 def checkin(%URI{} = uri, name) do
36 timeout = Pleroma.Config.get([:connections_pool, :receive_connection_timeout], 250)
37
38 GenServer.call(
39 name,
40 {:checkin, uri},
41 timeout
42 )
43 end
44
45 @spec open_conn(String.t() | URI.t(), atom(), keyword()) :: :ok
46 def open_conn(url, name, opts \\ [])
47 def open_conn(url, name, opts) when is_binary(url), do: open_conn(URI.parse(url), name, opts)
48
49 def open_conn(%URI{} = uri, name, opts) do
50 pool_opts = Pleroma.Config.get([:connections_pool], [])
51
52 opts =
53 opts
54 |> Enum.into(%{})
55 |> Map.put_new(:retry, pool_opts[:retry] || 0)
56 |> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 100)
57 |> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000)
58
59 GenServer.cast(name, {:open_conn, %{opts: opts, uri: uri}})
60 end
61
62 @spec alive?(atom()) :: boolean()
63 def alive?(name) do
64 pid = Process.whereis(name)
65 if pid, do: Process.alive?(pid), else: false
66 end
67
68 @spec get_state(atom()) :: t()
69 def get_state(name) do
70 GenServer.call(name, :state)
71 end
72
73 @spec checkout(pid(), pid(), atom()) :: :ok
74 def checkout(conn, pid, name) do
75 GenServer.cast(name, {:checkout, conn, pid})
76 end
77
78 @impl true
79 def handle_cast({:open_conn, %{opts: opts, uri: uri}}, state) do
80 Logger.debug("opening new #{compose_uri(uri)}")
81 max_connections = state.opts[:max_connections]
82
83 key = compose_key(uri)
84
85 if Enum.count(state.conns) < max_connections do
86 open_conn(key, uri, state, opts)
87 else
88 try_to_open_conn(key, uri, state, opts)
89 end
90 end
91
92 @impl true
93 def handle_cast({:checkout, conn_pid, pid}, state) do
94 Logger.debug("checkout #{inspect(conn_pid)}")
95
96 state =
97 with true <- Process.alive?(conn_pid),
98 {key, conn} <- find_conn(state.conns, conn_pid),
99 used_by <- List.keydelete(conn.used_by, pid, 0) do
100 conn_state =
101 if used_by == [] do
102 :idle
103 else
104 conn.conn_state
105 end
106
107 put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
108 else
109 false ->
110 Logger.debug("checkout for closed conn #{inspect(conn_pid)}")
111 state
112
113 nil ->
114 Logger.debug("checkout for alive conn #{inspect(conn_pid)}, but is not in state")
115 state
116 end
117
118 {:noreply, state}
119 end
120
121 @impl true
122 def handle_call({:checkin, uri}, from, state) do
123 Logger.debug("checkin #{compose_uri(uri)}")
124 key = compose_key(uri)
125
126 case state.conns[key] do
127 %{conn: conn, gun_state: gun_state} = current_conn when gun_state == :up ->
128 Logger.debug("reusing conn #{compose_uri(uri)}")
129
130 with time <- :os.system_time(:second),
131 last_reference <- time - current_conn.last_reference,
132 current_crf <- crf(last_reference, 100, current_conn.crf),
133 state <-
134 put_in(state.conns[key], %{
135 current_conn
136 | last_reference: time,
137 crf: current_crf,
138 conn_state: :active,
139 used_by: [from | current_conn.used_by]
140 }) do
141 {:reply, conn, state}
142 end
143
144 %{gun_state: gun_state} when gun_state == :down ->
145 {:reply, nil, state}
146
147 nil ->
148 {:reply, nil, state}
149 end
150 end
151
152 @impl true
153 def handle_call(:state, _from, state), do: {:reply, state, state}
154
155 @impl true
156 def handle_info({:gun_up, conn_pid, _protocol}, state) do
157 state =
158 with true <- Process.alive?(conn_pid),
159 conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid),
160 {key, conn} <- find_conn(state.conns, conn_pid, conn_key),
161 time <- :os.system_time(:second),
162 last_reference <- time - conn.last_reference,
163 current_crf <- crf(last_reference, 100, conn.crf) do
164 put_in(state.conns[key], %{
165 conn
166 | gun_state: :up,
167 last_reference: time,
168 crf: current_crf,
169 conn_state: :active,
170 retries: 0
171 })
172 else
173 :error_gun_info ->
174 Logger.debug(":gun.info caused error")
175 state
176
177 false ->
178 Logger.debug(":gun_up message for closed conn #{inspect(conn_pid)}")
179 state
180
181 nil ->
182 Logger.debug(
183 ":gun_up message for alive conn #{inspect(conn_pid)}, but deleted from state"
184 )
185
186 :ok = API.close(conn_pid)
187
188 state
189 end
190
191 {:noreply, state}
192 end
193
194 @impl true
195 def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do
196 # we can't get info on this pid, because pid is dead
197 state =
198 with true <- Process.alive?(conn_pid),
199 {key, conn} <- find_conn(state.conns, conn_pid) do
200 if conn.retries == 5 do
201 Logger.debug("closing conn if retries is eq 5 #{inspect(conn_pid)}")
202 :ok = API.close(conn.conn)
203
204 put_in(
205 state.conns,
206 Map.delete(state.conns, key)
207 )
208 else
209 put_in(state.conns[key], %{
210 conn
211 | gun_state: :down,
212 retries: conn.retries + 1
213 })
214 end
215 else
216 false ->
217 # gun can send gun_down for closed conn, maybe connection is not closed yet
218 Logger.debug(":gun_down message for closed conn #{inspect(conn_pid)}")
219 state
220
221 nil ->
222 Logger.debug(
223 ":gun_down message for alive conn #{inspect(conn_pid)}, but deleted from state"
224 )
225
226 :ok = API.close(conn_pid)
227
228 state
229 end
230
231 {:noreply, state}
232 end
233
234 defp compose_key(%URI{scheme: scheme, host: host, port: port}), do: "#{scheme}:#{host}:#{port}"
235
236 defp compose_key_gun_info(pid) do
237 try do
238 # sometimes :gun.info can raise MatchError, which lead to pool terminate
239 %{origin_host: origin_host, origin_scheme: scheme, origin_port: port} = API.info(pid)
240
241 host =
242 case :inet.ntoa(origin_host) do
243 {:error, :einval} -> origin_host
244 ip -> ip
245 end
246
247 "#{scheme}:#{host}:#{port}"
248 rescue
249 _ -> :error_gun_info
250 end
251 end
252
253 defp find_conn(conns, conn_pid) do
254 Enum.find(conns, fn {_key, conn} ->
255 conn.conn == conn_pid
256 end)
257 end
258
259 defp find_conn(conns, conn_pid, conn_key) do
260 Enum.find(conns, fn {key, conn} ->
261 key == conn_key and conn.conn == conn_pid
262 end)
263 end
264
265 defp open_conn(key, uri, state, %{proxy: {proxy_host, proxy_port}} = opts) do
266 connect_opts =
267 uri
268 |> destination_opts()
269 |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
270
271 with open_opts <- Map.delete(opts, :tls_opts),
272 {:ok, conn} <- API.open(proxy_host, proxy_port, open_opts),
273 {:ok, _} <- API.await_up(conn),
274 stream <- API.connect(conn, connect_opts),
275 {:response, :fin, 200, _} <- API.await(conn, stream),
276 state <-
277 put_in(state.conns[key], %Conn{
278 conn: conn,
279 gun_state: :up,
280 conn_state: :active,
281 last_reference: :os.system_time(:second)
282 }) do
283 {:noreply, state}
284 else
285 error ->
286 Logger.warn(
287 "Received error on opening connection with http proxy #{uri.scheme}://#{
288 compose_uri(uri)
289 }: #{inspect(error)}"
290 )
291
292 {:noreply, state}
293 end
294 end
295
296 defp open_conn(key, uri, state, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
297 version =
298 proxy_type
299 |> to_string()
300 |> String.last()
301 |> case do
302 "4" -> 4
303 _ -> 5
304 end
305
306 socks_opts =
307 uri
308 |> destination_opts()
309 |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
310 |> Map.put(:version, version)
311
312 opts =
313 opts
314 |> Map.put(:protocols, [:socks])
315 |> Map.put(:socks_opts, socks_opts)
316
317 with {:ok, conn} <- API.open(proxy_host, proxy_port, opts),
318 {:ok, _} <- API.await_up(conn),
319 state <-
320 put_in(state.conns[key], %Conn{
321 conn: conn,
322 gun_state: :up,
323 conn_state: :active,
324 last_reference: :os.system_time(:second)
325 }) do
326 {:noreply, state}
327 else
328 error ->
329 Logger.warn(
330 "Received error on opening connection with socks proxy #{uri.scheme}://#{
331 compose_uri(uri)
332 }: #{inspect(error)}"
333 )
334
335 {:noreply, state}
336 end
337 end
338
339 defp open_conn(key, %URI{host: host, port: port} = uri, state, opts) do
340 Logger.debug("opening conn #{compose_uri(uri)}")
341 {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
342
343 with {:ok, conn} <- API.open(host, port, opts),
344 {:ok, _} <- API.await_up(conn),
345 state <-
346 put_in(state.conns[key], %Conn{
347 conn: conn,
348 gun_state: :up,
349 conn_state: :active,
350 last_reference: :os.system_time(:second)
351 }) do
352 Logger.debug("new conn opened #{compose_uri(uri)}")
353 Logger.debug("replying to the call #{compose_uri(uri)}")
354 {:noreply, state}
355 else
356 error ->
357 Logger.warn(
358 "Received error on opening connection #{uri.scheme}://#{compose_uri(uri)}: #{
359 inspect(error)
360 }"
361 )
362
363 {:noreply, state}
364 end
365 end
366
367 defp destination_opts(%URI{host: host, port: port}) do
368 {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
369 %{host: host, port: port}
370 end
371
372 defp add_http2_opts(opts, "https", tls_opts) do
373 Map.merge(opts, %{protocols: [:http2], transport: :tls, tls_opts: tls_opts})
374 end
375
376 defp add_http2_opts(opts, _, _), do: opts
377
378 @spec get_unused_conns(map()) :: [{domain(), conn()}]
379 def get_unused_conns(conns) do
380 conns
381 |> Enum.filter(fn {_k, v} ->
382 v.conn_state == :idle and v.used_by == []
383 end)
384 |> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
385 x.crf <= y.crf and x.last_reference <= y.last_reference
386 end)
387 end
388
389 defp try_to_open_conn(key, uri, state, opts) do
390 Logger.debug("try to open conn #{compose_uri(uri)}")
391
392 with [{close_key, least_used} | _conns] <- get_unused_conns(state.conns),
393 :ok <- API.close(least_used.conn),
394 state <-
395 put_in(
396 state.conns,
397 Map.delete(state.conns, close_key)
398 ) do
399 Logger.debug(
400 "least used conn found and closed #{inspect(least_used.conn)} #{compose_uri(uri)}"
401 )
402
403 open_conn(key, uri, state, opts)
404 else
405 [] -> {:noreply, state}
406 end
407 end
408
409 def crf(current, steps, crf) do
410 1 + :math.pow(0.5, current / steps) * crf
411 end
412
413 def compose_uri(%URI{} = uri), do: "#{uri.host}#{uri.path}"
414 end