setup_instrumenters()
load_custom_modules()
- if adapter() == Tesla.Adapter.Gun do
+ adapter = Application.get_env(:tesla, :adapter)
+
+ if adapter == Tesla.Adapter.Gun do
if version = Pleroma.OTPVersion.version() do
[major, minor] =
version
Pleroma.Plugs.RateLimiter.Supervisor
] ++
cachex_children() ++
- http_pools_children(Config.get(:env)) ++
+ http_children(adapter, @env) ++
[
Pleroma.Stats,
Pleroma.JobQueueMonitor,
end
# start hackney and gun pools in tests
- defp http_pools_children(:test) do
+ defp http_children(_, :test) do
hackney_options = Config.get([:hackney_pools, :federation])
hackney_pool = :hackney_pool.child_spec(:federation, hackney_options)
[hackney_pool, Pleroma.Pool.Supervisor]
end
- defp http_pools_children(_), do: http_pools(adapter())
-
- defp http_pools(Tesla.Adapter.Hackney) do
+ defp http_children(Tesla.Adapter.Hackney, _) do
pools = [:federation, :media]
pools =
end
end
- defp http_pools(Tesla.Adapter.Gun), do: [Pleroma.Pool.Supervisor]
-
- defp http_pools(_), do: []
+ defp http_children(Tesla.Adapter.Gun, _), do: [Pleroma.Pool.Supervisor]
- defp adapter, do: Application.get_env(:tesla, :adapter)
+ defp http_children(_, _), do: []
end
defmodule Pleroma.Config.TransferTask do
use Task
+ alias Pleroma.Config
alias Pleroma.ConfigDB
alias Pleroma.Repo
def start_link(_) do
load_and_update_env()
- if Pleroma.Config.get(:env) == :test, do: Ecto.Adapters.SQL.Sandbox.checkin(Repo)
+ if Config.get(:env) == :test, do: Ecto.Adapters.SQL.Sandbox.checkin(Repo)
:ignore
end
- @spec load_and_update_env([ConfigDB.t()]) :: :ok | false
- def load_and_update_env(deleted \\ [], restart_pleroma? \\ true) do
- with {_, true} <- {:configurable, Pleroma.Config.get(:configurable_from_database)} do
+ @spec load_and_update_env([ConfigDB.t()], boolean()) :: :ok
+ def load_and_update_env(deleted_settings \\ [], restart_pleroma? \\ true) do
+ with {_, true} <- {:configurable, Config.get(:configurable_from_database)} do
# We need to restart applications for loaded settings take effect
- in_db = Repo.all(ConfigDB)
-
- with_deleted = in_db ++ deleted
# TODO: some problem with prometheus after restart!
- reject = [nil, :prometheus]
-
- reject_for_restart =
+ reject_restart =
if restart_pleroma? do
- reject
+ [nil, :prometheus]
else
- [:pleroma | reject]
+ [:pleroma, nil, :prometheus]
end
started_applications = Application.started_applications()
- with_deleted
- |> Enum.map(&merge_and_update(&1))
+ (Repo.all(ConfigDB) ++ deleted_settings)
+ |> Enum.map(&merge_and_update/1)
|> Enum.uniq()
- |> Enum.reject(&(&1 in reject_for_restart))
+ |> Enum.reject(&(&1 in reject_restart))
|> maybe_set_pleroma_last()
- |> Enum.each(&restart(started_applications, &1, Pleroma.Config.get(:env)))
+ |> Enum.each(&restart(started_applications, &1, Config.get(:env)))
:ok
else
key = ConfigDB.from_string(setting.key)
group = ConfigDB.from_string(setting.group)
- default = Pleroma.Config.Holder.config(group, key)
+ default = Config.Holder.config(group, key)
value = ConfigDB.from_binary(setting.value)
merged_value =
- if Ecto.get_meta(setting, :state) == :deleted do
- default
- else
- if can_be_merged?(default, value) do
- ConfigDB.merge_group(group, key, default, value)
- else
- value
- end
+ cond do
+ Ecto.get_meta(setting, :state) == :deleted -> default
+ can_be_merged?(default, value) -> ConfigDB.merge_group(group, key, default, value)
+ true -> value
end
:ok = update_env(group, key, merged_value)
key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
- Logger.debug("opening new connection #{Connections.compose_uri_log(uri)}")
-
conn_pid =
if Connections.count(name) < opts[:max_connection] do
do_open(uri, opts)
else
error ->
Logger.warn(
- "Received error on opening connection with http proxy #{
- Connections.compose_uri_log(uri)
- } #{inspect(error)}"
+ "Opening proxied connection to #{compose_uri_log(uri)} failed with error #{
+ inspect(error)
+ }"
)
error
else
error ->
Logger.warn(
- "Received error on opening connection with socks proxy #{
- Connections.compose_uri_log(uri)
- } #{inspect(error)}"
+ "Opening socks proxied connection to #{compose_uri_log(uri)} failed with error #{
+ inspect(error)
+ }"
)
error
else
error ->
Logger.warn(
- "Received error on opening connection #{Connections.compose_uri_log(uri)} #{
- inspect(error)
- }"
+ "Opening connection to #{compose_uri_log(uri)} failed with error #{inspect(error)}"
)
error
defp add_http2_opts(opts, _, _), do: opts
defp close_least_used_and_do_open(name, uri, opts) do
- Logger.debug("try to open conn #{Connections.compose_uri_log(uri)}")
-
- with [{close_key, least_used} | _conns] <-
- Connections.get_unused_conns(name),
- :ok <- Gun.close(least_used.conn) do
- Connections.remove_conn(name, close_key)
+ with [{key, conn} | _conns] <- Connections.get_unused_conns(name),
+ :ok <- Gun.close(conn.conn) do
+ Connections.remove_conn(name, key)
do_open(uri, opts)
else
[] -> {:error, :pool_overflowed}
end
end
+
+ def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do
+ "#{scheme}://#{host}#{path}"
+ end
end
@type proxy ::
{Connection.host(), pos_integer()}
- | {Connection.proxy_type(), pos_integer()}
+ | {Connection.proxy_type(), Connection.host(), pos_integer()}
@callback options(keyword(), URI.t()) :: keyword()
@callback after_request(keyword()) :: :ok
]
@spec options(keyword(), URI.t()) :: keyword()
- def options(connection_opts \\ [], %URI{} = uri) do
- formatted_proxy =
+ def options(incoming_opts \\ [], %URI{} = uri) do
+ proxy =
Pleroma.Config.get([:http, :proxy_url], nil)
|> AdapterHelper.format_proxy()
@defaults
|> Keyword.merge(config_opts)
|> add_scheme_opts(uri)
- |> AdapterHelper.maybe_add_proxy(formatted_proxy)
- |> maybe_get_conn(uri, connection_opts)
+ |> AdapterHelper.maybe_add_proxy(proxy)
+ |> maybe_get_conn(uri, incoming_opts)
end
@spec after_request(keyword()) :: :ok
:ok
end
- defp add_scheme_opts(opts, %URI{scheme: "http"}), do: opts
+ defp add_scheme_opts(opts, %{scheme: "http"}), do: opts
- defp add_scheme_opts(opts, %URI{scheme: "https"}) do
+ defp add_scheme_opts(opts, %{scheme: "https"}) do
opts
|> Keyword.put(:certificates_verification, true)
- |> Keyword.put(:transport, :tls)
|> Keyword.put(:tls_opts, log_level: :warning)
end
- defp maybe_get_conn(adapter_opts, uri, connection_opts) do
+ defp maybe_get_conn(adapter_opts, uri, incoming_opts) do
{receive_conn?, opts} =
adapter_opts
- |> Keyword.merge(connection_opts)
+ |> Keyword.merge(incoming_opts)
|> Keyword.pop(:receive_conn, true)
if Connections.alive?(:gun_connections) and receive_conn? do
- try_to_get_conn(uri, opts)
+ checkin_conn(uri, opts)
else
opts
end
end
- defp try_to_get_conn(uri, opts) do
+ defp checkin_conn(uri, opts) do
case Connections.checkin(uri, :gun_connections) do
nil ->
- Logger.debug(
- "Gun connections pool checkin was not successful. Trying to open conn for next request."
- )
-
- Task.start(fn -> Pleroma.Gun.Conn.open(uri, :gun_connections, opts) end)
+ Task.start(Pleroma.Gun.Conn, :open, [uri, :gun_connections, opts])
opts
conn when is_pid(conn) ->
- Logger.debug("received conn #{inspect(conn)} #{Connections.compose_uri_log(uri)}")
-
- opts
- |> Keyword.put(:conn, conn)
- |> Keyword.put(:close_conn, false)
+ Keyword.merge(opts, conn: conn, close_conn: false)
end
end
end
{:ok, parse_host(host), port}
else
{_, _} ->
- Logger.warn("parsing port in proxy fail #{inspect(proxy)}")
+ Logger.warn("Parsing port failed #{inspect(proxy)}")
{:error, :invalid_proxy_port}
:error ->
- Logger.warn("parsing port in proxy fail #{inspect(proxy)}")
+ Logger.warn("Parsing port failed #{inspect(proxy)}")
{:error, :invalid_proxy_port}
_ ->
- Logger.warn("parsing proxy fail #{inspect(proxy)}")
+ Logger.warn("Parsing proxy failed #{inspect(proxy)}")
{:error, :invalid_proxy}
end
end
{:ok, type, parse_host(host), port}
else
_ ->
- Logger.warn("parsing proxy fail #{inspect(proxy)}")
+ Logger.warn("Parsing proxy failed #{inspect(proxy)}")
{:error, :invalid_proxy}
end
end
{:ok, Env.t()} | {:error, any()}
def request(method, url, body, headers, options) when is_binary(url) do
uri = URI.parse(url)
- received_adapter_opts = Keyword.get(options, :adapter, [])
- adapter_opts = Connection.options(uri, received_adapter_opts)
+ adapter_opts = Connection.options(uri, options[:adapter] || [])
options = put_in(options[:adapter], adapter_opts)
- params = Keyword.get(options, :params, [])
+ params = options[:params] || []
request = build_request(method, headers, options, url, body, params)
adapter = Application.get_env(:tesla, :adapter)
@impl true
def handle_cast({:checkout, conn_pid, pid}, state) do
- Logger.debug("checkout #{inspect(conn_pid)}")
-
state =
with true <- Process.alive?(conn_pid),
{key, conn} <- find_conn(state.conns, conn_pid),
used_by <- List.keydelete(conn.used_by, pid, 0) do
- conn_state =
- if used_by == [] do
- :idle
- else
- conn.conn_state
- end
+ conn_state = if used_by == [], do: :idle, else: conn.conn_state
put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
else
@impl true
def handle_call({:checkin, uri}, from, state) do
key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
- Logger.debug("checkin #{key}")
case state.conns[key] do
- %{conn: conn, gun_state: :up} = current_conn ->
- Logger.debug("reusing conn #{key}")
-
+ %{conn: pid, gun_state: :up} = conn ->
time = :os.system_time(:second)
- last_reference = time - current_conn.last_reference
- current_crf = crf(last_reference, 100, current_conn.crf)
+ last_reference = time - conn.last_reference
+ crf = crf(last_reference, 100, conn.crf)
state =
put_in(state.conns[key], %{
- current_conn
+ conn
| last_reference: time,
- crf: current_crf,
+ crf: crf,
conn_state: :active,
- used_by: [from | current_conn.used_by]
+ used_by: [from | conn.used_by]
})
- {:reply, conn, state}
+ {:reply, pid, state}
%{gun_state: :down} ->
{:reply, nil, state}
def handle_call(:unused_conns, _from, state) do
unused_conns =
state.conns
- |> Enum.filter(fn {_k, v} ->
- v.conn_state == :idle and v.used_by == []
- end)
- |> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
- x.crf <= y.crf and x.last_reference <= y.last_reference
- end)
+ |> Enum.filter(&filter_conns/1)
+ |> Enum.sort(&sort_conns/2)
{:reply, unused_conns, state}
end
+ defp filter_conns({_, %{conn_state: :idle, used_by: []}}), do: true
+ defp filter_conns(_), do: false
+
+ defp sort_conns({_, c1}, {_, c2}) do
+ c1.crf <= c2.crf and c1.last_reference <= c2.last_reference
+ end
+
@impl true
def handle_info({:gun_up, conn_pid, _protocol}, state) do
+ %{origin_host: host, origin_scheme: scheme, origin_port: port} = Gun.info(conn_pid)
+
+ host =
+ case :inet.ntoa(host) do
+ {:error, :einval} -> host
+ ip -> ip
+ end
+
+ key = "#{scheme}:#{host}:#{port}"
+
state =
- with conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid),
- {key, conn} <- find_conn(state.conns, conn_pid, conn_key),
+ with {_key, conn} <- find_conn(state.conns, conn_pid, key),
{true, key} <- {Process.alive?(conn_pid), key} do
- time = :os.system_time(:second)
- last_reference = time - conn.last_reference
- current_crf = crf(last_reference, 100, conn.crf)
-
put_in(state.conns[key], %{
conn
| gun_state: :up,
- last_reference: time,
- crf: current_crf,
conn_state: :active,
retries: 0
})
else
- :error_gun_info ->
- Logger.debug(":gun.info caused error")
- state
-
{false, key} ->
- Logger.debug(":gun_up message for closed conn #{inspect(conn_pid)}")
-
put_in(
state.conns,
Map.delete(state.conns, key)
)
nil ->
- Logger.debug(":gun_up message for conn which is not found in state")
-
:ok = Gun.close(conn_pid)
state
with {key, conn} <- find_conn(state.conns, conn_pid),
{true, key} <- {Process.alive?(conn_pid), key} do
if conn.retries == retries do
- Logger.debug("closing conn if retries is eq #{inspect(conn_pid)}")
:ok = Gun.close(conn.conn)
put_in(
end
else
{false, key} ->
- # gun can send gun_down for closed conn, maybe connection is not closed yet
- Logger.debug(":gun_down message for closed conn #{inspect(conn_pid)}")
-
put_in(
state.conns,
Map.delete(state.conns, key)
)
nil ->
- Logger.debug(":gun_down message for conn which is not found in state")
-
- :ok = Gun.close(conn_pid)
+ Logger.debug(":gun_down for conn which isn't found in state")
state
end
)
else
nil ->
- Logger.debug(":DOWN message for conn which is not found in state")
+ Logger.debug(":DOWN for conn which isn't found in state")
state
end
{:noreply, state}
end
- defp compose_key_gun_info(pid) do
- %{origin_host: origin_host, origin_scheme: scheme, origin_port: port} = Gun.info(pid)
-
- host =
- case :inet.ntoa(origin_host) do
- {:error, :einval} -> origin_host
- ip -> ip
- end
-
- "#{scheme}:#{host}:#{port}"
- end
-
defp find_conn(conns, conn_pid) do
Enum.find(conns, fn {_key, conn} ->
conn.conn == conn_pid
def crf(current, steps, crf) do
1 + :math.pow(0.5, current / steps) * crf
end
-
- def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do
- "#{scheme}://#{host}#{path}"
- end
end
use ExUnit.Case, async: true
use Pleroma.Tests.Helpers
- import ExUnit.CaptureLog
import Mox
alias Pleroma.Config
opts = Gun.options([receive_conn: false], uri)
assert opts[:certificates_verification]
- assert opts[:transport] == :tls
end
test "get conn on next request" do
on_exit(fn -> Logger.configure(level: level) end)
uri = URI.parse("http://some-domain2.com")
- assert capture_log(fn ->
- opts = Gun.options(uri)
+ opts = Gun.options(uri)
- assert opts[:conn] == nil
- assert opts[:close_conn] == nil
- end) =~
- "Gun connections pool checkin was not successful. Trying to open conn for next request."
+ assert opts[:conn] == nil
+ assert opts[:close_conn] == nil
+ Process.sleep(50)
opts = Gun.options(uri)
assert is_pid(opts[:conn])
refute Conn.open(url, name)
refute Connections.checkin(url, name)
end) =~
- "Received error on opening connection http://gun-not-up.com {:error, :timeout}"
+ "Opening connection to http://gun-not-up.com failed with error {:error, :timeout}"
end
test "process gun_down message and then gun_up", %{name: name} do