Merge branch 'develop' into refactor/gun-pool-registry
authorMark Felder <feld@FreeBSD.org>
Wed, 15 Jul 2020 18:34:27 +0000 (13:34 -0500)
committerMark Felder <feld@FreeBSD.org>
Wed, 15 Jul 2020 18:34:27 +0000 (13:34 -0500)
29 files changed:
config/config.exs
config/description.exs
docs/configuration/cheatsheet.md
lib/pleroma/application.ex
lib/pleroma/gun/api.ex
lib/pleroma/gun/conn.ex
lib/pleroma/gun/connection_pool.ex [new file with mode: 0644]
lib/pleroma/gun/connection_pool/reclaimer.ex [new file with mode: 0644]
lib/pleroma/gun/connection_pool/worker.ex [new file with mode: 0644]
lib/pleroma/gun/connection_pool/worker_supervisor.ex [new file with mode: 0644]
lib/pleroma/http/adapter_helper.ex
lib/pleroma/http/adapter_helper/default.ex [new file with mode: 0644]
lib/pleroma/http/adapter_helper/gun.ex
lib/pleroma/http/adapter_helper/hackney.ex
lib/pleroma/http/connection.ex [deleted file]
lib/pleroma/http/http.ex
lib/pleroma/pool/connections.ex [deleted file]
lib/pleroma/pool/pool.ex [deleted file]
lib/pleroma/pool/request.ex [deleted file]
lib/pleroma/pool/supervisor.ex [deleted file]
lib/pleroma/reverse_proxy/client/tesla.ex
lib/pleroma/telemetry/logger.ex [new file with mode: 0644]
lib/pleroma/tesla/middleware/follow_redirects.ex [new file with mode: 0644]
mix.exs
mix.lock
test/gun/conneciton_pool_test.exs [new file with mode: 0644]
test/http/adapter_helper/gun_test.exs
test/http/connection_test.exs [deleted file]
test/pool/connections_test.exs [deleted file]

index daeefdca36517e7e59fe130e8972355102562f1e..2d3f35e70253cc3cf654ad4e792a8fecbb0f07c8 100644 (file)
@@ -172,7 +172,7 @@ config :mime, :types, %{
   "application/ld+json" => ["activity+json"]
 }
 
-config :tesla, adapter: Tesla.Adapter.Hackney
+config :tesla, adapter: Tesla.Adapter.Gun
 
 # Configures http settings, upstream proxy etc.
 config :pleroma, :http,
@@ -648,32 +648,30 @@ config :pleroma, Pleroma.Repo,
   prepare: :unnamed
 
 config :pleroma, :connections_pool,
-  checkin_timeout: 250,
+  reclaim_multiplier: 0.1,
+  connection_acquisition_wait: 250,
+  connection_acquisition_retries: 5,
   max_connections: 250,
-  retry: 1,
-  retry_timeout: 1000,
+  max_idle_time: 30_000,
+  retry0,
   await_up_timeout: 5_000
 
 config :pleroma, :pools,
   federation: [
     size: 50,
-    max_overflow: 10,
-    timeout: 150_000
+    max_waiting: 10
   ],
   media: [
     size: 50,
-    max_overflow: 10,
-    timeout: 150_000
+    max_waiting: 10
   ],
   upload: [
     size: 25,
-    max_overflow: 5,
-    timeout: 300_000
+    max_waiting: 5
   ],
   default: [
     size: 10,
-    max_overflow: 2,
-    timeout: 10_000
+    max_waiting: 2
   ]
 
 config :pleroma, :hackney_pools,
index afc4dcd79b79301cfbe113468d081dd63f1cd4a7..f1c6773f1962b224fe336ce20ae0c7436d50410d 100644 (file)
@@ -3161,36 +3161,37 @@ config :pleroma, :config_description, [
     description: "Advanced settings for `gun` connections pool",
     children: [
       %{
-        key: :checkin_timeout,
+        key: :connection_acquisition_wait,
         type: :integer,
-        description: "Timeout to checkin connection from pool. Default: 250ms.",
-        suggestions: [250]
-      },
-      %{
-        key: :max_connections,
-        type: :integer,
-        description: "Maximum number of connections in the pool. Default: 250 connections.",
+        description:
+          "Timeout to acquire a connection from pool.The total max time is this value multiplied by the number of retries. Default: 250ms.",
         suggestions: [250]
       },
       %{
-        key: :retry,
+        key: :connection_acquisition_retries,
         type: :integer,
         description:
-          "Number of retries, while `gun` will try to reconnect if connection goes down. Default: 1.",
-        suggestions: [1]
+          "Number of attempts to acquire the connection from the pool if it is overloaded. Default: 5",
+        suggestions: [5]
       },
       %{
-        key: :retry_timeout,
+        key: :max_connections,
         type: :integer,
-        description:
-          "Time between retries when `gun` will try to reconnect in milliseconds. Default: 1000ms.",
-        suggestions: [1000]
+        description: "Maximum number of connections in the pool. Default: 250 connections.",
+        suggestions: [250]
       },
       %{
         key: :await_up_timeout,
         type: :integer,
         description: "Timeout while `gun` will wait until connection is up. Default: 5000ms.",
         suggestions: [5000]
+      },
+      %{
+        key: :reclaim_multiplier,
+        type: :integer,
+        description:
+          "Multiplier for the number of idle connection to be reclaimed if the pool is full. For example if the pool maxes out at 250 connections and this setting is set to 0.3, the pool will reclaim at most 75 idle connections if it's overloaded. Default: 0.1",
+        suggestions: [0.1]
       }
     ]
   },
@@ -3199,108 +3200,29 @@ config :pleroma, :config_description, [
     key: :pools,
     type: :group,
     description: "Advanced settings for `gun` workers pools",
-    children: [
-      %{
-        key: :federation,
-        type: :keyword,
-        description: "Settings for federation pool.",
-        children: [
-          %{
-            key: :size,
-            type: :integer,
-            description: "Number workers in the pool.",
-            suggestions: [50]
-          },
-          %{
-            key: :max_overflow,
-            type: :integer,
-            description: "Number of additional workers if pool is under load.",
-            suggestions: [10]
-          },
-          %{
-            key: :timeout,
-            type: :integer,
-            description: "Timeout while `gun` will wait for response.",
-            suggestions: [150_000]
-          }
-        ]
-      },
-      %{
-        key: :media,
-        type: :keyword,
-        description: "Settings for media pool.",
-        children: [
-          %{
-            key: :size,
-            type: :integer,
-            description: "Number workers in the pool.",
-            suggestions: [50]
-          },
-          %{
-            key: :max_overflow,
-            type: :integer,
-            description: "Number of additional workers if pool is under load.",
-            suggestions: [10]
-          },
-          %{
-            key: :timeout,
-            type: :integer,
-            description: "Timeout while `gun` will wait for response.",
-            suggestions: [150_000]
-          }
-        ]
-      },
-      %{
-        key: :upload,
-        type: :keyword,
-        description: "Settings for upload pool.",
-        children: [
-          %{
-            key: :size,
-            type: :integer,
-            description: "Number workers in the pool.",
-            suggestions: [25]
-          },
-          %{
-            key: :max_overflow,
-            type: :integer,
-            description: "Number of additional workers if pool is under load.",
-            suggestions: [5]
-          },
-          %{
-            key: :timeout,
-            type: :integer,
-            description: "Timeout while `gun` will wait for response.",
-            suggestions: [300_000]
-          }
-        ]
-      },
-      %{
-        key: :default,
-        type: :keyword,
-        description: "Settings for default pool.",
-        children: [
-          %{
-            key: :size,
-            type: :integer,
-            description: "Number workers in the pool.",
-            suggestions: [10]
-          },
-          %{
-            key: :max_overflow,
-            type: :integer,
-            description: "Number of additional workers if pool is under load.",
-            suggestions: [2]
-          },
-          %{
-            key: :timeout,
-            type: :integer,
-            description: "Timeout while `gun` will wait for response.",
-            suggestions: [10_000]
-          }
-        ]
-      }
-    ]
+    children:
+      Enum.map([:federation, :media, :upload, :default], fn pool_name ->
+        %{
+          key: pool_name,
+          type: :keyword,
+          description: "Settings for #{pool_name} pool.",
+          children: [
+            %{
+              key: :size,
+              type: :integer,
+              description: "Maximum number of concurrent requests in the pool.",
+              suggestions: [50]
+            },
+            %{
+              key: :max_waiting,
+              type: :integer,
+              description:
+                "Maximum number of requests waiting for other requests to finish. After this number is reached, the pool will start returning errrors when a new request is made",
+              suggestions: [10]
+            }
+          ]
+        }
+      end)
   },
   %{
     group: :pleroma,
index ba62a721e1acdc07fbf9cd39668693b4ddf90506..6c1babba32251c38e7dfaa5ad67895ff0988603c 100644 (file)
@@ -448,36 +448,32 @@ For each pool, the options are:
 
 *For `gun` adapter*
 
-Advanced settings for connections pool. Pool with opened connections. These connections can be reused in worker pools.
+Settings for HTTP connection pool.
 
-For big instances it's recommended to increase `config :pleroma, :connections_pool, max_connections: 500` up to 500-1000.
-It will increase memory usage, but federation would work faster.
-
-* `:checkin_timeout` - timeout to checkin connection from pool. Default: 250ms.
-* `:max_connections` - maximum number of connections in the pool. Default: 250 connections.
-* `:retry` - number of retries, while `gun` will try to reconnect if connection goes down. Default: 1.
-* `:retry_timeout` - time between retries when `gun` will try to reconnect in milliseconds. Default: 1000ms.
-* `:await_up_timeout` - timeout while `gun` will wait until connection is up. Default: 5000ms.
+* `:connection_acquisition_wait` - Timeout to acquire a connection from pool.The total max time is this value multiplied by the number of retries.
+* `connection_acquisition_retries` - Number of attempts to acquire the connection from the pool if it is overloaded. Each attempt is timed `:connection_acquisition_wait` apart.
+* `:max_connections` - Maximum number of connections in the pool.
+* `:await_up_timeout` - Timeout to connect to the host.
+* `:reclaim_multiplier` - Multiplied by `:max_connections` this will be the maximum number of idle connections that will be reclaimed in case the pool is overloaded.
 
 ### :pools
 
 *For `gun` adapter*
 
-Advanced settings for workers pools.
+Settings for request pools. These pools are limited on top of `:connections_pool`.
 
 There are four pools used:
 
-* `:federation` for the federation jobs.
-  You may want this pool max_connections to be at least equal to the number of federator jobs + retry queue jobs.
-* `:media` for rich media, media proxy
-* `:upload` for uploaded media (if using a remote uploader and `proxy_remote: true`)
-* `:default` for other requests
+* `:federation` for the federation jobs. You may want this pool's max_connections to be at least equal to the number of federator jobs + retry queue jobs.
+* `:media` - for rich media, media proxy.
+* `:upload` - for proxying media when a remote uploader is used and `proxy_remote: true`.
+* `:default` - for other requests.
 
 For each pool, the options are:
 
-* `:size` - how much workers the pool can hold
+* `:size` - limit to how much requests can be concurrently executed.
 * `:timeout` - timeout while `gun` will wait for response
-* `:max_overflow` - additional workers if pool is under load
+* `:max_waiting` - limit to how much requests can be waiting for others to finish, after this is reached, subsequent requests will be dropped.
 
 ## Captcha
 
index 3282c6882104c9e14fafe004fa0a675b34959ddd..0ffb55358f26d49e68a06d8b67fff010f59f6a2c 100644 (file)
@@ -39,6 +39,7 @@ defmodule Pleroma.Application do
     # every time the application is restarted, so we disable module
     # conflicts at runtime
     Code.compiler_options(ignore_module_conflict: true)
+    Pleroma.Telemetry.Logger.attach()
     Config.Holder.save_default()
     Pleroma.HTML.compile_scrubbers()
     Config.DeprecationWarnings.warn()
@@ -223,9 +224,7 @@ defmodule Pleroma.Application do
 
   # start hackney and gun pools in tests
   defp http_children(_, :test) do
-    hackney_options = Config.get([:hackney_pools, :federation])
-    hackney_pool = :hackney_pool.child_spec(:federation, hackney_options)
-    [hackney_pool, Pleroma.Pool.Supervisor]
+    http_children(Tesla.Adapter.Hackney, nil) ++ http_children(Tesla.Adapter.Gun, nil)
   end
 
   defp http_children(Tesla.Adapter.Hackney, _) do
@@ -244,7 +243,10 @@ defmodule Pleroma.Application do
     end
   end
 
-  defp http_children(Tesla.Adapter.Gun, _), do: [Pleroma.Pool.Supervisor]
+  defp http_children(Tesla.Adapter.Gun, _) do
+    Pleroma.Gun.ConnectionPool.children() ++
+      [{Task, &Pleroma.HTTP.AdapterHelper.Gun.limiter_setup/0}]
+  end
 
   defp http_children(_, _), do: []
 end
index f51cd7db8a83cb995f6c609deac3e48456fca5f8..09be743926bac9de1b7627236c07676d4119f094 100644 (file)
@@ -19,7 +19,8 @@ defmodule Pleroma.Gun.API do
     :tls_opts,
     :tcp_opts,
     :socks_opts,
-    :ws_opts
+    :ws_opts,
+    :supervise
   ]
 
   @impl Gun
index cd25a2e746d1889f9b263daff6a3200917177780..a3f75a4bbfb3248e361389ae6419318b9abfa799 100644 (file)
@@ -3,85 +3,33 @@
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Gun.Conn do
-  @moduledoc """
-  Struct for gun connection data
-  """
   alias Pleroma.Gun
-  alias Pleroma.Pool.Connections
 
   require Logger
 
