d20927580af1f63c2890c25f37c19b69fc19859b
[akkoma] / lib / pleroma / pool / connections.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Pool.Connections do
6 use GenServer
7
8 alias Pleroma.Config
9
10 require Logger
11
12 @type domain :: String.t()
13 @type conn :: Pleroma.Gun.Conn.t()
14
15 @type t :: %__MODULE__{
16 conns: %{domain() => conn()},
17 opts: keyword()
18 }
19
20 defstruct conns: %{}, opts: []
21
22 alias Pleroma.Gun.API
23 alias Pleroma.Gun.Conn
24
25 @spec start_link({atom(), keyword()}) :: {:ok, pid()}
26 def start_link({name, opts}) do
27 GenServer.start_link(__MODULE__, opts, name: name)
28 end
29
30 @impl true
31 def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
32
33 @spec checkin(String.t() | URI.t(), atom()) :: pid() | nil
34 def checkin(url, name)
35 def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name)
36
37 def checkin(%URI{} = uri, name) do
38 timeout = Config.get([:connections_pool, :receive_connection_timeout], 250)
39
40 GenServer.call(
41 name,
42 {:checkin, uri},
43 timeout
44 )
45 end
46
47 @spec open_conn(String.t() | URI.t(), atom(), keyword()) :: :ok
48 def open_conn(url, name, opts \\ [])
49 def open_conn(url, name, opts) when is_binary(url), do: open_conn(URI.parse(url), name, opts)
50
51 def open_conn(%URI{} = uri, name, opts) do
52 pool_opts = Config.get([:connections_pool], [])
53
54 opts =
55 opts
56 |> Enum.into(%{})
57 |> Map.put_new(:retry, pool_opts[:retry] || 0)
58 |> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 100)
59 |> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000)
60
61 GenServer.cast(name, {:open_conn, %{opts: opts, uri: uri}})
62 end
63
64 @spec alive?(atom()) :: boolean()
65 def alive?(name) do
66 pid = Process.whereis(name)
67 if pid, do: Process.alive?(pid), else: false
68 end
69
70 @spec get_state(atom()) :: t()
71 def get_state(name) do
72 GenServer.call(name, :state)
73 end
74
75 @spec checkout(pid(), pid(), atom()) :: :ok
76 def checkout(conn, pid, name) do
77 GenServer.cast(name, {:checkout, conn, pid})
78 end
79
80 @impl true
81 def handle_cast({:open_conn, %{opts: opts, uri: uri}}, state) do
82 Logger.debug("opening new #{compose_uri(uri)}")
83 max_connections = state.opts[:max_connections]
84
85 key = compose_key(uri)
86
87 if Enum.count(state.conns) < max_connections do
88 open_conn(key, uri, state, opts)
89 else
90 try_to_open_conn(key, uri, state, opts)
91 end
92 end
93
94 @impl true
95 def handle_cast({:checkout, conn_pid, pid}, state) do
96 Logger.debug("checkout #{inspect(conn_pid)}")
97
98 state =
99 with true <- Process.alive?(conn_pid),
100 {key, conn} <- find_conn(state.conns, conn_pid),
101 used_by <- List.keydelete(conn.used_by, pid, 0) do
102 conn_state =
103 if used_by == [] do
104 :idle
105 else
106 conn.conn_state
107 end
108
109 put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
110 else
111 false ->
112 Logger.debug("checkout for closed conn #{inspect(conn_pid)}")
113 state
114
115 nil ->
116 Logger.debug("checkout for alive conn #{inspect(conn_pid)}, but is not in state")
117 state
118 end
119
120 {:noreply, state}
121 end
122
123 @impl true
124 def handle_call({:checkin, uri}, from, state) do
125 Logger.debug("checkin #{compose_uri(uri)}")
126 key = compose_key(uri)
127
128 case state.conns[key] do
129 %{conn: conn, gun_state: gun_state} = current_conn when gun_state == :up ->
130 Logger.debug("reusing conn #{compose_uri(uri)}")
131
132 with time <- :os.system_time(:second),
133 last_reference <- time - current_conn.last_reference,
134 current_crf <- crf(last_reference, 100, current_conn.crf),
135 state <-
136 put_in(state.conns[key], %{
137 current_conn
138 | last_reference: time,
139 crf: current_crf,
140 conn_state: :active,
141 used_by: [from | current_conn.used_by]
142 }) do
143 {:reply, conn, state}
144 end
145
146 %{gun_state: gun_state} when gun_state == :down ->
147 {:reply, nil, state}
148
149 nil ->
150 {:reply, nil, state}
151 end
152 end
153
154 @impl true
155 def handle_call(:state, _from, state), do: {:reply, state, state}
156
157 @impl true
158 def handle_info({:gun_up, conn_pid, _protocol}, state) do
159 state =
160 with true <- Process.alive?(conn_pid),
161 conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid),
162 {key, conn} <- find_conn(state.conns, conn_pid, conn_key),
163 time <- :os.system_time(:second),
164 last_reference <- time - conn.last_reference,
165 current_crf <- crf(last_reference, 100, conn.crf) do
166 put_in(state.conns[key], %{
167 conn
168 | gun_state: :up,
169 last_reference: time,
170 crf: current_crf,
171 conn_state: :active,
172 retries: 0
173 })
174 else
175 :error_gun_info ->
176 Logger.debug(":gun.info caused error")
177 state
178
179 false ->
180 Logger.debug(":gun_up message for closed conn #{inspect(conn_pid)}")
181 state
182
183 nil ->
184 Logger.debug(
185 ":gun_up message for alive conn #{inspect(conn_pid)}, but deleted from state"
186 )
187
188 :ok = API.close(conn_pid)
189
190 state
191 end
192
193 {:noreply, state}
194 end
195
196 @impl true
197 def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do
198 retries = Config.get([:connections_pool, :retry], 0)
199 # we can't get info on this pid, because pid is dead
200 state =
201 with true <- Process.alive?(conn_pid),
202 {key, conn} <- find_conn(state.conns, conn_pid) do
203 if conn.retries == retries do
204 Logger.debug("closing conn if retries is eq #{inspect(conn_pid)}")
205 :ok = API.close(conn.conn)
206
207 put_in(
208 state.conns,
209 Map.delete(state.conns, key)
210 )
211 else
212 put_in(state.conns[key], %{
213 conn
214 | gun_state: :down,
215 retries: conn.retries + 1
216 })
217 end
218 else
219 false ->
220 # gun can send gun_down for closed conn, maybe connection is not closed yet
221 Logger.debug(":gun_down message for closed conn #{inspect(conn_pid)}")
222 state
223
224 nil ->
225 Logger.debug(
226 ":gun_down message for alive conn #{inspect(conn_pid)}, but deleted from state"
227 )
228
229 :ok = API.close(conn_pid)
230
231 state
232 end
233
234 {:noreply, state}
235 end
236
237 defp compose_key(%URI{scheme: scheme, host: host, port: port}), do: "#{scheme}:#{host}:#{port}"
238
239 defp compose_key_gun_info(pid) do
240 try do
241 # sometimes :gun.info can raise MatchError, which lead to pool terminate
242 %{origin_host: origin_host, origin_scheme: scheme, origin_port: port} = API.info(pid)
243
244 host =
245 case :inet.ntoa(origin_host) do
246 {:error, :einval} -> origin_host
247 ip -> ip
248 end
249
250 "#{scheme}:#{host}:#{port}"
251 rescue
252 _ -> :error_gun_info
253 end
254 end
255
256 defp find_conn(conns, conn_pid) do
257 Enum.find(conns, fn {_key, conn} ->
258 conn.conn == conn_pid
259 end)
260 end
261
262 defp find_conn(conns, conn_pid, conn_key) do
263 Enum.find(conns, fn {key, conn} ->
264 key == conn_key and conn.conn == conn_pid
265 end)
266 end
267
268 defp open_conn(key, uri, state, %{proxy: {proxy_host, proxy_port}} = opts) do
269 connect_opts =
270 uri
271 |> destination_opts()
272 |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
273
274 with open_opts <- Map.delete(opts, :tls_opts),
275 {:ok, conn} <- API.open(proxy_host, proxy_port, open_opts),
276 {:ok, _} <- API.await_up(conn),
277 stream <- API.connect(conn, connect_opts),
278 {:response, :fin, 200, _} <- API.await(conn, stream),
279 state <-
280 put_in(state.conns[key], %Conn{
281 conn: conn,
282 gun_state: :up,
283 conn_state: :active,
284 last_reference: :os.system_time(:second)
285 }) do
286 {:noreply, state}
287 else
288 error ->
289 Logger.warn(
290 "Received error on opening connection with http proxy #{uri.scheme}://#{
291 compose_uri(uri)
292 }: #{inspect(error)}"
293 )
294
295 {:noreply, state}
296 end
297 end
298
299 defp open_conn(key, uri, state, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
300 version =
301 proxy_type
302 |> to_string()
303 |> String.last()
304 |> case do
305 "4" -> 4
306 _ -> 5
307 end
308
309 socks_opts =
310 uri
311 |> destination_opts()
312 |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
313 |> Map.put(:version, version)
314
315 opts =
316 opts
317 |> Map.put(:protocols, [:socks])
318 |> Map.put(:socks_opts, socks_opts)
319
320 with {:ok, conn} <- API.open(proxy_host, proxy_port, opts),
321 {:ok, _} <- API.await_up(conn),
322 state <-
323 put_in(state.conns[key], %Conn{
324 conn: conn,
325 gun_state: :up,
326 conn_state: :active,
327 last_reference: :os.system_time(:second)
328 }) do
329 {:noreply, state}
330 else
331 error ->
332 Logger.warn(
333 "Received error on opening connection with socks proxy #{uri.scheme}://#{
334 compose_uri(uri)
335 }: #{inspect(error)}"
336 )
337
338 {:noreply, state}
339 end
340 end
341
342 defp open_conn(key, %URI{host: host, port: port} = uri, state, opts) do
343 Logger.debug("opening conn #{compose_uri(uri)}")
344 {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
345
346 with {:ok, conn} <- API.open(host, port, opts),
347 {:ok, _} <- API.await_up(conn),
348 state <-
349 put_in(state.conns[key], %Conn{
350 conn: conn,
351 gun_state: :up,
352 conn_state: :active,
353 last_reference: :os.system_time(:second)
354 }) do
355 Logger.debug("new conn opened #{compose_uri(uri)}")
356 Logger.debug("replying to the call #{compose_uri(uri)}")
357 {:noreply, state}
358 else
359 error ->
360 Logger.warn(
361 "Received error on opening connection #{uri.scheme}://#{compose_uri(uri)}: #{
362 inspect(error)
363 }"
364 )
365
366 {:noreply, state}
367 end
368 end
369
370 defp destination_opts(%URI{host: host, port: port}) do
371 {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
372 %{host: host, port: port}
373 end
374
375 defp add_http2_opts(opts, "https", tls_opts) do
376 Map.merge(opts, %{protocols: [:http2], transport: :tls, tls_opts: tls_opts})
377 end
378
379 defp add_http2_opts(opts, _, _), do: opts
380
381 @spec get_unused_conns(map()) :: [{domain(), conn()}]
382 def get_unused_conns(conns) do
383 conns
384 |> Enum.filter(fn {_k, v} ->
385 v.conn_state == :idle and v.used_by == []
386 end)
387 |> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
388 x.crf <= y.crf and x.last_reference <= y.last_reference
389 end)
390 end
391
392 defp try_to_open_conn(key, uri, state, opts) do
393 Logger.debug("try to open conn #{compose_uri(uri)}")
394
395 with [{close_key, least_used} | _conns] <- get_unused_conns(state.conns),
396 :ok <- API.close(least_used.conn),
397 state <-
398 put_in(
399 state.conns,
400 Map.delete(state.conns, close_key)
401 ) do
402 Logger.debug(
403 "least used conn found and closed #{inspect(least_used.conn)} #{compose_uri(uri)}"
404 )
405
406 open_conn(key, uri, state, opts)
407 else
408 [] -> {:noreply, state}
409 end
410 end
411
412 def crf(current, steps, crf) do
413 1 + :math.pow(0.5, current / steps) * crf
414 end
415
416 def compose_uri(%URI{} = uri), do: "#{uri.host}#{uri.path}"
417 end