Refactor gun pooling and simplify adapter option insertion
authorrinpatch <rinpatch@sdf.org>
Tue, 5 May 2020 22:51:10 +0000 (01:51 +0300)
committerrinpatch <rinpatch@sdf.org>
Wed, 15 Jul 2020 12:17:27 +0000 (15:17 +0300)
This patch refactors gun pooling to use Elixir process registry and
simplifies adapter option insertion.

Having the pool use process registry instead of a GenServer has a number of advantages:
- Simpler code: the initial implementation adds about half the lines of code it deletes
- Concurrency: unlike a GenServer, ETS-based registry can handle multiple checkout/checkin
requests at the same time
- Precise and easy idle connection clousure: current proposal for closing idle connections in
the GenServer-based pool needs to filter through all connections once a minute and compare their
last active time with closing time. With Elixir process registry this can be done
by just using `Process.send_after`/`Process.cancel_timer` in the worker process.
- Lower memory footprint: In my tests `gun-memory-leak` branch uses about 290mb on peak load (250 connections)
and 235mb on idle (5-10 connections). Registry-based pool uses 210mb on idle and 240mb on peak load

16 files changed:
config/config.exs
lib/pleroma/application.ex
lib/pleroma/gun/conn.ex
lib/pleroma/gun/connection_pool.ex [new file with mode: 0644]
lib/pleroma/gun/connection_pool/worker.ex [new file with mode: 0644]
lib/pleroma/http/adapter_helper.ex
lib/pleroma/http/adapter_helper/default.ex [new file with mode: 0644]
lib/pleroma/http/adapter_helper/gun.ex
lib/pleroma/http/adapter_helper/hackney.ex
lib/pleroma/http/connection.ex [deleted file]
lib/pleroma/http/http.ex
lib/pleroma/pool/connections.ex [deleted file]
lib/pleroma/pool/pool.ex [deleted file]
lib/pleroma/pool/request.ex [deleted file]
lib/pleroma/pool/supervisor.ex [deleted file]
lib/pleroma/reverse_proxy/client/tesla.ex

index 6fc84efc2abae11b90bc31620b1f93b93435f185..577ccc1983fc21c83a7d48e9e783be69fdb92d20 100644 (file)
@@ -647,8 +647,10 @@ config :pleroma, Pleroma.Repo,
   prepare: :unnamed
 
 config :pleroma, :connections_pool,
+  reclaim_multiplier: 0.1,
   checkin_timeout: 250,
   max_connections: 250,
+  max_idle_time: 30_000,
   retry: 1,
   retry_timeout: 1000,
   await_up_timeout: 5_000
index 3282c6882104c9e14fafe004fa0a675b34959ddd..be14c1f9fb07cc8d2b62ebe2c04091561ec9644d 100644 (file)
@@ -223,9 +223,7 @@ defmodule Pleroma.Application do
 
   # start hackney and gun pools in tests
   defp http_children(_, :test) do
-    hackney_options = Config.get([:hackney_pools, :federation])
-    hackney_pool = :hackney_pool.child_spec(:federation, hackney_options)
-    [hackney_pool, Pleroma.Pool.Supervisor]
+    http_children(Tesla.Adapter.Hackney, nil) ++ http_children(Tesla.Adapter.Gun, nil)
   end
 
   defp http_children(Tesla.Adapter.Hackney, _) do
@@ -244,7 +242,9 @@ defmodule Pleroma.Application do
     end
   end
 
-  defp http_children(Tesla.Adapter.Gun, _), do: [Pleroma.Pool.Supervisor]
+  defp http_children(Tesla.Adapter.Gun, _) do
+    [{Registry, keys: :unique, name: Pleroma.Gun.ConnectionPool}]
+  end
 
   defp http_children(_, _), do: []
 end
index cd25a2e746d1889f9b263daff6a3200917177780..77f78c7fff17294266c5d54a63a9b01bbc78cb00 100644 (file)
@@ -3,40 +3,11 @@
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Gun.Conn do
-  @moduledoc """
-  Struct for gun connection data
-  """
   alias Pleroma.Gun
-  alias Pleroma.Pool.Connections
 
   require Logger
 
-  @type gun_state :: :up | :down
-  @type conn_state :: :active | :idle
-
-  @type t :: %__MODULE__{
-          conn: pid(),
-          gun_state: gun_state(),
-          conn_state: conn_state(),
-          used_by: [pid()],
-          last_reference: pos_integer(),
-          crf: float(),
-          retries: pos_integer()
-        }
-
-  defstruct conn: nil,
-            gun_state: :open,
-            conn_state: :init,
-            used_by: [],
-            last_reference: 0,
-            crf: 1,
-            retries: 0
-
-  @spec open(String.t() | URI.t(), atom(), keyword()) :: :ok | nil
-  def open(url, name, opts \\ [])
-  def open(url, name, opts) when is_binary(url), do: open(URI.parse(url), name, opts)
-
-  def open(%URI{} = uri, name, opts) do
+  def open(%URI{} = uri, opts) do
     pool_opts = Pleroma.Config.get([:connections_pool], [])
 
     opts =
@@ -45,30 +16,10 @@ defmodule Pleroma.Gun.Conn do
       |> Map.put_new(:retry, pool_opts[:retry] || 1)
       |> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 1000)
       |> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000)
+      |> Map.put_new(:supervise, false)
       |> maybe_add_tls_opts(uri)
 
