X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fpool%2Fconnections.ex;h=4d4ba913c72fe4d9a2f1ca67de2f943716317e54;hb=7f692343c80ddf353712490edfbcdb14866f5685;hp=f1fab2a24a96208122c198e2fcc561d5f273f2e7;hpb=d9c5ae7c09c7cbf3f4f66e01b7ed69a3d6388916;p=akkoma diff --git a/lib/pleroma/pool/connections.ex b/lib/pleroma/pool/connections.ex index f1fab2a24..4d4ba913c 100644 --- a/lib/pleroma/pool/connections.ex +++ b/lib/pleroma/pool/connections.ex @@ -6,6 +6,7 @@ defmodule Pleroma.Pool.Connections do use GenServer alias Pleroma.Config + alias Pleroma.Gun require Logger @@ -19,8 +20,6 @@ defmodule Pleroma.Pool.Connections do defstruct conns: %{}, opts: [] - alias Pleroma.Gun - @spec start_link({atom(), keyword()}) :: {:ok, pid()} def start_link({name, opts}) do GenServer.start_link(__MODULE__, opts, name: name) @@ -88,18 +87,11 @@ defmodule Pleroma.Pool.Connections do @impl true def handle_cast({:checkout, conn_pid, pid}, state) do - Logger.debug("checkout #{inspect(conn_pid)}") - state = with true <- Process.alive?(conn_pid), {key, conn} <- find_conn(state.conns, conn_pid), used_by <- List.keydelete(conn.used_by, pid, 0) do - conn_state = - if used_by == [] do - :idle - else - conn.conn_state - end + conn_state = if used_by == [], do: :idle, else: conn.conn_state put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by}) else @@ -124,26 +116,23 @@ defmodule Pleroma.Pool.Connections do @impl true def handle_call({:checkin, uri}, from, state) do key = "#{uri.scheme}:#{uri.host}:#{uri.port}" - Logger.debug("checkin #{key}") case state.conns[key] do - %{conn: conn, gun_state: :up} = current_conn -> - Logger.debug("reusing conn #{key}") - + %{conn: pid, gun_state: :up} = conn -> time = :os.system_time(:second) - last_reference = time - current_conn.last_reference - current_crf = crf(last_reference, 100, current_conn.crf) + last_reference = time - conn.last_reference + crf = crf(last_reference, 100, conn.crf) state = put_in(state.conns[key], %{ - current_conn + conn | last_reference: time, - crf: current_crf, + crf: crf, conn_state: :active, - used_by: [from | current_conn.used_by] + used_by: [from | conn.used_by] }) - {:reply, conn, state} + {:reply, pid, state} %{gun_state: :down} -> {:reply, nil, state} @@ -165,50 +154,48 @@ defmodule Pleroma.Pool.Connections do def handle_call(:unused_conns, _from, state) do unused_conns = state.conns - |> Enum.filter(fn {_k, v} -> - v.conn_state == :idle and v.used_by == [] - end) - |> Enum.sort(fn {_x_k, x}, {_y_k, y} -> - x.crf <= y.crf and x.last_reference <= y.last_reference - end) + |> Enum.filter(&filter_conns/1) + |> Enum.sort(&sort_conns/2) {:reply, unused_conns, state} end + defp filter_conns({_, %{conn_state: :idle, used_by: []}}), do: true + defp filter_conns(_), do: false + + defp sort_conns({_, c1}, {_, c2}) do + c1.crf <= c2.crf and c1.last_reference <= c2.last_reference + end + @impl true def handle_info({:gun_up, conn_pid, _protocol}, state) do + %{origin_host: host, origin_scheme: scheme, origin_port: port} = Gun.info(conn_pid) + + host = + case :inet.ntoa(host) do + {:error, :einval} -> host + ip -> ip + end + + key = "#{scheme}:#{host}:#{port}" + state = - with conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid), - {key, conn} <- find_conn(state.conns, conn_pid, conn_key), + with {key, conn} <- find_conn(state.conns, conn_pid, key), {true, key} <- {Process.alive?(conn_pid), key} do - time = :os.system_time(:second) - last_reference = time - conn.last_reference - current_crf = crf(last_reference, 100, conn.crf) - put_in(state.conns[key], %{ conn | gun_state: :up, - last_reference: time, - crf: current_crf, conn_state: :active, retries: 0 }) else - :error_gun_info -> - Logger.debug(":gun.info caused error") - state - {false, key} -> - Logger.debug(":gun_up message for closed conn #{inspect(conn_pid)}") - put_in( state.conns, Map.delete(state.conns, key) ) nil -> - Logger.debug(":gun_up message for conn which is not found in state") - :ok = Gun.close(conn_pid) state @@ -225,7 +212,6 @@ defmodule Pleroma.Pool.Connections do with {key, conn} <- find_conn(state.conns, conn_pid), {true, key} <- {Process.alive?(conn_pid), key} do if conn.retries == retries do - Logger.debug("closing conn if retries is eq #{inspect(conn_pid)}") :ok = Gun.close(conn.conn) put_in( @@ -241,18 +227,13 @@ defmodule Pleroma.Pool.Connections do end else {false, key} -> - # gun can send gun_down for closed conn, maybe connection is not closed yet - Logger.debug(":gun_down message for closed conn #{inspect(conn_pid)}") - put_in( state.conns, Map.delete(state.conns, key) ) nil -> - Logger.debug(":gun_down message for conn which is not found in state") - - :ok = Gun.close(conn_pid) + Logger.debug(":gun_down for conn which isn't found in state") state end @@ -276,7 +257,7 @@ defmodule Pleroma.Pool.Connections do ) else nil -> - Logger.debug(":DOWN message for conn which is not found in state") + Logger.debug(":DOWN for conn which isn't found in state") state end @@ -284,23 +265,6 @@ defmodule Pleroma.Pool.Connections do {:noreply, state} end - defp compose_key_gun_info(pid) do - try do - # sometimes :gun.info can raise MatchError, which lead to pool terminate - %{origin_host: origin_host, origin_scheme: scheme, origin_port: port} = Gun.info(pid) - - host = - case :inet.ntoa(origin_host) do - {:error, :einval} -> origin_host - ip -> ip - end - - "#{scheme}:#{host}:#{port}" - rescue - _ -> :error_gun_info - end - end - defp find_conn(conns, conn_pid) do Enum.find(conns, fn {_key, conn} -> conn.conn == conn_pid @@ -316,8 +280,4 @@ defmodule Pleroma.Pool.Connections do def crf(current, steps, crf) do 1 + :math.pow(0.5, current / steps) * crf end - - def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do - "#{scheme}://#{host}#{path}" - end end