HTTP: radically simplify pool checkin/checkout
authorrinpatch <rinpatch@sdf.org>
Thu, 3 Sep 2020 20:15:22 +0000 (23:15 +0300)
committerrinpatch <rinpatch@sdf.org>
Thu, 3 Sep 2020 20:44:13 +0000 (23:44 +0300)
Use a custom tesla middleware instead of adapter helper function +
custom redirect middleware.

This will also fix "Client died before releasing the connection"
messages when the request pool is overloaded. Since the checkout is
now done after passing ConcurrentLimiter.

This is technically less efficient, since the connection needs to be
checked in/out every time the middleware is left or entered respectively.
But I don't think the nanoseconds we might lose on redirects
to the same host are worth the complexity.

lib/pleroma/http/adapter_helper.ex
lib/pleroma/http/adapter_helper/gun.ex
lib/pleroma/http/adapter_helper/hackney.ex
lib/pleroma/http/http.ex
lib/pleroma/tesla/middleware/connection_pool.ex [new file with mode: 0644]
lib/pleroma/tesla/middleware/follow_redirects.ex [deleted file]

index 0728cbaa2c69a42182d5a048ac0028caaf285575..d722973236603ce8880b788c2c661d06568f598b 100644 (file)
@@ -19,7 +19,6 @@ defmodule Pleroma.HTTP.AdapterHelper do
           | {Connection.proxy_type(), Connection.host(), pos_integer()}
 
   @callback options(keyword(), URI.t()) :: keyword()
-  @callback get_conn(URI.t(), keyword()) :: {:ok, term()} | {:error, term()}
 
   @spec format_proxy(String.t() | tuple() | nil) :: proxy() | nil
   def format_proxy(nil), do: nil
@@ -47,9 +46,6 @@ defmodule Pleroma.HTTP.AdapterHelper do
     |> adapter_helper().options(uri)
   end
 
-  @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} | {:error, atom()}
-  def get_conn(uri, opts), do: adapter_helper().get_conn(uri, opts)
-
   defp adapter, do: Application.get_env(:tesla, :adapter)
 
   defp adapter_helper do
index 02e20f2d1aa408860ffd8239a70516957464d160..4a967d8f2150d4a3330eeeed5e34c7823a1e0814 100644 (file)
@@ -6,7 +6,6 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do
   @behaviour Pleroma.HTTP.AdapterHelper
 
   alias Pleroma.Config
-  alias Pleroma.Gun.ConnectionPool
   alias Pleroma.HTTP.AdapterHelper
 
   require Logger
@@ -57,14 +56,6 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do
     Config.get([:pools, pool, :timeout], default)
   end
 
-  @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} | {:error, atom()}
-  def get_conn(uri, opts) do
-    case ConnectionPool.get_conn(uri, opts) do
-      {:ok, conn_pid} -> {:ok, Keyword.merge(opts, conn: conn_pid, close_conn: false)}
-      err -> err
-    end
-  end
-
   @prefix Pleroma.Gun.ConnectionPool
   def limiter_setup do
     wait = Config.get([:connections_pool, :connection_acquisition_wait])
index cd569422b6b481e7d38320ca06e6705992656ee3..f47a671ad01b1188f35ae87e59941e6527624d1c 100644 (file)
@@ -23,7 +23,4 @@ defmodule Pleroma.HTTP.AdapterHelper.Hackney do
   end
 
   defp add_scheme_opts(opts, _), do: opts
-
-  @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()}
-  def get_conn(_uri, opts), do: {:ok, opts}
 end
index b37b3fa8927c252a9b357e51d5635ca20f5d2376..7bc73f4a0ec8c4f0d9744a88582e8e97f67b8685 100644 (file)
@@ -62,28 +62,21 @@ defmodule Pleroma.HTTP do
     uri = URI.parse(url)
     adapter_opts = AdapterHelper.options(uri, options[:adapter] || [])
 
-    case AdapterHelper.get_conn(uri, adapter_opts) do
-      {:ok, adapter_opts} ->
-        options = put_in(options[:adapter], adapter_opts)
-        params = options[:params] || []
-        request = build_request(method, headers, options, url, body, params)
-
-        adapter = Application.get_env(:tesla, :adapter)
-
-        client = Tesla.client(adapter_middlewares(adapter), adapter)
-
-        maybe_limit(
-          fn ->
-            request(client, request)
-          end,
-          adapter,
-          adapter_opts
-        )
-
-      # Connection release is handled in a custom FollowRedirects middleware
-      err ->
-        err
-    end
+    options = put_in(options[:adapter], adapter_opts)
+    params = options[:params] || []
+    request = build_request(method, headers, options, url, body, params)
+
+    adapter = Application.get_env(:tesla, :adapter)
+
+    client = Tesla.client(adapter_middlewares(adapter), adapter)
+
+    maybe_limit(
+      fn ->
+        request(client, request)
+      end,
+      adapter,
+      adapter_opts
+    )
   end
 
   @spec request(Client.t(), keyword()) :: {:ok, Env.t()} | {:error, any()}
@@ -110,7 +103,7 @@ defmodule Pleroma.HTTP do
   end
 
   defp adapter_middlewares(Tesla.Adapter.Gun) do
