Apply suggestion to lib/pleroma/pool/connections.ex
[akkoma] / lib / pleroma / pool / connections.ex
index 1ed16d1c1e2b722fc4d920e083cebc111d09860d..c4c5fd66c20785d23bb0ed66e7e2b8ce2e6a1ecf 100644 (file)
@@ -5,6 +5,8 @@
 defmodule Pleroma.Pool.Connections do
   use GenServer
 
+  alias Pleroma.Config
+
   require Logger
 
   @type domain :: String.t()
@@ -18,7 +20,6 @@ defmodule Pleroma.Pool.Connections do
   defstruct conns: %{}, opts: []
 
   alias Pleroma.Gun.API
-  alias Pleroma.Gun.Conn
 
   @spec start_link({atom(), keyword()}) :: {:ok, pid()}
   def start_link({name, opts}) do
@@ -33,7 +34,7 @@ defmodule Pleroma.Pool.Connections do
   def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name)
 
   def checkin(%URI{} = uri, name) do
-    timeout = Pleroma.Config.get([:connections_pool, :receive_connection_timeout], 250)
+    timeout = Config.get([:connections_pool, :receive_connection_timeout], 250)
 
     GenServer.call(
       name,
@@ -42,24 +43,6 @@ defmodule Pleroma.Pool.Connections do
     )
   end
 
-  @spec open_conn(String.t() | URI.t(), atom(), keyword()) :: :ok
-  def open_conn(url, name, opts \\ [])
-  def open_conn(url, name, opts) when is_binary(url), do: open_conn(URI.parse(url), name, opts)
-
-  def open_conn(%URI{} = uri, name, opts) do
-    pool_opts = Pleroma.Config.get([:connections_pool], [])
-
-    opts =
-      opts
-      |> Enum.into(%{})
-      |> Map.put_new(:receive, false)
-      |> Map.put_new(:retry, pool_opts[:retry] || 5)
-      |> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 100)
-      |> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000)
-
-    GenServer.cast(name, {:open_conn, %{opts: opts, uri: uri}})
-  end
-
   @spec alive?(atom()) :: boolean()
   def alive?(name) do
     pid = Process.whereis(name)
@@ -71,23 +54,37 @@ defmodule Pleroma.Pool.Connections do
     GenServer.call(name, :state)
   end
 
+  @spec count(atom()) :: pos_integer()
+  def count(name) do
+    GenServer.call(name, :count)
+  end
+
+  @spec get_unused_conns(atom()) :: [{domain(), conn()}]
+  def get_unused_conns(name) do
+    GenServer.call(name, :unused_conns)
+  end
+
   @spec checkout(pid(), pid(), atom()) :: :ok
   def checkout(conn, pid, name) do
     GenServer.cast(name, {:checkout, conn, pid})
   end
 
-  @impl true
-  def handle_cast({:open_conn, %{opts: opts, uri: uri}}, state) do
-    Logger.debug("opening new #{compose_uri(uri)}")
-    max_connections = state.opts[:max_connections]
+  @spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok
+  def add_conn(name, key, conn) do
+    GenServer.cast(name, {:add_conn, key, conn})
+  end
 
-    key = compose_key(uri)
+  @spec remove_conn(atom(), String.t()) :: :ok
+  def remove_conn(name, key) do
+    GenServer.cast(name, {:remove_conn, key})
+  end
 
-    if Enum.count(state.conns) < max_connections do
-      open_conn(key, uri, state, opts)
-    else
-      try_to_open_conn(key, uri, state, opts)
-    end
+  @impl true
+  def handle_cast({:add_conn, key, conn}, state) do
+    state = put_in(state.conns[key], conn)
+
+    Process.monitor(conn.conn)
+    {:noreply, state}
   end
 
   @impl true
@@ -108,25 +105,31 @@ defmodule Pleroma.Pool.Connections do
         put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
       else
         false ->
-          Logger.warn("checkout for closed conn #{inspect(conn_pid)}")
+          Logger.debug("checkout for closed conn #{inspect(conn_pid)}")
           state
 
         nil ->
-          Logger.info("checkout for alive conn #{inspect(conn_pid)}, but is not in state")
+          Logger.debug("checkout for alive conn #{inspect(conn_pid)}, but is not in state")
           state
       end
 
     {:noreply, state}
   end
 
+  @impl true
+  def handle_cast({:remove_conn, key}, state) do
+    state = put_in(state.conns, Map.delete(state.conns, key))
+    {:noreply, state}
+  end
+
   @impl true
   def handle_call({:checkin, uri}, from, state) do
-    Logger.debug("checkin #{compose_uri(uri)}")
-    key = compose_key(uri)
+    key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
+    Logger.debug("checkin #{key}")
 
     case state.conns[key] do