-    key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
-
-    max_connections = pool_opts[:max_connections] || 250
-
-    conn_pid =
-      if Connections.count(name) < max_connections do
-        do_open(uri, opts)
-      else
-        close_least_used_and_do_open(name, uri, opts)
-      end
-
-    if is_pid(conn_pid) do
-      conn = %Pleroma.Gun.Conn{
-        conn: conn_pid,
-        gun_state: :up,
-        conn_state: :active,
-        last_reference: :os.system_time(:second)
-      }
-
-      :ok = Gun.set_owner(conn_pid, Process.whereis(name))
-      Connections.add_conn(name, key, conn)
-    end
+    do_open(uri, opts)
   end
 
   defp maybe_add_tls_opts(opts, %URI{scheme: "http"}), do: opts
@@ -81,7 +32,7 @@ defmodule Pleroma.Gun.Conn do
       reuse_sessions: false,
       verify_fun:
         {&:ssl_verify_hostname.verify_fun/3,
-         [check_hostname: Pleroma.HTTP.Connection.format_host(host)]}
+         [check_hostname: Pleroma.HTTP.AdapterHelper.format_host(host)]}
     ]
 
     tls_opts =
@@ -105,7 +56,7 @@ defmodule Pleroma.Gun.Conn do
          {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]),
          stream <- Gun.connect(conn, connect_opts),
          {:response, :fin, 200, _} <- Gun.await(conn, stream) do
