clean up
authorAlexander Strizhakov <alex.strizhakov@gmail.com>
Thu, 12 Mar 2020 15:28:54 +0000 (18:28 +0300)
committerAlexander Strizhakov <alex.strizhakov@gmail.com>
Thu, 12 Mar 2020 15:29:07 +0000 (18:29 +0300)
lib/pleroma/application.ex
lib/pleroma/config/transfer_task.ex
lib/pleroma/gun/conn.ex
lib/pleroma/http/adapter_helper.ex
lib/pleroma/http/adapter_helper/gun.ex
lib/pleroma/http/connection.ex
lib/pleroma/http/http.ex
lib/pleroma/pool/connections.ex
test/http/adapter_helper/gun_test.exs
test/pool/connections_test.exs

index c8a0617a5a318044e7d18c37c1255464bf0e078d..55b5be488179a7aced69363a5dd4875ccdc188d5 100644 (file)
@@ -42,7 +42,9 @@ defmodule Pleroma.Application do
     setup_instrumenters()
     load_custom_modules()
 
-    if adapter() == Tesla.Adapter.Gun do
+    adapter = Application.get_env(:tesla, :adapter)
+
+    if adapter == Tesla.Adapter.Gun do
       if version = Pleroma.OTPVersion.version() do
         [major, minor] =
           version
@@ -74,7 +76,7 @@ defmodule Pleroma.Application do
         Pleroma.Plugs.RateLimiter.Supervisor
       ] ++
         cachex_children() ++
-        http_pools_children(Config.get(:env)) ++
+        http_children(adapter, @env) ++
         [
           Pleroma.Stats,
           Pleroma.JobQueueMonitor,
@@ -206,15 +208,13 @@ defmodule Pleroma.Application do
   end
 
   # start hackney and gun pools in tests
-  defp http_pools_children(:test) do
+  defp http_children(_, :test) do
     hackney_options = Config.get([:hackney_pools, :federation])
     hackney_pool = :hackney_pool.child_spec(:federation, hackney_options)
     [hackney_pool, Pleroma.Pool.Supervisor]
   end
 
-  defp http_pools_children(_), do: http_pools(adapter())
-
-  defp http_pools(Tesla.Adapter.Hackney) do
+  defp http_children(Tesla.Adapter.Hackney, _) do
     pools = [:federation, :media]
 
     pools =
@@ -230,9 +230,7 @@ defmodule Pleroma.Application do
     end
   end
 
-  defp http_pools(Tesla.Adapter.Gun), do: [Pleroma.Pool.Supervisor]
-
-  defp http_pools(_), do: []
+  defp http_children(Tesla.Adapter.Gun, _), do: [Pleroma.Pool.Supervisor]
 
-  defp adapter, do: Application.get_env(:tesla, :adapter)
+  defp http_children(_, _), do: []
 end
index 4a4c022f06750b3f4f8724c37e2adc09075c8fa1..b6d80adb7fdc27c55687b944827f416ad04ee31a 100644 (file)
@@ -5,6 +5,7 @@
 defmodule Pleroma.Config.TransferTask do
   use Task
 
+  alias Pleroma.Config
   alias Pleroma.ConfigDB
   alias Pleroma.Repo
 
@@ -36,36 +37,31 @@ defmodule Pleroma.Config.TransferTask do
 
   def start_link(_) do
     load_and_update_env()
-    if Pleroma.Config.get(:env) == :test, do: Ecto.Adapters.SQL.Sandbox.checkin(Repo)
+    if Config.get(:env) == :test, do: Ecto.Adapters.SQL.Sandbox.checkin(Repo)
     :ignore
   end
 
-  @spec load_and_update_env([ConfigDB.t()]) :: :ok | false
-  def load_and_update_env(deleted \\ [], restart_pleroma? \\ true) do
-    with {_, true} <- {:configurable, Pleroma.Config.get(:configurable_from_database)} do
+  @spec load_and_update_env([ConfigDB.t()], boolean()) :: :ok
+  def load_and_update_env(deleted_settings \\ [], restart_pleroma? \\ true) do
+    with {_, true} <- {:configurable, Config.get(:configurable_from_database)} do
       # We need to restart applications for loaded settings take effect
-      in_db = Repo.all(ConfigDB)
-
-      with_deleted = in_db ++ deleted
 
       # TODO: some problem with prometheus after restart!
-      reject = [nil, :prometheus]
-
-      reject_for_restart =
+      reject_restart =
         if restart_pleroma? do
-          reject
+          [nil, :prometheus]
         else
-          [:pleroma | reject]
+          [:pleroma, nil, :prometheus]
         end
 
       started_applications = Application.started_applications()
 
-      with_deleted
-      |> Enum.map(&merge_and_update(&1))
+      (Repo.all(ConfigDB) ++ deleted_settings)
+      |> Enum.map(&merge_and_update/1)
       |> Enum.uniq()
-      |> Enum.reject(&(&1 in reject_for_restart))
+      |> Enum.reject(&(&1 in reject_restart))
       |> maybe_set_pleroma_last()
-      |> Enum.each(&restart(started_applications, &1, Pleroma.Config.get(:env)))
+      |> Enum.each(&restart(started_applications, &1, Config.get(:env)))
 
       :ok
     else
@@ -108,18 +104,14 @@ defmodule Pleroma.Config.TransferTask do
       key = ConfigDB.from_string(setting.key)
       group = ConfigDB.from_string(setting.group)
 
-      default = Pleroma.Config.Holder.config(group, key)
+      default = Config.Holder.config(group, key)
       value = ConfigDB.from_binary(setting.value)
 
       merged_value =
-        if Ecto.get_meta(setting, :state) == :deleted do
-          default
-        else
-          if can_be_merged?(default, value) do
-            ConfigDB.merge_group(group, key, default, value)
-          else
-            value
-          end
+        cond do
+          Ecto.get_meta(setting, :state) == :deleted -> default
+          can_be_merged?(default, value) -> ConfigDB.merge_group(group, key, default, value)
+          true -> value
         end
 
       :ok = update_env(group, key, merged_value)
index 57a847c30c714470998f22d002ae251c062d70a6..20823a7658daa7c926887eda807966737ce20a04 100644 (file)
@@ -49,8 +49,6 @@ defmodule Pleroma.Gun.Conn do
 
     key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
 
-    Logger.debug("opening new connection #{Connections.compose_uri_log(uri)}")
-
     conn_pid =
       if Connections.count(name) < opts[:max_connection] do
         do_open(uri, opts)
@@ -109,9 +107,9 @@ defmodule Pleroma.Gun.Conn do
     else
       error ->
         Logger.warn(
-          "Received error on opening connection with http proxy #{
-            Connections.compose_uri_log(uri)
-          } #{inspect(error)}"
+          "Opening proxied connection to #{compose_uri_log(uri)} failed with error #{
+            inspect(error)
+          }"
         )
 
         error