-  @type gun_state :: :up | :down
-  @type conn_state :: :active | :idle
-
-  @type t :: %__MODULE__{
-          conn: pid(),
-          gun_state: gun_state(),
-          conn_state: conn_state(),
-          used_by: [pid()],
-          last_reference: pos_integer(),
-          crf: float(),
-          retries: pos_integer()
-        }
-
-  defstruct conn: nil,
-            gun_state: :open,
-            conn_state: :init,
-            used_by: [],
-            last_reference: 0,
-            crf: 1,
-            retries: 0
-
-  @spec open(String.t() | URI.t(), atom(), keyword()) :: :ok | nil
-  def open(url, name, opts \\ [])
-  def open(url, name, opts) when is_binary(url), do: open(URI.parse(url), name, opts)
-
-  def open(%URI{} = uri, name, opts) do
+  def open(%URI{} = uri, opts) do
     pool_opts = Pleroma.Config.get([:connections_pool], [])
 
     opts =
       opts
       |> Enum.into(%{})
-      |> Map.put_new(:retry, pool_opts[:retry] || 1)
-      |> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 1000)
       |> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000)
+      |> Map.put_new(:supervise, false)
       |> maybe_add_tls_opts(uri)
 
-    key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
-
-    max_connections = pool_opts[:max_connections] || 250
-
-    conn_pid =
-      if Connections.count(name) < max_connections do
-        do_open(uri, opts)
-      else
-        close_least_used_and_do_open(name, uri, opts)
-      end
-
-    if is_pid(conn_pid) do
-      conn = %Pleroma.Gun.Conn{
-        conn: conn_pid,
-        gun_state: :up,
-        conn_state: :active,
-        last_reference: :os.system_time(:second)
-      }
-
-      :ok = Gun.set_owner(conn_pid, Process.whereis(name))
-      Connections.add_conn(name, key, conn)
-    end
+    do_open(uri, opts)
   end
 
   defp maybe_add_tls_opts(opts, %URI{scheme: "http"}), do: opts
 
-  defp maybe_add_tls_opts(opts, %URI{scheme: "https", host: host}) do
+  defp maybe_add_tls_opts(opts, %URI{scheme: "https"}) do
     tls_opts = [
       verify: :verify_peer,
       cacertfile: CAStore.file_path(),
       depth: 20,
       reuse_sessions: false,
-      verify_fun:
-        {&:ssl_verify_hostname.verify_fun/3,
-         [check_hostname: Pleroma.HTTP.Connection.format_host(host)]}
+      log_level: :warning,
+      customize_hostname_check: [match_fun: :public_key.pkix_verify_hostname_match_fun(:https)]
     ]
 
     tls_opts =
@@ -105,7 +53,7 @@ defmodule Pleroma.Gun.Conn do
          {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]),
          stream <- Gun.connect(conn, connect_opts),
          {:response, :fin, 200, _} <- Gun.await(conn, stream) do
-      conn
+      {:ok, conn}
     else
       error ->
         Logger.warn(
@@ -141,7 +89,7 @@ defmodule Pleroma.Gun.Conn do
 
     with {:ok, conn} <- Gun.open(proxy_host, proxy_port, opts),
          {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do
-      conn
+      {:ok, conn}
     else
       error ->
         Logger.warn(
@@ -155,11 +103,11 @@ defmodule Pleroma.Gun.Conn do
   end
 
   defp do_open(%URI{host: host, port: port} = uri, opts) do
-    host = Pleroma.HTTP.Connection.parse_host(host)
+    host = Pleroma.HTTP.AdapterHelper.parse_host(host)
 
     with {:ok, conn} <- Gun.open(host, port, opts),
          {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do
-      conn
+      {:ok, conn}
     else
       error ->
         Logger.warn(
@@ -171,7 +119,7 @@ defmodule Pleroma.Gun.Conn do
   end
 
   defp destination_opts(%URI{host: host, port: port}) do
-    host = Pleroma.HTTP.Connection.parse_host(host)
+    host = Pleroma.HTTP.AdapterHelper.parse_host(host)
     %{host: host, port: port}
   end
 
@@ -181,17 +129,6 @@ defmodule Pleroma.Gun.Conn do
 
   defp add_http2_opts(opts, _, _), do: opts
 
-  defp close_least_used_and_do_open(name, uri, opts) do
-    with [{key, conn} | _conns] <- Connections.get_unused_conns(name),
-         :ok <- Gun.close(conn.conn) do
-      Connections.remove_conn(name, key)
-
-      do_open(uri, opts)
-    else
-      [] -> {:error, :pool_overflowed}
-    end
-  end
-
   def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do
     "#{scheme}://#{host}#{path}"
   end
diff --git a/lib/pleroma/gun/connection_pool.ex b/lib/pleroma/gun/connection_pool.ex
new file mode 100644 (file)
index 0000000..8b41a66
--- /dev/null
@@ -0,0 +1,79 @@
+defmodule Pleroma.Gun.ConnectionPool do
+  @registry __MODULE__
+
+  alias Pleroma.Gun.ConnectionPool.WorkerSupervisor
+
+  def children do
+    [
+      {Registry, keys: :unique, name: @registry},
+      Pleroma.Gun.ConnectionPool.WorkerSupervisor
+    ]
+  end
+
+  def get_conn(uri, opts) do
+    key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
+
+    case Registry.lookup(@registry, key) do
+      # The key has already been registered, but connection is not up yet
+      [{worker_pid, nil}] ->
+        get_gun_pid_from_worker(worker_pid, true)
+
+      [{worker_pid, {gun_pid, _used_by, _crf, _last_reference}}] ->
+        GenServer.cast(worker_pid, {:add_client, self(), false})
+        {:ok, gun_pid}
+
+      [] ->
+        # :gun.set_owner fails in :connected state for whatevever reason,
+        # so we open the connection in the process directly and send it's pid back
+        # We trust gun to handle timeouts by itself
+        case WorkerSupervisor.start_worker([key, uri, opts, self()]) do
+          {:ok, worker_pid} ->
+            get_gun_pid_from_worker(worker_pid, false)
+
+          {:error, {:already_started, worker_pid}} ->
+            get_gun_pid_from_worker(worker_pid, true)
+
+          err ->
+            err
+        end
+    end
+  end
+
+  defp get_gun_pid_from_worker(worker_pid, register) do
+    # GenServer.call will block the process for timeout length if
+    # the server crashes on startup (which will happen if gun fails to connect)
+    # so instead we use cast + monitor
+
+    ref = Process.monitor(worker_pid)
+    if register, do: GenServer.cast(worker_pid, {:add_client, self(), true})
+
+    receive do
+      {:conn_pid, pid} ->
+        Process.demonitor(ref)
+        {:ok, pid}
+
+      {:DOWN, ^ref, :process, ^worker_pid, reason} ->
+        case reason do
+          {:shutdown, error} -> error
+          _ -> {:error, reason}
+        end
+    end
+  end
+
+  def release_conn(conn_pid) do
+    # :ets.fun2ms(fn {_, {worker_pid, {gun_pid, _, _, _}}} when gun_pid == conn_pid ->
+    #    worker_pid end)
+    query_result =
+      Registry.select(@registry, [
+        {{:_, :"$1", {:"$2", :_, :_, :_}}, [{:==, :"$2", conn_pid}], [:"$1"]}
+      ])
+
+    case query_result do
+      [worker_pid] ->
+        GenServer.cast(worker_pid, {:remove_client, self()})
+
+      [] ->
+        :ok
+    end
+  end
+end
diff --git a/lib/pleroma/gun/connection_pool/reclaimer.ex b/lib/pleroma/gun/connection_pool/reclaimer.ex
new file mode 100644 (file)
index 0000000..cea8008
--- /dev/null
@@ -0,0 +1,85 @@
+defmodule Pleroma.Gun.ConnectionPool.Reclaimer do
+  use GenServer, restart: :temporary
+
+  @registry Pleroma.Gun.ConnectionPool
+
+  def start_monitor do
+    pid =
+      case :gen_server.start(__MODULE__, [], name: {:via, Registry, {@registry, "reclaimer"}}) do
+        {:ok, pid} ->
+          pid
+
+        {:error, {:already_registered, pid}} ->
+          pid
+      end
+
+    {pid, Process.monitor(pid)}
+  end
+
+  @impl true
+  def init(_) do
+    {:ok, nil, {:continue, :reclaim}}
+  end
+
+  @impl true
+  def handle_continue(:reclaim, _) do
+    max_connections = Pleroma.Config.get([:connections_pool, :max_connections])
+
+    reclaim_max =
+      [:connections_pool, :reclaim_multiplier]
+      |> Pleroma.Config.get()
+      |> Kernel.*(max_connections)
+      |> round
+      |> max(1)
+
+    :telemetry.execute([:pleroma, :connection_pool, :reclaim, :start], %{}, %{
+      max_connections: max_connections,
+      reclaim_max: reclaim_max
+    })
+
+    # :ets.fun2ms(
+    # fn {_, {worker_pid, {_, used_by, crf, last_reference}}} when used_by == [] ->
+    #   {worker_pid, crf, last_reference} end)
+    unused_conns =
+      Registry.select(
+        @registry,
+        [
+          {{:_, :"$1", {:_, :"$2", :"$3", :"$4"}}, [{:==, :"$2", []}], [{{:"$1", :"$3", :"$4"}}]}
+        ]
+      )
+
+    case unused_conns do
+      [] ->
+        :telemetry.execute(
+          [:pleroma, :connection_pool, :reclaim, :stop],
+          %{reclaimed_count: 0},
+          %{
+            max_connections: max_connections
+          }
+        )
+
+        {:stop, :no_unused_conns, nil}
+
+      unused_conns ->
+        reclaimed =
+          unused_conns
+          |> Enum.sort(fn {_pid1, crf1, last_reference1}, {_pid2, crf2, last_reference2} ->
+            crf1 <= crf2 and last_reference1 <= last_reference2
+          end)
+          |> Enum.take(reclaim_max)
+
+        reclaimed
+        |> Enum.each(fn {pid, _, _} ->
+          DynamicSupervisor.terminate_child(Pleroma.Gun.ConnectionPool.WorkerSupervisor, pid)
+        end)
+
+        :telemetry.execute(
+          [:pleroma, :connection_pool, :reclaim, :stop],
+          %{reclaimed_count: Enum.count(reclaimed)},
+          %{max_connections: max_connections}
+        )
+
+        {:stop, :normal, nil}
+    end
+  end
+end
diff --git a/lib/pleroma/gun/connection_pool/worker.ex b/lib/pleroma/gun/connection_pool/worker.ex
new file mode 100644 (file)
index 0000000..f33447c
--- /dev/null
@@ -0,0 +1,127 @@
+defmodule Pleroma.Gun.ConnectionPool.Worker do
+  alias Pleroma.Gun
+  use GenServer, restart: :temporary
+
+  @registry Pleroma.Gun.ConnectionPool
+
+  def start_link([key | _] = opts) do
+    GenServer.start_link(__MODULE__, opts, name: {:via, Registry, {@registry, key}})
+  end
+
+  @impl true
+  def init([_key, _uri, _opts, _client_pid] = opts) do
+    {:ok, nil, {:continue, {:connect, opts}}}
+  end
+
+  @impl true
+  def handle_continue({:connect, [key, uri, opts, client_pid]}, _) do
+    with {:ok, conn_pid} <- Gun.Conn.open(uri, opts),
+         Process.link(conn_pid) do
+      time = :erlang.monotonic_time(:millisecond)
+
+      {_, _} =
+        Registry.update_value(@registry, key, fn _ ->
+          {conn_pid, [client_pid], 1, time}
+        end)
+
+      send(client_pid, {:conn_pid, conn_pid})
+
+      {:noreply,
+       %{key: key, timer: nil, client_monitors: %{client_pid => Process.monitor(client_pid)}},
+       :hibernate}
+    else
+      err ->
+        {:stop, {:shutdown, err}, nil}
+    end
+  end
+
+  @impl true
+  def handle_cast({:add_client, client_pid, send_pid_back}, %{key: key} = state) do
+    time = :erlang.monotonic_time(:millisecond)
+
+    {{conn_pid, _, _, _}, _} =
+      Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} ->
+        {conn_pid, [client_pid | used_by], crf(time - last_reference, crf), time}
+      end)
+
+    if send_pid_back, do: send(client_pid, {:conn_pid, conn_pid})
+
+    state =
+      if state.timer != nil do
+        Process.cancel_timer(state[:timer])
+        %{state | timer: nil}
+      else
+        state
+      end
+
+    ref = Process.monitor(client_pid)
+
+    state = put_in(state.client_monitors[client_pid], ref)
+    {:noreply, state, :hibernate}
+  end
+
+  @impl true
+  def handle_cast({:remove_client, client_pid}, %{key: key} = state) do
+    {{_conn_pid, used_by, _crf, _last_reference}, _} =
+      Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} ->
+        {conn_pid, List.delete(used_by, client_pid), crf, last_reference}
+      end)
+
+    {ref, state} = pop_in(state.client_monitors[client_pid])
+    Process.demonitor(ref)
+
+    timer =
+      if used_by == [] do
+        max_idle = Pleroma.Config.get([:connections_pool, :max_idle_time], 30_000)
+        Process.send_after(self(), :idle_close, max_idle)
+      else
+        nil
+      end
+
+    {:noreply, %{state | timer: timer}, :hibernate}
+  end
+
+  @impl true
+  def handle_info(:idle_close, state) do
+    # Gun monitors the owner process, and will close the connection automatically
+    # when it's terminated
+    {:stop, :normal, state}
+  end
+
+  # Gracefully shutdown if the connection got closed without any streams left
+  @impl true
+  def handle_info({:gun_down, _pid, _protocol, _reason, []}, state) do
+    {:stop, :normal, state}
+  end
+
+  # Otherwise, shutdown with an error
+  @impl true
+  def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams} = down_message, state) do
+    {:stop, {:error, down_message}, state}
+  end
+
+  @impl true
+  def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
+    # Sometimes the client is dead before we demonitor it in :remove_client, so the message
+    # arrives anyway
+
+    case state.client_monitors[pid] do
+      nil ->
+        {:noreply, state, :hibernate}
+
+      _ref ->
+        :telemetry.execute(
+          [:pleroma, :connection_pool, :client_death],
+          %{client_pid: pid, reason: reason},
+          %{key: state.key}
+        )
+
+        handle_cast({:remove_client, pid}, state)
+    end
+  end
+
+  # LRFU policy: https://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.55.1478
+  defp crf(time_delta, prev_crf) do
+    1 + :math.pow(0.5, 0.0001 * time_delta) * prev_crf
+  end
+end
diff --git a/lib/pleroma/gun/connection_pool/worker_supervisor.ex b/lib/pleroma/gun/connection_pool/worker_supervisor.ex
new file mode 100644 (file)
index 0000000..39615c9
--- /dev/null
@@ -0,0 +1,45 @@
+defmodule Pleroma.Gun.ConnectionPool.WorkerSupervisor do
+  @moduledoc "Supervisor for pool workers. Does not do anything except enforce max connection limit"
+
+  use DynamicSupervisor
+
+  def start_link(opts) do
+    DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__)
+  end
+
+  def init(_opts) do
+    DynamicSupervisor.init(
+      strategy: :one_for_one,
+      max_children: Pleroma.Config.get([:connections_pool, :max_connections])
+    )
+  end
+
+  def start_worker(opts, retry \\ false) do
+    case DynamicSupervisor.start_child(__MODULE__, {Pleroma.Gun.ConnectionPool.Worker, opts}) do
+      {:error, :max_children} ->
+        if retry or free_pool() == :error do
+          :telemetry.execute([:pleroma, :connection_pool, :provision_failure], %{opts: opts})
+          {:error, :pool_full}
+        else
+          start_worker(opts, true)
+        end
+
+      res ->
+        res
+    end
+  end
+
+  defp free_pool do
+    wait_for_reclaimer_finish(Pleroma.Gun.ConnectionPool.Reclaimer.start_monitor())
+  end
+
+  defp wait_for_reclaimer_finish({pid, mon}) do
+    receive do
+      {:DOWN, ^mon, :process, ^pid, :no_unused_conns} ->
+        :error
+
+      {:DOWN, ^mon, :process, ^pid, :normal} ->
+        :ok
+    end
+  end
+end
index 510722ff94ff6884d7afbb695b23a15b846809c2..9ec3836b057122243b174201eecc08d9db92afb8 100644 (file)
@@ -3,32 +3,30 @@
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.HTTP.AdapterHelper do
-  alias Pleroma.HTTP.Connection
+  @moduledoc """
+  Configure Tesla.Client with default and customized adapter options.
+  """
+  @defaults [pool: :federation]
+
+  @type proxy_type() :: :socks4 | :socks5
+  @type host() :: charlist() | :inet.ip_address()
+
+  alias Pleroma.Config
+  alias Pleroma.HTTP.AdapterHelper
+  require Logger
 
   @type proxy ::
           {Connection.host(), pos_integer()}
           | {Connection.proxy_type(), Connection.host(), pos_integer()}
 
   @callback options(keyword(), URI.t()) :: keyword()