-      conn
+      {:ok, conn}
     else
       error ->
         Logger.warn(
@@ -141,7 +92,7 @@ defmodule Pleroma.Gun.Conn do
 
     with {:ok, conn} <- Gun.open(proxy_host, proxy_port, opts),
          {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do
-      conn
+      {:ok, conn}
     else
       error ->
         Logger.warn(
@@ -155,11 +106,11 @@ defmodule Pleroma.Gun.Conn do
   end
 
   defp do_open(%URI{host: host, port: port} = uri, opts) do
-    host = Pleroma.HTTP.Connection.parse_host(host)
+    host = Pleroma.HTTP.AdapterHelper.parse_host(host)
 
     with {:ok, conn} <- Gun.open(host, port, opts),
          {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do
-      conn
+      {:ok, conn}
     else
       error ->
         Logger.warn(
@@ -171,7 +122,7 @@ defmodule Pleroma.Gun.Conn do
   end
 
   defp destination_opts(%URI{host: host, port: port}) do
-    host = Pleroma.HTTP.Connection.parse_host(host)
+    host = Pleroma.HTTP.AdapterHelper.parse_host(host)
     %{host: host, port: port}
   end
 
@@ -181,17 +132,6 @@ defmodule Pleroma.Gun.Conn do
 
   defp add_http2_opts(opts, _, _), do: opts
 
-  defp close_least_used_and_do_open(name, uri, opts) do
-    with [{key, conn} | _conns] <- Connections.get_unused_conns(name),
-         :ok <- Gun.close(conn.conn) do
-      Connections.remove_conn(name, key)
-
-      do_open(uri, opts)
-    else
-      [] -> {:error, :pool_overflowed}
-    end
-  end
-
   def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do
     "#{scheme}://#{host}#{path}"
   end
diff --git a/lib/pleroma/gun/connection_pool.ex b/lib/pleroma/gun/connection_pool.ex
new file mode 100644 (file)
index 0000000..e6abee6
--- /dev/null
@@ -0,0 +1,129 @@
+defmodule Pleroma.Gun.ConnectionPool do
+  @registry __MODULE__
+
+  def get_conn(uri, opts) do
+    case enforce_pool_limits() do
+      :ok ->
+        key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
+
+        case Registry.lookup(@registry, key) do
+          # The key has already been registered, but connection is not up yet
+          [{worker_pid, {nil, _used_by, _crf, _last_reference}}] ->
+            get_gun_pid_from_worker(worker_pid)
+
+          [{worker_pid, {gun_pid, _used_by, _crf, _last_reference}}] ->
+            GenServer.cast(worker_pid, {:add_client, self(), false})
+            {:ok, gun_pid}
+
+          [] ->
+            # :gun.set_owner fails in :connected state for whatevever reason,
+            # so we open the connection in the process directly and send it's pid back
+            # We trust gun to handle timeouts by itself
+            case GenServer.start(Pleroma.Gun.ConnectionPool.Worker, [uri, key, opts, self()],
+                   timeout: :infinity
+                 ) do
+              {:ok, _worker_pid} ->
+                receive do
+                  {:conn_pid, pid} -> {:ok, pid}
+                end
+
+              {:error, {:error, {:already_registered, worker_pid}}} ->
+                get_gun_pid_from_worker(worker_pid)
+
+              err ->
+                err
+            end
+        end
+
+      :error ->
+        {:error, :pool_full}
+    end
+  end
+
+  @enforcer_key "enforcer"
+  defp enforce_pool_limits() do
+    max_connections = Pleroma.Config.get([:connections_pool, :max_connections])
+
+    if Registry.count(@registry) >= max_connections do
+      case Registry.lookup(@registry, @enforcer_key) do
+        [] ->
+          pid =
+            spawn(fn ->
+              {:ok, _pid} = Registry.register(@registry, @enforcer_key, nil)
+
+              reclaim_max =
+                [:connections_pool, :reclaim_multiplier]
+                |> Pleroma.Config.get()
+                |> Kernel.*(max_connections)
+                |> round
+                |> max(1)
+
+              unused_conns =
+                Registry.select(
+                  @registry,
+                  [
+                    {{:_, :"$1", {:_, :"$2", :"$3", :"$4"}}, [{:==, :"$2", []}],
+                     [{{:"$1", :"$3", :"$4"}}]}
+                  ]
+                )
+
+              case unused_conns do
+                [] ->
+                  exit(:pool_full)
+
+                unused_conns ->
+                  unused_conns
+                  |> Enum.sort(fn {_pid1, crf1, last_reference1},
+                                  {_pid2, crf2, last_reference2} ->
+                    crf1 <= crf2 and last_reference1 <= last_reference2
+                  end)
+                  |> Enum.take(reclaim_max)
+                  |> Enum.each(fn {pid, _, _} -> GenServer.call(pid, :idle_close) end)
+              end
+            end)
+
+          wait_for_enforcer_finish(pid)
+
+        [{pid, _}] ->
+          wait_for_enforcer_finish(pid)
+      end
+    else
+      :ok
+    end
+  end
+
+  defp wait_for_enforcer_finish(pid) do
+    ref = Process.monitor(pid)
+
+    receive do
+      {:DOWN, ^ref, :process, ^pid, :pool_full} ->
+        :error
+
+      {:DOWN, ^ref, :process, ^pid, :normal} ->
+        :ok
+    end
+  end
+
+  defp get_gun_pid_from_worker(worker_pid) do
+    # GenServer.call will block the process for timeout length if
+    # the server crashes on startup (which will happen if gun fails to connect)
+    # so instead we use cast + monitor
+
+    ref = Process.monitor(worker_pid)
+    GenServer.cast(worker_pid, {:add_client, self(), true})
+
+    receive do
+      {:conn_pid, pid} -> {:ok, pid}
+      {:DOWN, ^ref, :process, ^worker_pid, reason} -> reason
+    end
+  end
+
+  def release_conn(conn_pid) do
+    [worker_pid] =
+      Registry.select(@registry, [
+        {{:_, :"$1", {:"$2", :_, :_, :_}}, [{:==, :"$2", conn_pid}], [:"$1"]}
+      ])
+
+    GenServer.cast(worker_pid, {:remove_client, self()})
+  end
+end
diff --git a/lib/pleroma/gun/connection_pool/worker.ex b/lib/pleroma/gun/connection_pool/worker.ex
new file mode 100644 (file)
index 0000000..ebde4bb
--- /dev/null
@@ -0,0 +1,95 @@
+defmodule Pleroma.Gun.ConnectionPool.Worker do
+  alias Pleroma.Gun
+  use GenServer
+
+  @registry Pleroma.Gun.ConnectionPool
+
+  @impl true
+  def init([uri, key, opts, client_pid]) do
+    time = :os.system_time(:second)
+    # Register before opening connection to prevent race conditions
+    with {:ok, _owner} <- Registry.register(@registry, key, {nil, [client_pid], 1, time}),
+         {:ok, conn_pid} <- Gun.Conn.open(uri, opts),
+         Process.link(conn_pid) do
+      {_, _} =
+        Registry.update_value(@registry, key, fn {_, used_by, crf, last_reference} ->
+          {conn_pid, used_by, crf, last_reference}
+        end)
+
+      send(client_pid, {:conn_pid, conn_pid})
+      {:ok, %{key: key, timer: nil}, :hibernate}
+    else
+      err -> {:stop, err}
+    end
+  end
+
+  @impl true
+  def handle_cast({:add_client, client_pid, send_pid_back}, %{key: key} = state) do
+    time = :os.system_time(:second)
+
+    {{conn_pid, _, _, _}, _} =
+      Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} ->
+        {conn_pid, [client_pid | used_by], crf(time - last_reference, crf), time}
+      end)
+
+    if send_pid_back, do: send(client_pid, {:conn_pid, conn_pid})
+
+    state =
+      if state.timer != nil do
+        Process.cancel_timer(state[:timer])
+        %{state | timer: nil}
+      else
+        state
+      end
+
+    {:noreply, state, :hibernate}
+  end
+
+  @impl true
+  def handle_cast({:remove_client, client_pid}, %{key: key} = state) do
+    {{_conn_pid, used_by, _crf, _last_reference}, _} =
+      Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} ->
+        {conn_pid, List.delete(used_by, client_pid), crf, last_reference}
+      end)
+
+    timer =
+      if used_by == [] do
+        max_idle = Pleroma.Config.get([:connections_pool, :max_idle_time], 30_000)
+        Process.send_after(self(), :idle_close, max_idle)
+      else
+        nil
+      end
+
+    {:noreply, %{state | timer: timer}, :hibernate}
+  end
+
+  @impl true
+  def handle_info(:idle_close, state) do
+    # Gun monitors the owner process, and will close the connection automatically
+    # when it's terminated
+    {:stop, :normal, state}
+  end
+
+  # Gracefully shutdown if the connection got closed without any streams left
+  @impl true
+  def handle_info({:gun_down, _pid, _protocol, _reason, []}, state) do
+    {:stop, :normal, state}
+  end
+
+  # Otherwise, shutdown with an error
+  @impl true
+  def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams} = down_message, state) do
+    {:stop, {:error, down_message}, state}
+  end
+
+  @impl true
+  def handle_call(:idle_close, _, %{key: key} = state) do
+    Registry.unregister(@registry, key)
+    {:stop, :normal, state}
+  end
+
+  # LRFU policy: https://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.55.1478
+  defp crf(time_delta, prev_crf) do
+    1 + :math.pow(0.5, time_delta / 100) * prev_crf
+  end
+end
index 510722ff94ff6884d7afbb695b23a15b846809c2..0532ea31d527d10e37e9a2f34aef45d587bfb1a5 100644 (file)
@@ -3,7 +3,21 @@
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.HTTP.AdapterHelper do
-  alias Pleroma.HTTP.Connection
+  @moduledoc """
+  Configure Tesla.Client with default and customized adapter options.
+  """
+  @defaults [pool: :federation]
+
+  @type ip_address :: ipv4_address() | ipv6_address()
+  @type ipv4_address :: {0..255, 0..255, 0..255, 0..255}
+  @type ipv6_address ::
+          {0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535}
+  @type proxy_type() :: :socks4 | :socks5
+  @type host() :: charlist() | ip_address()
+
+  alias Pleroma.Config
+  alias Pleroma.HTTP.AdapterHelper
+  require Logger
 
   @type proxy ::
           {Connection.host(), pos_integer()}
