X-Git-Url: https://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fhttp%2Fadapter_helper%2Fgun.ex;h=82c7fd65482f4e62a521809287cd28e8ef93ac2e;hb=e8d88a72cf6633444f9056807f6e048c66cf952d;hp=862e851c028cc50fba466e636ad608d20c44594c;hpb=d9f8941dac983d89709645831b41e02adc454740;p=akkoma
diff --git a/lib/pleroma/http/adapter_helper/gun.ex b/lib/pleroma/http/adapter_helper/gun.ex
index 862e851c0..82c7fd654 100644
--- a/lib/pleroma/http/adapter_helper/gun.ex
+++ b/lib/pleroma/http/adapter_helper/gun.ex
@@ -1,109 +1,82 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors
+# Copyright © 2017-2021 Pleroma Authors
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.HTTP.AdapterHelper.Gun do
@behaviour Pleroma.HTTP.AdapterHelper
+ alias Pleroma.Config
alias Pleroma.HTTP.AdapterHelper
- alias Pleroma.Pool.Connections
require Logger
@defaults [
- connect_timeout: 5_000,
- domain_lookup_timeout: 5_000,
- tls_handshake_timeout: 5_000,
retry: 1,
- retry_timeout: 1000,
- await_up_timeout: 5_000
+ retry_timeout: 1_000
]
+ @type pool() :: :federation | :upload | :media | :default
+
@spec options(keyword(), URI.t()) :: keyword()
- def options(connection_opts \\ [], %URI{} = uri) do
- formatted_proxy =
- Pleroma.Config.get([:http, :proxy_url], nil)
+ def options(incoming_opts \\ [], %URI{} = uri) do
+ proxy =
+ [:http, :proxy_url]
+ |> Config.get()
|> AdapterHelper.format_proxy()
- config_opts = Pleroma.Config.get([:http, :adapter], [])
+ config_opts = Config.get([:http, :adapter], [])
@defaults
|> Keyword.merge(config_opts)
|> add_scheme_opts(uri)
- |> AdapterHelper.maybe_add_proxy(formatted_proxy)
- |> maybe_get_conn(uri, connection_opts)
+ |> AdapterHelper.maybe_add_proxy(proxy)
+ |> Keyword.merge(incoming_opts)
+ |> put_timeout()
end
- @spec after_request(keyword()) :: :ok
- def after_request(opts) do
- if opts[:conn] && opts[:body_as] != :chunks do
- Connections.checkout(opts[:conn], self(), :gun_connections)
- end
+ defp add_scheme_opts(opts, %{scheme: "http"}), do: opts
- :ok
+ defp add_scheme_opts(opts, %{scheme: "https"}) do
+ Keyword.put(opts, :certificates_verification, true)
end
- defp add_scheme_opts(opts, %URI{scheme: "http"}), do: opts
-
- defp add_scheme_opts(opts, %URI{scheme: "https", host: host}) do
- adapter_opts = [
- certificates_verification: true,
- transport: :tls,
- tls_opts: [
- verify: :verify_peer,
- cacertfile: CAStore.file_path(),
- depth: 20,
- reuse_sessions: false,
- verify_fun: {&:ssl_verify_hostname.verify_fun/3, [check_hostname: format_host(host)]},
- log_level: :warning
- ]
- ]
-
- Keyword.merge(opts, adapter_opts)
+ defp put_timeout(opts) do
+ {recv_timeout, opts} = Keyword.pop(opts, :recv_timeout, pool_timeout(opts[:pool]))
+ # this is the timeout to receive a message from Gun
+ # `:timeout` key is used in Tesla
+ Keyword.put(opts, :timeout, recv_timeout)
end
- defp maybe_get_conn(adapter_opts, uri, connection_opts) do
- {receive_conn?, opts} =
- adapter_opts
- |> Keyword.merge(connection_opts)
- |> Keyword.pop(:receive_conn, true)
-
- if Connections.alive?(:gun_connections) and receive_conn? do
- try_to_get_conn(uri, opts)
- else
- opts
- end
- end
-
- defp try_to_get_conn(uri, opts) do
- case Connections.checkin(uri, :gun_connections) do
- nil ->
- Logger.debug(
- "Gun connections pool checkin was not successful. Trying to open conn for next request."
- )
+ @spec pool_timeout(pool()) :: non_neg_integer()
+ def pool_timeout(pool) do
+ default = Config.get([:pools, :default, :recv_timeout], 5_000)
- Task.start(fn -> Pleroma.Gun.Conn.open(uri, :gun_connections, opts) end)
- opts
-
- conn when is_pid(conn) ->
- Logger.debug("received conn #{inspect(conn)} #{Connections.compose_uri_log(uri)}")
-
- opts
- |> Keyword.put(:conn, conn)
- |> Keyword.put(:close_conn, false)
- end
+ Config.get([:pools, pool, :recv_timeout], default)
end
- @spec format_host(String.t()) :: charlist()
- def format_host(host) do
- host_charlist = to_charlist(host)
+ @prefix Pleroma.Gun.ConnectionPool
+ def limiter_setup do
+ wait = Config.get([:connections_pool, :connection_acquisition_wait])
+ retries = Config.get([:connections_pool, :connection_acquisition_retries])
+
+ :pools
+ |> Config.get([])
+ |> Enum.each(fn {name, opts} ->
+ max_running = Keyword.get(opts, :size, 50)
+ max_waiting = Keyword.get(opts, :max_waiting, 10)
+
+ result =
+ ConcurrentLimiter.new(:"#{@prefix}.#{name}", max_running, max_waiting,
+ wait: wait,
+ max_retries: retries
+ )
- case :inet.parse_address(host_charlist) do
- {:error, :einval} ->
- :idna.encode(host_charlist)
+ case result do
+ :ok -> :ok
+ {:error, :existing} -> :ok
+ end
+ end)
- {:ok, _ip} ->
- host_charlist
- end
+ :ok
end
end