-      %{conn: conn, gun_state: gun_state} = current_conn when gun_state == :up ->
-        Logger.debug("reusing conn #{compose_uri(uri)}")
+      %{conn: conn, gun_state: :up} = current_conn ->
+        Logger.debug("reusing conn #{key}")
 
         with time <- :os.system_time(:second),
              last_reference <- time - current_conn.last_reference,
@@ -142,7 +145,7 @@ defmodule Pleroma.Pool.Connections do
           {:reply, conn, state}
         end
 
-      %{gun_state: gun_state} when gun_state == :down ->
+      %{gun_state: :down} ->
         {:reply, nil, state}
 
       nil ->
@@ -153,12 +156,31 @@ defmodule Pleroma.Pool.Connections do
   @impl true
   def handle_call(:state, _from, state), do: {:reply, state, state}
 
+  @impl true
+  def handle_call(:count, _from, state) do
+    {:reply, Enum.count(state.conns), state}
+  end
+
+  @impl true
+  def handle_call(:unused_conns, _from, state) do
+    unused_conns =
+      state.conns
+      |> Enum.filter(fn {_k, v} ->
+        v.conn_state == :idle and v.used_by == []
+      end)
+      |> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
+        x.crf <= y.crf and x.last_reference <= y.last_reference
+      end)
+
+    {:reply, unused_conns, state}
+  end
+
   @impl true
   def handle_info({:gun_up, conn_pid, _protocol}, state) do
     state =
-      with true <- Process.alive?(conn_pid),
-           conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid),
+      with conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid),
            {key, conn} <- find_conn(state.conns, conn_pid, conn_key),
+           {true, key} <- {Process.alive?(conn_pid), key},
            time <- :os.system_time(:second),
            last_reference <- time - conn.last_reference,
            current_crf <- crf(last_reference, 100, conn.crf) do
@@ -172,18 +194,20 @@ defmodule Pleroma.Pool.Connections do
         })
       else
         :error_gun_info ->
-          Logger.warn(":gun.info caused error")
+          Logger.debug(":gun.info caused error")
           state
 
-        false ->
-          Logger.warn(":gun_up message for closed conn #{inspect(conn_pid)}")
-          state
+        {false, key} ->
+          Logger.debug(":gun_up message for closed conn #{inspect(conn_pid)}")
 
-        nil ->
-          Logger.warn(
-            ":gun_up message for alive conn #{inspect(conn_pid)}, but deleted from state"
+          put_in(
+            state.conns,
+            Map.delete(state.conns, key)
           )
 
+        nil ->
+          Logger.debug(":gun_up message for conn which is not found in state")
+
           :ok = API.close(conn_pid)
 
           state
@@ -194,12 +218,13 @@ defmodule Pleroma.Pool.Connections do
 
   @impl true
   def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do
+    retries = Config.get([:connections_pool, :retry], 0)
     # we can't get info on this pid, because pid is dead
     state =
-      with true <- Process.alive?(conn_pid),
-           {key, conn} <- find_conn(state.conns, conn_pid) do
-        if conn.retries == 5 do
-          Logger.debug("closing conn if retries is eq 5 #{inspect(conn_pid)}")
+      with {key, conn} <- find_conn(state.conns, conn_pid),
+           {true, key} <- {Process.alive?(conn_pid), key} do
+        if conn.retries == retries do
+          Logger.debug("closing conn if retries is eq  #{inspect(conn_pid)}")
           :ok = API.close(conn.conn)
 
           put_in(
@@ -214,16 +239,18 @@ defmodule Pleroma.Pool.Connections do
           })
         end
       else
-        false ->
+        {false, key} ->
           # gun can send gun_down for closed conn, maybe connection is not closed yet
-          Logger.warn(":gun_down message for closed conn #{inspect(conn_pid)}")
-          state
+          Logger.debug(":gun_down message for closed conn #{inspect(conn_pid)}")
 
-        nil ->
-          Logger.warn(
-            ":gun_down message for alive conn #{inspect(conn_pid)}, but deleted from state"
+          put_in(
+            state.conns,
+            Map.delete(state.conns, key)
           )
 
+        nil ->
+          Logger.debug(":gun_down message for conn which is not found in state")
+
           :ok = API.close(conn_pid)
 
           state
@@ -232,7 +259,29 @@ defmodule Pleroma.Pool.Connections do
     {:noreply, state}
   end
 
-  defp compose_key(%URI{scheme: scheme, host: host, port: port}), do: "#{scheme}:#{host}:#{port}"
+  @impl true
+  def handle_info({:DOWN, _ref, :process, conn_pid, reason}, state) do
+    Logger.debug("received DOWM message for #{inspect(conn_pid)} reason -> #{inspect(reason)}")
+
+    state =
+      with {key, conn} <- find_conn(state.conns, conn_pid) do
+        Enum.each(conn.used_by, fn {pid, _ref} ->
+          Process.exit(pid, reason)
+        end)
+
+        put_in(
+          state.conns,
+          Map.delete(state.conns, key)
+        )
+      else
+        nil ->
+          Logger.debug(":DOWN message for conn which is not found in state")
+
+          state
+      end
+
+    {:noreply, state}
+  end
 
   defp compose_key_gun_info(pid) do
     try do
@@ -263,153 +312,11 @@ defmodule Pleroma.Pool.Connections do
     end)
   end
 