@@ -11,24 +25,13 @@ defmodule Pleroma.HTTP.AdapterHelper do
 
   @callback options(keyword(), URI.t()) :: keyword()
   @callback after_request(keyword()) :: :ok
-
-  @spec options(keyword(), URI.t()) :: keyword()
-  def options(opts, _uri) do
-    proxy = Pleroma.Config.get([:http, :proxy_url], nil)
-    maybe_add_proxy(opts, format_proxy(proxy))
-  end
-
-  @spec maybe_get_conn(URI.t(), keyword()) :: keyword()
-  def maybe_get_conn(_uri, opts), do: opts
-
-  @spec after_request(keyword()) :: :ok
-  def after_request(_opts), do: :ok
+  @callback get_conn(URI.t(), keyword()) :: {:ok, term()} | {:error, term()}
 
   @spec format_proxy(String.t() | tuple() | nil) :: proxy() | nil
   def format_proxy(nil), do: nil
 
   def format_proxy(proxy_url) do
-    case Connection.parse_proxy(proxy_url) do
+    case parse_proxy(proxy_url) do
       {:ok, host, port} -> {host, port}
       {:ok, type, host, port} -> {type, host, port}
       _ -> nil
@@ -38,4 +41,106 @@ defmodule Pleroma.HTTP.AdapterHelper do
   @spec maybe_add_proxy(keyword(), proxy() | nil) :: keyword()
   def maybe_add_proxy(opts, nil), do: opts
   def maybe_add_proxy(opts, proxy), do: Keyword.put_new(opts, :proxy, proxy)
+
+  @doc """
+  Merge default connection & adapter options with received ones.
+  """
+
+  @spec options(URI.t(), keyword()) :: keyword()
+  def options(%URI{} = uri, opts \\ []) do
+    @defaults
+    |> pool_timeout()
+    |> Keyword.merge(opts)
+    |> adapter_helper().options(uri)
+  end
+
+  defp pool_timeout(opts) do
+    {config_key, default} =
+      if adapter() == Tesla.Adapter.Gun do
+        {:pools, Config.get([:pools, :default, :timeout])}
+      else
+        {:hackney_pools, 10_000}
+      end
+
+    timeout = Config.get([config_key, opts[:pool], :timeout], default)
+
+    Keyword.merge(opts, timeout: timeout)
+  end
+
+  @spec after_request(keyword()) :: :ok
+  def after_request(opts), do: adapter_helper().after_request(opts)
+
+  def get_conn(uri, opts), do: adapter_helper().get_conn(uri, opts)
+  defp adapter, do: Application.get_env(:tesla, :adapter)
+
+  defp adapter_helper do
+    case adapter() do
+      Tesla.Adapter.Gun -> AdapterHelper.Gun
+      Tesla.Adapter.Hackney -> AdapterHelper.Hackney
+      _ -> AdapterHelper.Default
+    end
+  end
+
+  @spec parse_proxy(String.t() | tuple() | nil) ::
+          {:ok, host(), pos_integer()}
+          | {:ok, proxy_type(), host(), pos_integer()}
+          | {:error, atom()}
+          | nil
+
+  def parse_proxy(nil), do: nil
+
+  def parse_proxy(proxy) when is_binary(proxy) do
+    with [host, port] <- String.split(proxy, ":"),
+         {port, ""} <- Integer.parse(port) do
+      {:ok, parse_host(host), port}
+    else
+      {_, _} ->
+        Logger.warn("Parsing port failed #{inspect(proxy)}")
+        {:error, :invalid_proxy_port}
+
+      :error ->
+        Logger.warn("Parsing port failed #{inspect(proxy)}")
+        {:error, :invalid_proxy_port}
+
+      _ ->
+        Logger.warn("Parsing proxy failed #{inspect(proxy)}")
+        {:error, :invalid_proxy}
+    end
+  end
+
+  def parse_proxy(proxy) when is_tuple(proxy) do
+    with {type, host, port} <- proxy do
+      {:ok, type, parse_host(host), port}
+    else
+      _ ->
+        Logger.warn("Parsing proxy failed #{inspect(proxy)}")
+        {:error, :invalid_proxy}
+    end
+  end
+
+  @spec parse_host(String.t() | atom() | charlist()) :: charlist() | ip_address()
+  def parse_host(host) when is_list(host), do: host
+  def parse_host(host) when is_atom(host), do: to_charlist(host)
+
+  def parse_host(host) when is_binary(host) do
+    host = to_charlist(host)
+
+    case :inet.parse_address(host) do
+      {:error, :einval} -> host
+      {:ok, ip} -> ip
+    end
+  end
+
+  @spec format_host(String.t()) :: charlist()
+  def format_host(host) do
+    host_charlist = to_charlist(host)
+
+    case :inet.parse_address(host_charlist) do
+      {:error, :einval} ->
+        :idna.encode(host_charlist)
+
+      {:ok, _ip} ->
+        host_charlist
+    end
+  end
 end
