d73bec3605e8cae262d1922f620e4b87ea5fe74a
[akkoma] / lib / pleroma / gun / conn.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Gun.Conn do
6 @moduledoc """
7 Struct for gun connection data
8 """
9 alias Pleroma.Gun.API
10 alias Pleroma.Pool.Connections
11
12 require Logger
13
14 @type gun_state :: :up | :down
15 @type conn_state :: :active | :idle
16
17 @type t :: %__MODULE__{
18 conn: pid(),
19 gun_state: gun_state(),
20 conn_state: conn_state(),
21 used_by: [pid()],
22 last_reference: pos_integer(),
23 crf: float(),
24 retries: pos_integer()
25 }
26
27 defstruct conn: nil,
28 gun_state: :open,
29 conn_state: :init,
30 used_by: [],
31 last_reference: 0,
32 crf: 1,
33 retries: 0
34
35 @spec open(String.t() | URI.t(), atom(), keyword()) :: :ok | nil
36 def open(url, name, opts \\ [])
37 def open(url, name, opts) when is_binary(url), do: open(URI.parse(url), name, opts)
38
39 def open(%URI{} = uri, name, opts) do
40 pool_opts = Pleroma.Config.get([:connections_pool], [])
41
42 opts =
43 opts
44 |> Enum.into(%{})
45 |> Map.put_new(:retry, pool_opts[:retry] || 1)
46 |> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 1000)
47 |> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000)
48
49 key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
50
51 Logger.debug("opening new connection #{Connections.compose_uri_log(uri)}")
52
53 conn_pid =
54 if Connections.count(name) < opts[:max_connection] do
55 do_open(uri, opts)
56 else
57 close_least_used_and_do_open(name, uri, opts)
58 end
59
60 if is_pid(conn_pid) do
61 conn = %Pleroma.Gun.Conn{
62 conn: conn_pid,
63 gun_state: :up,
64 conn_state: :active,
65 last_reference: :os.system_time(:second)
66 }
67
68 :ok = API.set_owner(conn_pid, Process.whereis(name))
69 Connections.add_conn(name, key, conn)
70 end
71 end
72
73 defp do_open(uri, %{proxy: {proxy_host, proxy_port}} = opts) do
74 connect_opts =
75 uri
76 |> destination_opts()
77 |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
78
79 with open_opts <- Map.delete(opts, :tls_opts),
80 {:ok, conn} <- API.open(proxy_host, proxy_port, open_opts),
81 {:ok, _} <- API.await_up(conn, opts[:await_up_timeout]),
82 stream <- API.connect(conn, connect_opts),
83 {:response, :fin, 200, _} <- API.await(conn, stream) do
84 conn
85 else
86 error ->
87 Logger.warn(
88 "Received error on opening connection with http proxy #{
89 Connections.compose_uri_log(uri)
90 } #{inspect(error)}"
91 )
92
93 error
94 end
95 end
96
97 defp do_open(uri, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
98 version =
99 proxy_type
100 |> to_string()
101 |> String.last()
102 |> case do
103 "4" -> 4
104 _ -> 5
105 end
106
107 socks_opts =
108 uri
109 |> destination_opts()
110 |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
111 |> Map.put(:version, version)
112
113 opts =
114 opts
115 |> Map.put(:protocols, [:socks])
116 |> Map.put(:socks_opts, socks_opts)
117
118 with {:ok, conn} <- API.open(proxy_host, proxy_port, opts),
119 {:ok, _} <- API.await_up(conn, opts[:await_up_timeout]) do
120 conn
121 else
122 error ->
123 Logger.warn(
124 "Received error on opening connection with socks proxy #{
125 Connections.compose_uri_log(uri)
126 } #{inspect(error)}"
127 )
128
129 error
130 end
131 end
132
133 defp do_open(%URI{host: host, port: port} = uri, opts) do
134 host = Pleroma.HTTP.Connection.parse_host(host)
135
136 with {:ok, conn} <- API.open(host, port, opts),
137 {:ok, _} <- API.await_up(conn, opts[:await_up_timeout]) do
138 conn
139 else
140 error ->
141 Logger.warn(
142 "Received error on opening connection #{Connections.compose_uri_log(uri)} #{
143 inspect(error)
144 }"
145 )
146
147 error
148 end
149 end
150
151 defp destination_opts(%URI{host: host, port: port}) do
152 host = Pleroma.HTTP.Connection.parse_host(host)
153 %{host: host, port: port}
154 end
155
156 defp add_http2_opts(opts, "https", tls_opts) do
157 Map.merge(opts, %{protocols: [:http2], transport: :tls, tls_opts: tls_opts})
158 end
159
160 defp add_http2_opts(opts, _, _), do: opts
161
162 defp close_least_used_and_do_open(name, uri, opts) do
163 Logger.debug("try to open conn #{Connections.compose_uri_log(uri)}")
164
165 with [{close_key, least_used} | _conns] <-
166 Connections.get_unused_conns(name),
167 :ok <- Pleroma.Gun.API.close(least_used.conn) do
168 Connections.remove_conn(name, close_key)
169
170 do_open(uri, opts)
171 else
172 [] -> {:error, :pool_overflowed}
173 end
174 end
175 end