Connection pool: Fix race conditions in limit enforcement
authorrinpatch <rinpatch@sdf.org>
Wed, 6 May 2020 20:14:24 +0000 (23:14 +0300)
committerrinpatch <rinpatch@sdf.org>
Wed, 15 Jul 2020 12:17:27 +0000 (15:17 +0300)
Fixes race conditions in limit enforcement by putting worker processes
in a DynamicSupervisor

lib/pleroma/application.ex
lib/pleroma/gun/connection_pool.ex
lib/pleroma/gun/connection_pool/worker.ex
lib/pleroma/gun/connection_pool/worker_supervisor.ex [new file with mode: 0644]

index be14c1f9fb07cc8d2b62ebe2c04091561ec9644d..cfdaf1770f516e59e6d35b0cc7252a4d393fdb29 100644 (file)
@@ -243,7 +243,7 @@ defmodule Pleroma.Application do
   end
 
   defp http_children(Tesla.Adapter.Gun, _) do
-    [{Registry, keys: :unique, name: Pleroma.Gun.ConnectionPool}]
+    Pleroma.Gun.ConnectionPool.children()
   end
 
   defp http_children(_, _), do: []
index 0daf1da44bfed3dd2ec5bf6f12bf8dc0b9253cdd..545bfaf7fea486564d9d035914525c7575463e50 100644 (file)
@@ -1,6 +1,15 @@
 defmodule Pleroma.Gun.ConnectionPool do
   @registry __MODULE__
 
+  alias Pleroma.Gun.ConnectionPool.WorkerSupervisor
+
+  def children do
+    [
+      {Registry, keys: :unique, name: @registry},
+      Pleroma.Gun.ConnectionPool.WorkerSupervisor
+    ]
+  end
+
   def get_conn(uri, opts) do
     key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
 
@@ -14,93 +23,21 @@ defmodule Pleroma.Gun.ConnectionPool do
         {:ok, gun_pid}
 
       [] ->
-        case enforce_pool_limits() do
-          :ok ->
-            # :gun.set_owner fails in :connected state for whatevever reason,
-            # so we open the connection in the process directly and send it's pid back
-            # We trust gun to handle timeouts by itself
-            case GenServer.start(Pleroma.Gun.ConnectionPool.Worker, [uri, key, opts, self()],
-                   timeout: :infinity
-                 ) do
-              {:ok, _worker_pid} ->
-                receive do
-                  {:conn_pid, pid} -> {:ok, pid}
-                end
-
-              {:error, {:error, {:already_registered, worker_pid}}} ->
-                get_gun_pid_from_worker(worker_pid)
-
-              err ->
-                err
+        # :gun.set_owner fails in :connected state for whatevever reason,
+        # so we open the connection in the process directly and send it's pid back
+        # We trust gun to handle timeouts by itself
+        case WorkerSupervisor.start_worker([uri, key, opts, self()]) do
+          {:ok, _worker_pid} ->
+            receive do
+              {:conn_pid, pid} -> {:ok, pid}
             end
 
-          :error ->
-            {:error, :pool_full}
-        end
-    end
-  end
-
-  @enforcer_key "enforcer"
-  defp enforce_pool_limits() do
-    max_connections = Pleroma.Config.get([:connections_pool, :max_connections])
-
-    if Registry.count(@registry) >= max_connections do
-      case Registry.lookup(@registry, @enforcer_key) do
-        [] ->
-          pid =
-            spawn(fn ->
-              {:ok, _pid} = Registry.register(@registry, @enforcer_key, nil)
-
-              reclaim_max =
-                [:connections_pool, :reclaim_multiplier]
-                |> Pleroma.Config.get()
-                |> Kernel.*(max_connections)
-                |> round
-                |> max(1)
-
-              unused_conns =
-                Registry.select(
-                  @registry,
-                  [
-                    {{:_, :"$1", {:_, :"$2", :"$3", :"$4"}}, [{:==, :"$2", []}],
-                     [{{:"$1", :"$3", :"$4"}}]}
-                  ]
-                )
+          {:error, {:error, {:already_registered, worker_pid}}} ->
+            get_gun_pid_from_worker(worker_pid)
 
-              case unused_conns do
-                [] ->
-                  exit(:pool_full)
-
-                unused_conns ->
-                  unused_conns
-                  |> Enum.sort(fn {_pid1, crf1, last_reference1},
-                                  {_pid2, crf2, last_reference2} ->
-                    crf1 <= crf2 and last_reference1 <= last_reference2
-                  end)
-                  |> Enum.take(reclaim_max)
-                  |> Enum.each(fn {pid, _, _} -> GenServer.call(pid, :idle_close) end)
-              end
-            end)
-
-          wait_for_enforcer_finish(pid)
-
-        [{pid, _}] ->
-          wait_for_enforcer_finish(pid)
-      end
-    else
-      :ok
-    end
-  end
-
-  defp wait_for_enforcer_finish(pid) do
-    ref = Process.monitor(pid)
-
-    receive do
-      {:DOWN, ^ref, :process, ^pid, :pool_full} ->
-        :error
-
-      {:DOWN, ^ref, :process, ^pid, :normal} ->
-        :ok
+          err ->
+            err
+        end
     end
   end
 