diff --git a/lib/pleroma/http/adapter_helper/default.ex b/lib/pleroma/http/adapter_helper/default.ex
new file mode 100644 (file)
index 0000000..218cfac
--- /dev/null
@@ -0,0 +1,17 @@
+defmodule Pleroma.HTTP.AdapterHelper.Default do
+  alias Pleroma.HTTP.AdapterHelper
+
+  @behaviour Pleroma.HTTP.AdapterHelper
+
+  @spec options(keyword(), URI.t()) :: keyword()
+  def options(opts, _uri) do
+    proxy = Pleroma.Config.get([:http, :proxy_url], nil)
+    AdapterHelper.maybe_add_proxy(opts, AdapterHelper.format_proxy(proxy))
+  end
+
+  @spec after_request(keyword()) :: :ok
+  def after_request(_opts), do: :ok
+
+  @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()}
+  def get_conn(_uri, opts), do: {:ok, opts}
+end
index ead7cdc6bba7297f1b6ceaa1faf4a6687e48c979..6f7cc9784dbdde094ae41121ad0f1b146f721cd3 100644 (file)
@@ -5,8 +5,8 @@
 defmodule Pleroma.HTTP.AdapterHelper.Gun do
   @behaviour Pleroma.HTTP.AdapterHelper
 
+  alias Pleroma.Gun.ConnectionPool
   alias Pleroma.HTTP.AdapterHelper
-  alias Pleroma.Pool.Connections
 
   require Logger
 
@@ -31,13 +31,13 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do
     |> Keyword.merge(config_opts)
     |> add_scheme_opts(uri)
     |> AdapterHelper.maybe_add_proxy(proxy)
-    |> maybe_get_conn(uri, incoming_opts)
+    |> Keyword.merge(incoming_opts)
   end
 
   @spec after_request(keyword()) :: :ok
   def after_request(opts) do
     if opts[:conn] && opts[:body_as] != :chunks do
-      Connections.checkout(opts[:conn], self(), :gun_connections)
+      ConnectionPool.release_conn(opts[:conn])
     end
 
     :ok
@@ -51,27 +51,11 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do
     |> Keyword.put(:tls_opts, log_level: :warning)
   end
 
-  defp maybe_get_conn(adapter_opts, uri, incoming_opts) do
-    {receive_conn?, opts} =
-      adapter_opts
-      |> Keyword.merge(incoming_opts)
-      |> Keyword.pop(:receive_conn, true)
-
-    if Connections.alive?(:gun_connections) and receive_conn? do
-      checkin_conn(uri, opts)
-    else
-      opts
-    end
-  end
-
-  defp checkin_conn(uri, opts) do
-    case Connections.checkin(uri, :gun_connections) do
-      nil ->
-        Task.start(Pleroma.Gun.Conn, :open, [uri, :gun_connections, opts])
-        opts
-
-      conn when is_pid(conn) ->
-        Keyword.merge(opts, conn: conn, close_conn: false)
+  @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} | {:error, atom()}
+  def get_conn(uri, opts) do
+    case ConnectionPool.get_conn(uri, opts) do
+      {:ok, conn_pid} -> {:ok, Keyword.merge(opts, conn: conn_pid, close_conn: false)}
+      err -> err
     end
   end
 end
index 3972a03a948717eb436e13666bc2caf1dff689d3..42d552740f1b187b4cca55770c6da670d1bb8ff1 100644 (file)
@@ -25,4 +25,7 @@ defmodule Pleroma.HTTP.AdapterHelper.Hackney do
   defp add_scheme_opts(opts, _), do: opts
 
   def after_request(_), do: :ok
+
+  @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()}
+  def get_conn(_uri, opts), do: {:ok, opts}
 end