-  @callback after_request(keyword()) :: :ok
-
-  @spec options(keyword(), URI.t()) :: keyword()
-  def options(opts, _uri) do
-    proxy = Pleroma.Config.get([:http, :proxy_url], nil)
-    maybe_add_proxy(opts, format_proxy(proxy))
-  end
-
-  @spec maybe_get_conn(URI.t(), keyword()) :: keyword()
-  def maybe_get_conn(_uri, opts), do: opts
-
-  @spec after_request(keyword()) :: :ok
-  def after_request(_opts), do: :ok
+  @callback get_conn(URI.t(), keyword()) :: {:ok, term()} | {:error, term()}
 
   @spec format_proxy(String.t() | tuple() | nil) :: proxy() | nil
   def format_proxy(nil), do: nil
 
   def format_proxy(proxy_url) do
-    case Connection.parse_proxy(proxy_url) do
+    case parse_proxy(proxy_url) do
       {:ok, host, port} -> {host, port}
       {:ok, type, host, port} -> {type, host, port}
       _ -> nil
@@ -38,4 +36,105 @@ defmodule Pleroma.HTTP.AdapterHelper do
   @spec maybe_add_proxy(keyword(), proxy() | nil) :: keyword()
   def maybe_add_proxy(opts, nil), do: opts
   def maybe_add_proxy(opts, proxy), do: Keyword.put_new(opts, :proxy, proxy)
+
+  @doc """
+  Merge default connection & adapter options with received ones.
+  """
+
+  @spec options(URI.t(), keyword()) :: keyword()
+  def options(%URI{} = uri, opts \\ []) do
+    @defaults
+    |> put_timeout()
+    |> Keyword.merge(opts)
+    |> adapter_helper().options(uri)
+  end
+
+  # For Hackney, this is the time a connection can stay idle in the pool.
+  # For Gun, this is the timeout to receive a message from Gun.
+  defp put_timeout(opts) do
+    {config_key, default} =
+      if adapter() == Tesla.Adapter.Gun do
+        {:pools, Config.get([:pools, :default, :timeout], 5_000)}
+      else
+        {:hackney_pools, 10_000}
+      end
+
+    timeout = Config.get([config_key, opts[:pool], :timeout], default)
+
+    Keyword.merge(opts, timeout: timeout)
+  end
+
+  def get_conn(uri, opts), do: adapter_helper().get_conn(uri, opts)
+  defp adapter, do: Application.get_env(:tesla, :adapter)
+
+  defp adapter_helper do
+    case adapter() do
+      Tesla.Adapter.Gun -> AdapterHelper.Gun
+      Tesla.Adapter.Hackney -> AdapterHelper.Hackney
+      _ -> AdapterHelper.Default
+    end
+  end
+
+  @spec parse_proxy(String.t() | tuple() | nil) ::
+          {:ok, host(), pos_integer()}
+          | {:ok, proxy_type(), host(), pos_integer()}
+          | {:error, atom()}
+          | nil
+
+  def parse_proxy(nil), do: nil
+
+  def parse_proxy(proxy) when is_binary(proxy) do
+    with [host, port] <- String.split(proxy, ":"),
+         {port, ""} <- Integer.parse(port) do
+      {:ok, parse_host(host), port}
+    else
+      {_, _} ->
+        Logger.warn("Parsing port failed #{inspect(proxy)}")
+        {:error, :invalid_proxy_port}
+
+      :error ->
+        Logger.warn("Parsing port failed #{inspect(proxy)}")
+        {:error, :invalid_proxy_port}
+
+      _ ->
+        Logger.warn("Parsing proxy failed #{inspect(proxy)}")
+        {:error, :invalid_proxy}
+    end
+  end
+
+  def parse_proxy(proxy) when is_tuple(proxy) do
+    with {type, host, port} <- proxy do
+      {:ok, type, parse_host(host), port}
+    else
+      _ ->
+        Logger.warn("Parsing proxy failed #{inspect(proxy)}")
+        {:error, :invalid_proxy}
+    end
+  end
+
+  @spec parse_host(String.t() | atom() | charlist()) :: charlist() | :inet.ip_address()
+  def parse_host(host) when is_list(host), do: host
+  def parse_host(host) when is_atom(host), do: to_charlist(host)
+
+  def parse_host(host) when is_binary(host) do
+    host = to_charlist(host)
+
+    case :inet.parse_address(host) do
+      {:error, :einval} -> host
+      {:ok, ip} -> ip
+    end
+  end
+
+  @spec format_host(String.t()) :: charlist()
+  def format_host(host) do
+    host_charlist = to_charlist(host)
+
+    case :inet.parse_address(host_charlist) do
+      {:error, :einval} ->
+        :idna.encode(host_charlist)
+
+      {:ok, _ip} ->
+        host_charlist
+    end
+  end
 end
diff --git a/lib/pleroma/http/adapter_helper/default.ex b/lib/pleroma/http/adapter_helper/default.ex
new file mode 100644 (file)
index 0000000..e134413
--- /dev/null
@@ -0,0 +1,14 @@
+defmodule Pleroma.HTTP.AdapterHelper.Default do
+  alias Pleroma.HTTP.AdapterHelper
+
+  @behaviour Pleroma.HTTP.AdapterHelper
+
+  @spec options(keyword(), URI.t()) :: keyword()
+  def options(opts, _uri) do
+    proxy = Pleroma.Config.get([:http, :proxy_url], nil)
+    AdapterHelper.maybe_add_proxy(opts, AdapterHelper.format_proxy(proxy))
+  end
+
+  @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()}
+  def get_conn(_uri, opts), do: {:ok, opts}
+end
index ead7cdc6bba7297f1b6ceaa1faf4a6687e48c979..b4ff8306c3adeabd505b28fa6440c3e91c0fd984 100644 (file)
@@ -5,8 +5,8 @@
 defmodule Pleroma.HTTP.AdapterHelper.Gun do
   @behaviour Pleroma.HTTP.AdapterHelper
 
+  alias Pleroma.Gun.ConnectionPool
   alias Pleroma.HTTP.AdapterHelper
-  alias Pleroma.Pool.Connections
 
   require Logger
 
@@ -14,7 +14,7 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do
     connect_timeout: 5_000,
     domain_lookup_timeout: 5_000,
     tls_handshake_timeout: 5_000,
-    retry: 1,
+    retry: 0,
     retry_timeout: 1000,
     await_up_timeout: 5_000
   ]
@@ -31,16 +31,7 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do
     |> Keyword.merge(config_opts)
     |> add_scheme_opts(uri)
     |> AdapterHelper.maybe_add_proxy(proxy)
-    |> maybe_get_conn(uri, incoming_opts)
-  end
-
-  @spec after_request(keyword()) :: :ok
-  def after_request(opts) do
-    if opts[:conn] && opts[:body_as] != :chunks do
-      Connections.checkout(opts[:conn], self(), :gun_connections)
-    end
-
-    :ok
+    |> Keyword.merge(incoming_opts)
   end
 
   defp add_scheme_opts(opts, %{scheme: "http"}), do: opts
@@ -48,30 +39,40 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do
   defp add_scheme_opts(opts, %{scheme: "https"}) do
     opts
     |> Keyword.put(:certificates_verification, true)
-    |> Keyword.put(:tls_opts, log_level: :warning)
   end
 
-  defp maybe_get_conn(adapter_opts, uri, incoming_opts) do
-    {receive_conn?, opts} =
-      adapter_opts
-      |> Keyword.merge(incoming_opts)
-      |> Keyword.pop(:receive_conn, true)
-
-    if Connections.alive?(:gun_connections) and receive_conn? do
-      checkin_conn(uri, opts)
-    else
-      opts
+  @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} | {:error, atom()}
+  def get_conn(uri, opts) do
+    case ConnectionPool.get_conn(uri, opts) do
+      {:ok, conn_pid} -> {:ok, Keyword.merge(opts, conn: conn_pid, close_conn: false)}
+      err -> err
     end
   end
 
-  defp checkin_conn(uri, opts) do
-    case Connections.checkin(uri, :gun_connections) do
-      nil ->
-        Task.start(Pleroma.Gun.Conn, :open, [uri, :gun_connections, opts])
-        opts
+  @prefix Pleroma.Gun.ConnectionPool
+  def limiter_setup do
+    wait = Pleroma.Config.get([:connections_pool, :connection_acquisition_wait])
+    retries = Pleroma.Config.get([:connections_pool, :connection_acquisition_retries])
 
-      conn when is_pid(conn) ->
-        Keyword.merge(opts, conn: conn, close_conn: false)
-    end
+    :pools
+    |> Pleroma.Config.get([])
+    |> Enum.each(fn {name, opts} ->
+      max_running = Keyword.get(opts, :size, 50)
+      max_waiting = Keyword.get(opts, :max_waiting, 10)
+
+      result =
+        ConcurrentLimiter.new(:"#{@prefix}.#{name}", max_running, max_waiting,
+          wait: wait,
+          max_retries: retries
+        )
+
+      case result do
+        :ok -> :ok
+        {:error, :existing} -> :ok
+        e -> raise e
+      end
+    end)
+
+    :ok
   end
 end
index 3972a03a948717eb436e13666bc2caf1dff689d3..cd569422b6b481e7d38320ca06e6705992656ee3 100644 (file)
@@ -24,5 +24,6 @@ defmodule Pleroma.HTTP.AdapterHelper.Hackney do
 
   defp add_scheme_opts(opts, _), do: opts
 
-  def after_request(_), do: :ok
+  @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()}
+  def get_conn(_uri, opts), do: {:ok, opts}
 end
