open conn in separate task
authorAlexander Strizhakov <alex.strizhakov@gmail.com>
Mon, 24 Feb 2020 16:56:27 +0000 (19:56 +0300)
committerAlexander Strizhakov <alex.strizhakov@gmail.com>
Mon, 24 Feb 2020 16:56:27 +0000 (19:56 +0300)
12 files changed:
lib/mix/tasks/pleroma/benchmark.ex
lib/pleroma/gun/api.ex
lib/pleroma/gun/api/mock.ex
lib/pleroma/gun/conn.ex
lib/pleroma/gun/gun.ex
lib/pleroma/http/adapter/gun.ex
lib/pleroma/pool/connections.ex
restarter/lib/pleroma.ex
test/gun/gun_test.exs
test/http/adapter/gun_test.exs
test/http/connection_test.exs
test/pool/connections_test.exs

index 01e079136ae1d1eb690ee9bc6ed481fc7f58d8bc..7a743028933422e6708f5b4b7a1db5b91c49aa3c 100644 (file)
@@ -79,7 +79,7 @@ defmodule Mix.Tasks.Pleroma.Benchmark do
     start_pleroma()
 
     :ok =
-      Pleroma.Pool.Connections.open_conn(
+      Pleroma.Gun.Conn.open(
         "https://httpbin.org/stream-bytes/1500",
         :gun_connections
       )
index a0c3c5415a58ed2b16651e12fee582f42eb13b7f..f79c9f4434c69823509bf8d79122090c8da4493b 100644 (file)
@@ -6,9 +6,10 @@ defmodule Pleroma.Gun.API do
   @callback open(charlist(), pos_integer(), map()) :: {:ok, pid()}
   @callback info(pid()) :: map()
   @callback close(pid()) :: :ok
-  @callback await_up(pid) :: {:ok, atom()} | {:error, atom()}
+  @callback await_up(pid, pos_integer()) :: {:ok, atom()} | {:error, atom()}
   @callback connect(pid(), map()) :: reference()
   @callback await(pid(), reference()) :: {:response, :fin, 200, []}
+  @callback set_owner(pid(), pid()) :: :ok
 
   def open(host, port, opts), do: api().open(host, port, opts)
 
@@ -16,11 +17,13 @@ defmodule Pleroma.Gun.API do
 
   def close(pid), do: api().close(pid)
 
-  def await_up(pid), do: api().await_up(pid)
+  def await_up(pid, timeout \\ 5_000), do: api().await_up(pid, timeout)
 
   def connect(pid, opts), do: api().connect(pid, opts)
 
   def await(pid, ref), do: api().await(pid, ref)
 
+  def set_owner(pid, owner), do: api().set_owner(pid, owner)
+
   defp api, do: Pleroma.Config.get([Pleroma.Gun.API], Pleroma.Gun)
 end
index 0134b016ea836c7d7cfc0915a7815ccb76ea13a0..6d24b0e69afbd1fb4b83d0b2177a8a778e77476e 100644 (file)
@@ -118,7 +118,10 @@ defmodule Pleroma.Gun.API.Mock do
   end
 
   @impl API
-  def await_up(_pid), do: {:ok, :http}
+  def await_up(_pid, _timeout), do: {:ok, :http}
+
+  @impl API
+  def set_owner(_pid, _owner), do: :ok
 
   @impl API
   def connect(pid, %{host: _, port: 80}) do
index 2474829d6561b9f5fb3af6d0602a1c45e46094dc..ddb9f30b024153fbe91875020baeb5abba1a5396 100644 (file)
@@ -6,6 +6,11 @@ defmodule Pleroma.Gun.Conn do
   @moduledoc """
   Struct for gun connection data
   """
+  alias Pleroma.Gun.API
+  alias Pleroma.Pool.Connections
+
+  require Logger
+
   @type gun_state :: :up | :down
   @type conn_state :: :active | :idle
 
@@ -26,4 +31,145 @@ defmodule Pleroma.Gun.Conn do
             last_reference: 0,
             crf: 1,
             retries: 0
+
+  @spec open(String.t() | URI.t(), atom(), keyword()) :: :ok | nil
+  def open(url, name, opts \\ [])
+  def open(url, name, opts) when is_binary(url), do: open(URI.parse(url), name, opts)
+
+  def open(%URI{} = uri, name, opts) do
+    pool_opts = Pleroma.Config.get([:connections_pool], [])
+
+    opts =
+      opts
+      |> Enum.into(%{})
+      |> Map.put_new(:retry, pool_opts[:retry] || 0)
+      |> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 100)
+      |> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000)
+
+    key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
+
+    Logger.debug("opening new connection #{Connections.compose_uri_log(uri)}")
+
+    conn_pid =
+      if Connections.count(name) < opts[:max_connection] do
+        do_open(uri, opts)
+      else
+        try_do_open(name, uri, opts)
+      end
+
+    if is_pid(conn_pid) do
+      conn = %Pleroma.Gun.Conn{
+        conn: conn_pid,
+        gun_state: :up,
+        conn_state: :active,
+        last_reference: :os.system_time(:second)
+      }
+
+      :ok = API.set_owner(conn_pid, Process.whereis(name))
+      Connections.add_conn(name, key, conn)
+    end
+  end
+
+  defp do_open(uri, %{proxy: {proxy_host, proxy_port}} = opts) do
+    connect_opts =
+      uri
+      |> destination_opts()
+      |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
+
+    with open_opts <- Map.delete(opts, :tls_opts),
+         {:ok, conn} <- API.open(proxy_host, proxy_port, open_opts),
+         {:ok, _} <- API.await_up(conn, opts[:await_up_timeout]),
+         stream <- API.connect(conn, connect_opts),
+         {:response, :fin, 200, _} <- API.await(conn, stream) do
+      conn
+    else
+      error ->
+        Logger.warn(
+          "Received error on opening connection with http proxy #{
+            Connections.compose_uri_log(uri)
+          } #{inspect(error)}"
+        )
+
+        nil
+    end
+  end
+
+  defp do_open(uri, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
+    version =
+      proxy_type
+      |> to_string()
+      |> String.last()
+      |> case do
+        "4" -> 4
+        _ -> 5
+      end
+
+    socks_opts =
+      uri
+      |> destination_opts()
+      |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
+      |> Map.put(:version, version)
+
+    opts =
+      opts
+      |> Map.put(:protocols, [:socks])
+      |> Map.put(:socks_opts, socks_opts)
+
+    with {:ok, conn} <- API.open(proxy_host, proxy_port, opts),
+         {:ok, _} <- API.await_up(conn, opts[:await_up_timeout]) do
+      conn
+    else
+      error ->
+        Logger.warn(
+          "Received error on opening connection with socks proxy #{
+            Connections.compose_uri_log(uri)
+          } #{inspect(error)}"
+        )
+
+        nil
+    end
+  end
+
+  defp do_open(%URI{host: host, port: port} = uri, opts) do
+    {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
+
+    with {:ok, conn} <- API.open(host, port, opts),
+         {:ok, _} <- API.await_up(conn, opts[:await_up_timeout]) do
+      conn
+    else
+      error ->
+        Logger.warn(
+          "Received error on opening connection #{Connections.compose_uri_log(uri)} #{
+            inspect(error)
+          }"
+        )
+
+        nil
+    end
+  end
+
+  defp destination_opts(%URI{host: host, port: port}) do
+    {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
+    %{host: host, port: port}
+  end
+
+  defp add_http2_opts(opts, "https", tls_opts) do
+    Map.merge(opts, %{protocols: [:http2], transport: :tls, tls_opts: tls_opts})
+  end
+
+  defp add_http2_opts(opts, _, _), do: opts
+
+  defp try_do_open(name, uri, opts) do
+    Logger.debug("try to open conn #{Connections.compose_uri_log(uri)}")
+
+    with [{close_key, least_used} | _conns] <-
+           Connections.get_unused_conns(name),
+         :ok <- Pleroma.Gun.API.close(least_used.conn) do
+      Connections.remove_conn(name, close_key)
+
+      do_open(uri, opts)
+    else
+      [] -> nil
+    end
+  end
 end
index 4a1bbc95f6f9156dd4e39c52a5c6d22da59d24a3..da82983b1eed79db5c31f163956b4365e9e4571a 100644 (file)
@@ -32,7 +32,7 @@ defmodule Pleroma.Gun do
   defdelegate close(pid), to: :gun
 
   @impl API
-  defdelegate await_up(pid), to: :gun
+  defdelegate await_up(pid, timeout \\ 5_000), to: :gun
 
   @impl API
   defdelegate connect(pid, opts), to: :gun
@@ -42,4 +42,7 @@ defmodule Pleroma.Gun do
 
   @spec flush(pid() | reference()) :: :ok
   defdelegate flush(pid), to: :gun
+
+  @impl API
+  defdelegate set_owner(pid, owner), to: :gun
 end
index 7b7e38d8c39be9d25051b805c951323a211fac2f..908d71898e6ed7abd93052f9e1a06f8e454e7871 100644 (file)
@@ -12,7 +12,7 @@ defmodule Pleroma.HTTP.Adapter.Gun do
   alias Pleroma.Pool.Connections
 
   @defaults [
-    connect_timeout: 20_000,
+    connect_timeout: 5_000,
     domain_lookup_timeout: 5_000,
     tls_handshake_timeout: 5_000,
     retry: 0,
@@ -94,13 +94,11 @@ defmodule Pleroma.HTTP.Adapter.Gun do
             "Gun connections pool checkin was not successful. Trying to open conn for next request."
           )
 
-          :ok = Connections.open_conn(uri, :gun_connections, opts)
+          Task.start(fn -> Pleroma.Gun.Conn.open(uri, :gun_connections, opts) end)
           opts
 
         conn when is_pid(conn) ->
-          Logger.debug(
-            "received conn #{inspect(conn)} #{uri.scheme}://#{Connections.compose_uri(uri)}"
-          )
+          Logger.debug("received conn #{inspect(conn)} #{Connections.compose_uri_log(uri)}")
 
           opts
           |> Keyword.put(:conn, conn)
@@ -109,13 +107,14 @@ defmodule Pleroma.HTTP.Adapter.Gun do
     rescue
       error ->
         Logger.warn(
-          "Gun connections pool checkin caused error #{uri.scheme}://#{
-            Connections.compose_uri(uri)
-          } #{inspect(error)}"
+          "Gun connections pool checkin caused error #{Connections.compose_uri_log(uri)} #{
+            inspect(error)
+          }"
         )
 
         opts
     catch