index ebde4bbf6f2bdacf44a21e9b310d6bb75c073662..25fafc64c3c4de58f9db4f28f2874292f1ab27fb 100644 (file)
@@ -1,9 +1,13 @@
 defmodule Pleroma.Gun.ConnectionPool.Worker do
   alias Pleroma.Gun
-  use GenServer
+  use GenServer, restart: :temporary
 
   @registry Pleroma.Gun.ConnectionPool
 
+  def start_link(opts) do
+    GenServer.start_link(__MODULE__, opts)
+  end
+
   @impl true
   def init([uri, key, opts, client_pid]) do
     time = :os.system_time(:second)
@@ -82,12 +86,6 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do
     {:stop, {:error, down_message}, state}
   end
 
-  @impl true
-  def handle_call(:idle_close, _, %{key: key} = state) do
-    Registry.unregister(@registry, key)
-    {:stop, :normal, state}
-  end
-
   # LRFU policy: https://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.55.1478
   defp crf(time_delta, prev_crf) do
     1 + :math.pow(0.5, time_delta / 100) * prev_crf
diff --git a/lib/pleroma/gun/connection_pool/worker_supervisor.ex b/lib/pleroma/gun/connection_pool/worker_supervisor.ex
new file mode 100644 (file)
index 0000000..5b546bd
--- /dev/null
@@ -0,0 +1,91 @@
+defmodule Pleroma.Gun.ConnectionPool.WorkerSupervisor do
+  @doc "Supervisor for pool workers. Does not do anything except enforce max connection limit"
+
+  use DynamicSupervisor
+
+  def start_link(opts) do
+    DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__)
+  end
+
+  def init(_opts) do
+    DynamicSupervisor.init(
+      strategy: :one_for_one,
+      max_children: Pleroma.Config.get([:connections_pool, :max_connections])
+    )
+  end
+
+  def start_worker(opts) do
+    case DynamicSupervisor.start_child(__MODULE__, {Pleroma.Gun.ConnectionPool.Worker, opts}) do
+      {:error, :max_children} ->
+        case free_pool() do
+          :ok -> start_worker(opts)
+          :error -> {:error, :pool_full}
+        end
+
+      res ->
+        res
+    end
+  end
+
+  @registry Pleroma.Gun.ConnectionPool
+  @enforcer_key "enforcer"
+  defp free_pool do
+    case Registry.lookup(@registry, @enforcer_key) do
+      [] ->
+        pid =
+          spawn(fn ->
+            {:ok, _pid} = Registry.register(@registry, @enforcer_key, nil)
+
+            max_connections = Pleroma.Config.get([:connections_pool, :max_connections])
+
+            reclaim_max =
+              [:connections_pool, :reclaim_multiplier]
+              |> Pleroma.Config.get()
+              |> Kernel.*(max_connections)
+              |> round
+              |> max(1)
+
+            unused_conns =
+              Registry.select(
+                @registry,
+                [
+                  {{:_, :"$1", {:_, :"$2", :"$3", :"$4"}}, [{:==, :"$2", []}],
+                   [{{:"$1", :"$3", :"$4"}}]}
+                ]
+              )
+
+            case unused_conns do
+              [] ->
+                exit(:no_unused_conns)
+
+              unused_conns ->
+                unused_conns
+                |> Enum.sort(fn {_pid1, crf1, last_reference1}, {_pid2, crf2, last_reference2} ->
+                  crf1 <= crf2 and last_reference1 <= last_reference2
+                end)
+                |> Enum.take(reclaim_max)
+                |> Enum.each(fn {pid, _, _} ->
+                  DynamicSupervisor.terminate_child(__MODULE__, pid)
+                end)
+            end
+          end)
+
+        wait_for_enforcer_finish(pid)
+
+      [{pid, _}] ->
+        wait_for_enforcer_finish(pid)
+    end
+  end
+
+  defp wait_for_enforcer_finish(pid) do
+    ref = Process.monitor(pid)
+
+    receive do
+      {:DOWN, ^ref, :process, ^pid, :no_unused_conns} ->
+        :error
+
+      {:DOWN, ^ref, :process, ^pid, :normal} ->
+        :ok
+    end
+  end
+end