-    [Pleroma.HTTP.Middleware.FollowRedirects]
+    [Tesla.Middleware.FollowRedirects, Pleroma.Tesla.Middleware.ConnectionPool]
   end
 
   defp adapter_middlewares(_), do: []
diff --git a/lib/pleroma/tesla/middleware/connection_pool.ex b/lib/pleroma/tesla/middleware/connection_pool.ex
new file mode 100644 (file)
index 0000000..a435ab4
--- /dev/null
@@ -0,0 +1,35 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Tesla.Middleware.ConnectionPool do
+  @moduledoc """
+  Middleware to get/release connections from `Pleroma.Gun.ConnectionPool`
+  """
+
+  @behaviour Tesla.Middleware
+
+  alias Pleroma.Gun.ConnectionPool
+
+  @impl Tesla.Middleware
+  def call(%Tesla.Env{url: url, opts: opts} = env, next, _) do
+    uri = URI.parse(url)
+
+    case ConnectionPool.get_conn(uri, opts[:adapter]) do
+      {:ok, conn_pid} ->
+        adapter_opts = Keyword.merge(opts[:adapter], conn: conn_pid, close_conn: false)
+        opts = Keyword.put(opts, :adapter, adapter_opts)
+        env = %{env | opts: opts}
+        res = Tesla.run(env, next)
+
+        unless opts[:adapter][:body_as] == :chunks do
+          ConnectionPool.release_conn(conn_pid)
+        end
+
+        res
+
+      err ->
+        err
+    end
+  end
+end
diff --git a/lib/pleroma/tesla/middleware/follow_redirects.ex b/lib/pleroma/tesla/middleware/follow_redirects.ex
deleted file mode 100644 (file)
index 5a70322..0000000
+++ /dev/null
@@ -1,110 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2015-2020 Tymon Tobolski <https://github.com/teamon/tesla/blob/master/lib/tesla/middleware/follow_redirects.ex>
-# Copyright © 2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.HTTP.Middleware.FollowRedirects do
-  @moduledoc """
-  Pool-aware version of https://github.com/teamon/tesla/blob/master/lib/tesla/middleware/follow_redirects.ex
-
-  Follow 3xx redirects
-  ## Options
-  - `:max_redirects` - limit number of redirects (default: `5`)
-  """
-
-  alias Pleroma.Gun.ConnectionPool
-
-  @behaviour Tesla.Middleware
-
-  @max_redirects 5
-  @redirect_statuses [301, 302, 303, 307, 308]
-
-  @impl Tesla.Middleware
-  def call(env, next, opts \\ []) do
-    max = Keyword.get(opts, :max_redirects, @max_redirects)
-
-    redirect(env, next, max)
-  end
-
-  defp redirect(env, next, left) do
-    opts = env.opts[:adapter]
-
-    case Tesla.run(env, next) do
-      {:ok, %{status: status} = res} when status in @redirect_statuses and left > 0 ->
-        release_conn(opts)
-
-        case Tesla.get_header(res, "location") do
-          nil ->
-            {:ok, res}
-
-          location ->
-            location = parse_location(location, res)
-
-            case get_conn(location, opts) do
-              {:ok, opts} ->
-                %{env | opts: Keyword.put(env.opts, :adapter, opts)}
-                |> new_request(res.status, location)
-                |> redirect(next, left - 1)
-
-              e ->
-                e
-            end
-        end
-
-      {:ok, %{status: status}} when status in @redirect_statuses ->
-        release_conn(opts)
-        {:error, {__MODULE__, :too_many_redirects}}
-
-      {:error, _} = e ->
-        release_conn(opts)
-        e
-
-      other ->
-        unless opts[:body_as] == :chunks do
-          release_conn(opts)
-        end
-
-        other
-    end
-  end
-
-  defp get_conn(location, opts) do
-    uri = URI.parse(location)
-
-    case ConnectionPool.get_conn(uri, opts) do
-      {:ok, conn} ->
-        {:ok, Keyword.merge(opts, conn: conn)}
-
-      e ->
-        e
-    end
-  end
-
-  defp release_conn(opts) do
-    ConnectionPool.release_conn(opts[:conn])
-  end
-
-  # The 303 (See Other) redirect was added in HTTP/1.1 to indicate that the originally
-  # requested resource is not available, however a related resource (or another redirect)
-  # available via GET is available at the specified location.
-  # https://tools.ietf.org/html/rfc7231#section-6.4.4
-  defp new_request(env, 303, location), do: %{env | url: location, method: :get, query: []}
-
-  # The 307 (Temporary Redirect) status code indicates that the target
-  # resource resides temporarily under a different URI and the user agent
-  # MUST NOT change the request method (...)
-  # https://tools.ietf.org/html/rfc7231#section-6.4.7
-  defp new_request(env, 307, location), do: %{env | url: location}
-
-  defp new_request(env, _, location), do: %{env | url: location, query: []}
-
-  defp parse_location("https://" <> _rest = location, _env), do: location
-  defp parse_location("http://" <> _rest = location, _env), do: location
-
-  defp parse_location(location, env) do
-    env.url
-    |> URI.parse()
-    |> URI.merge(location)
-    |> URI.to_string()
-  end
-end