@@ -145,9 +143,9 @@ defmodule Pleroma.Gun.Conn do
     else
       error ->
         Logger.warn(
-          "Received error on opening connection with socks proxy #{
-            Connections.compose_uri_log(uri)
-          } #{inspect(error)}"
+          "Opening socks proxied connection to #{compose_uri_log(uri)} failed with error #{
+            inspect(error)
+          }"
         )
 
         error
@@ -163,9 +161,7 @@ defmodule Pleroma.Gun.Conn do
     else
       error ->
         Logger.warn(
-          "Received error on opening connection #{Connections.compose_uri_log(uri)} #{
-            inspect(error)
-          }"
+          "Opening connection to #{compose_uri_log(uri)} failed with error #{inspect(error)}"
         )
 
         error
@@ -184,16 +180,17 @@ defmodule Pleroma.Gun.Conn do
   defp add_http2_opts(opts, _, _), do: opts
 
   defp close_least_used_and_do_open(name, uri, opts) do
-    Logger.debug("try to open conn #{Connections.compose_uri_log(uri)}")
-
-    with [{close_key, least_used} | _conns] <-
-           Connections.get_unused_conns(name),
-         :ok <- Gun.close(least_used.conn) do
-      Connections.remove_conn(name, close_key)
+    with [{key, conn} | _conns] <- Connections.get_unused_conns(name),
+         :ok <- Gun.close(conn.conn) do
+      Connections.remove_conn(name, key)
 
       do_open(uri, opts)
     else
       [] -> {:error, :pool_overflowed}
     end
   end
+
+  def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do
+    "#{scheme}://#{host}#{path}"
+  end
 end
index 2c13666ecebff86375d4555a37d7fe13a9ac628f..510722ff94ff6884d7afbb695b23a15b846809c2 100644 (file)
@@ -7,7 +7,7 @@ defmodule Pleroma.HTTP.AdapterHelper do
 
   @type proxy ::
           {Connection.host(), pos_integer()}
