Merge branch 'develop' into gun
[akkoma] / lib / pleroma / gun / conn.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Gun.Conn do
6 @moduledoc """
7 Struct for gun connection data
8 """
9 alias Pleroma.Gun
10 alias Pleroma.Pool.Connections
11
12 require Logger
13
14 @type gun_state :: :up | :down
15 @type conn_state :: :active | :idle
16
17 @type t :: %__MODULE__{
18 conn: pid(),
19 gun_state: gun_state(),
20 conn_state: conn_state(),
21 used_by: [pid()],
22 last_reference: pos_integer(),
23 crf: float(),
24 retries: pos_integer()
25 }
26
27 defstruct conn: nil,
28 gun_state: :open,
29 conn_state: :init,
30 used_by: [],
31 last_reference: 0,
32 crf: 1,
33 retries: 0
34
35 @spec open(String.t() | URI.t(), atom(), keyword()) :: :ok | nil
36 def open(url, name, opts \\ [])
37 def open(url, name, opts) when is_binary(url), do: open(URI.parse(url), name, opts)
38
39 def open(%URI{} = uri, name, opts) do
40 pool_opts = Pleroma.Config.get([:connections_pool], [])
41
42 opts =
43 opts
44 |> Enum.into(%{})
45 |> Map.put_new(:retry, pool_opts[:retry] || 1)
46 |> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 1000)
47 |> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000)
48 |> maybe_add_tls_opts(uri)
49
50 key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
51
52 conn_pid =
53 if Connections.count(name) < opts[:max_connection] do
54 do_open(uri, opts)
55 else
56 close_least_used_and_do_open(name, uri, opts)
57 end
58
59 if is_pid(conn_pid) do
60 conn = %Pleroma.Gun.Conn{
61 conn: conn_pid,
62 gun_state: :up,
63 conn_state: :active,
64 last_reference: :os.system_time(:second)
65 }
66
67 :ok = Gun.set_owner(conn_pid, Process.whereis(name))
68 Connections.add_conn(name, key, conn)
69 end
70 end
71
72 defp maybe_add_tls_opts(opts, %URI{scheme: "http"}), do: opts
73
74 defp maybe_add_tls_opts(opts, %URI{scheme: "https", host: host}) do
75 tls_opts = [
76 verify: :verify_peer,
77 cacertfile: CAStore.file_path(),
78 depth: 20,
79 reuse_sessions: false,
80 verify_fun:
81 {&:ssl_verify_hostname.verify_fun/3,
82 [check_hostname: Pleroma.HTTP.Connection.format_host(host)]}
83 ]
84
85 tls_opts =
86 if Keyword.keyword?(opts[:tls_opts]) do
87 Keyword.merge(tls_opts, opts[:tls_opts])
88 else
89 tls_opts
90 end
91
92 Map.put(opts, :tls_opts, tls_opts)
93 end
94
95 defp do_open(uri, %{proxy: {proxy_host, proxy_port}} = opts) do
96 connect_opts =
97 uri
98 |> destination_opts()
99 |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
100
101 with open_opts <- Map.delete(opts, :tls_opts),
102 {:ok, conn} <- Gun.open(proxy_host, proxy_port, open_opts),
103 {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]),
104 stream <- Gun.connect(conn, connect_opts),
105 {:response, :fin, 200, _} <- Gun.await(conn, stream) do
106 conn
107 else
108 error ->
109 Logger.warn(
110 "Opening proxied connection to #{compose_uri_log(uri)} failed with error #{
111 inspect(error)
112 }"
113 )
114
115 error
116 end
117 end
118
119 defp do_open(uri, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
120 version =
121 proxy_type
122 |> to_string()
123 |> String.last()
124 |> case do
125 "4" -> 4
126 _ -> 5
127 end
128
129 socks_opts =
130 uri
131 |> destination_opts()
132 |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
133 |> Map.put(:version, version)
134
135 opts =
136 opts
137 |> Map.put(:protocols, [:socks])
138 |> Map.put(:socks_opts, socks_opts)
139
140 with {:ok, conn} <- Gun.open(proxy_host, proxy_port, opts),
141 {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do
142 conn
143 else
144 error ->
145 Logger.warn(
146 "Opening socks proxied connection to #{compose_uri_log(uri)} failed with error #{
147 inspect(error)
148 }"
149 )
150
151 error
152 end
153 end
154
155 defp do_open(%URI{host: host, port: port} = uri, opts) do
156 host = Pleroma.HTTP.Connection.parse_host(host)
157
158 with {:ok, conn} <- Gun.open(host, port, opts),
159 {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do
160 conn
161 else
162 error ->
163 Logger.warn(
164 "Opening connection to #{compose_uri_log(uri)} failed with error #{inspect(error)}"
165 )
166
167 error
168 end
169 end
170
171 defp destination_opts(%URI{host: host, port: port}) do
172 host = Pleroma.HTTP.Connection.parse_host(host)
173 %{host: host, port: port}
174 end
175
176 defp add_http2_opts(opts, "https", tls_opts) do
177 Map.merge(opts, %{protocols: [:http2], transport: :tls, tls_opts: tls_opts})
178 end
179
180 defp add_http2_opts(opts, _, _), do: opts
181
182 defp close_least_used_and_do_open(name, uri, opts) do
183 with [{key, conn} | _conns] <- Connections.get_unused_conns(name),
184 :ok <- Gun.close(conn.conn) do
185 Connections.remove_conn(name, key)
186
187 do_open(uri, opts)
188 else
189 [] -> {:error, :pool_overflowed}
190 end
191 end
192
193 def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do
194 "#{scheme}://#{host}#{path}"
195 end
196 end