-  defp open_conn(key, uri, state, %{proxy: {proxy_host, proxy_port}} = opts) do
-    connect_opts =
-      uri
-      |> destination_opts()
-      |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
-
-    with open_opts <- Map.delete(opts, :tls_opts),
-         {:ok, conn} <- API.open(proxy_host, proxy_port, open_opts),
-         {:ok, _} <- API.await_up(conn),
-         stream <- API.connect(conn, connect_opts),
-         {:response, :fin, 200, _} <- API.await(conn, stream),
-         state <-
-           put_in(state.conns[key], %Conn{
-             conn: conn,
-             gun_state: :up,
-             conn_state: :active,
-             last_reference: :os.system_time(:second)
-           }) do
-      {:noreply, state}
-    else
-      error ->
-        Logger.warn(
-          "Received error on opening connection with http proxy #{uri.scheme}://#{
-            compose_uri(uri)
-          }: #{inspect(error)}"
-        )
-
-        {:noreply, state}
-    end
-  end
-
-  defp open_conn(key, uri, state, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
-    version =
-      proxy_type
-      |> to_string()
-      |> String.last()
-      |> case do
-        "4" -> 4
-        _ -> 5
-      end
-
-    socks_opts =
-      uri
-      |> destination_opts()
-      |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
-      |> Map.put(:version, version)
-
-    opts =
-      opts
-      |> Map.put(:protocols, [:socks])
-      |> Map.put(:socks_opts, socks_opts)
-
-    with {:ok, conn} <- API.open(proxy_host, proxy_port, opts),
-         {:ok, _} <- API.await_up(conn),
-         state <-
-           put_in(state.conns[key], %Conn{
-             conn: conn,
-             gun_state: :up,
-             conn_state: :active,
-             last_reference: :os.system_time(:second)
-           }) do
-      {:noreply, state}
-    else
-      error ->
-        Logger.warn(
-          "Received error on opening connection with socks proxy #{uri.scheme}://#{
-            compose_uri(uri)
-          }: #{inspect(error)}"
-        )
-
-        {:noreply, state}
-    end
-  end
-
-  defp open_conn(key, %URI{host: host, port: port} = uri, state, opts) do
-    Logger.debug("opening conn #{compose_uri(uri)}")
-    {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
-
-    with {:ok, conn} <- API.open(host, port, opts),
-         {:ok, _} <- API.await_up(conn),
-         state <-
-           put_in(state.conns[key], %Conn{
-             conn: conn,
-             gun_state: :up,
-             conn_state: :active,
-             last_reference: :os.system_time(:second)
-           }) do
-      Logger.debug("new conn opened #{compose_uri(uri)}")
-      Logger.debug("replying to the call #{compose_uri(uri)}")
-      {:noreply, state}
-    else
-      error ->
-        Logger.warn(
-          "Received error on opening connection #{uri.scheme}://#{compose_uri(uri)}: #{
-            inspect(error)
-          }"
-        )
-
-        {:noreply, state}
-    end
-  end
-
-  defp destination_opts(%URI{host: host, port: port}) do
-    {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
-    %{host: host, port: port}
-  end
-
-  defp add_http2_opts(opts, "https", tls_opts) do
-    Map.merge(opts, %{protocols: [:http2], transport: :tls, tls_opts: tls_opts})
-  end
-
-  defp add_http2_opts(opts, _, _), do: opts
-
-  @spec get_unused_conns(map()) :: [{domain(), conn()}]
-  def get_unused_conns(conns) do
-    conns
-    |> Enum.filter(fn {_k, v} ->
-      v.conn_state == :idle and v.used_by == []
-    end)
-    |> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
-      x.crf <= y.crf and x.last_reference <= y.last_reference
-    end)
-  end
-
-  defp try_to_open_conn(key, uri, state, opts) do
-    Logger.debug("try to open conn #{compose_uri(uri)}")
-
-    with [{close_key, least_used} | _conns] <- get_unused_conns(state.conns),
-         :ok <- API.close(least_used.conn),
-         state <-
-           put_in(
-             state.conns,
-             Map.delete(state.conns, close_key)
-           ) do
-      Logger.debug(
-        "least used conn found and closed #{inspect(least_used.conn)} #{compose_uri(uri)}"
-      )
-
-      open_conn(key, uri, state, opts)
-    else
-      [] -> {:noreply, state}
-    end
-  end
-
   def crf(current, steps, crf) do
     1 + :math.pow(0.5, current / steps) * crf
   end
 
-  def compose_uri(%URI{} = uri), do: "#{uri.host}#{uri.path}"
+  def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do
+    "#{scheme}://#{host}#{path}"
+  end
 end