config :pleroma, :pools,
federation: [
size: 50,
- max_waiting: 10
+ max_waiting: 10,
+ timeout: 10_000
],
media: [
size: 50,
- max_waiting: 10
+ max_waiting: 10,
+ timeout: 10_000
],
upload: [
size: 25,
- max_waiting: 5
+ max_waiting: 5,
+ timeout: 15_000
],
default: [
size: 10,
- max_waiting: 2
+ max_waiting: 2,
+ timeout: 5_000
]
config :pleroma, :hackney_pools,
url = String.replace(frontend_info["build_url"], "${ref}", frontend_info["ref"])
with {:ok, %{status: 200, body: zip_body}} <-
- Pleroma.HTTP.get(url, [], timeout: 120_000, recv_timeout: 120_000) do
+ Pleroma.HTTP.get(url, [],
+ adapter: [pool: :media, timeout: 120_000, recv_timeout: 120_000]
+ ) do
unzip(zip_body, dest)
else
e -> {:error, e}
end)
{ref, state} = pop_in(state.client_monitors[client_pid])
- Process.demonitor(ref)
-
- timer =
- if used_by == [] do
- max_idle = Pleroma.Config.get([:connections_pool, :max_idle_time], 30_000)
- Process.send_after(self(), :idle_close, max_idle)
+ # DOWN message can receive right after `remove_client` call and cause worker to terminate
+ state =
+ if is_nil(ref) do
+ state
else
- nil
+ Process.demonitor(ref)
+
+ timer =
+ if used_by == [] do
+ max_idle = Pleroma.Config.get([:connections_pool, :max_idle_time], 30_000)
+ Process.send_after(self(), :idle_close, max_idle)
+ else
+ nil
+ end
+
+ %{state | timer: timer}
end
- {:reply, :ok, %{state | timer: timer}, :hibernate}
+ {:reply, :ok, state, :hibernate}
end
@impl true
{:stop, :normal, state}
end
+ @impl true
+ def handle_info({:gun_up, _pid, _protocol}, state) do
+ {:noreply, state, :hibernate}
+ end
+
# Gracefully shutdown if the connection got closed without any streams left
@impl true
def handle_info({:gun_down, _pid, _protocol, _reason, []}, state) do
{:stop, :normal, state}
end
- # Otherwise, shutdown with an error
+ # Otherwise, wait for retry
@impl true
- def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams} = down_message, state) do
- {:stop, {:error, down_message}, state}
+ def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams}, state) do
+ {:noreply, state, :hibernate}
end
@impl true
@type proxy_type() :: :socks4 | :socks5
@type host() :: charlist() | :inet.ip_address()
- alias Pleroma.Config
alias Pleroma.HTTP.AdapterHelper
require Logger
@spec options(URI.t(), keyword()) :: keyword()
def options(%URI{} = uri, opts \\ []) do
@defaults
- |> put_timeout()
|> Keyword.merge(opts)
|> adapter_helper().options(uri)
end
- # For Hackney, this is the time a connection can stay idle in the pool.
- # For Gun, this is the timeout to receive a message from Gun.
- defp put_timeout(opts) do
- {config_key, default} =
- if adapter() == Tesla.Adapter.Gun do
- {:pools, Config.get([:pools, :default, :timeout], 5_000)}
- else
- {:hackney_pools, 10_000}
- end
-
- timeout = Config.get([config_key, opts[:pool], :timeout], default)
-
- Keyword.merge(opts, timeout: timeout)
- end
-
+ @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} | {:error, atom()}
def get_conn(uri, opts), do: adapter_helper().get_conn(uri, opts)
+
defp adapter, do: Application.get_env(:tesla, :adapter)
defp adapter_helper do
defmodule Pleroma.HTTP.AdapterHelper.Gun do
@behaviour Pleroma.HTTP.AdapterHelper
+ alias Pleroma.Config
alias Pleroma.Gun.ConnectionPool
alias Pleroma.HTTP.AdapterHelper
connect_timeout: 5_000,
domain_lookup_timeout: 5_000,
tls_handshake_timeout: 5_000,
- retry: 0,
+ retry: 1,
retry_timeout: 1000,
await_up_timeout: 5_000
]
+ @type pool() :: :federation | :upload | :media | :default
+
@spec options(keyword(), URI.t()) :: keyword()
def options(incoming_opts \\ [], %URI{} = uri) do
proxy =
- Pleroma.Config.get([:http, :proxy_url])
+ [:http, :proxy_url]
+ |> Config.get()
|> AdapterHelper.format_proxy()
- config_opts = Pleroma.Config.get([:http, :adapter], [])
+ config_opts = Config.get([:http, :adapter], [])
@defaults
|> Keyword.merge(config_opts)
|> add_scheme_opts(uri)
|> AdapterHelper.maybe_add_proxy(proxy)
|> Keyword.merge(incoming_opts)
+ |> put_timeout()
end
defp add_scheme_opts(opts, %{scheme: "http"}), do: opts
defp add_scheme_opts(opts, %{scheme: "https"}) do
- opts
- |> Keyword.put(:certificates_verification, true)
+ Keyword.put(opts, :certificates_verification, true)
+ end
+
+ defp put_timeout(opts) do
+ # this is the timeout to receive a message from Gun
+ Keyword.put_new(opts, :timeout, pool_timeout(opts[:pool]))
+ end
+
+ @spec pool_timeout(pool()) :: non_neg_integer()
+ def pool_timeout(pool) do
+ default = Config.get([:pools, :default, :timeout], 5_000)
+
+ Config.get([:pools, pool, :timeout], default)
end
@spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} | {:error, atom()}
@prefix Pleroma.Gun.ConnectionPool
def limiter_setup do
- wait = Pleroma.Config.get([:connections_pool, :connection_acquisition_wait])
- retries = Pleroma.Config.get([:connections_pool, :connection_acquisition_retries])
+ wait = Config.get([:connections_pool, :connection_acquisition_wait])
+ retries = Config.get([:connections_pool, :connection_acquisition_retries])
:pools
- |> Pleroma.Config.get([])
+ |> Config.get([])
|> Enum.each(fn {name, opts} ->
max_running = Keyword.get(opts, :size, 50)
max_waiting = Keyword.get(opts, :max_waiting, 10)
case result do
:ok -> :ok
{:error, :existing} -> :ok
- e -> raise e
end
end)
@impl true
def request(method, url, body \\ "", headers \\ [], http_opts \\ []) do
+ http_opts = Keyword.put_new(http_opts, :adapter, pool: :upload)
+
case HTTP.request(method, url, body, headers, http_opts) do
{:ok, env} ->
{:ok, %{status_code: env.status, headers: env.headers, body: env.body}}
@impl true
def get(url, headers, options) do
+ options = Keyword.put_new(options, :adapter, pool: :default)
+
with {:ok, %Tesla.Env{} = env} <- HTTP.get(url, headers, options) do
{:ok, {env.status, env.headers, env.body}}
end
@impl true
def head(url, headers, options) do
+ options = Keyword.put_new(options, :adapter, pool: :default)
+
with {:ok, %Tesla.Env{} = env} <- HTTP.head(url, headers, options) do
{:ok, {env.status, env.headers}}
end
defp scrape_favicon(%URI{} = instance_uri) do
try do
with {:ok, %Tesla.Env{body: html}} <-
- Pleroma.HTTP.get(to_string(instance_uri), [{:Accept, "text/html"}]),
+ Pleroma.HTTP.get(to_string(instance_uri), [{"accept", "text/html"}],
+ adapter: [pool: :media]
+ ),
favicon_rel <-
html
|> Floki.parse_document!()
date: date
})
- [{"signature", signature}]
+ {"signature", signature}
end
defp sign_fetch(headers, id, date) do
if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do
- headers ++ make_signature(id, date)
+ [make_signature(id, date) | headers]
else
headers
end
defp maybe_date_fetch(headers, date) do
if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do
- headers ++ [{"date", date}]
+ [{"date", date} | headers]
else
headers
end
op =
if streaming do
- upload.tempfile
- |> ExAws.S3.Upload.stream_file()
- |> ExAws.S3.upload(bucket, s3_name, [
- {:acl, :public_read},
- {:content_type, upload.content_type}
- ])
+ op =
+ upload.tempfile
+ |> ExAws.S3.Upload.stream_file()
+ |> ExAws.S3.upload(bucket, s3_name, [
+ {:acl, :public_read},
+ {:content_type, upload.content_type}
+ ])
+
+ if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do
+ # set s3 upload timeout to respect :upload pool timeout
+ # timeout should be slightly larger, so s3 can retry upload on fail
+ timeout = Pleroma.HTTP.AdapterHelper.Gun.pool_timeout(:upload) + 1_000
+ opts = Keyword.put(op.opts, :timeout, timeout)
+ Map.put(op, :opts, opts)
+ else
+ op
+ end
else
{:ok, file_data} = File.read(upload.tempfile)
@rich_media_options
end
- Pleroma.HTTP.get(url, headers, options)
+ Pleroma.HTTP.get(url, headers, adapter: options)
end
end
def find_lrdd_template(domain) do
with {:ok, %{status: status, body: body}} when status in 200..299 <-
- HTTP.get("http://#{domain}/.well-known/host-meta", []) do
+ HTTP.get("http://#{domain}/.well-known/host-meta") do
get_template_from_xml(body)
else
_ ->
with {:ok, %{body: body, status: status}} when status in 200..299 <-
- HTTP.get("https://#{domain}/.well-known/host-meta", []) do
+ HTTP.get("https://#{domain}/.well-known/host-meta") do
get_template_from_xml(body)
else
e -> {:error, "Can't find LRDD template: #{inspect(e)}"}
{:ok, %Tesla.Env{status: 200, body: File.read!("test/fixtures/relay/relay.json")}}
end
- def get("http://localhost:4001/", _, "", Accept: "text/html") do
+ def get("http://localhost:4001/", _, "", [{"accept", "text/html"}]) do
{:ok, %Tesla.Env{status: 200, body: File.read!("test/fixtures/tesla_mock/7369654.html")}}
end
- def get("https://osada.macgirvin.com/", _, "", Accept: "text/html") do
+ def get("https://osada.macgirvin.com/", _, "", [{"accept", "text/html"}]) do
{:ok,
%Tesla.Env{
status: 200,