# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.HTTP.AdapterHelper.Gun do
@behaviour Pleroma.HTTP.AdapterHelper
+ alias Pleroma.Config
alias Pleroma.HTTP.AdapterHelper
require Logger
- alias Pleroma.Pool.Connections
-
@defaults [
- connect_timeout: 5_000,
- domain_lookup_timeout: 5_000,
- tls_handshake_timeout: 5_000,
retry: 1,
- retry_timeout: 1000,
- await_up_timeout: 5_000
+ retry_timeout: 1_000
]
+ @type pool() :: :federation | :upload | :media | :default
+
@spec options(keyword(), URI.t()) :: keyword()
- def options(connection_opts \\ [], %URI{} = uri) do
- proxy = Pleroma.Config.get([:http, :proxy_url], nil)
+ def options(incoming_opts \\ [], %URI{} = uri) do
+ proxy =
+ [:http, :proxy_url]
+ |> Config.get()
+ |> AdapterHelper.format_proxy()
+
+ config_opts = Config.get([:http, :adapter], [])
@defaults
- |> Keyword.merge(Pleroma.Config.get([:http, :adapter], []))
- |> add_original(uri)
+ |> Keyword.merge(config_opts)
|> add_scheme_opts(uri)
- |> AdapterHelper.maybe_add_proxy(AdapterHelper.format_proxy(proxy))
- |> maybe_get_conn(uri, connection_opts)
+ |> AdapterHelper.maybe_add_proxy(proxy)
+ |> Keyword.merge(incoming_opts)
+ |> put_timeout()
end
- @spec after_request(keyword()) :: :ok
- def after_request(opts) do
- with conn when not is_nil(conn) <- opts[:conn],
- body_as when body_as != :chunks <- opts[:body_as] do
- Connections.checkout(conn, self(), :gun_connections)
- end
+ defp add_scheme_opts(opts, %{scheme: "http"}), do: opts
- :ok
+ defp add_scheme_opts(opts, %{scheme: "https"}) do
+ Keyword.put(opts, :certificates_verification, true)
end
- defp add_original(opts, %URI{host: host, port: port}) do
- formatted_host = format_host(host)
-
- Keyword.put(opts, :original, "#{formatted_host}:#{port}")
+ defp put_timeout(opts) do
+ {recv_timeout, opts} = Keyword.pop(opts, :recv_timeout, pool_timeout(opts[:pool]))
+ # this is the timeout to receive a message from Gun
+ # `:timeout` key is used in Tesla
+ Keyword.put(opts, :timeout, recv_timeout)
end
- defp add_scheme_opts(opts, %URI{scheme: "http"}), do: opts
-
- defp add_scheme_opts(opts, %URI{scheme: "https", host: host, port: port}) do
- adapter_opts = [
- certificates_verification: true,
- tls_opts: [
- verify: :verify_peer,
- cacertfile: CAStore.file_path(),
- depth: 20,
- reuse_sessions: false,
- verify_fun: {&:ssl_verify_hostname.verify_fun/3, [check_hostname: format_host(host)]},
- log_level: :warning
- ]
- ]
-
- adapter_opts =
- if port != 443 do
- Keyword.put(adapter_opts, :transport, :tls)
- else
- adapter_opts
- end
-
- Keyword.merge(opts, adapter_opts)
- end
+ @spec pool_timeout(pool()) :: non_neg_integer()
+ def pool_timeout(pool) do
+ default = Config.get([:pools, :default, :recv_timeout], 5_000)
- defp maybe_get_conn(adapter_opts, uri, connection_opts) do
- {receive_conn?, opts} =
- adapter_opts
- |> Keyword.merge(connection_opts)
- |> Keyword.pop(:receive_conn, true)
-
- if Connections.alive?(:gun_connections) and receive_conn? do
- try_to_get_conn(uri, opts)
- else
- opts
- end
+ Config.get([:pools, pool, :recv_timeout], default)
end
- defp try_to_get_conn(uri, opts) do
- case Connections.checkin(uri, :gun_connections) do
- nil ->
- Logger.debug(
- "Gun connections pool checkin was not successful. Trying to open conn for next request."
+ @prefix Pleroma.Gun.ConnectionPool
+ def limiter_setup do
+ wait = Config.get([:connections_pool, :connection_acquisition_wait])
+ retries = Config.get([:connections_pool, :connection_acquisition_retries])
+
+ :pools
+ |> Config.get([])
+ |> Enum.each(fn {name, opts} ->
+ max_running = Keyword.get(opts, :size, 50)
+ max_waiting = Keyword.get(opts, :max_waiting, 10)
+
+ result =
+ ConcurrentLimiter.new(:"#{@prefix}.#{name}", max_running, max_waiting,
+ wait: wait,
+ max_retries: retries
)
- Task.start(fn -> Pleroma.Gun.Conn.open(uri, :gun_connections, opts) end)
- opts
-
- conn when is_pid(conn) ->
- Logger.debug("received conn #{inspect(conn)} #{Connections.compose_uri_log(uri)}")
-
- opts
- |> Keyword.put(:conn, conn)
- |> Keyword.put(:close_conn, false)
- end
- end
-
- @spec format_host(String.t()) :: charlist()
- def format_host(host) do
- host_charlist = to_charlist(host)
-
- case :inet.parse_address(host_charlist) do
- {:error, :einval} ->
- :idna.encode(host_charlist)
+ case result do
+ :ok -> :ok
+ {:error, :existing} -> :ok
+ end
+ end)
- {:ok, _ip} ->
- host_charlist
- end
+ :ok
end
end