more info in Connections.checkin timout errors
[akkoma] / lib / pleroma / http / adapter / gun.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.HTTP.Adapter.Gun do
6 @behaviour Pleroma.HTTP.Adapter
7
8 alias Pleroma.HTTP.Adapter
9
10 require Logger
11
12 alias Pleroma.Pool.Connections
13
14 @defaults [
15 connect_timeout: 20_000,
16 domain_lookup_timeout: 5_000,
17 tls_handshake_timeout: 5_000,
18 retry: 0,
19 await_up_timeout: 5_000
20 ]
21
22 @spec options(keyword(), URI.t()) :: keyword()
23 def options(connection_opts \\ [], %URI{} = uri) do
24 proxy = Pleroma.Config.get([:http, :proxy_url], nil)
25
26 @defaults
27 |> Keyword.merge(Pleroma.Config.get([:http, :adapter], []))
28 |> add_original(uri)
29 |> add_scheme_opts(uri)
30 |> Adapter.maybe_add_proxy(Adapter.format_proxy(proxy))
31 |> maybe_get_conn(uri, connection_opts)
32 end
33
34 @spec after_request(keyword()) :: :ok
35 def after_request(opts) do
36 with conn when not is_nil(conn) <- opts[:conn],
37 body_as when body_as != :chunks <- opts[:body_as] do
38 Connections.checkout(conn, self(), :gun_connections)
39 end
40
41 :ok
42 end
43
44 defp add_original(opts, %URI{host: host, port: port}) do
45 formatted_host = Adapter.domain_or_fallback(host)
46
47 Keyword.put(opts, :original, "#{formatted_host}:#{port}")
48 end
49
50 defp add_scheme_opts(opts, %URI{scheme: "http"}), do: opts
51
52 defp add_scheme_opts(opts, %URI{scheme: "https", host: host, port: port}) do
53 adapter_opts = [
54 certificates_verification: true,
55 tls_opts: [
56 verify: :verify_peer,
57 cacertfile: CAStore.file_path(),
58 depth: 20,
59 reuse_sessions: false,
60 verify_fun:
61 {&:ssl_verify_hostname.verify_fun/3, [check_hostname: Adapter.domain_or_fallback(host)]}
62 ]
63 ]
64
65 adapter_opts =
66 if port != 443 do
67 Keyword.put(adapter_opts, :transport, :tls)
68 else
69 adapter_opts
70 end
71
72 Keyword.merge(opts, adapter_opts)
73 end
74
75 defp maybe_get_conn(adapter_opts, uri, connection_opts) do
76 {receive_conn?, opts} =
77 adapter_opts
78 |> Keyword.merge(connection_opts)
79 |> Keyword.pop(:receive_conn, true)
80
81 if Connections.alive?(:gun_connections) and receive_conn? do
82 try_to_get_conn(uri, opts)
83 else
84 opts
85 end
86 end
87
88 defp try_to_get_conn(uri, opts) do
89 try do
90 case Connections.checkin(uri, :gun_connections) do
91 nil ->
92 Logger.debug(
93 "Gun connections pool checkin was not successful. Trying to open conn for next request."
94 )
95
96 :ok = Connections.open_conn(uri, :gun_connections, opts)
97 opts
98
99 conn when is_pid(conn) ->
100 Logger.debug(
101 "received conn #{inspect(conn)} #{uri.scheme}://#{Connections.compose_uri(uri)}"
102 )
103
104 opts
105 |> Keyword.put(:conn, conn)
106 |> Keyword.put(:close_conn, false)
107 end
108 rescue
109 error ->
110 Logger.warn(
111 "Gun connections pool checkin caused error #{uri.scheme}://#{
112 Connections.compose_uri(uri)
113 } #{inspect(error)}"
114 )
115
116 opts
117 catch
118 :exit, {:timeout, {_, operation, [_, {method, _}, _]}} ->
119 messages_len =
120 :gun_connections
121 |> Process.whereis()
122 |> Process.info(:message_queue_len)
123
124 Logger.warn(
125 "Gun connections pool checkin with timeout error for #{operation} #{method} #{
126 uri.scheme
127 }://#{Connections.compose_uri(uri)}. Messages length: #{messages_len}"
128 )
129
130 opts
131
132 :exit, error ->
133 Logger.warn(
134 "Gun pool checkin exited with error #{uri.scheme}://#{Connections.compose_uri(uri)} #{
135 inspect(error)
136 }"
137 )
138
139 opts
140 end
141 end
142 end