diff --git a/lib/pleroma/http/connection.ex b/lib/pleroma/http/connection.ex
deleted file mode 100644 (file)
index ebacf79..0000000
+++ /dev/null
@@ -1,124 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.HTTP.Connection do
-  @moduledoc """
-  Configure Tesla.Client with default and customized adapter options.
-  """
-
-  alias Pleroma.Config
-  alias Pleroma.HTTP.AdapterHelper
-
-  require Logger
-
-  @defaults [pool: :federation]
-
-  @type ip_address :: ipv4_address() | ipv6_address()
-  @type ipv4_address :: {0..255, 0..255, 0..255, 0..255}
-  @type ipv6_address ::
-          {0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535}
-  @type proxy_type() :: :socks4 | :socks5
-  @type host() :: charlist() | ip_address()
-
-  @doc """
-  Merge default connection & adapter options with received ones.
-  """
-
-  @spec options(URI.t(), keyword()) :: keyword()
-  def options(%URI{} = uri, opts \\ []) do
-    @defaults
-    |> pool_timeout()
-    |> Keyword.merge(opts)
-    |> adapter_helper().options(uri)
-  end
-
-  defp pool_timeout(opts) do
-    {config_key, default} =
-      if adapter() == Tesla.Adapter.Gun do
-        {:pools, Config.get([:pools, :default, :timeout])}
-      else
-        {:hackney_pools, 10_000}
-      end
-
-    timeout = Config.get([config_key, opts[:pool], :timeout], default)
-
-    Keyword.merge(opts, timeout: timeout)
-  end
-
-  @spec after_request(keyword()) :: :ok
-  def after_request(opts), do: adapter_helper().after_request(opts)
-
-  defp adapter, do: Application.get_env(:tesla, :adapter)
-
-  defp adapter_helper do
-    case adapter() do
-      Tesla.Adapter.Gun -> AdapterHelper.Gun
-      Tesla.Adapter.Hackney -> AdapterHelper.Hackney
-      _ -> AdapterHelper
-    end
-  end
-
-  @spec parse_proxy(String.t() | tuple() | nil) ::
-          {:ok, host(), pos_integer()}
-          | {:ok, proxy_type(), host(), pos_integer()}
-          | {:error, atom()}
-          | nil
-
-  def parse_proxy(nil), do: nil
-
-  def parse_proxy(proxy) when is_binary(proxy) do
-    with [host, port] <- String.split(proxy, ":"),
-         {port, ""} <- Integer.parse(port) do
-      {:ok, parse_host(host), port}
-    else
-      {_, _} ->
-        Logger.warn("Parsing port failed #{inspect(proxy)}")
-        {:error, :invalid_proxy_port}
-
-      :error ->
-        Logger.warn("Parsing port failed #{inspect(proxy)}")
-        {:error, :invalid_proxy_port}
-
-      _ ->
-        Logger.warn("Parsing proxy failed #{inspect(proxy)}")
-        {:error, :invalid_proxy}
-    end
-  end
-
-  def parse_proxy(proxy) when is_tuple(proxy) do
-    with {type, host, port} <- proxy do
-      {:ok, type, parse_host(host), port}
-    else
-      _ ->
-        Logger.warn("Parsing proxy failed #{inspect(proxy)}")
-        {:error, :invalid_proxy}
-    end
-  end
-
-  @spec parse_host(String.t() | atom() | charlist()) :: charlist() | ip_address()
-  def parse_host(host) when is_list(host), do: host
-  def parse_host(host) when is_atom(host), do: to_charlist(host)
-
-  def parse_host(host) when is_binary(host) do
-    host = to_charlist(host)
-
-    case :inet.parse_address(host) do
-      {:error, :einval} -> host
-      {:ok, ip} -> ip
-    end
-  end
-
-  @spec format_host(String.t()) :: charlist()
-  def format_host(host) do
-    host_charlist = to_charlist(host)
-
-    case :inet.parse_address(host_charlist) do
-      {:error, :einval} ->
-        :idna.encode(host_charlist)
-
-      {:ok, _ip} ->
-        host_charlist
-    end
-  end
-end
index 66ca7536766918a3ffa6734fe301aa3857cfcb6e..8ded76601c19b116e1cbcc6a6ddbffea132a831e 100644 (file)
@@ -7,7 +7,7 @@ defmodule Pleroma.HTTP do
     Wrapper for `Tesla.request/2`.
   """
 
-  alias Pleroma.HTTP.Connection
+  alias Pleroma.HTTP.AdapterHelper
   alias Pleroma.HTTP.Request
   alias Pleroma.HTTP.RequestBuilder, as: Builder
   alias Tesla.Client
@@ -60,49 +60,26 @@ defmodule Pleroma.HTTP do
           {:ok, Env.t()} | {:error, any()}
   def request(method, url, body, headers, options) when is_binary(url) do
     uri = URI.parse(url)
-    adapter_opts = Connection.options(uri, options[:adapter] || [])
-    options = put_in(options[:adapter], adapter_opts)
-    params = options[:params] || []
-    request = build_request(method, headers, options, url, body, params)
+    adapter_opts = AdapterHelper.options(uri, options[:adapter] || [])
 
-    adapter = Application.get_env(:tesla, :adapter)
-    client = Tesla.client([Tesla.Middleware.FollowRedirects], adapter)
+    case AdapterHelper.get_conn(uri, adapter_opts) do
+      {:ok, adapter_opts} ->
+        options = put_in(options[:adapter], adapter_opts)
+        params = options[:params] || []
+        request = build_request(method, headers, options, url, body, params)
 
-    pid = Process.whereis(adapter_opts[:pool])
+        adapter = Application.get_env(:tesla, :adapter)
+        client = Tesla.client([Tesla.Middleware.FollowRedirects], adapter)
 
-    pool_alive? =
-      if adapter == Tesla.Adapter.Gun && pid do
-        Process.alive?(pid)
-      else
-        false
-      end
+        response = request(client, request)
 
-    request_opts =
-      adapter_opts
-      |> Enum.into(%{})
-      |> Map.put(:env, Pleroma.Config.get([:env]))
-      |> Map.put(:pool_alive?, pool_alive?)
+        AdapterHelper.after_request(adapter_opts)
 
-    response = request(client, request, request_opts)
+        response
 
-    Connection.after_request(adapter_opts)
-
-    response
-  end
-
-  @spec request(Client.t(), keyword(), map()) :: {:ok, Env.t()} | {:error, any()}
-  def request(%Client{} = client, request, %{env: :test}), do: request(client, request)
-
-  def request(%Client{} = client, request, %{body_as: :chunks}), do: request(client, request)
-
-  def request(%Client{} = client, request, %{pool_alive?: false}), do: request(client, request)
-
-  def request(%Client{} = client, request, %{pool: pool, timeout: timeout}) do
-    :poolboy.transaction(
-      pool,
-      &Pleroma.Pool.Request.execute(&1, client, request, timeout),
-      timeout
-    )
+      err ->
+        err
+    end
   end
 
   @spec request(Client.t(), keyword()) :: {:ok, Env.t()} | {:error, any()}
diff --git a/lib/pleroma/pool/connections.ex b/lib/pleroma/pool/connections.ex
deleted file mode 100644 (file)
index acafe1b..0000000
+++ /dev/null
@@ -1,283 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Pool.Connections do
-  use GenServer
-
-  alias Pleroma.Config
-  alias Pleroma.Gun
-
-  require Logger
-
-  @type domain :: String.t()
-  @type conn :: Pleroma.Gun.Conn.t()
-
-  @type t :: %__MODULE__{
-          conns: %{domain() => conn()},
-          opts: keyword()
-        }
-
-  defstruct conns: %{}, opts: []
-
-  @spec start_link({atom(), keyword()}) :: {:ok, pid()}
-  def start_link({name, opts}) do
-    GenServer.start_link(__MODULE__, opts, name: name)
-  end
-
-  @impl true
-  def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
-
-  @spec checkin(String.t() | URI.t(), atom()) :: pid() | nil
-  def checkin(url, name)
-  def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name)
-
-  def checkin(%URI{} = uri, name) do
-    timeout = Config.get([:connections_pool, :checkin_timeout], 250)
-
-    GenServer.call(name, {:checkin, uri}, timeout)
-  end
-
-  @spec alive?(atom()) :: boolean()
-  def alive?(name) do
-    if pid = Process.whereis(name) do
-      Process.alive?(pid)
-    else
-      false
-    end
-  end
-
-  @spec get_state(atom()) :: t()
-  def get_state(name) do
-    GenServer.call(name, :state)
-  end
-
-  @spec count(atom()) :: pos_integer()
-  def count(name) do
-    GenServer.call(name, :count)
-  end
-
-  @spec get_unused_conns(atom()) :: [{domain(), conn()}]
-  def get_unused_conns(name) do
-    GenServer.call(name, :unused_conns)
-  end
-
-  @spec checkout(pid(), pid(), atom()) :: :ok
-  def checkout(conn, pid, name) do
-    GenServer.cast(name, {:checkout, conn, pid})
-  end
-
-  @spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok
-  def add_conn(name, key, conn) do
-    GenServer.cast(name, {:add_conn, key, conn})
-  end
-
-  @spec remove_conn(atom(), String.t()) :: :ok
-  def remove_conn(name, key) do
-    GenServer.cast(name, {:remove_conn, key})
-  end
-
-  @impl true
-  def handle_cast({:add_conn, key, conn}, state) do
-    state = put_in(state.conns[key], conn)
-
-    Process.monitor(conn.conn)
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_cast({:checkout, conn_pid, pid}, state) do
-    state =
-      with true <- Process.alive?(conn_pid),
-           {key, conn} <- find_conn(state.conns, conn_pid),
-           used_by <- List.keydelete(conn.used_by, pid, 0) do
-        conn_state = if used_by == [], do: :idle, else: conn.conn_state
-
-        put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
-      else
-        false ->
-          Logger.debug("checkout for closed conn #{inspect(conn_pid)}")
-          state
-
-        nil ->
-          Logger.debug("checkout for alive conn #{inspect(conn_pid)}, but is not in state")
-          state
-      end
-
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_cast({:remove_conn, key}, state) do
-    state = put_in(state.conns, Map.delete(state.conns, key))
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_call({:checkin, uri}, from, state) do
-    key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
-
-    case state.conns[key] do
-      %{conn: pid, gun_state: :up} = conn ->
-        time = :os.system_time(:second)
-        last_reference = time - conn.last_reference
-        crf = crf(last_reference, 100, conn.crf)
-
-        state =
-          put_in(state.conns[key], %{
-            conn
-            | last_reference: time,
-              crf: crf,
-              conn_state: :active,
-              used_by: [from | conn.used_by]
-          })
-
-        {:reply, pid, state}
-
-      %{gun_state: :down} ->
-        {:reply, nil, state}
-
-      nil ->
-        {:reply, nil, state}
-    end
-  end
-
-  @impl true
-  def handle_call(:state, _from, state), do: {:reply, state, state}
-
-  @impl true
-  def handle_call(:count, _from, state) do
-    {:reply, Enum.count(state.conns), state}
-  end
-
-  @impl true
-  def handle_call(:unused_conns, _from, state) do
-    unused_conns =
-      state.conns
-      |> Enum.filter(&filter_conns/1)
-      |> Enum.sort(&sort_conns/2)
-
-    {:reply, unused_conns, state}
-  end
-
-  defp filter_conns({_, %{conn_state: :idle, used_by: []}}), do: true
-  defp filter_conns(_), do: false
-
-  defp sort_conns({_, c1}, {_, c2}) do
-    c1.crf <= c2.crf and c1.last_reference <= c2.last_reference
-  end
-
-  @impl true
-  def handle_info({:gun_up, conn_pid, _protocol}, state) do
-    %{origin_host: host, origin_scheme: scheme, origin_port: port} = Gun.info(conn_pid)
-
-    host =
-      case :inet.ntoa(host) do
-        {:error, :einval} -> host
-        ip -> ip
-      end
-
-    key = "#{scheme}:#{host}:#{port}"
-
-    state =
-      with {key, conn} <- find_conn(state.conns, conn_pid, key),
-           {true, key} <- {Process.alive?(conn_pid), key} do
-        put_in(state.conns[key], %{
-          conn
-          | gun_state: :up,
-            conn_state: :active,
-            retries: 0
-        })
-      else
-        {false, key} ->
-          put_in(
-            state.conns,
-            Map.delete(state.conns, key)
-          )
-
-        nil ->
-          :ok = Gun.close(conn_pid)
-
-          state
-      end
-
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do
-    retries = Config.get([:connections_pool, :retry], 1)
-    # we can't get info on this pid, because pid is dead
-    state =
-      with {key, conn} <- find_conn(state.conns, conn_pid),
-           {true, key} <- {Process.alive?(conn_pid), key} do
-        if conn.retries == retries do
-          :ok = Gun.close(conn.conn)
-
-          put_in(
-            state.conns,
-            Map.delete(state.conns, key)
-          )
-        else
-          put_in(state.conns[key], %{
-            conn
-            | gun_state: :down,
-              retries: conn.retries + 1
-          })
-        end
-      else
-        {false, key} ->
-          put_in(
-            state.conns,
-            Map.delete(state.conns, key)
-          )
-
-        nil ->
-          Logger.debug(":gun_down for conn which isn't found in state")
-
-          state
-      end
-
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_info({:DOWN, _ref, :process, conn_pid, reason}, state) do
-    Logger.debug("received DOWN message for #{inspect(conn_pid)} reason -> #{inspect(reason)}")
-
-    state =
-      with {key, conn} <- find_conn(state.conns, conn_pid) do
-        Enum.each(conn.used_by, fn {pid, _ref} ->
-          Process.exit(pid, reason)
-        end)
-
-        put_in(
-          state.conns,
-          Map.delete(state.conns, key)
-        )
-      else
-        nil ->
-          Logger.debug(":DOWN for conn which isn't found in state")
-
-          state
-      end
-
-    {:noreply, state}
-  end
-
-  defp find_conn(conns, conn_pid) do
-    Enum.find(conns, fn {_key, conn} ->
-      conn.conn == conn_pid
-    end)
-  end
-
-  defp find_conn(conns, conn_pid, conn_key) do
-    Enum.find(conns, fn {key, conn} ->
-      key == conn_key and conn.conn == conn_pid
-    end)
-  end
-
-  def crf(current, steps, crf) do
-    1 + :math.pow(0.5, current / steps) * crf
-  end
-end
diff --git a/lib/pleroma/pool/pool.ex b/lib/pleroma/pool/pool.ex
deleted file mode 100644 (file)
index 21a6fbb..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Pool do
-  def child_spec(opts) do
-    poolboy_opts =
-      opts
-      |> Keyword.put(:worker_module, Pleroma.Pool.Request)
-      |> Keyword.put(:name, {:local, opts[:name]})
-      |> Keyword.put(:size, opts[:size])
-      |> Keyword.put(:max_overflow, opts[:max_overflow])
-
-    %{
-      id: opts[:id] || {__MODULE__, make_ref()},
-      start: {:poolboy, :start_link, [poolboy_opts, [name: opts[:name]]]},
-      restart: :permanent,
-      shutdown: 5000,
-      type: :worker
-    }
-  end
-end
diff --git a/lib/pleroma/pool/request.ex b/lib/pleroma/pool/request.ex
deleted file mode 100644 (file)
index 3fb930d..0000000
+++ /dev/null
@@ -1,65 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Pool.Request do
-  use GenServer
-
-  require Logger
-
-  def start_link(args) do
-    GenServer.start_link(__MODULE__, args)
-  end
-
-  @impl true
-  def init(_), do: {:ok, []}
-
-  @spec execute(pid() | atom(), Tesla.Client.t(), keyword(), pos_integer()) ::
-          {:ok, Tesla.Env.t()} | {:error, any()}
-  def execute(pid, client, request, timeout) do
-    GenServer.call(pid, {:execute, client, request}, timeout)
-  end
-
-  @impl true
-  def handle_call({:execute, client, request}, _from, state) do
-    response = Pleroma.HTTP.request(client, request)
-
-    {:reply, response, state}
-  end
-
-  @impl true
-  def handle_info({:gun_data, _conn, _stream, _, _}, state) do
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_info({:gun_up, _conn, _protocol}, state) do
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_info({:gun_down, _conn, _protocol, _reason, _killed}, state) do
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_info({:gun_error, _conn, _stream, _error}, state) do
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_info({:gun_push, _conn, _stream, _new_stream, _method, _uri, _headers}, state) do
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_info({:gun_response, _conn, _stream, _, _status, _headers}, state) do
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_info(msg, state) do
-    Logger.warn("Received unexpected message #{inspect(__MODULE__)} #{inspect(msg)}")
-    {:noreply, state}
-  end
-end
diff --git a/lib/pleroma/pool/supervisor.ex b/lib/pleroma/pool/supervisor.ex
deleted file mode 100644 (file)
index faf646c..0000000
+++ /dev/null
@@ -1,42 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Pool.Supervisor do
-  use Supervisor
-
-  alias Pleroma.Config
-  alias Pleroma.Pool
-
-  def start_link(args) do
-    Supervisor.start_link(__MODULE__, args, name: __MODULE__)
-  end
-
-  def init(_) do
-    conns_child = %{
-      id: Pool.Connections,
-      start:
-        {Pool.Connections, :start_link, [{:gun_connections, Config.get([:connections_pool])}]}
-    }
-
-    Supervisor.init([conns_child | pools()], strategy: :one_for_one)
-  end
-
-  defp pools do
-    pools = Config.get(:pools)
-
-    pools =
-      if Config.get([Pleroma.Upload, :proxy_remote]) == false do
-        Keyword.delete(pools, :upload)
-      else
-        pools
-      end
-
-    for {pool_name, pool_opts} <- pools do
-      pool_opts
-      |> Keyword.put(:id, {Pool, pool_name})
-      |> Keyword.put(:name, pool_name)
-      |> Pool.child_spec()
-    end
-  end
-end
index e81ea8bde135be2045ce88b7ad292a7f6780a231..65785445d2a5021d7ee4d57062b64315c653bc18 100644 (file)
@@ -48,7 +48,7 @@ defmodule Pleroma.ReverseProxy.Client.Tesla do
     # if there were redirects we need to checkout old conn
     conn = opts[:old_conn] || opts[:conn]
 
-    if conn, do: :ok = Pleroma.Pool.Connections.checkout(conn, self(), :gun_connections)
+    if conn, do: :ok = Pleroma.Gun.ConnectionPool.release_conn(conn)
 
     :done
   end