+      # TODO: here must be no timeouts
       :exit, {:timeout, {_, operation, [_, {method, _}, _]}} ->
         {:message_queue_len, messages_len} =
           :gun_connections
@@ -124,15 +123,15 @@ defmodule Pleroma.HTTP.Adapter.Gun do
 
         Logger.warn(
           "Gun connections pool checkin with timeout error for #{operation} #{method} #{
-            uri.scheme
-          }://#{Connections.compose_uri(uri)}. Messages length: #{messages_len}"
+            Connections.compose_uri_log(uri)
+          }. Messages length: #{messages_len}"
         )
 
         opts
 
       :exit, error ->
         Logger.warn(
-          "Gun pool checkin exited with error #{uri.scheme}://#{Connections.compose_uri(uri)} #{
+          "Gun pool checkin exited with error #{Connections.compose_uri_log(uri)} #{
             inspect(error)
           }"
         )
index d20927580af1f63c2890c25f37c19b69fc19859b..a444f822ff97ab0f8357271e5d70309302fc87bc 100644 (file)
@@ -20,7 +20,6 @@ defmodule Pleroma.Pool.Connections do
   defstruct conns: %{}, opts: []
 
   alias Pleroma.Gun.API
-  alias Pleroma.Gun.Conn
 
   @spec start_link({atom(), keyword()}) :: {:ok, pid()}
   def start_link({name, opts}) do