-          | {Connection.proxy_type(), pos_integer()}
+          | {Connection.proxy_type(), Connection.host(), pos_integer()}
 
   @callback options(keyword(), URI.t()) :: keyword()
   @callback after_request(keyword()) :: :ok
index 55c2b192aecf451010a29f7802ab555715543428..f14b95c19f5d64ddce8964f68d0abf7329b53109 100644 (file)
@@ -20,8 +20,8 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do
   ]
 
   @spec options(keyword(), URI.t()) :: keyword()
-  def options(connection_opts \\ [], %URI{} = uri) do
-    formatted_proxy =
+  def options(incoming_opts \\ [], %URI{} = uri) do
+    proxy =
       Pleroma.Config.get([:http, :proxy_url], nil)
       |> AdapterHelper.format_proxy()
 
@@ -30,8 +30,8 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do
     @defaults
     |> Keyword.merge(config_opts)
     |> add_scheme_opts(uri)
-    |> AdapterHelper.maybe_add_proxy(formatted_proxy)
-    |> maybe_get_conn(uri, connection_opts)
+    |> AdapterHelper.maybe_add_proxy(proxy)
+    |> maybe_get_conn(uri, incoming_opts)
   end
 
   @spec after_request(keyword()) :: :ok
@@ -43,44 +43,35 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do
     :ok
   end
 
-  defp add_scheme_opts(opts, %URI{scheme: "http"}), do: opts
+  defp add_scheme_opts(opts, %{scheme: "http"}), do: opts
 
-  defp add_scheme_opts(opts, %URI{scheme: "https"}) do
+  defp add_scheme_opts(opts, %{scheme: "https"}) do
     opts
     |> Keyword.put(:certificates_verification, true)
-    |> Keyword.put(:transport, :tls)
     |> Keyword.put(:tls_opts, log_level: :warning)
   end
 
-  defp maybe_get_conn(adapter_opts, uri, connection_opts) do
+  defp maybe_get_conn(adapter_opts, uri, incoming_opts) do
     {receive_conn?, opts} =
       adapter_opts
-      |> Keyword.merge(connection_opts)
+      |> Keyword.merge(incoming_opts)
       |> Keyword.pop(:receive_conn, true)
 
     if Connections.alive?(:gun_connections) and receive_conn? do
-      try_to_get_conn(uri, opts)
+      checkin_conn(uri, opts)
     else
       opts
     end
   end
 
-  defp try_to_get_conn(uri, opts) do
+  defp checkin_conn(uri, opts) do
     case Connections.checkin(uri, :gun_connections) do
       nil ->
-        Logger.debug(
-          "Gun connections pool checkin was not successful. Trying to open conn for next request."
-        )
-
-        Task.start(fn -> Pleroma.Gun.Conn.open(uri, :gun_connections, opts) end)
+        Task.start(Pleroma.Gun.Conn, :open, [uri, :gun_connections, opts])
         opts
 
       conn when is_pid(conn) ->
-        Logger.debug("received conn #{inspect(conn)} #{Connections.compose_uri_log(uri)}")
-
-        opts
-        |> Keyword.put(:conn, conn)
-        |> Keyword.put(:close_conn, false)
+        Keyword.merge(opts, conn: conn, close_conn: false)
     end
   end
 end
index 0fc88f708da48ec138ad49b4f88bcdf9ee2552b3..76de3fcfe009e5cb0f7815299503186f6278b522 100644 (file)
@@ -71,15 +71,15 @@ defmodule Pleroma.HTTP.Connection do
       {:ok, parse_host(host), port}
     else
       {_, _} ->
-        Logger.warn("parsing port in proxy fail #{inspect(proxy)}")
+        Logger.warn("Parsing port failed #{inspect(proxy)}")
         {:error, :invalid_proxy_port}
 
       :error ->
-        Logger.warn("parsing port in proxy fail #{inspect(proxy)}")
+        Logger.warn("Parsing port failed #{inspect(proxy)}")
         {:error, :invalid_proxy_port}
 
       _ ->
-        Logger.warn("parsing proxy fail #{inspect(proxy)}")
+        Logger.warn("Parsing proxy failed #{inspect(proxy)}")
         {:error, :invalid_proxy}
     end
   end
@@ -89,7 +89,7 @@ defmodule Pleroma.HTTP.Connection do
       {:ok, type, parse_host(host), port}
     else
       _ ->
-        Logger.warn("parsing proxy fail #{inspect(proxy)}")
+        Logger.warn("Parsing proxy failed #{inspect(proxy)}")
         {:error, :invalid_proxy}
     end
   end
