1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Pool.Connections do
10 @type domain :: String.t()
11 @type conn :: Pleroma.Gun.Conn.t()
13 @type t :: %__MODULE__{
14 conns: %{domain() => conn()},
18 defstruct conns: %{}, opts: []
21 alias Pleroma.Gun.Conn
23 @spec start_link({atom(), keyword()}) :: {:ok, pid()}
24 def start_link({name, opts}) do
25 GenServer.start_link(__MODULE__, opts, name: name)
29 def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
31 @spec checkin(String.t() | URI.t(), atom()) :: pid() | nil
32 def checkin(url, name)
33 def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name)
35 def checkin(%URI{} = uri, name) do
36 timeout = Pleroma.Config.get([:connections_pool, :receive_connection_timeout], 250)
45 @spec open_conn(String.t() | URI.t(), atom(), keyword()) :: :ok
46 def open_conn(url, name, opts \\ [])
47 def open_conn(url, name, opts) when is_binary(url), do: open_conn(URI.parse(url), name, opts)
49 def open_conn(%URI{} = uri, name, opts) do
50 pool_opts = Pleroma.Config.get([:connections_pool], [])
55 |> Map.put_new(:receive, false)
56 |> Map.put_new(:retry, pool_opts[:retry] || 5)
57 |> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 100)
58 |> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000)
60 GenServer.cast(name, {:open_conn, %{opts: opts, uri: uri}})
63 @spec alive?(atom()) :: boolean()
65 pid = Process.whereis(name)
66 if pid, do: Process.alive?(pid), else: false
69 @spec get_state(atom()) :: t()
70 def get_state(name) do
71 GenServer.call(name, :state)
74 @spec checkout(pid(), pid(), atom()) :: :ok
75 def checkout(conn, pid, name) do
76 GenServer.cast(name, {:checkout, conn, pid})
80 def handle_cast({:open_conn, %{opts: opts, uri: uri}}, state) do
81 Logger.debug("opening new #{compose_uri(uri)}")
82 max_connections = state.opts[:max_connections]
84 key = compose_key(uri)
86 if Enum.count(state.conns) < max_connections do
87 open_conn(key, uri, state, opts)
89 try_to_open_conn(key, uri, state, opts)
94 def handle_cast({:checkout, conn_pid, pid}, state) do
95 Logger.debug("checkout #{inspect(conn_pid)}")
98 with true <- Process.alive?(conn_pid),
99 {key, conn} <- find_conn(state.conns, conn_pid),
100 used_by <- List.keydelete(conn.used_by, pid, 0) do
108 put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
111 Logger.warn("checkout for closed conn #{inspect(conn_pid)}")
115 Logger.info("checkout for alive conn #{inspect(conn_pid)}, but is not in state")
123 def handle_call({:checkin, uri}, from, state) do
124 Logger.debug("checkin #{compose_uri(uri)}")
125 key = compose_key(uri)
127 case state.conns[key] do
128 %{conn: conn, gun_state: gun_state} = current_conn when gun_state == :up ->
129 Logger.debug("reusing conn #{compose_uri(uri)}")
131 with time <- :os.system_time(:second),
132 last_reference <- time - current_conn.last_reference,
133 current_crf <- crf(last_reference, 100, current_conn.crf),
135 put_in(state.conns[key], %{
137 | last_reference: time,
140 used_by: [from | current_conn.used_by]
142 {:reply, conn, state}
145 %{gun_state: gun_state} when gun_state == :down ->
154 def handle_call(:state, _from, state), do: {:reply, state, state}
157 def handle_info({:gun_up, conn_pid, _protocol}, state) do
159 with true <- Process.alive?(conn_pid),
160 conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid),
161 {key, conn} <- find_conn(state.conns, conn_pid, conn_key),
162 time <- :os.system_time(:second),
163 last_reference <- time - conn.last_reference,
164 current_crf <- crf(last_reference, 100, conn.crf) do
165 put_in(state.conns[key], %{
168 last_reference: time,
175 Logger.warn(":gun.info caused error")
179 Logger.warn(":gun_up message for closed conn #{inspect(conn_pid)}")
184 ":gun_up message for alive conn #{inspect(conn_pid)}, but deleted from state"
187 :ok = API.close(conn_pid)
196 def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do
197 # we can't get info on this pid, because pid is dead
199 with true <- Process.alive?(conn_pid),
200 {key, conn} <- find_conn(state.conns, conn_pid) do
201 if conn.retries == 5 do
202 Logger.debug("closing conn if retries is eq 5 #{inspect(conn_pid)}")
203 :ok = API.close(conn.conn)
207 Map.delete(state.conns, key)
210 put_in(state.conns[key], %{
213 retries: conn.retries + 1
218 # gun can send gun_down for closed conn, maybe connection is not closed yet
219 Logger.warn(":gun_down message for closed conn #{inspect(conn_pid)}")
224 ":gun_down message for alive conn #{inspect(conn_pid)}, but deleted from state"
227 :ok = API.close(conn_pid)
235 defp compose_key(%URI{scheme: scheme, host: host, port: port}), do: "#{scheme}:#{host}:#{port}"
237 defp compose_key_gun_info(pid) do
239 # sometimes :gun.info can raise MatchError, which lead to pool terminate
240 %{origin_host: origin_host, origin_scheme: scheme, origin_port: port} = API.info(pid)
243 case :inet.ntoa(origin_host) do
244 {:error, :einval} -> origin_host
248 "#{scheme}:#{host}:#{port}"
254 defp find_conn(conns, conn_pid) do
255 Enum.find(conns, fn {_key, conn} ->
256 conn.conn == conn_pid
260 defp find_conn(conns, conn_pid, conn_key) do
261 Enum.find(conns, fn {key, conn} ->
262 key == conn_key and conn.conn == conn_pid
266 defp open_conn(key, uri, state, %{proxy: {proxy_host, proxy_port}} = opts) do
269 |> destination_opts()
270 |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
272 with open_opts <- Map.delete(opts, :tls_opts),
273 {:ok, conn} <- API.open(proxy_host, proxy_port, open_opts),
274 {:ok, _} <- API.await_up(conn),
275 stream <- API.connect(conn, connect_opts),
276 {:response, :fin, 200, _} <- API.await(conn, stream),
278 put_in(state.conns[key], %Conn{
282 last_reference: :os.system_time(:second)
288 "Received error on opening connection with http proxy #{uri.scheme}://#{
290 }: #{inspect(error)}"
297 defp open_conn(key, uri, state, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
309 |> destination_opts()
310 |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
311 |> Map.put(:version, version)
315 |> Map.put(:protocols, [:socks])
316 |> Map.put(:socks_opts, socks_opts)
318 with {:ok, conn} <- API.open(proxy_host, proxy_port, opts),
319 {:ok, _} <- API.await_up(conn),
321 put_in(state.conns[key], %Conn{
325 last_reference: :os.system_time(:second)
331 "Received error on opening connection with socks proxy #{uri.scheme}://#{
333 }: #{inspect(error)}"
340 defp open_conn(key, %URI{host: host, port: port} = uri, state, opts) do
341 Logger.debug("opening conn #{compose_uri(uri)}")
342 {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
344 with {:ok, conn} <- API.open(host, port, opts),
345 {:ok, _} <- API.await_up(conn),
347 put_in(state.conns[key], %Conn{
351 last_reference: :os.system_time(:second)
353 Logger.debug("new conn opened #{compose_uri(uri)}")
354 Logger.debug("replying to the call #{compose_uri(uri)}")
359 "Received error on opening connection #{uri.scheme}://#{compose_uri(uri)}: #{
368 defp destination_opts(%URI{host: host, port: port}) do
369 {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
370 %{host: host, port: port}
373 defp add_http2_opts(opts, "https", tls_opts) do
374 Map.merge(opts, %{protocols: [:http2], transport: :tls, tls_opts: tls_opts})
377 defp add_http2_opts(opts, _, _), do: opts
379 @spec get_unused_conns(map()) :: [{domain(), conn()}]
380 def get_unused_conns(conns) do
382 |> Enum.filter(fn {_k, v} ->
383 v.conn_state == :idle and v.used_by == []
385 |> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
386 x.crf <= y.crf and x.last_reference <= y.last_reference
390 defp try_to_open_conn(key, uri, state, opts) do
391 Logger.debug("try to open conn #{compose_uri(uri)}")
393 with [{close_key, least_used} | _conns] <- get_unused_conns(state.conns),
394 :ok <- API.close(least_used.conn),
398 Map.delete(state.conns, close_key)
401 "least used conn found and closed #{inspect(least_used.conn)} #{compose_uri(uri)}"
404 open_conn(key, uri, state, opts)
406 [] -> {:noreply, state}
410 def crf(current, steps, crf) do
411 1 + :math.pow(0.5, current / steps) * crf
414 def compose_uri(%URI{} = uri), do: "#{uri.host}#{uri.path}"