@@ -44,23 +43,6 @@ defmodule Pleroma.Pool.Connections do
     )
   end
 
-  @spec open_conn(String.t() | URI.t(), atom(), keyword()) :: :ok
-  def open_conn(url, name, opts \\ [])
-  def open_conn(url, name, opts) when is_binary(url), do: open_conn(URI.parse(url), name, opts)
-
-  def open_conn(%URI{} = uri, name, opts) do
-    pool_opts = Config.get([:connections_pool], [])
-
-    opts =
-      opts
-      |> Enum.into(%{})
-      |> Map.put_new(:retry, pool_opts[:retry] || 0)
-      |> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 100)
-      |> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000)
-
-    GenServer.cast(name, {:open_conn, %{opts: opts, uri: uri}})
-  end
-
   @spec alive?(atom()) :: boolean()
   def alive?(name) do
     pid = Process.whereis(name)
@@ -72,23 +54,37 @@ defmodule Pleroma.Pool.Connections do
     GenServer.call(name, :state)
   end
 
+  @spec count(atom()) :: pos_integer()
+  def count(name) do
+    GenServer.call(name, :count)
+  end
+
+  @spec get_unused_conns(atom()) :: [{domain(), conn()}]
+  def get_unused_conns(name) do
+    GenServer.call(name, :unused_conns)
+  end
+
   @spec checkout(pid(), pid(), atom()) :: :ok
   def checkout(conn, pid, name) do
     GenServer.cast(name, {:checkout, conn, pid})
   end
 
-  @impl true
-  def handle_cast({:open_conn, %{opts: opts, uri: uri}}, state) do
-    Logger.debug("opening new #{compose_uri(uri)}")
-    max_connections = state.opts[:max_connections]
+  @spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok
+  def add_conn(name, key, conn) do
+    GenServer.cast(name, {:add_conn, key, conn})
+  end
 
-    key = compose_key(uri)
+  @spec remove_conn(atom(), String.t()) :: :ok
+  def remove_conn(name, key) do
+    GenServer.cast(name, {:remove_conn, key})
+  end
 
-    if Enum.count(state.conns) < max_connections do
-      open_conn(key, uri, state, opts)
-    else
-      try_to_open_conn(key, uri, state, opts)
-    end
+  @impl true
+  def handle_cast({:add_conn, key, conn}, state) do
+    state = put_in(state.conns[key], conn)
+
+    Process.monitor(conn.conn)
+    {:noreply, state}
   end
 
   @impl true
@@ -120,14 +116,20 @@ defmodule Pleroma.Pool.Connections do
     {:noreply, state}
   end
 
+  @impl true
+  def handle_cast({:remove_conn, key}, state) do
+    state = put_in(state.conns, Map.delete(state.conns, key))
+    {:noreply, state}
+  end
+
   @impl true
   def handle_call({:checkin, uri}, from, state) do
-    Logger.debug("checkin #{compose_uri(uri)}")
-    key = compose_key(uri)
+    key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
+    Logger.debug("checkin #{key}")
 
     case state.conns[key] do
       %{conn: conn, gun_state: gun_state} = current_conn when gun_state == :up ->
-        Logger.debug("reusing conn #{compose_uri(uri)}")
+        Logger.debug("reusing conn #{key}")
 
         with time <- :os.system_time(:second),
              last_reference <- time - current_conn.last_reference,
@@ -154,12 +156,31 @@ defmodule Pleroma.Pool.Connections do
   @impl true
   def handle_call(:state, _from, state), do: {:reply, state, state}
 
+  @impl true
+  def handle_call(:count, _from, state) do
+    {:reply, Enum.count(state.conns), state}
+  end
+
+  @impl true
+  def handle_call(:unused_conns, _from, state) do
+    unused_conns =
+      state.conns
+      |> Enum.filter(fn {_k, v} ->
+        v.conn_state == :idle and v.used_by == []
+      end)
+      |> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
+        x.crf <= y.crf and x.last_reference <= y.last_reference
+      end)
+
+    {:reply, unused_conns, state}
+  end
+
   @impl true
   def handle_info({:gun_up, conn_pid, _protocol}, state) do
     state =
-      with true <- Process.alive?(conn_pid),
-           conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid),
+      with conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid),
            {key, conn} <- find_conn(state.conns, conn_pid, conn_key),
+           {true, key} <- {Process.alive?(conn_pid), key},
            time <- :os.system_time(:second),
            last_reference <- time - conn.last_reference,
            current_crf <- crf(last_reference, 100, conn.crf) do
@@ -176,15 +197,17 @@ defmodule Pleroma.Pool.Connections do
           Logger.debug(":gun.info caused error")
           state
 
-        false ->
+        {false, key} ->
           Logger.debug(":gun_up message for closed conn #{inspect(conn_pid)}")
-          state
 