index 466a94adc3af4195ff0c88b638f59f1ef04fea42..583b564842fe79477f0205ef758ab2b5c094351f 100644 (file)
@@ -56,10 +56,9 @@ defmodule Pleroma.HTTP do
           {:ok, Env.t()} | {:error, any()}
   def request(method, url, body, headers, options) when is_binary(url) do
     uri = URI.parse(url)
-    received_adapter_opts = Keyword.get(options, :adapter, [])
-    adapter_opts = Connection.options(uri, received_adapter_opts)
+    adapter_opts = Connection.options(uri, options[:adapter] || [])
     options = put_in(options[:adapter], adapter_opts)
-    params = Keyword.get(options, :params, [])
+    params = options[:params] || []
     request = build_request(method, headers, options, url, body, params)
 
     adapter = Application.get_env(:tesla, :adapter)
index 7529e9240f33ca4ea03ca374c5679c4d55555661..772833509ad8f419681145e096569c43f70eeff7 100644 (file)
@@ -87,18 +87,11 @@ defmodule Pleroma.Pool.Connections do
 
   @impl true
   def handle_cast({:checkout, conn_pid, pid}, state) do
-    Logger.debug("checkout #{inspect(conn_pid)}")
-
     state =
       with true <- Process.alive?(conn_pid),
            {key, conn} <- find_conn(state.conns, conn_pid),
            used_by <- List.keydelete(conn.used_by, pid, 0) do
-        conn_state =
-          if used_by == [] do
-            :idle
-          else
-            conn.conn_state
-          end
+        conn_state = if used_by == [], do: :idle, else: conn.conn_state
 
         put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
       else
@@ -123,26 +116,23 @@ defmodule Pleroma.Pool.Connections do
   @impl true
   def handle_call({:checkin, uri}, from, state) do
     key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
-    Logger.debug("checkin #{key}")
 
     case state.conns[key] do
-      %{conn: conn, gun_state: :up} = current_conn ->
-        Logger.debug("reusing conn #{key}")
-
+      %{conn: pid, gun_state: :up} = conn ->
         time = :os.system_time(:second)
-        last_reference = time - current_conn.last_reference
-        current_crf = crf(last_reference, 100, current_conn.crf)
+        last_reference = time - conn.last_reference
+        crf = crf(last_reference, 100, conn.crf)
 
         state =
           put_in(state.conns[key], %{
-            current_conn
+            conn
             | last_reference: time,
-              crf: current_crf,
+              crf: crf,
               conn_state: :active,
-              used_by: [from | current_conn.used_by]
+              used_by: [from | conn.used_by]
           })
 
-        {:reply, conn, state}
+        {:reply, pid, state}
 
       %{gun_state: :down} ->
         {:reply, nil, state}
@@ -164,50 +154,48 @@ defmodule Pleroma.Pool.Connections do
   def handle_call(:unused_conns, _from, state) do
     unused_conns =
       state.conns
-      |> Enum.filter(fn {_k, v} ->
-        v.conn_state == :idle and v.used_by == []
-      end)
-      |> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
-        x.crf <= y.crf and x.last_reference <= y.last_reference
-      end)
+      |> Enum.filter(&filter_conns/1)
+      |> Enum.sort(&sort_conns/2)
 
     {:reply, unused_conns, state}
   end
 
+  defp filter_conns({_, %{conn_state: :idle, used_by: []}}), do: true
+  defp filter_conns(_), do: false
+
+  defp sort_conns({_, c1}, {_, c2}) do
+    c1.crf <= c2.crf and c1.last_reference <= c2.last_reference
+  end
+
   @impl true
   def handle_info({:gun_up, conn_pid, _protocol}, state) do
+    %{origin_host: host, origin_scheme: scheme, origin_port: port} = Gun.info(conn_pid)
+
+    host =
+      case :inet.ntoa(host) do
+        {:error, :einval} -> host
+        ip -> ip
+      end
+
+    key = "#{scheme}:#{host}:#{port}"
+
     state =
-      with conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid),
-           {key, conn} <- find_conn(state.conns, conn_pid, conn_key),
+      with {_key, conn} <- find_conn(state.conns, conn_pid, key),
            {true, key} <- {Process.alive?(conn_pid), key} do
-        time = :os.system_time(:second)
-        last_reference = time - conn.last_reference
-        current_crf = crf(last_reference, 100, conn.crf)
-
         put_in(state.conns[key], %{
           conn
           | gun_state: :up,
-            last_reference: time,
-            crf: current_crf,
             conn_state: :active,
             retries: 0
         })
       else