diff --git a/lib/pleroma/http/connection.ex b/lib/pleroma/http/connection.ex
deleted file mode 100644 (file)
index ebacf79..0000000
+++ /dev/null
@@ -1,124 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.HTTP.Connection do
-  @moduledoc """
-  Configure Tesla.Client with default and customized adapter options.
-  """
-
-  alias Pleroma.Config
-  alias Pleroma.HTTP.AdapterHelper
-
-  require Logger
-
-  @defaults [pool: :federation]
-
-  @type ip_address :: ipv4_address() | ipv6_address()
-  @type ipv4_address :: {0..255, 0..255, 0..255, 0..255}
-  @type ipv6_address ::
-          {0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535}
-  @type proxy_type() :: :socks4 | :socks5
-  @type host() :: charlist() | ip_address()
-
-  @doc """
-  Merge default connection & adapter options with received ones.
-  """
-
-  @spec options(URI.t(), keyword()) :: keyword()
-  def options(%URI{} = uri, opts \\ []) do
-    @defaults
-    |> pool_timeout()
-    |> Keyword.merge(opts)
-    |> adapter_helper().options(uri)
-  end
-
-  defp pool_timeout(opts) do
-    {config_key, default} =
-      if adapter() == Tesla.Adapter.Gun do
-        {:pools, Config.get([:pools, :default, :timeout])}
-      else
-        {:hackney_pools, 10_000}
-      end
-
-    timeout = Config.get([config_key, opts[:pool], :timeout], default)
-
-    Keyword.merge(opts, timeout: timeout)
-  end
-
-  @spec after_request(keyword()) :: :ok
-  def after_request(opts), do: adapter_helper().after_request(opts)
-
-  defp adapter, do: Application.get_env(:tesla, :adapter)
-
-  defp adapter_helper do
-    case adapter() do
-      Tesla.Adapter.Gun -> AdapterHelper.Gun
-      Tesla.Adapter.Hackney -> AdapterHelper.Hackney
-      _ -> AdapterHelper
-    end
-  end
-
-  @spec parse_proxy(String.t() | tuple() | nil) ::
-          {:ok, host(), pos_integer()}
-          | {:ok, proxy_type(), host(), pos_integer()}
-          | {:error, atom()}
-          | nil
-
-  def parse_proxy(nil), do: nil
-
-  def parse_proxy(proxy) when is_binary(proxy) do
-    with [host, port] <- String.split(proxy, ":"),
-         {port, ""} <- Integer.parse(port) do
-      {:ok, parse_host(host), port}
-    else
-      {_, _} ->
-        Logger.warn("Parsing port failed #{inspect(proxy)}")
-        {:error, :invalid_proxy_port}
-
-      :error ->
-        Logger.warn("Parsing port failed #{inspect(proxy)}")
-        {:error, :invalid_proxy_port}
-
-      _ ->
-        Logger.warn("Parsing proxy failed #{inspect(proxy)}")
-        {:error, :invalid_proxy}
-    end
-  end
-
-  def parse_proxy(proxy) when is_tuple(proxy) do
-    with {type, host, port} <- proxy do
-      {:ok, type, parse_host(host), port}
-    else
-      _ ->
-        Logger.warn("Parsing proxy failed #{inspect(proxy)}")
-        {:error, :invalid_proxy}
-    end
-  end
-
-  @spec parse_host(String.t() | atom() | charlist()) :: charlist() | ip_address()
-  def parse_host(host) when is_list(host), do: host
-  def parse_host(host) when is_atom(host), do: to_charlist(host)
-
-  def parse_host(host) when is_binary(host) do
-    host = to_charlist(host)
-
-    case :inet.parse_address(host) do
-      {:error, :einval} -> host
-      {:ok, ip} -> ip
-    end
-  end
-
-  @spec format_host(String.t()) :: charlist()
-  def format_host(host) do
-    host_charlist = to_charlist(host)
-
-    case :inet.parse_address(host_charlist) do
-      {:error, :einval} ->
-        :idna.encode(host_charlist)
-
-      {:ok, _ip} ->
-        host_charlist
-    end
-  end
-end
index 66ca7536766918a3ffa6734fe301aa3857cfcb6e..6128bc4cf94d5ac863a72d9dbe5922b9259eba29 100644 (file)
@@ -7,7 +7,7 @@ defmodule Pleroma.HTTP do
     Wrapper for `Tesla.request/2`.
   """
 
-  alias Pleroma.HTTP.Connection
+  alias Pleroma.HTTP.AdapterHelper
   alias Pleroma.HTTP.Request
   alias Pleroma.HTTP.RequestBuilder, as: Builder
   alias Tesla.Client
@@ -60,49 +60,29 @@ defmodule Pleroma.HTTP do
           {:ok, Env.t()} | {:error, any()}
   def request(method, url, body, headers, options) when is_binary(url) do
     uri = URI.parse(url)
-    adapter_opts = Connection.options(uri, options[:adapter] || [])
-    options = put_in(options[:adapter], adapter_opts)
-    params = options[:params] || []
-    request = build_request(method, headers, options, url, body, params)
-
-    adapter = Application.get_env(:tesla, :adapter)
-    client = Tesla.client([Tesla.Middleware.FollowRedirects], adapter)
-
-    pid = Process.whereis(adapter_opts[:pool])
-
-    pool_alive? =
-      if adapter == Tesla.Adapter.Gun && pid do
-        Process.alive?(pid)
-      else
-        false
-      end
-
-    request_opts =
-      adapter_opts
-      |> Enum.into(%{})
-      |> Map.put(:env, Pleroma.Config.get([:env]))
-      |> Map.put(:pool_alive?, pool_alive?)
-
-    response = request(client, request, request_opts)
-
-    Connection.after_request(adapter_opts)
-
-    response
-  end
-
-  @spec request(Client.t(), keyword(), map()) :: {:ok, Env.t()} | {:error, any()}
-  def request(%Client{} = client, request, %{env: :test}), do: request(client, request)
-
-  def request(%Client{} = client, request, %{body_as: :chunks}), do: request(client, request)
-
-  def request(%Client{} = client, request, %{pool_alive?: false}), do: request(client, request)
-
-  def request(%Client{} = client, request, %{pool: pool, timeout: timeout}) do
-    :poolboy.transaction(
-      pool,
-      &Pleroma.Pool.Request.execute(&1, client, request, timeout),
-      timeout
-    )
+    adapter_opts = AdapterHelper.options(uri, options[:adapter] || [])
+
+    case AdapterHelper.get_conn(uri, adapter_opts) do
+      {:ok, adapter_opts} ->
+        options = put_in(options[:adapter], adapter_opts)
+        params = options[:params] || []
+        request = build_request(method, headers, options, url, body, params)
+
+        adapter = Application.get_env(:tesla, :adapter)
+        client = Tesla.client([Pleroma.HTTP.Middleware.FollowRedirects], adapter)
+
+        maybe_limit(
+          fn ->
+            request(client, request)
+          end,
+          adapter,
+          adapter_opts
+        )
+
+      # Connection release is handled in a custom FollowRedirects middleware
+      err ->
+        err
+    end
   end
 
   @spec request(Client.t(), keyword()) :: {:ok, Env.t()} | {:error, any()}
@@ -118,4 +98,13 @@ defmodule Pleroma.HTTP do
     |> Builder.add_param(:query, :query, params)
     |> Builder.convert_to_keyword()
   end
+
+  @prefix Pleroma.Gun.ConnectionPool
+  defp maybe_limit(fun, Tesla.Adapter.Gun, opts) do
+    ConcurrentLimiter.limit(:"#{@prefix}.#{opts[:pool] || :default}", fun)
+  end
+
+  defp maybe_limit(fun, _, _) do
+    fun.()
+  end
 end
diff --git a/lib/pleroma/pool/connections.ex b/lib/pleroma/pool/connections.ex
deleted file mode 100644 (file)
index acafe1b..0000000
+++ /dev/null
@@ -1,283 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Pool.Connections do
-  use GenServer
-
-  alias Pleroma.Config
-  alias Pleroma.Gun
-
-  require Logger
-
-  @type domain :: String.t()
-  @type conn :: Pleroma.Gun.Conn.t()
-
-  @type t :: %__MODULE__{
-          conns: %{domain() => conn()},
-          opts: keyword()
-        }
-
-  defstruct conns: %{}, opts: []
-
-  @spec start_link({atom(), keyword()}) :: {:ok, pid()}
-  def start_link({name, opts}) do
-    GenServer.start_link(__MODULE__, opts, name: name)
-  end
-
-  @impl true
-  def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
-
-  @spec checkin(String.t() | URI.t(), atom()) :: pid() | nil
-  def checkin(url, name)
-  def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name)
-
-  def checkin(%URI{} = uri, name) do
-    timeout = Config.get([:connections_pool, :checkin_timeout], 250)
-
-    GenServer.call(name, {:checkin, uri}, timeout)
-  end
-
-  @spec alive?(atom()) :: boolean()
-  def alive?(name) do
-    if pid = Process.whereis(name) do
-      Process.alive?(pid)
-    else
-      false
-    end
-  end
-
-  @spec get_state(atom()) :: t()
-  def get_state(name) do
-    GenServer.call(name, :state)
-  end
-
-  @spec count(atom()) :: pos_integer()
-  def count(name) do
-    GenServer.call(name, :count)
-  end
-
-  @spec get_unused_conns(atom()) :: [{domain(), conn()}]
-  def get_unused_conns(name) do
-    GenServer.call(name, :unused_conns)
-  end
-
-  @spec checkout(pid(), pid(), atom()) :: :ok
-  def checkout(conn, pid, name) do
-    GenServer.cast(name, {:checkout, conn, pid})
-  end
-
-  @spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok
-  def add_conn(name, key, conn) do
-    GenServer.cast(name, {:add_conn, key, conn})
-  end
-
-  @spec remove_conn(atom(), String.t()) :: :ok
-  def remove_conn(name, key) do
-    GenServer.cast(name, {:remove_conn, key})
-  end
-
-  @impl true
-  def handle_cast({:add_conn, key, conn}, state) do
-    state = put_in(state.conns[key], conn)
-
-    Process.monitor(conn.conn)
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_cast({:checkout, conn_pid, pid}, state) do
-    state =
-      with true <- Process.alive?(conn_pid),
-           {key, conn} <- find_conn(state.conns, conn_pid),
-           used_by <- List.keydelete(conn.used_by, pid, 0) do
-        conn_state = if used_by == [], do: :idle, else: conn.conn_state
-
-        put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
-      else
-        false ->
-          Logger.debug("checkout for closed conn #{inspect(conn_pid)}")
-          state
-
-        nil ->
-          Logger.debug("checkout for alive conn #{inspect(conn_pid)}, but is not in state")
-          state
-      end
-
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_cast({:remove_conn, key}, state) do
-    state = put_in(state.conns, Map.delete(state.conns, key))
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_call({:checkin, uri}, from, state) do
-    key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
-
-    case state.conns[key] do
-      %{conn: pid, gun_state: :up} = conn ->
-        time = :os.system_time(:second)
-        last_reference = time - conn.last_reference
-        crf = crf(last_reference, 100, conn.crf)
-
-        state =
-          put_in(state.conns[key], %{
-            conn
-            | last_reference: time,
-              crf: crf,
-              conn_state: :active,
-              used_by: [from | conn.used_by]
-          })
-
-        {:reply, pid, state}
-
-      %{gun_state: :down} ->
-        {:reply, nil, state}
-
-      nil ->
-        {:reply, nil, state}
-    end
-  end
-
-  @impl true
-  def handle_call(:state, _from, state), do: {:reply, state, state}
-
-  @impl true
-  def handle_call(:count, _from, state) do
-    {:reply, Enum.count(state.conns), state}
-  end
-
-  @impl true
-  def handle_call(:unused_conns, _from, state) do
-    unused_conns =
-      state.conns
-      |> Enum.filter(&filter_conns/1)
-      |> Enum.sort(&sort_conns/2)
-
-    {:reply, unused_conns, state}
-  end
-
-  defp filter_conns({_, %{conn_state: :idle, used_by: []}}), do: true
-  defp filter_conns(_), do: false
-
-  defp sort_conns({_, c1}, {_, c2}) do
-    c1.crf <= c2.crf and c1.last_reference <= c2.last_reference
-  end
-
-  @impl true
-  def handle_info({:gun_up, conn_pid, _protocol}, state) do
-    %{origin_host: host, origin_scheme: scheme, origin_port: port} = Gun.info(conn_pid)
-
-    host =
-      case :inet.ntoa(host) do
-        {:error, :einval} -> host
-        ip -> ip
-      end
-
-    key = "#{scheme}:#{host}:#{port}"
-
-    state =
-      with {key, conn} <- find_conn(state.conns, conn_pid, key),
-           {true, key} <- {Process.alive?(conn_pid), key} do
-        put_in(state.conns[key], %{
-          conn
-          | gun_state: :up,
-            conn_state: :active,
-            retries: 0
-        })
-      else
-        {false, key} ->
-          put_in(
-            state.conns,
-            Map.delete(state.conns, key)
-          )
-
-        nil ->
-          :ok = Gun.close(conn_pid)
-
-          state
-      end
-
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do
-    retries = Config.get([:connections_pool, :retry], 1)
-    # we can't get info on this pid, because pid is dead
-    state =
-      with {key, conn} <- find_conn(state.conns, conn_pid),
-           {true, key} <- {Process.alive?(conn_pid), key} do
-        if conn.retries == retries do
-          :ok = Gun.close(conn.conn)
-
-          put_in(
-            state.conns,
-            Map.delete(state.conns, key)
-          )
-        else
-          put_in(state.conns[key], %{
-            conn
-            | gun_state: :down,
-              retries: conn.retries + 1
-          })
-        end
-      else
-        {false, key} ->
-          put_in(
-            state.conns,
-            Map.delete(state.conns, key)
-          )
-
-        nil ->
-          Logger.debug(":gun_down for conn which isn't found in state")
-
-          state
-      end
-
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_info({:DOWN, _ref, :process, conn_pid, reason}, state) do
-    Logger.debug("received DOWN message for #{inspect(conn_pid)} reason -> #{inspect(reason)}")
-
-    state =
-      with {key, conn} <- find_conn(state.conns, conn_pid) do
-        Enum.each(conn.used_by, fn {pid, _ref} ->
-          Process.exit(pid, reason)
-        end)
-
-        put_in(
-          state.conns,
-          Map.delete(state.conns, key)
-        )
-      else
-        nil ->
-          Logger.debug(":DOWN for conn which isn't found in state")
-
-          state
-      end
-
-    {:noreply, state}
-  end
-
-  defp find_conn(conns, conn_pid) do
-    Enum.find(conns, fn {_key, conn} ->
-      conn.conn == conn_pid
-    end)
-  end
-
-  defp find_conn(conns, conn_pid, conn_key) do
-    Enum.find(conns, fn {key, conn} ->
-      key == conn_key and conn.conn == conn_pid
-    end)
-  end
-
-  def crf(current, steps, crf) do
-    1 + :math.pow(0.5, current / steps) * crf
-  end
-end
diff --git a/lib/pleroma/pool/pool.ex b/lib/pleroma/pool/pool.ex
deleted file mode 100644 (file)
index 21a6fbb..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Pool do
-  def child_spec(opts) do
-    poolboy_opts =
-      opts
-      |> Keyword.put(:worker_module, Pleroma.Pool.Request)
-      |> Keyword.put(:name, {:local, opts[:name]})
-      |> Keyword.put(:size, opts[:size])
-      |> Keyword.put(:max_overflow, opts[:max_overflow])
-
-    %{
-      id: opts[:id] || {__MODULE__, make_ref()},
-      start: {:poolboy, :start_link, [poolboy_opts, [name: opts[:name]]]},
-      restart: :permanent,
-      shutdown: 5000,
-      type: :worker
-    }
-  end
-end
diff --git a/lib/pleroma/pool/request.ex b/lib/pleroma/pool/request.ex
deleted file mode 100644 (file)
index 3fb930d..0000000
+++ /dev/null
@@ -1,65 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Pool.Request do
-  use GenServer
-
-  require Logger
-
-  def start_link(args) do
-    GenServer.start_link(__MODULE__, args)
-  end
-
-  @impl true
-  def init(_), do: {:ok, []}
-
-  @spec execute(pid() | atom(), Tesla.Client.t(), keyword(), pos_integer()) ::
-          {:ok, Tesla.Env.t()} | {:error, any()}
-  def execute(pid, client, request, timeout) do
-    GenServer.call(pid, {:execute, client, request}, timeout)
-  end
-
-  @impl true
-  def handle_call({:execute, client, request}, _from, state) do
-    response = Pleroma.HTTP.request(client, request)
-
-    {:reply, response, state}
-  end
-
-  @impl true
-  def handle_info({:gun_data, _conn, _stream, _, _}, state) do
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_info({:gun_up, _conn, _protocol}, state) do
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_info({:gun_down, _conn, _protocol, _reason, _killed}, state) do
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_info({:gun_error, _conn, _stream, _error}, state) do
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_info({:gun_push, _conn, _stream, _new_stream, _method, _uri, _headers}, state) do
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_info({:gun_response, _conn, _stream, _, _status, _headers}, state) do
-    {:noreply, state}
-  end
-
-  @impl true
-  def handle_info(msg, state) do
-    Logger.warn("Received unexpected message #{inspect(__MODULE__)} #{inspect(msg)}")
-    {:noreply, state}
-  end
-end
diff --git a/lib/pleroma/pool/supervisor.ex b/lib/pleroma/pool/supervisor.ex
deleted file mode 100644 (file)
index faf646c..0000000
+++ /dev/null
@@ -1,42 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Pool.Supervisor do
-  use Supervisor
-
-  alias Pleroma.Config
-  alias Pleroma.Pool
-
-  def start_link(args) do
-    Supervisor.start_link(__MODULE__, args, name: __MODULE__)
-  end
-
-  def init(_) do
-    conns_child = %{
-      id: Pool.Connections,
-      start:
-        {Pool.Connections, :start_link, [{:gun_connections, Config.get([:connections_pool])}]}
-    }
-
-    Supervisor.init([conns_child | pools()], strategy: :one_for_one)
-  end
-
-  defp pools do
-    pools = Config.get(:pools)
-
-    pools =
-      if Config.get([Pleroma.Upload, :proxy_remote]) == false do
-        Keyword.delete(pools, :upload)
-      else
-        pools
-      end
-
-    for {pool_name, pool_opts} <- pools do
-      pool_opts
-      |> Keyword.put(:id, {Pool, pool_name})
-      |> Keyword.put(:name, pool_name)
-      |> Pool.child_spec()
-    end
-  end
-end
index e81ea8bde135be2045ce88b7ad292a7f6780a231..65785445d2a5021d7ee4d57062b64315c653bc18 100644 (file)
@@ -48,7 +48,7 @@ defmodule Pleroma.ReverseProxy.Client.Tesla do
     # if there were redirects we need to checkout old conn
     conn = opts[:old_conn] || opts[:conn]
 
-    if conn, do: :ok = Pleroma.Pool.Connections.checkout(conn, self(), :gun_connections)
+    if conn, do: :ok = Pleroma.Gun.ConnectionPool.release_conn(conn)
 
     :done
   end
diff --git a/lib/pleroma/telemetry/logger.ex b/lib/pleroma/telemetry/logger.ex
new file mode 100644 (file)
index 0000000..4cacae0
--- /dev/null
@@ -0,0 +1,76 @@
+defmodule Pleroma.Telemetry.Logger do
+  @moduledoc "Transforms Pleroma telemetry events to logs"
+
+  require Logger
+
+  @events [
+    [:pleroma, :connection_pool, :reclaim, :start],
+    [:pleroma, :connection_pool, :reclaim, :stop],
+    [:pleroma, :connection_pool, :provision_failure],
+    [:pleroma, :connection_pool, :client_death]
+  ]
+  def attach do
+    :telemetry.attach_many("pleroma-logger", @events, &handle_event/4, [])
+  end
+
+  # Passing anonymous functions instead of strings to logger is intentional,
+  # that way strings won't be concatenated if the message is going to be thrown
+  # out anyway due to higher log level configured
+
+  def handle_event(
+        [:pleroma, :connection_pool, :reclaim, :start],
+        _,
+        %{max_connections: max_connections, reclaim_max: reclaim_max},
+        _
+      ) do
+    Logger.debug(fn ->
+      "Connection pool is exhausted (reached #{max_connections} connections). Starting idle connection cleanup to reclaim as much as #{
+        reclaim_max
+      } connections"
+    end)
+  end
+
+  def handle_event(
+        [:pleroma, :connection_pool, :reclaim, :stop],
+        %{reclaimed_count: 0},
+        _,
+        _
+      ) do
+    Logger.error(fn ->
+      "Connection pool failed to reclaim any connections due to all of them being in use. It will have to drop requests for opening connections to new hosts"
+    end)
+  end
+
+  def handle_event(
+        [:pleroma, :connection_pool, :reclaim, :stop],
+        %{reclaimed_count: reclaimed_count},
+        _,
+        _
+      ) do
+    Logger.debug(fn -> "Connection pool cleaned up #{reclaimed_count} idle connections" end)
+  end
+
+  def handle_event(
+        [:pleroma, :connection_pool, :provision_failure],
+        %{opts: [key | _]},
+        _,
+        _
+      ) do
+    Logger.error(fn ->
+      "Connection pool had to refuse opening a connection to #{key} due to connection limit exhaustion"
+    end)
+  end
+
+  def handle_event(
+        [:pleroma, :connection_pool, :client_death],
+        %{client_pid: client_pid, reason: reason},
+        %{key: key},
+        _
+      ) do
+    Logger.warn(fn ->
+      "Pool worker for #{key}: Client #{inspect(client_pid)} died before releasing the connection with #{
+        inspect(reason)
+      }"
+    end)
+  end
+end
diff --git a/lib/pleroma/tesla/middleware/follow_redirects.ex b/lib/pleroma/tesla/middleware/follow_redirects.ex
new file mode 100644 (file)
index 0000000..5a70322
--- /dev/null
@@ -0,0 +1,110 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2015-2020 Tymon Tobolski <https://github.com/teamon/tesla/blob/master/lib/tesla/middleware/follow_redirects.ex>
+# Copyright © 2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.HTTP.Middleware.FollowRedirects do
+  @moduledoc """
+  Pool-aware version of https://github.com/teamon/tesla/blob/master/lib/tesla/middleware/follow_redirects.ex
+
+  Follow 3xx redirects
+  ## Options
+  - `:max_redirects` - limit number of redirects (default: `5`)
+  """
+
+  alias Pleroma.Gun.ConnectionPool
+
+  @behaviour Tesla.Middleware
+
+  @max_redirects 5
+  @redirect_statuses [301, 302, 303, 307, 308]
+
+  @impl Tesla.Middleware
+  def call(env, next, opts \\ []) do
+    max = Keyword.get(opts, :max_redirects, @max_redirects)
+
+    redirect(env, next, max)
+  end
+
+  defp redirect(env, next, left) do
+    opts = env.opts[:adapter]
+
+    case Tesla.run(env, next) do
+      {:ok, %{status: status} = res} when status in @redirect_statuses and left > 0 ->
+        release_conn(opts)
+
+        case Tesla.get_header(res, "location") do
+          nil ->
+            {:ok, res}
+
+          location ->
+            location = parse_location(location, res)
+
+            case get_conn(location, opts) do
+              {:ok, opts} ->
+                %{env | opts: Keyword.put(env.opts, :adapter, opts)}
+                |> new_request(res.status, location)
+                |> redirect(next, left - 1)
+
+              e ->
+                e
+            end
+        end
+
+      {:ok, %{status: status}} when status in @redirect_statuses ->
+        release_conn(opts)
+        {:error, {__MODULE__, :too_many_redirects}}
+
+      {:error, _} = e ->
+        release_conn(opts)
+        e
+
+      other ->
+        unless opts[:body_as] == :chunks do
+          release_conn(opts)
+        end
+
+        other
+    end
+  end
+
+  defp get_conn(location, opts) do
+    uri = URI.parse(location)
+
+    case ConnectionPool.get_conn(uri, opts) do
+      {:ok, conn} ->
+        {:ok, Keyword.merge(opts, conn: conn)}
+
+      e ->
+        e
+    end
+  end
+
+  defp release_conn(opts) do
+    ConnectionPool.release_conn(opts[:conn])
+  end
+
+  # The 303 (See Other) redirect was added in HTTP/1.1 to indicate that the originally
+  # requested resource is not available, however a related resource (or another redirect)
+  # available via GET is available at the specified location.
+  # https://tools.ietf.org/html/rfc7231#section-6.4.4
+  defp new_request(env, 303, location), do: %{env | url: location, method: :get, query: []}
+
+  # The 307 (Temporary Redirect) status code indicates that the target
+  # resource resides temporarily under a different URI and the user agent
+  # MUST NOT change the request method (...)
+  # https://tools.ietf.org/html/rfc7231#section-6.4.7
+  defp new_request(env, 307, location), do: %{env | url: location}
+
+  defp new_request(env, _, location), do: %{env | url: location, query: []}
+
+  defp parse_location("https://" <> _rest = location, _env), do: location
+  defp parse_location("http://" <> _rest = location, _env), do: location
+
+  defp parse_location(location, env) do
+    env.url
+    |> URI.parse()
+    |> URI.merge(location)
+    |> URI.to_string()
+  end
+end
diff --git a/mix.exs b/mix.exs
index 741f917e68fd7b7e4b81ee365d9b166457d77b10..52b4cf26840b460fb387ec2670f0a02c7dbfad0c 100644 (file)
--- a/mix.exs
+++ b/mix.exs
@@ -135,13 +135,11 @@ defmodule Pleroma.Mixfile do
       {:poison, "~> 3.0", override: true},
       # {:tesla, "~> 1.3", override: true},
       {:tesla,
-       git: "https://git.pleroma.social/pleroma/elixir-libraries/tesla.git",
-       ref: "61b7503cef33f00834f78ddfafe0d5d9dec2270b",
-       override: true},
+       github: "teamon/tesla", ref: "af3707078b10793f6a534938e56b963aff82fe3c", override: true},
       {:castore, "~> 0.1"},
       {:cowlib, "~> 2.8", override: true},
       {:gun,
-       github: "ninenines/gun", ref: "e1a69b36b180a574c0ac314ced9613fdd52312cc", override: true},
+       github: "ninenines/gun", ref: "921c47146b2d9567eac7e9a4d2ccc60fffd4f327", override: true},
       {:jason, "~> 1.0"},
       {:mogrify, "~> 0.6.1"},
       {:ex_aws, "~> 2.1"},
@@ -191,6 +189,9 @@ defmodule Pleroma.Mixfile do
       {:plug_static_index_html, "~> 1.0.0"},
       {:excoveralls, "~> 0.12.1", only: :test},
       {:flake_id, "~> 0.1.0"},
+      {:concurrent_limiter,
+       git: "https://git.pleroma.social/pleroma/elixir-libraries/concurrent_limiter",
+       ref: "8eee96c6ba39b9286ec44c51c52d9f2758951365"},
       {:remote_ip,
        git: "https://git.pleroma.social/pleroma/remote_ip.git",
        ref: "b647d0deecaa3acb140854fe4bda5b7e1dc6d1c8"},
index f801f9e0c470007abb3f9211b63ab12060b67e8f..8dd37a40fc5deb77a483ba0d98b5d2def8929e92 100644 (file)
--- a/mix.lock
+++ b/mix.lock
@@ -15,6 +15,7 @@
   "certifi": {:hex, :certifi, "2.5.2", "b7cfeae9d2ed395695dd8201c57a2d019c0c43ecaf8b8bcb9320b40d6662f340", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "3b3b5f36493004ac3455966991eaf6e768ce9884693d9968055aeeeb1e575040"},
   "combine": {:hex, :combine, "0.10.0", "eff8224eeb56498a2af13011d142c5e7997a80c8f5b97c499f84c841032e429f", [:mix], [], "hexpm", "1b1dbc1790073076580d0d1d64e42eae2366583e7aecd455d1215b0d16f2451b"},
   "comeonin": {:hex, :comeonin, "5.3.1", "7fe612b739c78c9c1a75186ef2d322ce4d25032d119823269d0aa1e2f1e20025", [:mix], [], "hexpm", "d6222483060c17f0977fad1b7401ef0c5863c985a64352755f366aee3799c245"},
+  "concurrent_limiter": {:git, "https://git.pleroma.social/pleroma/elixir-libraries/concurrent_limiter", "8eee96c6ba39b9286ec44c51c52d9f2758951365", [ref: "8eee96c6ba39b9286ec44c51c52d9f2758951365"]},
   "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], [], "hexpm", "4a0850c9be22a43af9920a71ab17c051f5f7d45c209e40269a1938832510e4d9"},
   "cors_plug": {:hex, :cors_plug, "1.5.2", "72df63c87e4f94112f458ce9d25800900cc88608c1078f0e4faddf20933eda6e", [:mix], [{:plug, "~> 1.3 or ~> 1.4 or ~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "9af027d20dc12dd0c4345a6b87247e0c62965871feea0bfecf9764648b02cc69"},
   "cowboy": {:hex, :cowboy, "2.7.0", "91ed100138a764355f43316b1d23d7ff6bdb0de4ea618cb5d8677c93a7a2f115", [:rebar3], [{:cowlib, "~> 2.8.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "04fd8c6a39edc6aaa9c26123009200fc61f92a3a94f3178c527b70b767c6e605"},
@@ -49,7 +50,7 @@
   "gen_stage": {:hex, :gen_stage, "0.14.3", "d0c66f1c87faa301c1a85a809a3ee9097a4264b2edf7644bf5c123237ef732bf", [:mix], [], "hexpm"},
   "gen_state_machine": {:hex, :gen_state_machine, "2.0.5", "9ac15ec6e66acac994cc442dcc2c6f9796cf380ec4b08267223014be1c728a95", [:mix], [], "hexpm"},
   "gettext": {:hex, :gettext, "0.17.4", "f13088e1ec10ce01665cf25f5ff779e7df3f2dc71b37084976cf89d1aa124d5c", [:mix], [], "hexpm", "3c75b5ea8288e2ee7ea503ff9e30dfe4d07ad3c054576a6e60040e79a801e14d"},
-  "gun": {:git, "https://github.com/ninenines/gun.git", "e1a69b36b180a574c0ac314ced9613fdd52312cc", [ref: "e1a69b36b180a574c0ac314ced9613fdd52312cc"]},
+  "gun": {:git, "https://github.com/ninenines/gun.git", "921c47146b2d9567eac7e9a4d2ccc60fffd4f327", [ref: "921c47146b2d9567eac7e9a4d2ccc60fffd4f327"]},
   "hackney": {:hex, :hackney, "1.16.0", "5096ac8e823e3a441477b2d187e30dd3fff1a82991a806b2003845ce72ce2d84", [:rebar3], [{:certifi, "2.5.2", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.1", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.0", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.6", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "3bf0bebbd5d3092a3543b783bf065165fa5d3ad4b899b836810e513064134e18"},
   "html_entities": {:hex, :html_entities, "0.5.1", "1c9715058b42c35a2ab65edc5b36d0ea66dd083767bef6e3edb57870ef556549", [:mix], [], "hexpm", "30efab070904eb897ff05cd52fa61c1025d7f8ef3a9ca250bc4e6513d16c32de"},
   "html_sanitize_ex": {:hex, :html_sanitize_ex, "1.3.0", "f005ad692b717691203f940c686208aa3d8ffd9dd4bb3699240096a51fa9564e", [:mix], [{:mochiweb, "~> 2.15", [hex: :mochiweb, repo: "hexpm", optional: false]}], "hexpm"},
   "swoosh": {:git, "https://github.com/swoosh/swoosh", "c96e0ca8a00d8f211ec1f042a4626b09f249caa5", [ref: "c96e0ca8a00d8f211ec1f042a4626b09f249caa5"]},
   "syslog": {:hex, :syslog, "1.1.0", "6419a232bea84f07b56dc575225007ffe34d9fdc91abe6f1b2f254fd71d8efc2", [:rebar3], [], "hexpm", "4c6a41373c7e20587be33ef841d3de6f3beba08519809329ecc4d27b15b659e1"},
   "telemetry": {:hex, :telemetry, "0.4.2", "2808c992455e08d6177322f14d3bdb6b625fbcfd233a73505870d8738a2f4599", [:rebar3], [], "hexpm", "2d1419bd9dda6a206d7b5852179511722e2b18812310d304620c7bd92a13fcef"},
-  "tesla": {:git, "https://git.pleroma.social/pleroma/elixir-libraries/tesla.git", "61b7503cef33f00834f78ddfafe0d5d9dec2270b", [ref: "61b7503cef33f00834f78ddfafe0d5d9dec2270b"]},
+  "tesla": {:git, "https://github.com/teamon/tesla.git", "af3707078b10793f6a534938e56b963aff82fe3c", [ref: "af3707078b10793f6a534938e56b963aff82fe3c"]},
   "timex": {:hex, :timex, "3.6.1", "efdf56d0e67a6b956cc57774353b0329c8ab7726766a11547e529357ffdc1d56", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 0.1.8 or ~> 0.5 or ~> 1.0.0", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm", "f354efb2400dd7a80fd9eb6c8419068c4f632da4ac47f3d8822d6e33f08bc852"},
   "trailing_format_plug": {:hex, :trailing_format_plug, "0.0.7", "64b877f912cf7273bed03379936df39894149e35137ac9509117e59866e10e45", [:mix], [{:plug, "> 0.12.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "bd4fde4c15f3e993a999e019d64347489b91b7a9096af68b2bdadd192afa693f"},
   "tzdata": {:hex, :tzdata, "1.0.3", "73470ad29dde46e350c60a66e6b360d3b99d2d18b74c4c349dbebbc27a09a3eb", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "a6e1ee7003c4d04ecbd21dd3ec690d4c6662db5d3bbdd7262d53cdf5e7c746c1"},
diff --git a/test/gun/conneciton_pool_test.exs b/test/gun/conneciton_pool_test.exs
new file mode 100644 (file)
index 0000000..aea908f
--- /dev/null
@@ -0,0 +1,101 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Gun.ConnectionPoolTest do
+  use Pleroma.DataCase
+
+  import Mox
+  import ExUnit.CaptureLog
+  alias Pleroma.Config
+  alias Pleroma.Gun.ConnectionPool
+
+  defp gun_mock(_) do
+    Pleroma.GunMock
+    |> stub(:open, fn _, _, _ -> Task.start_link(fn -> Process.sleep(100) end) end)
+    |> stub(:await_up, fn _, _ -> {:ok, :http} end)
+    |> stub(:set_owner, fn _, _ -> :ok end)
+
+    :ok
+  end
+
+  setup :set_mox_from_context
+  setup :gun_mock
+
+  test "gives the same connection to 2 concurrent requests" do
+    Enum.map(
+      [
+        "http://www.korean-books.com.kp/KBMbooks/en/periodic/pictorial/20200530163914.pdf",
+        "http://www.korean-books.com.kp/KBMbooks/en/periodic/pictorial/20200528183427.pdf"
+      ],
+      fn uri ->
+        uri = URI.parse(uri)
+        task_parent = self()
+
+        Task.start_link(fn ->
+          {:ok, conn} = ConnectionPool.get_conn(uri, [])
+          ConnectionPool.release_conn(conn)
+          send(task_parent, conn)
+        end)
+      end
+    )
+
+    [pid, pid] =
+      for _ <- 1..2 do
+        receive do
+          pid -> pid
+        end
+      end
+  end
+
+  test "connection limit is respected with concurrent requests" do
+    clear_config([:connections_pool, :max_connections]) do
+      Config.put([:connections_pool, :max_connections], 1)
+      # The supervisor needs a reboot to apply the new config setting
+      Process.exit(Process.whereis(Pleroma.Gun.ConnectionPool.WorkerSupervisor), :kill)
+
+      on_exit(fn ->
+        Process.exit(Process.whereis(Pleroma.Gun.ConnectionPool.WorkerSupervisor), :kill)
+      end)
+    end
+
+    capture_log(fn ->
+      Enum.map(
+        [
+          "https://ninenines.eu/",
+          "https://youtu.be/PFGwMiDJKNY"
+        ],
+        fn uri ->
+          uri = URI.parse(uri)
+          task_parent = self()
+
+          Task.start_link(fn ->
+            result = ConnectionPool.get_conn(uri, [])
+            # Sleep so that we don't end up with a situation,
+            # where request from the second process gets processed
+            # only after the first process already released the connection
+            Process.sleep(50)
+
+            case result do
+              {:ok, pid} ->
+                ConnectionPool.release_conn(pid)
+
+              _ ->
+                nil
+            end
+
+            send(task_parent, result)
+          end)
+        end
+      )
+
+      [{:error, :pool_full}, {:ok, _pid}] =
+        for _ <- 1..2 do
+          receive do
+            result -> result
+          end
+        end
+        |> Enum.sort()
+    end)
+  end
+end
index 2e961826ede9fb72fdbb05be211799b70c77d79b..80589c73d5f452cdccb64d4b164afc23055464f7 100644 (file)
@@ -9,24 +9,10 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do
   import Mox
 
   alias Pleroma.Config
-  alias Pleroma.Gun.Conn
   alias Pleroma.HTTP.AdapterHelper.Gun
-  alias Pleroma.Pool.Connections
 
   setup :verify_on_exit!
 
-  defp gun_mock(_) do
-    gun_mock()
-    :ok
-  end
-
-  defp gun_mock do
-    Pleroma.GunMock
-    |> stub(:open, fn _, _, _ -> Task.start_link(fn -> Process.sleep(1000) end) end)
-    |> stub(:await_up, fn _, _ -> {:ok, :http} end)
-    |> stub(:set_owner, fn _, _ -> :ok end)
-  end
-
   describe "options/1" do
     setup do: clear_config([:http, :adapter], a: 1, b: 2)
 
@@ -35,7 +21,6 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do
 
       opts = Gun.options([receive_conn: false], uri)
       assert opts[:certificates_verification]
-      assert opts[:tls_opts][:log_level] == :warning
     end
 
     test "https ipv4 with default port" do
@@ -43,7 +28,6 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do
 
       opts = Gun.options([receive_conn: false], uri)
       assert opts[:certificates_verification]
-      assert opts[:tls_opts][:log_level] == :warning
     end
 
     test "https ipv6 with default port" do
@@ -51,7 +35,6 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do
 
       opts = Gun.options([receive_conn: false], uri)
       assert opts[:certificates_verification]
-      assert opts[:tls_opts][:log_level] == :warning
     end
 
     test "https url with non standart port" do
@@ -62,46 +45,12 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do
       assert opts[:certificates_verification]
     end
 
-    test "get conn on next request" do
-      gun_mock()
-      level = Application.get_env(:logger, :level)
-      Logger.configure(level: :debug)
-      on_exit(fn -> Logger.configure(level: level) end)
-      uri = URI.parse("http://some-domain2.com")
-
-      opts = Gun.options(uri)
-
-      assert opts[:conn] == nil
-      assert opts[:close_conn] == nil
-
-      Process.sleep(50)
-      opts = Gun.options(uri)
-
-      assert is_pid(opts[:conn])
-      assert opts[:close_conn] == false
-    end
-
     test "merges with defaul http adapter config" do
       defaults = Gun.options([receive_conn: false], URI.parse("https://example.com"))
       assert Keyword.has_key?(defaults, :a)
       assert Keyword.has_key?(defaults, :b)
     end
 
-    test "default ssl adapter opts with connection" do
-      gun_mock()
-      uri = URI.parse("https://some-domain.com")
-
-      :ok = Conn.open(uri, :gun_connections)
-
-      opts = Gun.options(uri)
-
-      assert opts[:certificates_verification]
-      refute opts[:tls_opts] == []
-
-      assert opts[:close_conn] == false
-      assert is_pid(opts[:conn])
-    end
-
     test "parses string proxy host & port" do
       proxy = Config.get([:http, :proxy_url])
       Config.put([:http, :proxy_url], "localhost:8123")
@@ -132,127 +81,4 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do
       assert opts[:proxy] == {'example.com', 4321}
     end
   end
-
-  describe "options/1 with receive_conn parameter" do
-    setup :gun_mock
-
-    test "receive conn by default" do
-      uri = URI.parse("http://another-domain.com")
-      :ok = Conn.open(uri, :gun_connections)
-
-      received_opts = Gun.options(uri)
-      assert received_opts[:close_conn] == false
-      assert is_pid(received_opts[:conn])
-    end
-
-    test "don't receive conn if receive_conn is false" do
-      uri = URI.parse("http://another-domain.com")
-      :ok = Conn.open(uri, :gun_connections)
-
-      opts = [receive_conn: false]
-      received_opts = Gun.options(opts, uri)
-      assert received_opts[:close_conn] == nil
-      assert received_opts[:conn] == nil
-    end
-  end
-
-  describe "after_request/1" do
-    setup :gun_mock
-
-    test "body_as not chunks" do
-      uri = URI.parse("http://some-domain.com")
-      :ok = Conn.open(uri, :gun_connections)
-      opts = Gun.options(uri)
-      :ok = Gun.after_request(opts)
-      conn = opts[:conn]
-
-      assert %Connections{
-               conns: %{
-                 "http:some-domain.com:80" => %Pleroma.Gun.Conn{
-                   conn: ^conn,
-                   conn_state: :idle,
-                   used_by: []
-                 }
-               }
-             } = Connections.get_state(:gun_connections)
-    end
-
-    test "body_as chunks" do
-      uri = URI.parse("http://some-domain.com")
-      :ok = Conn.open(uri, :gun_connections)
-      opts = Gun.options([body_as: :chunks], uri)
-      :ok = Gun.after_request(opts)
-      conn = opts[:conn]
-      self = self()
-
-      assert %Connections{
-               conns: %{
-                 "http:some-domain.com:80" => %Pleroma.Gun.Conn{
-                   conn: ^conn,
-                   conn_state: :active,
-                   used_by: [{^self, _}]
-                 }
-               }
-             } = Connections.get_state(:gun_connections)
-    end
-
-    test "with no connection" do
-      uri = URI.parse("http://uniq-domain.com")
-
-      :ok = Conn.open(uri, :gun_connections)
-
-      opts = Gun.options([body_as: :chunks], uri)
-      conn = opts[:conn]
-      opts = Keyword.delete(opts, :conn)
-      self = self()
-
-      :ok = Gun.after_request(opts)
-
-      assert %Connections{
-               conns: %{
-                 "http:uniq-domain.com:80" => %Pleroma.Gun.Conn{
-                   conn: ^conn,
-                   conn_state: :active,
-                   used_by: [{^self, _}]
-                 }
-               }
-             } = Connections.get_state(:gun_connections)
-    end
-
-    test "with ipv4" do
-      uri = URI.parse("http://127.0.0.1")
-      :ok = Conn.open(uri, :gun_connections)
-      opts = Gun.options(uri)
-      :ok = Gun.after_request(opts)
-      conn = opts[:conn]
-
-      assert %Connections{
-               conns: %{
-                 "http:127.0.0.1:80" => %Pleroma.Gun.Conn{
-                   conn: ^conn,
-                   conn_state: :idle,
-                   used_by: []
-                 }
-               }
-             } = Connections.get_state(:gun_connections)
-    end
-
-    test "with ipv6" do
-      uri = URI.parse("http://[2a03:2880:f10c:83:face:b00c:0:25de]")
-      :ok = Conn.open(uri, :gun_connections)
-      opts = Gun.options(uri)
-      :ok = Gun.after_request(opts)
-      conn = opts[:conn]
-
-      assert %Connections{
-               conns: %{
-                 "http:2a03:2880:f10c:83:face:b00c:0:25de:80" => %Pleroma.Gun.Conn{
-                   conn: ^conn,
-                   conn_state: :idle,
-                   used_by: []
-                 }
-               }
-             } = Connections.get_state(:gun_connections)
-    end
-  end
 end
diff --git a/test/http/connection_test.exs b/test/http/connection_test.exs
deleted file mode 100644 (file)
index 7c94a50..0000000
+++ /dev/null
@@ -1,135 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.HTTP.ConnectionTest do
-  use ExUnit.Case
-  use Pleroma.Tests.Helpers
-
-  import ExUnit.CaptureLog
-
-  alias Pleroma.Config
-  alias Pleroma.HTTP.Connection
-
-  describe "parse_host/1" do
-    test "as atom to charlist" do
-      assert Connection.parse_host(:localhost) == 'localhost'
-    end
-
-    test "as string to charlist" do
-      assert Connection.parse_host("localhost.com") == 'localhost.com'
-    end
-
-    test "as string ip to tuple" do
-      assert Connection.parse_host("127.0.0.1") == {127, 0, 0, 1}
-    end
-  end
-
-  describe "parse_proxy/1" do
-    test "ip with port" do
-      assert Connection.parse_proxy("127.0.0.1:8123") == {:ok, {127, 0, 0, 1}, 8123}
-    end
-
-    test "host with port" do
-      assert Connection.parse_proxy("localhost:8123") == {:ok, 'localhost', 8123}
-    end
-
-    test "as tuple" do
-      assert Connection.parse_proxy({:socks4, :localhost, 9050}) ==
-               {:ok, :socks4, 'localhost', 9050}
-    end
-
-    test "as tuple with string host" do
-      assert Connection.parse_proxy({:socks5, "localhost", 9050}) ==
-               {:ok, :socks5, 'localhost', 9050}
-    end
-  end
-
-  describe "parse_proxy/1 errors" do
-    test "ip without port" do
-      capture_log(fn ->
-        assert Connection.parse_proxy("127.0.0.1") == {:error, :invalid_proxy}
-      end) =~ "parsing proxy fail \"127.0.0.1\""
-    end
-
-    test "host without port" do
-      capture_log(fn ->
-        assert Connection.parse_proxy("localhost") == {:error, :invalid_proxy}
-      end) =~ "parsing proxy fail \"localhost\""
-    end
-
-    test "host with bad port" do
-      capture_log(fn ->
-        assert Connection.parse_proxy("localhost:port") == {:error, :invalid_proxy_port}
-      end) =~ "parsing port in proxy fail \"localhost:port\""
-    end
-
-    test "ip with bad port" do
-      capture_log(fn ->
-        assert Connection.parse_proxy("127.0.0.1:15.9") == {:error, :invalid_proxy_port}
-      end) =~ "parsing port in proxy fail \"127.0.0.1:15.9\""
-    end
-
-    test "as tuple without port" do
-      capture_log(fn ->
-        assert Connection.parse_proxy({:socks5, :localhost}) == {:error, :invalid_proxy}
-      end) =~ "parsing proxy fail {:socks5, :localhost}"
-    end
-
-    test "with nil" do
-      assert Connection.parse_proxy(nil) == nil
-    end
-  end
-
-  describe "options/3" do
-    setup do: clear_config([:http, :proxy_url])
-
-    test "without proxy_url in config" do
-      Config.delete([:http, :proxy_url])
-
-      opts = Connection.options(%URI{})
-      refute Keyword.has_key?(opts, :proxy)
-    end
-
-    test "parses string proxy host & port" do
-      Config.put([:http, :proxy_url], "localhost:8123")
-
-      opts = Connection.options(%URI{})
-      assert opts[:proxy] == {'localhost', 8123}
-    end
-
-    test "parses tuple proxy scheme host and port" do
-      Config.put([:http, :proxy_url], {:socks, 'localhost', 1234})
-
-      opts = Connection.options(%URI{})
-      assert opts[:proxy] == {:socks, 'localhost', 1234}
-    end
-
-    test "passed opts have more weight than defaults" do
-      Config.put([:http, :proxy_url], {:socks5, 'localhost', 1234})
-
-      opts = Connection.options(%URI{}, proxy: {'example.com', 4321})
-
-      assert opts[:proxy] == {'example.com', 4321}
-    end
-  end
-
-  describe "format_host/1" do
-    test "with domain" do
-      assert Connection.format_host("example.com") == 'example.com'
-    end
-
-    test "with idna domain" do
-      assert Connection.format_host("ですexample.com") == 'xn--example-183fne.com'
-    end
-
-    test "with ipv4" do
-      assert Connection.format_host("127.0.0.1") == '127.0.0.1'
-    end
-
-    test "with ipv6" do
-      assert Connection.format_host("2a03:2880:f10c:83:face:b00c:0:25de") ==
-               '2a03:2880:f10c:83:face:b00c:0:25de'
-    end
-  end
-end
diff --git a/test/pool/connections_test.exs b/test/pool/connections_test.exs
deleted file mode 100644 (file)
index aeda548..0000000
+++ /dev/null
@@ -1,760 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Pool.ConnectionsTest do
-  use ExUnit.Case, async: true
-  use Pleroma.Tests.Helpers
-
-  import ExUnit.CaptureLog
-  import Mox
-
-  alias Pleroma.Gun.Conn
-  alias Pleroma.GunMock
-  alias Pleroma.Pool.Connections
-
-  setup :verify_on_exit!
-
-  setup_all do
-    name = :test_connections
-    {:ok, pid} = Connections.start_link({name, [checkin_timeout: 150]})
-    {:ok, _} = Registry.start_link(keys: :unique, name: Pleroma.GunMock)
-
-    on_exit(fn ->
-      if Process.alive?(pid), do: GenServer.stop(name)
-    end)
-
-    {:ok, name: name}
-  end
-
-  defp open_mock(num \\ 1) do
-    GunMock
-    |> expect(:open, num, &start_and_register(&1, &2, &3))
-    |> expect(:await_up, num, fn _, _ -> {:ok, :http} end)
-    |> expect(:set_owner, num, fn _, _ -> :ok end)
-  end
-
-  defp connect_mock(mock) do
-    mock
-    |> expect(:connect, &connect(&1, &2))
-    |> expect(:await, &await(&1, &2))
-  end
-
-  defp info_mock(mock), do: expect(mock, :info, &info(&1))
-
-  defp start_and_register('gun-not-up.com', _, _), do: {:error, :timeout}
-
-  defp start_and_register(host, port, _) do
-    {:ok, pid} = Task.start_link(fn -> Process.sleep(1000) end)
-
-    scheme =
-      case port do
-        443 -> "https"
-        _ -> "http"
-      end
-
-    Registry.register(GunMock, pid, %{
-      origin_scheme: scheme,
-      origin_host: host,
-      origin_port: port
-    })
-
-    {:ok, pid}
-  end
-
-  defp info(pid) do
-    [{_, info}] = Registry.lookup(GunMock, pid)
-    info
-  end
-
-  defp connect(pid, _) do
-    ref = make_ref()
-    Registry.register(GunMock, ref, pid)
-    ref
-  end
-
-  defp await(pid, ref) do
-    [{_, ^pid}] = Registry.lookup(GunMock, ref)
-    {:response, :fin, 200, []}
-  end
-
-  defp now, do: :os.system_time(:second)
-
-  describe "alive?/2" do
-    test "is alive", %{name: name} do
-      assert Connections.alive?(name)
-    end
-
-    test "returns false if not started" do
-      refute Connections.alive?(:some_random_name)
-    end
-  end
-
-  test "opens connection and reuse it on next request", %{name: name} do
-    open_mock()
-    url = "http://some-domain.com"
-    key = "http:some-domain.com:80"
-    refute Connections.checkin(url, name)
-    :ok = Conn.open(url, name)
-
-    conn = Connections.checkin(url, name)
-    assert is_pid(conn)
-    assert Process.alive?(conn)
-
-    self = self()
-
-    %Connections{
-      conns: %{
-        ^key => %Conn{
-          conn: ^conn,
-          gun_state: :up,
-          used_by: [{^self, _}],
-          conn_state: :active
-        }
-      }
-    } = Connections.get_state(name)
-
-    reused_conn = Connections.checkin(url, name)
-
-    assert conn == reused_conn
-
-    %Connections{
-      conns: %{
-        ^key => %Conn{
-          conn: ^conn,
-          gun_state: :up,
-          used_by: [{^self, _}, {^self, _}],
-          conn_state: :active
-        }
-      }
-    } = Connections.get_state(name)
-
-    :ok = Connections.checkout(conn, self, name)
-
-    %Connections{
-      conns: %{
-        ^key => %Conn{
-          conn: ^conn,
-          gun_state: :up,
-          used_by: [{^self, _}],
-          conn_state: :active
-        }
-      }
-    } = Connections.get_state(name)
-
-    :ok = Connections.checkout(conn, self, name)
-
-    %Connections{
-      conns: %{
-        ^key => %Conn{
-          conn: ^conn,
-          gun_state: :up,
-          used_by: [],
-          conn_state: :idle
-        }
-      }
-    } = Connections.get_state(name)
-  end
-
-  test "reuse connection for idna domains", %{name: name} do
-    open_mock()
-    url = "http://ですsome-domain.com"
-    refute Connections.checkin(url, name)
-
-    :ok = Conn.open(url, name)
-
-    conn = Connections.checkin(url, name)
-    assert is_pid(conn)
-    assert Process.alive?(conn)
-
-    self = self()
-
-    %Connections{
-      conns: %{
-        "http:ですsome-domain.com:80" => %Conn{
-          conn: ^conn,
-          gun_state: :up,
-          used_by: [{^self, _}],
-          conn_state: :active
-        }
-      }
-    } = Connections.get_state(name)
-
-    reused_conn = Connections.checkin(url, name)
-
-    assert conn == reused_conn
-  end
-
-  test "reuse for ipv4", %{name: name} do
-    open_mock()
-    url = "http://127.0.0.1"
-
-    refute Connections.checkin(url, name)
-
-    :ok = Conn.open(url, name)
-
-    conn = Connections.checkin(url, name)
-    assert is_pid(conn)
-    assert Process.alive?(conn)
-
-    self = self()
-
-    %Connections{
-      conns: %{
-        "http:127.0.0.1:80" => %Conn{
-          conn: ^conn,
-          gun_state: :up,
-          used_by: [{^self, _}],
-          conn_state: :active
-        }
-      }
-    } = Connections.get_state(name)
-
-    reused_conn = Connections.checkin(url, name)
-
-    assert conn == reused_conn
-
-    :ok = Connections.checkout(conn, self, name)
-    :ok = Connections.checkout(reused_conn, self, name)
-
-    %Connections{
-      conns: %{
-        "http:127.0.0.1:80" => %Conn{
-          conn: ^conn,
-          gun_state: :up,
-          used_by: [],
-          conn_state: :idle
-        }
-      }
-    } = Connections.get_state(name)
-  end
-
-  test "reuse for ipv6", %{name: name} do
-    open_mock()
-    url = "http://[2a03:2880:f10c:83:face:b00c:0:25de]"
-
-    refute Connections.checkin(url, name)
-
-    :ok = Conn.open(url, name)
-
-    conn = Connections.checkin(url, name)
-    assert is_pid(conn)
-    assert Process.alive?(conn)
-
-    self = self()
-
-    %Connections{
-      conns: %{
-        "http:2a03:2880:f10c:83:face:b00c:0:25de:80" => %Conn{
-          conn: ^conn,
-          gun_state: :up,
-          used_by: [{^self, _}],
-          conn_state: :active
-        }
-      }
-    } = Connections.get_state(name)
-
-    reused_conn = Connections.checkin(url, name)
-
-    assert conn == reused_conn
-  end
-
-  test "up and down ipv4", %{name: name} do
-    open_mock()
-    |> info_mock()
-    |> allow(self(), name)
-
-    self = self()
-    url = "http://127.0.0.1"
-    :ok = Conn.open(url, name)
-    conn = Connections.checkin(url, name)
-    send(name, {:gun_down, conn, nil, nil, nil})
-    send(name, {:gun_up, conn, nil})
-
-    %Connections{
-      conns: %{
-        "http:127.0.0.1:80" => %Conn{
-          conn: ^conn,
-          gun_state: :up,
-          used_by: [{^self, _}],
-          conn_state: :active
-        }
-      }
-    } = Connections.get_state(name)
-  end
-
-  test "up and down ipv6", %{name: name} do
-    self = self()
-
-    open_mock()
-    |> info_mock()
-    |> allow(self, name)
-
-    url = "http://[2a03:2880:f10c:83:face:b00c:0:25de]"
-    :ok = Conn.open(url, name)
-    conn = Connections.checkin(url, name)
-    send(name, {:gun_down, conn, nil, nil, nil})
-    send(name, {:gun_up, conn, nil})
-
-    %Connections{
-      conns: %{
-        "http:2a03:2880:f10c:83:face:b00c:0:25de:80" => %Conn{
-          conn: ^conn,
-          gun_state: :up,
-          used_by: [{^self, _}],
-          conn_state: :active
-        }
-      }
-    } = Connections.get_state(name)
-  end
-
-  test "reuses connection based on protocol", %{name: name} do
-    open_mock(2)
-    http_url = "http://some-domain.com"
-    http_key = "http:some-domain.com:80"
-    https_url = "https://some-domain.com"
-    https_key = "https:some-domain.com:443"
-
-    refute Connections.checkin(http_url, name)
-    :ok = Conn.open(http_url, name)
-    conn = Connections.checkin(http_url, name)
-    assert is_pid(conn)
-    assert Process.alive?(conn)
-
-    refute Connections.checkin(https_url, name)
-    :ok = Conn.open(https_url, name)
-    https_conn = Connections.checkin(https_url, name)
-
-    refute conn == https_conn
-
-    reused_https = Connections.checkin(https_url, name)
-
-    refute conn == reused_https
-
-    assert reused_https == https_conn
-
-    %Connections{
-      conns: %{
-        ^http_key => %Conn{
-          conn: ^conn,
-          gun_state: :up
-        },
-        ^https_key => %Conn{
-          conn: ^https_conn,
-          gun_state: :up
-        }
-      }
-    } = Connections.get_state(name)
-  end
-
-  test "connection can't get up", %{name: name} do
-    expect(GunMock, :open, &start_and_register(&1, &2, &3))
-    url = "http://gun-not-up.com"
-
-    assert capture_log(fn ->
-             refute Conn.open(url, name)
-             refute Connections.checkin(url, name)
-           end) =~
-             "Opening connection to http://gun-not-up.com failed with error {:error, :timeout}"
-  end
-
-  test "process gun_down message and then gun_up", %{name: name} do
-    self = self()
-
-    open_mock()
-    |> info_mock()
-    |> allow(self, name)
-
-    url = "http://gun-down-and-up.com"
-    key = "http:gun-down-and-up.com:80"
-    :ok = Conn.open(url, name)
-    conn = Connections.checkin(url, name)
-
-    assert is_pid(conn)
-    assert Process.alive?(conn)
-
-    %Connections{
-      conns: %{
-        ^key => %Conn{
-          conn: ^conn,
-          gun_state: :up,
-          used_by: [{^self, _}]
-        }
-      }
-    } = Connections.get_state(name)
-
-    send(name, {:gun_down, conn, :http, nil, nil})
-
-    %Connections{
-      conns: %{
-        ^key => %Conn{
-          conn: ^conn,
-          gun_state: :down,
-          used_by: [{^self, _}]
-        }
-      }
-    } = Connections.get_state(name)
-
-    send(name, {:gun_up, conn, :http})
-
-    conn2 = Connections.checkin(url, name)
-    assert conn == conn2
-
-    assert is_pid(conn2)
-    assert Process.alive?(conn2)
-
-    %Connections{
-      conns: %{
-        ^key => %Conn{
-          conn: _,
-          gun_state: :up,
-          used_by: [{^self, _}, {^self, _}]
-        }
-      }
-    } = Connections.get_state(name)
-  end
-
-  test "async processes get same conn for same domain", %{name: name} do
-    open_mock()
-    url = "http://some-domain.com"
-    :ok = Conn.open(url, name)
-
-    tasks =
-      for _ <- 1..5 do
-        Task.async(fn ->
-          Connections.checkin(url, name)
-        end)
-      end
-
-    tasks_with_results = Task.yield_many(tasks)
-
-    results =
-      Enum.map(tasks_with_results, fn {task, res} ->
-        res || Task.shutdown(task, :brutal_kill)
-      end)
-
-    conns = for {:ok, value} <- results, do: value
-
-    %Connections{
-      conns: %{
-        "http:some-domain.com:80" => %Conn{
-          conn: conn,
-          gun_state: :up
-        }
-      }
-    } = Connections.get_state(name)
-
-    assert Enum.all?(conns, fn res -> res == conn end)
-  end
-
-  test "remove frequently used and idle", %{name: name} do
-    open_mock(3)
-    self = self()
-    http_url = "http://some-domain.com"
-    https_url = "https://some-domain.com"
-    :ok = Conn.open(https_url, name)
-    :ok = Conn.open(http_url, name)
-
-    conn1 = Connections.checkin(https_url, name)
-
-    [conn2 | _conns] =
-      for _ <- 1..4 do
-        Connections.checkin(http_url, name)
-      end
-
-    http_key = "http:some-domain.com:80"
-
-    %Connections{
-      conns: %{
-        ^http_key => %Conn{
-          conn: ^conn2,
-          gun_state: :up,
-          conn_state: :active,
-          used_by: [{^self, _}, {^self, _}, {^self, _}, {^self, _}]
-        },
-        "https:some-domain.com:443" => %Conn{
-          conn: ^conn1,
-          gun_state: :up,
-          conn_state: :active,
-          used_by: [{^self, _}]
-        }
-      }
-    } = Connections.get_state(name)
-
-    :ok = Connections.checkout(conn1, self, name)
-
-    another_url = "http://another-domain.com"
-    :ok = Conn.open(another_url, name)
-    conn = Connections.checkin(another_url, name)
-
-    %Connections{
-      conns: %{
-        "http:another-domain.com:80" => %Conn{
-          conn: ^conn,
-          gun_state: :up
-        },
-        ^http_key => %Conn{
-          conn: _,
-          gun_state: :up
-        }
-      }
-    } = Connections.get_state(name)
-  end
-
-  describe "with proxy" do
-    test "as ip", %{name: name} do
-      open_mock()
-      |> connect_mock()
-
-      url = "http://proxy-string.com"
-      key = "http:proxy-string.com:80"
-      :ok = Conn.open(url, name, proxy: {{127, 0, 0, 1}, 8123})
-
-      conn = Connections.checkin(url, name)
-
-      %Connections{
-        conns: %{
-          ^key => %Conn{
-            conn: ^conn,
-            gun_state: :up
-          }
-        }
-      } = Connections.get_state(name)
-
-      reused_conn = Connections.checkin(url, name)
-
-      assert reused_conn == conn
-    end
-
-    test "as host", %{name: name} do
-      open_mock()
-      |> connect_mock()
-
-      url = "http://proxy-tuple-atom.com"
-      :ok = Conn.open(url, name, proxy: {'localhost', 9050})
-      conn = Connections.checkin(url, name)
-
-      %Connections{
-        conns: %{
-          "http:proxy-tuple-atom.com:80" => %Conn{
-            conn: ^conn,
-            gun_state: :up
-          }
-        }
-      } = Connections.get_state(name)
-
-      reused_conn = Connections.checkin(url, name)
-
-      assert reused_conn == conn
-    end
-
-    test "as ip and ssl", %{name: name} do
-      open_mock()
-      |> connect_mock()
-
-      url = "https://proxy-string.com"
-
-      :ok = Conn.open(url, name, proxy: {{127, 0, 0, 1}, 8123})
-      conn = Connections.checkin(url, name)
-
-      %Connections{
-        conns: %{
-          "https:proxy-string.com:443" => %Conn{
-            conn: ^conn,
-            gun_state: :up
-          }
-        }
-      } = Connections.get_state(name)
-
-      reused_conn = Connections.checkin(url, name)
-
-      assert reused_conn == conn
-    end
-
-    test "as host and ssl", %{name: name} do
-      open_mock()
-      |> connect_mock()
-
-      url = "https://proxy-tuple-atom.com"
-      :ok = Conn.open(url, name, proxy: {'localhost', 9050})
-      conn = Connections.checkin(url, name)
-
-      %Connections{
-        conns: %{
-          "https:proxy-tuple-atom.com:443" => %Conn{
-            conn: ^conn,
-            gun_state: :up
-          }
-        }
-      } = Connections.get_state(name)
-
-      reused_conn = Connections.checkin(url, name)
-
-      assert reused_conn == conn
-    end
-
-    test "with socks type", %{name: name} do
-      open_mock()
-
-      url = "http://proxy-socks.com"
-
-      :ok = Conn.open(url, name, proxy: {:socks5, 'localhost', 1234})
-
-      conn = Connections.checkin(url, name)
-
-      %Connections{
-        conns: %{
-          "http:proxy-socks.com:80" => %Conn{
-            conn: ^conn,
-            gun_state: :up
-          }
-        }
-      } = Connections.get_state(name)
-
-      reused_conn = Connections.checkin(url, name)
-
-      assert reused_conn == conn
-    end
-
-    test "with socks4 type and ssl", %{name: name} do
-      open_mock()
-      url = "https://proxy-socks.com"
-
-      :ok = Conn.open(url, name, proxy: {:socks4, 'localhost', 1234})
-
-      conn = Connections.checkin(url, name)
-
-      %Connections{
-        conns: %{
-          "https:proxy-socks.com:443" => %Conn{
-            conn: ^conn,
-            gun_state: :up
-          }
-        }
-      } = Connections.get_state(name)
-
-      reused_conn = Connections.checkin(url, name)
-
-      assert reused_conn == conn
-    end
-  end
-
-  describe "crf/3" do
-    setup do
-      crf = Connections.crf(1, 10, 1)
-      {:ok, crf: crf}
-    end
-
-    test "more used will have crf higher", %{crf: crf} do
-      # used 3 times
-      crf1 = Connections.crf(1, 10, crf)
-      crf1 = Connections.crf(1, 10, crf1)
-
-      # used 2 times
-      crf2 = Connections.crf(1, 10, crf)
-
-      assert crf1 > crf2
-    end
-
-    test "recently used will have crf higher on equal references", %{crf: crf} do
-      # used 3 sec ago
-      crf1 = Connections.crf(3, 10, crf)
-
-      # used 4 sec ago
-      crf2 = Connections.crf(4, 10, crf)
-
-      assert crf1 > crf2
-    end
-
-    test "equal crf on equal reference and time", %{crf: crf} do
-      # used 2 times
-      crf1 = Connections.crf(1, 10, crf)
-
-      # used 2 times
-      crf2 = Connections.crf(1, 10, crf)
-
-      assert crf1 == crf2
-    end
-
-    test "recently used will have higher crf", %{crf: crf} do
-      crf1 = Connections.crf(2, 10, crf)
-      crf1 = Connections.crf(1, 10, crf1)
-
-      crf2 = Connections.crf(3, 10, crf)
-      crf2 = Connections.crf(4, 10, crf2)
-      assert crf1 > crf2
-    end
-  end
-
-  describe "get_unused_conns/1" do
-    test "crf is equalent, sorting by reference", %{name: name} do
-      Connections.add_conn(name, "1", %Conn{
-        conn_state: :idle,
-        last_reference: now() - 1
-      })
-
-      Connections.add_conn(name, "2", %Conn{
-        conn_state: :idle,
-        last_reference: now()
-      })
-
-      assert [{"1", _unused_conn} | _others] = Connections.get_unused_conns(name)
-    end
-
-    test "reference is equalent, sorting by crf", %{name: name} do
-      Connections.add_conn(name, "1", %Conn{
-        conn_state: :idle,
-        crf: 1.999
-      })
-
-      Connections.add_conn(name, "2", %Conn{
-        conn_state: :idle,
-        crf: 2
-      })
-
-      assert [{"1", _unused_conn} | _others] = Connections.get_unused_conns(name)
-    end
-
-    test "higher crf and lower reference", %{name: name} do
-      Connections.add_conn(name, "1", %Conn{
-        conn_state: :idle,
-        crf: 3,
-        last_reference: now() - 1
-      })
-
-      Connections.add_conn(name, "2", %Conn{
-        conn_state: :idle,
-        crf: 2,
-        last_reference: now()
-      })
-
-      assert [{"2", _unused_conn} | _others] = Connections.get_unused_conns(name)
-    end
-
-    test "lower crf and lower reference", %{name: name} do
-      Connections.add_conn(name, "1", %Conn{
-        conn_state: :idle,
-        crf: 1.99,
-        last_reference: now() - 1
-      })
-
-      Connections.add_conn(name, "2", %Conn{
-        conn_state: :idle,
-        crf: 2,
-        last_reference: now()
-      })
-
-      assert [{"1", _unused_conn} | _others] = Connections.get_unused_conns(name)
-    end
-  end
-
-  test "count/1" do
-    name = :test_count
-    {:ok, _} = Connections.start_link({name, [checkin_timeout: 150]})
-    assert Connections.count(name) == 0
-    Connections.add_conn(name, "1", %Conn{conn: self()})
-    assert Connections.count(name) == 1
-    Connections.remove_conn(name, "1")
-    assert Connections.count(name) == 0
-  end
-end