-        nil ->
-          Logger.debug(
-            ":gun_up message for alive conn #{inspect(conn_pid)}, but deleted from state"
+          put_in(
+            state.conns,
+            Map.delete(state.conns, key)
           )
 
+        nil ->
+          Logger.debug(":gun_up message for conn which is not found in state")
+
           :ok = API.close(conn_pid)
 
           state
@@ -198,8 +221,8 @@ defmodule Pleroma.Pool.Connections do
     retries = Config.get([:connections_pool, :retry], 0)
     # we can't get info on this pid, because pid is dead
     state =
-      with true <- Process.alive?(conn_pid),
-           {key, conn} <- find_conn(state.conns, conn_pid) do
+      with {key, conn} <- find_conn(state.conns, conn_pid),
+           {true, key} <- {Process.alive?(conn_pid), key} do
         if conn.retries == retries do
           Logger.debug("closing conn if retries is eq  #{inspect(conn_pid)}")
           :ok = API.close(conn.conn)
@@ -216,16 +239,18 @@ defmodule Pleroma.Pool.Connections do
           })
         end
       else
-        false ->
+        {false, key} ->
           # gun can send gun_down for closed conn, maybe connection is not closed yet
           Logger.debug(":gun_down message for closed conn #{inspect(conn_pid)}")
-          state
 
-        nil ->
-          Logger.debug(
-            ":gun_down message for alive conn #{inspect(conn_pid)}, but deleted from state"
+          put_in(
+            state.conns,
+            Map.delete(state.conns, key)
           )
 
+        nil ->
+          Logger.debug(":gun_down message for conn which is not found in state")
+
           :ok = API.close(conn_pid)
 
           state
@@ -234,7 +259,29 @@ defmodule Pleroma.Pool.Connections do
     {:noreply, state}
   end
 
-  defp compose_key(%URI{scheme: scheme, host: host, port: port}), do: "#{scheme}:#{host}:#{port}"
+  @impl true
+  def handle_info({:DOWN, _ref, :process, conn_pid, reason}, state) do
+    Logger.debug("received DOWM message for #{inspect(conn_pid)} reason -> #{inspect(reason)}")
+
+    state =
+      with {key, conn} <- find_conn(state.conns, conn_pid) do
+        Enum.each(conn.used_by, fn {pid, _ref} ->
+          Process.exit(pid, reason)
+        end)
+
+        put_in(
+          state.conns,
+          Map.delete(state.conns, key)
+        )
+      else
+        nil ->
+          Logger.debug(":DOWN message for conn which is not found in state")
+
+          state
+      end
+
+    {:noreply, state}
+  end
 
   defp compose_key_gun_info(pid) do
     try do
@@ -265,153 +312,11 @@ defmodule Pleroma.Pool.Connections do
     end)
   end
 
-  defp open_conn(key, uri, state, %{proxy: {proxy_host, proxy_port}} = opts) do
-    connect_opts =
-      uri
-      |> destination_opts()
-      |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
-
-    with open_opts <- Map.delete(opts, :tls_opts),
-         {:ok, conn} <- API.open(proxy_host, proxy_port, open_opts),
-         {:ok, _} <- API.await_up(conn),
-         stream <- API.connect(conn, connect_opts),
-         {:response, :fin, 200, _} <- API.await(conn, stream),
-         state <-
-           put_in(state.conns[key], %Conn{
-             conn: conn,
-             gun_state: :up,
-             conn_state: :active,
-             last_reference: :os.system_time(:second)
-           }) do
-      {:noreply, state}
-    else
-      error ->
-        Logger.warn(
-          "Received error on opening connection with http proxy #{uri.scheme}://#{
-            compose_uri(uri)
-          }: #{inspect(error)}"
-        )
-
-        {:noreply, state}
-    end
-  end
-
-  defp open_conn(key, uri, state, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
-    version =
-      proxy_type
-      |> to_string()
-      |> String.last()
-      |> case do
-        "4" -> 4
-        _ -> 5
-      end
-
-    socks_opts =
-      uri
-      |> destination_opts()
-      |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
-      |> Map.put(:version, version)
-
-    opts =
-      opts
-      |> Map.put(:protocols, [:socks])
-      |> Map.put(:socks_opts, socks_opts)
-
-    with {:ok, conn} <- API.open(proxy_host, proxy_port, opts),
-         {:ok, _} <- API.await_up(conn),
-         state <-
-           put_in(state.conns[key], %Conn{
-             conn: conn,
-             gun_state: :up,
-             conn_state: :active,
-             last_reference: :os.system_time(:second)
-           }) do
-      {:noreply, state}
-    else
-      error ->
-        Logger.warn(
-          "Received error on opening connection with socks proxy #{uri.scheme}://#{
-            compose_uri(uri)
-          }: #{inspect(error)}"
-        )
-
-        {:noreply, state}
-    end
-  end
-
-  defp open_conn(key, %URI{host: host, port: port} = uri, state, opts) do
-    Logger.debug("opening conn #{compose_uri(uri)}")
-    {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
-
-    with {:ok, conn} <- API.open(host, port, opts),
-         {:ok, _} <- API.await_up(conn),
-         state <-
-           put_in(state.conns[key], %Conn{
-             conn: conn,
-             gun_state: :up,
-             conn_state: :active,
-             last_reference: :os.system_time(:second)
-           }) do
-      Logger.debug("new conn opened #{compose_uri(uri)}")
-      Logger.debug("replying to the call #{compose_uri(uri)}")
-      {:noreply, state}
-    else
-      error ->
-        Logger.warn(
-          "Received error on opening connection #{uri.scheme}://#{compose_uri(uri)}: #{
-            inspect(error)
-          }"
-        )
-
-        {:noreply, state}
-    end
-  end
-
-  defp destination_opts(%URI{host: host, port: port}) do
-    {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
-    %{host: host, port: port}
-  end
-
-  defp add_http2_opts(opts, "https", tls_opts) do
-    Map.merge(opts, %{protocols: [:http2], transport: :tls, tls_opts: tls_opts})
-  end
-
-  defp add_http2_opts(opts, _, _), do: opts
-
-  @spec get_unused_conns(map()) :: [{domain(), conn()}]
-  def get_unused_conns(conns) do
-    conns
-    |> Enum.filter(fn {_k, v} ->
-      v.conn_state == :idle and v.used_by == []
-    end)
-    |> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
-      x.crf <= y.crf and x.last_reference <= y.last_reference
-    end)
-  end
-
-  defp try_to_open_conn(key, uri, state, opts) do
-    Logger.debug("try to open conn #{compose_uri(uri)}")
-
-    with [{close_key, least_used} | _conns] <- get_unused_conns(state.conns),
-         :ok <- API.close(least_used.conn),
-         state <-
-           put_in(
-             state.conns,
-             Map.delete(state.conns, close_key)
-           ) do
-      Logger.debug(
-        "least used conn found and closed #{inspect(least_used.conn)} #{compose_uri(uri)}"
-      )
-
-      open_conn(key, uri, state, opts)
-    else
-      [] -> {:noreply, state}
-    end
-  end
-
   def crf(current, steps, crf) do
     1 + :math.pow(0.5, current / steps) * crf
   end
 