-        :error_gun_info ->
-          Logger.debug(":gun.info caused error")
-          state
-
         {false, key} ->
-          Logger.debug(":gun_up message for closed conn #{inspect(conn_pid)}")
-
           put_in(
             state.conns,
             Map.delete(state.conns, key)
           )
 
         nil ->
-          Logger.debug(":gun_up message for conn which is not found in state")
-
           :ok = Gun.close(conn_pid)
 
           state
@@ -224,7 +212,6 @@ defmodule Pleroma.Pool.Connections do
       with {key, conn} <- find_conn(state.conns, conn_pid),
            {true, key} <- {Process.alive?(conn_pid), key} do
         if conn.retries == retries do
-          Logger.debug("closing conn if retries is eq  #{inspect(conn_pid)}")
           :ok = Gun.close(conn.conn)
 
           put_in(
@@ -240,18 +227,13 @@ defmodule Pleroma.Pool.Connections do
         end
       else
         {false, key} ->
-          # gun can send gun_down for closed conn, maybe connection is not closed yet
-          Logger.debug(":gun_down message for closed conn #{inspect(conn_pid)}")
-
           put_in(
             state.conns,
             Map.delete(state.conns, key)
           )
 
         nil ->
-          Logger.debug(":gun_down message for conn which is not found in state")
-
-          :ok = Gun.close(conn_pid)
+          Logger.debug(":gun_down for conn which isn't found in state")
 
           state
       end
@@ -275,7 +257,7 @@ defmodule Pleroma.Pool.Connections do
         )
       else
         nil ->
-          Logger.debug(":DOWN message for conn which is not found in state")
+          Logger.debug(":DOWN for conn which isn't found in state")
 
           state
       end
@@ -283,18 +265,6 @@ defmodule Pleroma.Pool.Connections do
     {:noreply, state}
   end
 
-  defp compose_key_gun_info(pid) do
-    %{origin_host: origin_host, origin_scheme: scheme, origin_port: port} = Gun.info(pid)
-
-    host =
-      case :inet.ntoa(origin_host) do
-        {:error, :einval} -> origin_host
-        ip -> ip
-      end
-
-    "#{scheme}:#{host}:#{port}"
-  end
-
   defp find_conn(conns, conn_pid) do
     Enum.find(conns, fn {_key, conn} ->
       conn.conn == conn_pid
@@ -310,8 +280,4 @@ defmodule Pleroma.Pool.Connections do
   def crf(current, steps, crf) do
     1 + :math.pow(0.5, current / steps) * crf
   end
-
-  def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do
-    "#{scheme}://#{host}#{path}"
-  end
 end
index 6af8be15d0cdacfc98df2b655c84b912fbaa56ce..18025b98676acd113408cdbf2e29ccb6557b71e3 100644 (file)
@@ -6,7 +6,6 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do
   use ExUnit.Case, async: true
   use Pleroma.Tests.Helpers
 
-  import ExUnit.CaptureLog
   import Mox
 
   alias Pleroma.Config
@@ -63,7 +62,6 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do
       opts = Gun.options([receive_conn: false], uri)
 
       assert opts[:certificates_verification]
-      assert opts[:transport] == :tls
     end
 
     test "get conn on next request" do
@@ -73,14 +71,12 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do
       on_exit(fn -> Logger.configure(level: level) end)
       uri = URI.parse("http://some-domain2.com")
 
-      assert capture_log(fn ->
-               opts = Gun.options(uri)
+      opts = Gun.options(uri)
 
-               assert opts[:conn] == nil
-               assert opts[:close_conn] == nil
-             end) =~
-               "Gun connections pool checkin was not successful. Trying to open conn for next request."
+      assert opts[:conn] == nil
+      assert opts[:close_conn] == nil
 
+      Process.sleep(50)
       opts = Gun.options(uri)
 
       assert is_pid(opts[:conn])
index 06f32b74e9041e89377549204ca7a9a1af5e5427..aeda54875851f171f06505596a7586ce6b02db61 100644 (file)
@@ -355,7 +355,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
              refute Conn.open(url, name)
              refute Connections.checkin(url, name)
            end) =~
-             "Received error on opening connection http://gun-not-up.com {:error, :timeout}"
+             "Opening connection to http://gun-not-up.com failed with error {:error, :timeout}"
   end
 
   test "process gun_down message and then gun_up", %{name: name} do