Spelling
[akkoma] / lib / pleroma / pool / connections.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Pool.Connections do
6 use GenServer
7
8 require Logger
9
10 @type domain :: String.t()
11 @type conn :: Pleroma.Gun.Conn.t()
12
13 @type t :: %__MODULE__{
14 conns: %{domain() => conn()},
15 opts: keyword()
16 }
17
18 defstruct conns: %{}, opts: []
19
20 alias Pleroma.Gun.API
21 alias Pleroma.Gun.Conn
22
23 @spec start_link({atom(), keyword()}) :: {:ok, pid()}
24 def start_link({name, opts}) do
25 GenServer.start_link(__MODULE__, opts, name: name)
26 end
27
28 @impl true
29 def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
30
31 @spec checkin(String.t() | URI.t(), atom()) :: pid() | nil
32 def checkin(url, name)
33 def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name)
34
35 def checkin(%URI{} = uri, name) do
36 timeout = Pleroma.Config.get([:connections_pool, :receive_connection_timeout], 250)
37
38 GenServer.call(
39 name,
40 {:checkin, uri},
41 timeout
42 )
43 end
44
45 @spec open_conn(String.t() | URI.t(), atom(), keyword()) :: :ok
46 def open_conn(url, name, opts \\ [])
47 def open_conn(url, name, opts) when is_binary(url), do: open_conn(URI.parse(url), name, opts)
48
49 def open_conn(%URI{} = uri, name, opts) do
50 pool_opts = Pleroma.Config.get([:connections_pool], [])
51
52 opts =
53 opts
54 |> Enum.into(%{})
55 |> Map.put_new(:receive, false)
56 |> Map.put_new(:retry, pool_opts[:retry] || 5)
57 |> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 100)
58 |> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000)
59
60 GenServer.cast(name, {:open_conn, %{opts: opts, uri: uri}})
61 end
62
63 @spec alive?(atom()) :: boolean()
64 def alive?(name) do
65 pid = Process.whereis(name)
66 if pid, do: Process.alive?(pid), else: false
67 end
68
69 @spec get_state(atom()) :: t()
70 def get_state(name) do
71 GenServer.call(name, :state)
72 end
73
74 @spec checkout(pid(), pid(), atom()) :: :ok
75 def checkout(conn, pid, name) do
76 GenServer.cast(name, {:checkout, conn, pid})
77 end
78
79 @impl true
80 def handle_cast({:open_conn, %{opts: opts, uri: uri}}, state) do
81 Logger.debug("opening new #{compose_uri(uri)}")
82 max_connections = state.opts[:max_connections]
83
84 key = compose_key(uri)
85
86 if Enum.count(state.conns) < max_connections do
87 open_conn(key, uri, state, opts)
88 else
89 try_to_open_conn(key, uri, state, opts)
90 end
91 end
92
93 @impl true
94 def handle_cast({:checkout, conn_pid, pid}, state) do
95 Logger.debug("checkout #{inspect(conn_pid)}")
96
97 state =
98 with true <- Process.alive?(conn_pid),
99 {key, conn} <- find_conn(state.conns, conn_pid),
100 used_by <- List.keydelete(conn.used_by, pid, 0) do
101 conn_state =
102 if used_by == [] do
103 :idle
104 else
105 conn.conn_state
106 end
107
108 put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
109 else
110 false ->
111 Logger.warn("checkout for closed conn #{inspect(conn_pid)}")
112 state
113
114 nil ->
115 Logger.info("checkout for alive conn #{inspect(conn_pid)}, but is not in state")
116 state
117 end
118
119 {:noreply, state}
120 end
121
122 @impl true
123 def handle_call({:checkin, uri}, from, state) do
124 Logger.debug("checkin #{compose_uri(uri)}")
125 key = compose_key(uri)
126
127 case state.conns[key] do
128 %{conn: conn, gun_state: gun_state} = current_conn when gun_state == :up ->
129 Logger.debug("reusing conn #{compose_uri(uri)}")
130
131 with time <- :os.system_time(:second),
132 last_reference <- time - current_conn.last_reference,
133 current_crf <- crf(last_reference, 100, current_conn.crf),
134 state <-
135 put_in(state.conns[key], %{
136 current_conn
137 | last_reference: time,
138 crf: current_crf,
139 conn_state: :active,
140 used_by: [from | current_conn.used_by]
141 }) do
142 {:reply, conn, state}
143 end
144
145 %{gun_state: gun_state} when gun_state == :down ->
146 {:reply, nil, state}
147
148 nil ->
149 {:reply, nil, state}
150 end
151 end
152
153 @impl true
154 def handle_call(:state, _from, state), do: {:reply, state, state}
155
156 @impl true
157 def handle_info({:gun_up, conn_pid, _protocol}, state) do
158 state =
159 with true <- Process.alive?(conn_pid),
160 conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid),
161 {key, conn} <- find_conn(state.conns, conn_pid, conn_key),
162 time <- :os.system_time(:second),
163 last_reference <- time - conn.last_reference,
164 current_crf <- crf(last_reference, 100, conn.crf) do
165 put_in(state.conns[key], %{
166 conn
167 | gun_state: :up,
168 last_reference: time,
169 crf: current_crf,
170 conn_state: :active,
171 retries: 0
172 })
173 else
174 :error_gun_info ->
175 Logger.warn(":gun.info caused error")
176 state
177
178 false ->
179 Logger.warn(":gun_up message for closed conn #{inspect(conn_pid)}")
180 state
181
182 nil ->
183 Logger.warn(
184 ":gun_up message for alive conn #{inspect(conn_pid)}, but deleted from state"
185 )
186
187 :ok = API.close(conn_pid)
188
189 state
190 end
191
192 {:noreply, state}
193 end
194
195 @impl true
196 def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do
197 # we can't get info on this pid, because pid is dead
198 state =
199 with true <- Process.alive?(conn_pid),
200 {key, conn} <- find_conn(state.conns, conn_pid) do
201 if conn.retries == 5 do
202 Logger.debug("closing conn if retries is eq 5 #{inspect(conn_pid)}")
203 :ok = API.close(conn.conn)
204
205 put_in(
206 state.conns,
207 Map.delete(state.conns, key)
208 )
209 else
210 put_in(state.conns[key], %{
211 conn
212 | gun_state: :down,
213 retries: conn.retries + 1
214 })
215 end
216 else
217 false ->
218 # gun can send gun_down for closed conn, maybe connection is not closed yet
219 Logger.warn(":gun_down message for closed conn #{inspect(conn_pid)}")
220 state
221
222 nil ->
223 Logger.warn(
224 ":gun_down message for alive conn #{inspect(conn_pid)}, but deleted from state"
225 )
226
227 :ok = API.close(conn_pid)
228
229 state
230 end
231
232 {:noreply, state}
233 end
234
235 defp compose_key(%URI{scheme: scheme, host: host, port: port}), do: "#{scheme}:#{host}:#{port}"
236
237 defp compose_key_gun_info(pid) do
238 try do
239 # sometimes :gun.info can raise MatchError, which lead to pool terminate
240 %{origin_host: origin_host, origin_scheme: scheme, origin_port: port} = API.info(pid)
241
242 host =
243 case :inet.ntoa(origin_host) do
244 {:error, :einval} -> origin_host
245 ip -> ip
246 end
247
248 "#{scheme}:#{host}:#{port}"
249 rescue
250 _ -> :error_gun_info
251 end
252 end
253
254 defp find_conn(conns, conn_pid) do
255 Enum.find(conns, fn {_key, conn} ->
256 conn.conn == conn_pid
257 end)
258 end
259
260 defp find_conn(conns, conn_pid, conn_key) do
261 Enum.find(conns, fn {key, conn} ->
262 key == conn_key and conn.conn == conn_pid
263 end)
264 end
265
266 defp open_conn(key, uri, state, %{proxy: {proxy_host, proxy_port}} = opts) do
267 connect_opts =
268 uri
269 |> destination_opts()
270 |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
271
272 with open_opts <- Map.delete(opts, :tls_opts),
273 {:ok, conn} <- API.open(proxy_host, proxy_port, open_opts),
274 {:ok, _} <- API.await_up(conn),
275 stream <- API.connect(conn, connect_opts),
276 {:response, :fin, 200, _} <- API.await(conn, stream),
277 state <-
278 put_in(state.conns[key], %Conn{
279 conn: conn,
280 gun_state: :up,
281 conn_state: :active,
282 last_reference: :os.system_time(:second)
283 }) do
284 {:noreply, state}
285 else
286 error ->
287 Logger.warn(
288 "Received error on opening connection with http proxy #{uri.scheme}://#{
289 compose_uri(uri)
290 }: #{inspect(error)}"
291 )
292
293 {:noreply, state}
294 end
295 end
296
297 defp open_conn(key, uri, state, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
298 version =
299 proxy_type
300 |> to_string()
301 |> String.last()
302 |> case do
303 "4" -> 4
304 _ -> 5
305 end
306
307 socks_opts =
308 uri
309 |> destination_opts()
310 |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
311 |> Map.put(:version, version)
312
313 opts =
314 opts
315 |> Map.put(:protocols, [:socks])
316 |> Map.put(:socks_opts, socks_opts)
317
318 with {:ok, conn} <- API.open(proxy_host, proxy_port, opts),
319 {:ok, _} <- API.await_up(conn),
320 state <-
321 put_in(state.conns[key], %Conn{
322 conn: conn,
323 gun_state: :up,
324 conn_state: :active,
325 last_reference: :os.system_time(:second)
326 }) do
327 {:noreply, state}
328 else
329 error ->
330 Logger.warn(
331 "Received error on opening connection with socks proxy #{uri.scheme}://#{
332 compose_uri(uri)
333 }: #{inspect(error)}"
334 )
335
336 {:noreply, state}
337 end
338 end
339
340 defp open_conn(key, %URI{host: host, port: port} = uri, state, opts) do
341 Logger.debug("opening conn #{compose_uri(uri)}")
342 {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
343
344 with {:ok, conn} <- API.open(host, port, opts),
345 {:ok, _} <- API.await_up(conn),
346 state <-
347 put_in(state.conns[key], %Conn{
348 conn: conn,
349 gun_state: :up,
350 conn_state: :active,
351 last_reference: :os.system_time(:second)
352 }) do
353 Logger.debug("new conn opened #{compose_uri(uri)}")
354 Logger.debug("replying to the call #{compose_uri(uri)}")
355 {:noreply, state}
356 else
357 error ->
358 Logger.warn(
359 "Received error on opening connection #{uri.scheme}://#{compose_uri(uri)}: #{
360 inspect(error)
361 }"
362 )
363
364 {:noreply, state}
365 end
366 end
367
368 defp destination_opts(%URI{host: host, port: port}) do
369 {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
370 %{host: host, port: port}
371 end
372
373 defp add_http2_opts(opts, "https", tls_opts) do
374 Map.merge(opts, %{protocols: [:http2], transport: :tls, tls_opts: tls_opts})
375 end
376
377 defp add_http2_opts(opts, _, _), do: opts
378
379 @spec get_unused_conns(map()) :: [{domain(), conn()}]
380 def get_unused_conns(conns) do
381 conns
382 |> Enum.filter(fn {_k, v} ->
383 v.conn_state == :idle and v.used_by == []
384 end)
385 |> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
386 x.crf <= y.crf and x.last_reference <= y.last_reference
387 end)
388 end
389
390 defp try_to_open_conn(key, uri, state, opts) do
391 Logger.debug("try to open conn #{compose_uri(uri)}")
392
393 with [{close_key, least_used} | _conns] <- get_unused_conns(state.conns),
394 :ok <- API.close(least_used.conn),
395 state <-
396 put_in(
397 state.conns,
398 Map.delete(state.conns, close_key)
399 ) do
400 Logger.debug(
401 "least used conn found and closed #{inspect(least_used.conn)} #{compose_uri(uri)}"
402 )
403
404 open_conn(key, uri, state, opts)
405 else
406 [] -> {:noreply, state}
407 end
408 end
409
410 def crf(current, steps, crf) do
411 1 + :math.pow(0.5, current / steps) * crf
412 end
413
414 def compose_uri(%URI{} = uri), do: "#{uri.host}#{uri.path}"
415 end