-  def compose_uri(%URI{} = uri), do: "#{uri.host}#{uri.path}"
+  def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do
+    "#{scheme}://#{host}#{path}"
+  end
 end
index d7817909d8cbdc586651d8b168bfdea75a3386c2..4ade890f9c0d79214c347233b3abed6f4d1e4737 100644 (file)
@@ -44,7 +44,7 @@ defmodule Restarter.Pleroma do
   end
 
   def handle_cast({:restart, :test, _}, state) do
-    Logger.warn("pleroma restarted")
+    Logger.warn("pleroma manually restarted")
     {:noreply, Map.put(state, :need_reboot?, false)}
   end
 
@@ -57,7 +57,7 @@ defmodule Restarter.Pleroma do
   def handle_cast({:after_boot, _}, %{after_boot: true} = state), do: {:noreply, state}
 
   def handle_cast({:after_boot, :test}, state) do
-    Logger.warn("pleroma restarted")
+    Logger.warn("pleroma restarted after boot")
     {:noreply, Map.put(state, :after_boot, true)}
   end
 
index 7f185617ce38766eeb25084b319b9a716594f735..9f3e0f93817924039af899198eb5c932d049f470 100644 (file)
@@ -19,6 +19,12 @@ defmodule Pleroma.GunTest do
     assert json = receive_response(conn, ref)
 
     assert %{"args" => %{"a" => "b", "c" => "d"}} = Jason.decode!(json)
+
+    {:ok, pid} = Task.start(fn -> Process.sleep(50) end)
+
+    :ok = :gun.set_owner(conn, pid)
+
+    assert :gun.info(conn).owner == pid
   end
 
   defp receive_response(conn, ref, acc \\ "") do
index ef1b4a8821c0ea9ab62543528d7e0b38dcea70e3..a8dcbae04e03ed7ec3857890a67e5ca182b04a74 100644 (file)
@@ -7,6 +7,7 @@ defmodule Pleroma.HTTP.Adapter.GunTest do
   use Pleroma.Tests.Helpers
   import ExUnit.CaptureLog
   alias Pleroma.Config
+  alias Pleroma.Gun.Conn
   alias Pleroma.HTTP.Adapter.Gun
   alias Pleroma.Pool.Connections
 
@@ -72,7 +73,7 @@ defmodule Pleroma.HTTP.Adapter.GunTest do
 
     test "receive conn by default" do
       uri = URI.parse("http://another-domain.com")
-      :ok = Connections.open_conn(uri, :gun_connections)
+      :ok = Conn.open(uri, :gun_connections)
 
       received_opts = Gun.options(uri)
       assert received_opts[:close_conn] == false
@@ -81,7 +82,7 @@ defmodule Pleroma.HTTP.Adapter.GunTest do
 
     test "don't receive conn if receive_conn is false" do
       uri = URI.parse("http://another-domain2.com")
-      :ok = Connections.open_conn(uri, :gun_connections)
+      :ok = Conn.open(uri, :gun_connections)
 
       opts = [receive_conn: false]
       received_opts = Gun.options(opts, uri)
@@ -118,7 +119,7 @@ defmodule Pleroma.HTTP.Adapter.GunTest do
     test "default ssl adapter opts with connection" do
       uri = URI.parse("https://some-domain.com")
 
-      :ok = Connections.open_conn(uri, :gun_connections)
+      :ok = Conn.open(uri, :gun_connections)
 
       opts = Gun.options(uri)
 
@@ -167,7 +168,7 @@ defmodule Pleroma.HTTP.Adapter.GunTest do
   describe "after_request/1" do
     test "body_as not chunks" do
       uri = URI.parse("http://some-domain.com")
-      :ok = Connections.open_conn(uri, :gun_connections)
+      :ok = Conn.open(uri, :gun_connections)
       opts = Gun.options(uri)
       :ok = Gun.after_request(opts)
       conn = opts[:conn]
@@ -185,7 +186,7 @@ defmodule Pleroma.HTTP.Adapter.GunTest do
 
     test "body_as chunks" do
       uri = URI.parse("http://some-domain.com")
