X-Git-Url: https://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fpool%2Fconnections.ex;h=acafe1beabe7c698810172683dd6ad530ba00047;hb=2069ec5006b9142b784dc6ab8b190838481dfe5b;hp=1ed16d1c1e2b722fc4d920e083cebc111d09860d;hpb=514c899275a32e6ef63305f9424c50344d41b12e;p=akkoma
diff --git a/lib/pleroma/pool/connections.ex b/lib/pleroma/pool/connections.ex
index 1ed16d1c1..acafe1bea 100644
--- a/lib/pleroma/pool/connections.ex
+++ b/lib/pleroma/pool/connections.ex
@@ -1,10 +1,13 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors
+# Copyright © 2017-2020 Pleroma Authors
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Pool.Connections do
use GenServer
+ alias Pleroma.Config
+ alias Pleroma.Gun
+
require Logger
@type domain :: String.t()
@@ -17,9 +20,6 @@ defmodule Pleroma.Pool.Connections do
defstruct conns: %{}, opts: []
- alias Pleroma.Gun.API
- alias Pleroma.Gun.Conn
-
@spec start_link({atom(), keyword()}) :: {:ok, pid()}
def start_link({name, opts}) do
GenServer.start_link(__MODULE__, opts, name: name)
@@ -33,37 +33,18 @@ defmodule Pleroma.Pool.Connections do
def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name)
def checkin(%URI{} = uri, name) do
- timeout = Pleroma.Config.get([:connections_pool, :receive_connection_timeout], 250)
-
- GenServer.call(
- name,
- {:checkin, uri},
- timeout
- )
- end
-
- @spec open_conn(String.t() | URI.t(), atom(), keyword()) :: :ok
- def open_conn(url, name, opts \\ [])
- def open_conn(url, name, opts) when is_binary(url), do: open_conn(URI.parse(url), name, opts)
-
- def open_conn(%URI{} = uri, name, opts) do
- pool_opts = Pleroma.Config.get([:connections_pool], [])
-
- opts =
- opts
- |> Enum.into(%{})
- |> Map.put_new(:receive, false)
- |> Map.put_new(:retry, pool_opts[:retry] || 5)
- |> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 100)
- |> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000)
+ timeout = Config.get([:connections_pool, :checkin_timeout], 250)
- GenServer.cast(name, {:open_conn, %{opts: opts, uri: uri}})
+ GenServer.call(name, {:checkin, uri}, timeout)
end
@spec alive?(atom()) :: boolean()
def alive?(name) do
- pid = Process.whereis(name)
- if pid, do: Process.alive?(pid), else: false
+ if pid = Process.whereis(name) do
+ Process.alive?(pid)
+ else
+ false
+ end
end
@spec get_state(atom()) :: t()
@@ -71,78 +52,89 @@ defmodule Pleroma.Pool.Connections do
GenServer.call(name, :state)
end
+ @spec count(atom()) :: pos_integer()
+ def count(name) do
+ GenServer.call(name, :count)
+ end
+
+ @spec get_unused_conns(atom()) :: [{domain(), conn()}]
+ def get_unused_conns(name) do
+ GenServer.call(name, :unused_conns)
+ end
+
@spec checkout(pid(), pid(), atom()) :: :ok
def checkout(conn, pid, name) do
GenServer.cast(name, {:checkout, conn, pid})
end
- @impl true
- def handle_cast({:open_conn, %{opts: opts, uri: uri}}, state) do
- Logger.debug("opening new #{compose_uri(uri)}")
- max_connections = state.opts[:max_connections]
+ @spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok
+ def add_conn(name, key, conn) do
+ GenServer.cast(name, {:add_conn, key, conn})
+ end
- key = compose_key(uri)
+ @spec remove_conn(atom(), String.t()) :: :ok
+ def remove_conn(name, key) do
+ GenServer.cast(name, {:remove_conn, key})
+ end
- if Enum.count(state.conns) < max_connections do
- open_conn(key, uri, state, opts)
- else
- try_to_open_conn(key, uri, state, opts)
- end
+ @impl true
+ def handle_cast({:add_conn, key, conn}, state) do
+ state = put_in(state.conns[key], conn)
+
+ Process.monitor(conn.conn)
+ {:noreply, state}
end
@impl true
def handle_cast({:checkout, conn_pid, pid}, state) do
- Logger.debug("checkout #{inspect(conn_pid)}")
-
state =
with true <- Process.alive?(conn_pid),
{key, conn} <- find_conn(state.conns, conn_pid),
used_by <- List.keydelete(conn.used_by, pid, 0) do
- conn_state =
- if used_by == [] do
- :idle
- else
- conn.conn_state
- end
+ conn_state = if used_by == [], do: :idle, else: conn.conn_state
put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
else
false ->
- Logger.warn("checkout for closed conn #{inspect(conn_pid)}")
+ Logger.debug("checkout for closed conn #{inspect(conn_pid)}")
state
nil ->
- Logger.info("checkout for alive conn #{inspect(conn_pid)}, but is not in state")
+ Logger.debug("checkout for alive conn #{inspect(conn_pid)}, but is not in state")
state
end
{:noreply, state}
end
+ @impl true
+ def handle_cast({:remove_conn, key}, state) do
+ state = put_in(state.conns, Map.delete(state.conns, key))
+ {:noreply, state}
+ end
+
@impl true
def handle_call({:checkin, uri}, from, state) do
- Logger.debug("checkin #{compose_uri(uri)}")
- key = compose_key(uri)
+ key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
case state.conns[key] do
- %{conn: conn, gun_state: gun_state} = current_conn when gun_state == :up ->
- Logger.debug("reusing conn #{compose_uri(uri)}")
-
- with time <- :os.system_time(:second),
- last_reference <- time - current_conn.last_reference,
- current_crf <- crf(last_reference, 100, current_conn.crf),
- state <-
- put_in(state.conns[key], %{
- current_conn
- | last_reference: time,
- crf: current_crf,
- conn_state: :active,
- used_by: [from | current_conn.used_by]
- }) do
- {:reply, conn, state}
- end
+ %{conn: pid, gun_state: :up} = conn ->
+ time = :os.system_time(:second)
+ last_reference = time - conn.last_reference
+ crf = crf(last_reference, 100, conn.crf)
- %{gun_state: gun_state} when gun_state == :down ->
+ state =
+ put_in(state.conns[key], %{
+ conn
+ | last_reference: time,
+ crf: crf,
+ conn_state: :active,
+ used_by: [from | conn.used_by]
+ })
+
+ {:reply, pid, state}
+
+ %{gun_state: :down} ->
{:reply, nil, state}
nil ->
@@ -153,38 +145,58 @@ defmodule Pleroma.Pool.Connections do
@impl true
def handle_call(:state, _from, state), do: {:reply, state, state}
+ @impl true
+ def handle_call(:count, _from, state) do
+ {:reply, Enum.count(state.conns), state}
+ end
+
+ @impl true
+ def handle_call(:unused_conns, _from, state) do
+ unused_conns =
+ state.conns
+ |> Enum.filter(&filter_conns/1)
+ |> Enum.sort(&sort_conns/2)
+
+ {:reply, unused_conns, state}
+ end
+
+ defp filter_conns({_, %{conn_state: :idle, used_by: []}}), do: true
+ defp filter_conns(_), do: false
+
+ defp sort_conns({_, c1}, {_, c2}) do
+ c1.crf <= c2.crf and c1.last_reference <= c2.last_reference
+ end
+
@impl true
def handle_info({:gun_up, conn_pid, _protocol}, state) do
+ %{origin_host: host, origin_scheme: scheme, origin_port: port} = Gun.info(conn_pid)
+
+ host =
+ case :inet.ntoa(host) do
+ {:error, :einval} -> host
+ ip -> ip
+ end
+
+ key = "#{scheme}:#{host}:#{port}"
+
state =
- with true <- Process.alive?(conn_pid),
- conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid),
- {key, conn} <- find_conn(state.conns, conn_pid, conn_key),
- time <- :os.system_time(:second),
- last_reference <- time - conn.last_reference,
- current_crf <- crf(last_reference, 100, conn.crf) do
+ with {key, conn} <- find_conn(state.conns, conn_pid, key),
+ {true, key} <- {Process.alive?(conn_pid), key} do
put_in(state.conns[key], %{
conn
| gun_state: :up,
- last_reference: time,
- crf: current_crf,
conn_state: :active,
retries: 0
})
else
- :error_gun_info ->
- Logger.warn(":gun.info caused error")
- state
-
- false ->
- Logger.warn(":gun_up message for closed conn #{inspect(conn_pid)}")
- state
-
- nil ->
- Logger.warn(
- ":gun_up message for alive conn #{inspect(conn_pid)}, but deleted from state"
+ {false, key} ->
+ put_in(
+ state.conns,
+ Map.delete(state.conns, key)
)
- :ok = API.close(conn_pid)
+ nil ->
+ :ok = Gun.close(conn_pid)
state
end
@@ -194,13 +206,13 @@ defmodule Pleroma.Pool.Connections do
@impl true
def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do
+ retries = Config.get([:connections_pool, :retry], 1)
# we can't get info on this pid, because pid is dead
state =
- with true <- Process.alive?(conn_pid),
- {key, conn} <- find_conn(state.conns, conn_pid) do
- if conn.retries == 5 do
- Logger.debug("closing conn if retries is eq 5 #{inspect(conn_pid)}")
- :ok = API.close(conn.conn)
+ with {key, conn} <- find_conn(state.conns, conn_pid),
+ {true, key} <- {Process.alive?(conn_pid), key} do
+ if conn.retries == retries do
+ :ok = Gun.close(conn.conn)
put_in(
state.conns,
@@ -214,17 +226,14 @@ defmodule Pleroma.Pool.Connections do
})
end
else
- false ->
- # gun can send gun_down for closed conn, maybe connection is not closed yet
- Logger.warn(":gun_down message for closed conn #{inspect(conn_pid)}")
- state
-
- nil ->
- Logger.warn(
- ":gun_down message for alive conn #{inspect(conn_pid)}, but deleted from state"
+ {false, key} ->
+ put_in(
+ state.conns,
+ Map.delete(state.conns, key)
)
- :ok = API.close(conn_pid)
+ nil ->
+ Logger.debug(":gun_down for conn which isn't found in state")
state
end
@@ -232,23 +241,28 @@ defmodule Pleroma.Pool.Connections do
{:noreply, state}
end
- defp compose_key(%URI{scheme: scheme, host: host, port: port}), do: "#{scheme}:#{host}:#{port}"
+ @impl true
+ def handle_info({:DOWN, _ref, :process, conn_pid, reason}, state) do
+ Logger.debug("received DOWN message for #{inspect(conn_pid)} reason -> #{inspect(reason)}")
- defp compose_key_gun_info(pid) do
- try do
- # sometimes :gun.info can raise MatchError, which lead to pool terminate
- %{origin_host: origin_host, origin_scheme: scheme, origin_port: port} = API.info(pid)
+ state =
+ with {key, conn} <- find_conn(state.conns, conn_pid) do
+ Enum.each(conn.used_by, fn {pid, _ref} ->
+ Process.exit(pid, reason)
+ end)
+
+ put_in(
+ state.conns,
+ Map.delete(state.conns, key)
+ )
+ else
+ nil ->
+ Logger.debug(":DOWN for conn which isn't found in state")
- host =
- case :inet.ntoa(origin_host) do
- {:error, :einval} -> origin_host
- ip -> ip
- end
+ state
+ end
- "#{scheme}:#{host}:#{port}"
- rescue
- _ -> :error_gun_info
- end
+ {:noreply, state}
end
defp find_conn(conns, conn_pid) do
@@ -263,153 +277,7 @@ defmodule Pleroma.Pool.Connections do
end)
end
- defp open_conn(key, uri, state, %{proxy: {proxy_host, proxy_port}} = opts) do
- connect_opts =
- uri
- |> destination_opts()
- |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
-
- with open_opts <- Map.delete(opts, :tls_opts),
- {:ok, conn} <- API.open(proxy_host, proxy_port, open_opts),
- {:ok, _} <- API.await_up(conn),
- stream <- API.connect(conn, connect_opts),
- {:response, :fin, 200, _} <- API.await(conn, stream),
- state <-
- put_in(state.conns[key], %Conn{
- conn: conn,
- gun_state: :up,
- conn_state: :active,
- last_reference: :os.system_time(:second)
- }) do
- {:noreply, state}
- else
- error ->
- Logger.warn(
- "Received error on opening connection with http proxy #{uri.scheme}://#{
- compose_uri(uri)
- }: #{inspect(error)}"
- )
-
- {:noreply, state}
- end
- end
-
- defp open_conn(key, uri, state, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
- version =
- proxy_type
- |> to_string()
- |> String.last()
- |> case do
- "4" -> 4
- _ -> 5
- end
-
- socks_opts =
- uri
- |> destination_opts()
- |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
- |> Map.put(:version, version)
-
- opts =
- opts
- |> Map.put(:protocols, [:socks])
- |> Map.put(:socks_opts, socks_opts)
-
- with {:ok, conn} <- API.open(proxy_host, proxy_port, opts),
- {:ok, _} <- API.await_up(conn),
- state <-
- put_in(state.conns[key], %Conn{
- conn: conn,
- gun_state: :up,
- conn_state: :active,
- last_reference: :os.system_time(:second)
- }) do
- {:noreply, state}
- else
- error ->
- Logger.warn(
- "Received error on opening connection with socks proxy #{uri.scheme}://#{
- compose_uri(uri)
- }: #{inspect(error)}"
- )
-
- {:noreply, state}
- end
- end
-
- defp open_conn(key, %URI{host: host, port: port} = uri, state, opts) do
- Logger.debug("opening conn #{compose_uri(uri)}")
- {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
-
- with {:ok, conn} <- API.open(host, port, opts),
- {:ok, _} <- API.await_up(conn),
- state <-
- put_in(state.conns[key], %Conn{
- conn: conn,
- gun_state: :up,
- conn_state: :active,
- last_reference: :os.system_time(:second)
- }) do
- Logger.debug("new conn opened #{compose_uri(uri)}")
- Logger.debug("replying to the call #{compose_uri(uri)}")
- {:noreply, state}
- else
- error ->
- Logger.warn(
- "Received error on opening connection #{uri.scheme}://#{compose_uri(uri)}: #{
- inspect(error)
- }"
- )
-
- {:noreply, state}
- end
- end
-
- defp destination_opts(%URI{host: host, port: port}) do
- {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
- %{host: host, port: port}
- end
-
- defp add_http2_opts(opts, "https", tls_opts) do
- Map.merge(opts, %{protocols: [:http2], transport: :tls, tls_opts: tls_opts})
- end
-
- defp add_http2_opts(opts, _, _), do: opts
-
- @spec get_unused_conns(map()) :: [{domain(), conn()}]
- def get_unused_conns(conns) do
- conns
- |> Enum.filter(fn {_k, v} ->
- v.conn_state == :idle and v.used_by == []
- end)
- |> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
- x.crf <= y.crf and x.last_reference <= y.last_reference
- end)
- end
-
- defp try_to_open_conn(key, uri, state, opts) do
- Logger.debug("try to open conn #{compose_uri(uri)}")
-
- with [{close_key, least_used} | _conns] <- get_unused_conns(state.conns),
- :ok <- API.close(least_used.conn),
- state <-
- put_in(
- state.conns,
- Map.delete(state.conns, close_key)
- ) do
- Logger.debug(
- "least used conn found and closed #{inspect(least_used.conn)} #{compose_uri(uri)}"
- )
-
- open_conn(key, uri, state, opts)
- else
- [] -> {:noreply, state}
- end
- end
-
def crf(current, steps, crf) do
1 + :math.pow(0.5, current / steps) * crf
end
-
- def compose_uri(%URI{} = uri), do: "#{uri.host}#{uri.path}"
end