-      :ok = Connections.open_conn(uri, :gun_connections)
+      :ok = Conn.open(uri, :gun_connections)
       opts = Gun.options([body_as: :chunks], uri)
       :ok = Gun.after_request(opts)
       conn = opts[:conn]
@@ -205,7 +206,7 @@ defmodule Pleroma.HTTP.Adapter.GunTest do
     test "with no connection" do
       uri = URI.parse("http://uniq-domain.com")
 
-      :ok = Connections.open_conn(uri, :gun_connections)
+      :ok = Conn.open(uri, :gun_connections)
 
       opts = Gun.options([body_as: :chunks], uri)
       conn = opts[:conn]
@@ -227,7 +228,7 @@ defmodule Pleroma.HTTP.Adapter.GunTest do
 
     test "with ipv4" do
       uri = URI.parse("http://127.0.0.1")
-      :ok = Connections.open_conn(uri, :gun_connections)
+      :ok = Conn.open(uri, :gun_connections)
       opts = Gun.options(uri)
       send(:gun_connections, {:gun_up, opts[:conn], :http})
       :ok = Gun.after_request(opts)
@@ -246,7 +247,7 @@ defmodule Pleroma.HTTP.Adapter.GunTest do
 
     test "with ipv6" do
       uri = URI.parse("http://[2a03:2880:f10c:83:face:b00c:0:25de]")
-      :ok = Connections.open_conn(uri, :gun_connections)
+      :ok = Conn.open(uri, :gun_connections)
       opts = Gun.options(uri)
       send(:gun_connections, {:gun_up, opts[:conn], :http})
       :ok = Gun.after_request(opts)
index c1ff0cc21aaff52f74b09aa12d1fc8d3e9bb3f3d..53ccbc9cd6171a89e6ecf7fae5ec11cb1a83735f 100644 (file)
@@ -124,7 +124,7 @@ defmodule Pleroma.HTTP.ConnectionTest do
       uri = URI.parse("https://some-domain.com")
 
       pid = Process.whereis(:federation)
-      :ok = Pleroma.Pool.Connections.open_conn(uri, :gun_connections, genserver_pid: pid)
+      :ok = Pleroma.Gun.Conn.open(uri, :gun_connections, genserver_pid: pid)
 
       opts = Connection.options(uri)
 
index d0d711c55da607aec1bc41a1c622ae64afe726ee..f766e3b5f9299cf513a3e94150905a9cfac69617 100644 (file)
@@ -45,7 +45,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
     url = "http://some-domain.com"
     key = "http:some-domain.com:80"
     refute Connections.checkin(url, name)
-    :ok = Connections.open_conn(url, name)
+    :ok = Conn.open(url, name)
 
     conn = Connections.checkin(url, name)
     assert is_pid(conn)
@@ -110,7 +110,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
     url = "http://ですsome-domain.com"
     refute Connections.checkin(url, name)
 
-    :ok = Connections.open_conn(url, name)
+    :ok = Conn.open(url, name)
 
     conn = Connections.checkin(url, name)
     assert is_pid(conn)
@@ -139,7 +139,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
 
     refute Connections.checkin(url, name)
 
-    :ok = Connections.open_conn(url, name)
+    :ok = Conn.open(url, name)
 
     conn = Connections.checkin(url, name)
     assert is_pid(conn)
@@ -182,7 +182,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
 
     refute Connections.checkin(url, name)
 
-    :ok = Connections.open_conn(url, name)
+    :ok = Conn.open(url, name)
 
     conn = Connections.checkin(url, name)
     assert is_pid(conn)
@@ -209,7 +209,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
   test "up and down ipv4", %{name: name} do
     self = self()
     url = "http://127.0.0.1"
-    :ok = Connections.open_conn(url, name)
+    :ok = Conn.open(url, name)
     conn = Connections.checkin(url, name)
     send(name, {:gun_down, conn, nil, nil, nil})
     send(name, {:gun_up, conn, nil})
@@ -229,7 +229,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
   test "up and down ipv6", %{name: name} do
     self = self()
     url = "http://[2a03:2880:f10c:83:face:b00c:0:25de]"
-    :ok = Connections.open_conn(url, name)
+    :ok = Conn.open(url, name)
     conn = Connections.checkin(url, name)
     send(name, {:gun_down, conn, nil, nil, nil})
     send(name, {:gun_up, conn, nil})
@@ -253,13 +253,13 @@ defmodule Pleroma.Pool.ConnectionsTest do
     https_key = "https:some-domain.com:443"
 
     refute Connections.checkin(http_url, name)
-    :ok = Connections.open_conn(http_url, name)
+    :ok = Conn.open(http_url, name)
     conn = Connections.checkin(http_url, name)
     assert is_pid(conn)
     assert Process.alive?(conn)
 
     refute Connections.checkin(https_url, name)
-    :ok = Connections.open_conn(https_url, name)
+    :ok = Conn.open(https_url, name)
     https_conn = Connections.checkin(https_url, name)
 
     refute conn == https_conn
@@ -288,17 +288,17 @@ defmodule Pleroma.Pool.ConnectionsTest do
     url = "http://gun-not-up.com"
 
     assert capture_log(fn ->
-             :ok = Connections.open_conn(url, name)
+             refute Conn.open(url, name)
              refute Connections.checkin(url, name)
            end) =~
-             "Received error on opening connection http://gun-not-up.com: {:error, :timeout}"
+             "Received error on opening connection http://gun-not-up.com {:error, :timeout}"
   end
 
   test "process gun_down message and then gun_up", %{name: name} do
     self = self()
     url = "http://gun-down-and-up.com"
     key = "http:gun-down-and-up.com:80"
-    :ok = Connections.open_conn(url, name)
+    :ok = Conn.open(url, name)
     conn = Connections.checkin(url, name)
 
     assert is_pid(conn)
@@ -347,7 +347,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
 
   test "async processes get same conn for same domain", %{name: name} do
     url = "http://some-domain.com"
-    :ok = Connections.open_conn(url, name)
+    :ok = Conn.open(url, name)
 
     tasks =
       for _ <- 1..5 do
@@ -381,8 +381,8 @@ defmodule Pleroma.Pool.ConnectionsTest do
     self = self()
     http_url = "http://some-domain.com"
     https_url = "https://some-domain.com"
-    :ok = Connections.open_conn(https_url, name)
-    :ok = Connections.open_conn(http_url, name)
+    :ok = Conn.open(https_url, name)
+    :ok = Conn.open(http_url, name)
 
     conn1 = Connections.checkin(https_url, name)
 
@@ -413,7 +413,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
     :ok = Connections.checkout(conn1, self, name)
 
     another_url = "http://another-domain.com"
-    :ok = Connections.open_conn(another_url, name)
+    :ok = Conn.open(another_url, name)
     conn = Connections.checkin(another_url, name)
 
     %Connections{
@@ -437,9 +437,19 @@ defmodule Pleroma.Pool.ConnectionsTest do
       Pleroma.Config.put(API, Pleroma.Gun)
     end
 
+    test "opens connection and change owner", %{name: name} do
+      url = "https://httpbin.org"
+      :ok = Conn.open(url, name)
+      conn = Connections.checkin(url, name)
+
+      pid = Process.whereis(name)
+
+      assert :gun.info(conn).owner == pid
+    end
+
     test "opens connection and reuse it on next request", %{name: name} do
       url = "http://httpbin.org"
-      :ok = Connections.open_conn(url, name)
+      :ok = Conn.open(url, name)
       Process.sleep(250)
       conn = Connections.checkin(url, name)
 
@@ -462,7 +472,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
 
     test "opens ssl connection and reuse it on next request", %{name: name} do
       url = "https://httpbin.org"
-      :ok = Connections.open_conn(url, name)
+      :ok = Conn.open(url, name)
       Process.sleep(1_000)
       conn = Connections.checkin(url, name)
 
@@ -488,8 +498,8 @@ defmodule Pleroma.Pool.ConnectionsTest do
       https1 = "https://www.google.com"
       https2 = "https://httpbin.org"
 
-      :ok = Connections.open_conn(https1, name)
-      :ok = Connections.open_conn(https2, name)
+      :ok = Conn.open(https1, name)
+      :ok = Conn.open(https2, name)
       Process.sleep(1_500)
       conn = Connections.checkin(https1, name)
 
@@ -513,7 +523,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
       :ok = Connections.checkout(conn, self, name)
       http = "http://httpbin.org"
       Process.sleep(1_000)
-      :ok = Connections.open_conn(http, name)
+      :ok = Conn.open(http, name)
       conn = Connections.checkin(http, name)
 
       %Connections{
@@ -535,8 +545,8 @@ defmodule Pleroma.Pool.ConnectionsTest do
 
       https1 = "https://www.google.com"
       https2 = "https://httpbin.org"
-      :ok = Connections.open_conn(https1, name)
-      :ok = Connections.open_conn(https2, name)
+      :ok = Conn.open(https1, name)
+      :ok = Conn.open(https2, name)
       Process.sleep(1_500)
 
       Connections.checkin(https1, name)
@@ -563,7 +573,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
       :ok = Connections.checkout(conn, self, name)
 
       http = "http://httpbin.org"
-      :ok = Connections.open_conn(http, name)
+      :ok = Conn.open(http, name)
       Process.sleep(1_000)
 
       conn = Connections.checkin(http, name)
@@ -587,8 +597,8 @@ defmodule Pleroma.Pool.ConnectionsTest do
 
       https1 = "https://www.google.com"
       https2 = "https://httpbin.org"
-      :ok = Connections.open_conn(https1, name)
-      :ok = Connections.open_conn(https2, name)
+      :ok = Conn.open(https1, name)
+      :ok = Conn.open(https2, name)
       Process.sleep(1_000)
       Connections.checkin(https1, name)
       conn1 = Connections.checkin(https1, name)
@@ -639,8 +649,8 @@ defmodule Pleroma.Pool.ConnectionsTest do
       https1 = "https://www.google.com"
       https2 = "https://httpbin.org"
 
-      :ok = Connections.open_conn(https1, name)
-      :ok = Connections.open_conn(https2, name)
+      :ok = Conn.open(https1, name)
+      :ok = Conn.open(https2, name)
       Process.sleep(1_500)
       Connections.checkin(https1, name)
       Connections.checkin(https2, name)
@@ -694,7 +704,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
       } = Connections.get_state(name)
 
       http = "http://httpbin.org"
-      :ok = Connections.open_conn(http, name)
+      :ok = Conn.open(http, name)
       Process.sleep(1_000)
       conn = Connections.checkin(http, name)
 
@@ -725,7 +735,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
     test "as ip", %{name: name} do
       url = "http://proxy-string.com"
       key = "http:proxy-string.com:80"
-      :ok = Connections.open_conn(url, name, proxy: {{127, 0, 0, 1}, 8123})
+      :ok = Conn.open(url, name, proxy: {{127, 0, 0, 1}, 8123})
 
       conn = Connections.checkin(url, name)
 
@@ -745,7 +755,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
 
     test "as host", %{name: name} do
       url = "http://proxy-tuple-atom.com"
-      :ok = Connections.open_conn(url, name, proxy: {'localhost', 9050})
+      :ok = Conn.open(url, name, proxy: {'localhost', 9050})
       conn = Connections.checkin(url, name)
 
       %Connections{
@@ -765,7 +775,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
     test "as ip and ssl", %{name: name} do
       url = "https://proxy-string.com"
 
-      :ok = Connections.open_conn(url, name, proxy: {{127, 0, 0, 1}, 8123})
+      :ok = Conn.open(url, name, proxy: {{127, 0, 0, 1}, 8123})
       conn = Connections.checkin(url, name)
 
       %Connections{
@@ -784,7 +794,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
 
     test "as host and ssl", %{name: name} do
       url = "https://proxy-tuple-atom.com"
-      :ok = Connections.open_conn(url, name, proxy: {'localhost', 9050})
+      :ok = Conn.open(url, name, proxy: {'localhost', 9050})
       conn = Connections.checkin(url, name)
 
       %Connections{
@@ -804,7 +814,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
     test "with socks type", %{name: name} do
       url = "http://proxy-socks.com"
 
-      :ok = Connections.open_conn(url, name, proxy: {:socks5, 'localhost', 1234})
+      :ok = Conn.open(url, name, proxy: {:socks5, 'localhost', 1234})
 
       conn = Connections.checkin(url, name)
 
@@ -825,7 +835,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
     test "with socks4 type and ssl", %{name: name} do
       url = "https://proxy-socks.com"
 
-      :ok = Connections.open_conn(url, name, proxy: {:socks4, 'localhost', 1234})
+      :ok = Conn.open(url, name, proxy: {:socks4, 'localhost', 1234})
 
       conn = Connections.checkin(url, name)
 
@@ -892,71 +902,75 @@ defmodule Pleroma.Pool.ConnectionsTest do
   end
 
   describe "get_unused_conns/1" do
-    test "crf is equalent, sorting by reference" do
-      conns = %{
-        "1" => %Conn{
-          conn_state: :idle,
-          last_reference: now() - 1
-        },
-        "2" => %Conn{
-          conn_state: :idle,
-          last_reference: now()
-        }
-      }
-
-      assert [{"1", _unused_conn} | _others] = Connections.get_unused_conns(conns)
+    test "crf is equalent, sorting by reference", %{name: name} do
+      Connections.add_conn(name, "1", %Conn{
+        conn_state: :idle,
+        last_reference: now() - 1
+      })
+
+      Connections.add_conn(name, "2", %Conn{
+        conn_state: :idle,
+        last_reference: now()
+      })
+
+      assert [{"1", _unused_conn} | _others] = Connections.get_unused_conns(name)
     end
 
-    test "reference is equalent, sorting by crf" do
-      conns = %{
-        "1" => %Conn{
-          conn_state: :idle,
-          crf: 1.999
-        },
-        "2" => %Conn{
-          conn_state: :idle,
-          crf: 2
-        }
-      }
+    test "reference is equalent, sorting by crf", %{name: name} do
+      Connections.add_conn(name, "1", %Conn{
+        conn_state: :idle,
+        crf: 1.999
+      })
+
+      Connections.add_conn(name, "2", %Conn{
+        conn_state: :idle,
+        crf: 2
+      })
 
-      assert [{"1", _unused_conn} | _others] = Connections.get_unused_conns(conns)
+      assert [{"1", _unused_conn} | _others] = Connections.get_unused_conns(name)
     end
 
-    test "higher crf and lower reference" do
-      conns = %{
-        "1" => %Conn{
-          conn_state: :idle,
-          crf: 3,
-          last_reference: now() - 1
-        },
-        "2" => %Conn{
-          conn_state: :idle,
-          crf: 2,
-          last_reference: now()
-        }
-      }
+    test "higher crf and lower reference", %{name: name} do
+      Connections.add_conn(name, "1", %Conn{
+        conn_state: :idle,
+        crf: 3,
+        last_reference: now() - 1
+      })
+
+      Connections.add_conn(name, "2", %Conn{
+        conn_state: :idle,
+        crf: 2,
+        last_reference: now()
+      })
 
-      assert [{"2", _unused_conn} | _others] = Connections.get_unused_conns(conns)
+      assert [{"2", _unused_conn} | _others] = Connections.get_unused_conns(name)
     end
 
-    test "lower crf and lower reference" do
-      conns = %{
-        "1" => %Conn{
-          conn_state: :idle,
-          crf: 1.99,
-          last_reference: now() - 1
-        },
-        "2" => %Conn{
-          conn_state: :idle,
-          crf: 2,
-          last_reference: now()
-        }
-      }
+    test "lower crf and lower reference", %{name: name} do
+      Connections.add_conn(name, "1", %Conn{
+        conn_state: :idle,
+        crf: 1.99,
+        last_reference: now() - 1
+      })
 
-      assert [{"1", _unused_conn} | _others] = Connections.get_unused_conns(conns)
+      Connections.add_conn(name, "2", %Conn{
+        conn_state: :idle,
+        crf: 2,
+        last_reference: now()
+      })
+
+      assert [{"1", _unused_conn} | _others] = Connections.get_unused_conns(name)
     end
   end
 
+  test "count/1", %{name: name} do
+    assert Connections.count(name) == 0
+    Connections.add_conn(name, "1", %Conn{conn: self()})
+    assert Connections.count(name) == 1
+    Connections.remove_conn(name, "1")
+    assert Connections.count(name) == 0
+  end
+
   defp now do
     :os.system_time(:second)
   end