Fix race in enforcer/reclaimer start
authorhref <href@random.sh>
Wed, 8 Jul 2020 11:22:42 +0000 (13:22 +0200)
committerrinpatch <rinpatch@sdf.org>
Wed, 15 Jul 2020 12:26:35 +0000 (15:26 +0300)
lib/pleroma/gun/connection_pool/reclaimer.ex [new file with mode: 0644]
lib/pleroma/gun/connection_pool/worker_supervisor.ex

diff --git a/lib/pleroma/gun/connection_pool/reclaimer.ex b/lib/pleroma/gun/connection_pool/reclaimer.ex
new file mode 100644 (file)
index 0000000..1793ac3
--- /dev/null
@@ -0,0 +1,85 @@
+defmodule Pleroma.Gun.ConnectionPool.Reclaimer do
+  use GenServer, restart: :temporary
+
+  @registry Pleroma.Gun.ConnectionPool
+
+  def start_monitor() do
+    pid =
+      case :gen_server.start(__MODULE__, [], name: {:via, Registry, {@registry, "reclaimer"}}) do
+        {:ok, pid} ->
+          pid
+
+        {:error, {:already_registered, pid}} ->
+          pid
+      end
+
+    {pid, Process.monitor(pid)}
+  end
+
+  @impl true
+  def init(_) do
+    {:ok, nil, {:continue, :reclaim}}
+  end
+
+  @impl true
+  def handle_continue(:reclaim, _) do
+    max_connections = Pleroma.Config.get([:connections_pool, :max_connections])
+
+    reclaim_max =
+      [:connections_pool, :reclaim_multiplier]
+      |> Pleroma.Config.get()
+      |> Kernel.*(max_connections)
+      |> round
+      |> max(1)
+
+    :telemetry.execute([:pleroma, :connection_pool, :reclaim, :start], %{}, %{
+      max_connections: max_connections,
+      reclaim_max: reclaim_max
+    })
+
+    # :ets.fun2ms(
+    # fn {_, {worker_pid, {_, used_by, crf, last_reference}}} when used_by == [] ->
+    #   {worker_pid, crf, last_reference} end)
+    unused_conns =
+      Registry.select(
+        @registry,
+        [
+          {{:_, :"$1", {:_, :"$2", :"$3", :"$4"}}, [{:==, :"$2", []}], [{{:"$1", :"$3", :"$4"}}]}
+        ]
+      )
+
+    case unused_conns do
+      [] ->
+        :telemetry.execute(
+          [:pleroma, :connection_pool, :reclaim, :stop],
+          %{reclaimed_count: 0},
+          %{
+            max_connections: max_connections
+          }
+        )
+
+        {:stop, :no_unused_conns, nil}
+
+      unused_conns ->
+        reclaimed =
+          unused_conns
+          |> Enum.sort(fn {_pid1, crf1, last_reference1}, {_pid2, crf2, last_reference2} ->
+            crf1 <= crf2 and last_reference1 <= last_reference2
+          end)
+          |> Enum.take(reclaim_max)
+
+        reclaimed
+        |> Enum.each(fn {pid, _, _} ->
+          DynamicSupervisor.terminate_child(Pleroma.Gun.ConnectionPool.WorkerSupervisor, pid)
+        end)
+
+        :telemetry.execute(
+          [:pleroma, :connection_pool, :reclaim, :stop],
+          %{reclaimed_count: Enum.count(reclaimed)},
+          %{max_connections: max_connections}
+        )
+
+        {:stop, :normal, nil}
+    end
+  end
+end
index 5cb8d488a99fce63736d8aff1e2e493e2b849684..39615c9568a8d20094fd9797199aa1e1797bcfa6 100644 (file)
@@ -29,89 +29,16 @@ defmodule Pleroma.Gun.ConnectionPool.WorkerSupervisor do
     end
   end
 
-  @registry Pleroma.Gun.ConnectionPool
-  @enforcer_key "enforcer"
   defp free_pool do
-    case Registry.lookup(@registry, @enforcer_key) do
-      [] ->
-        pid =
-          spawn(fn ->
-            {:ok, _pid} = Registry.register(@registry, @enforcer_key, nil)
-            max_connections = Pleroma.Config.get([:connections_pool, :max_connections])
-
-            reclaim_max =
-              [:connections_pool, :reclaim_multiplier]
-              |> Pleroma.Config.get()
-              |> Kernel.*(max_connections)
-              |> round
-              |> max(1)
-
-            :telemetry.execute([:pleroma, :connection_pool, :reclaim, :start], %{}, %{
-              max_connections: max_connections,
-              reclaim_max: reclaim_max
-            })
-
-            # :ets.fun2ms(
-            # fn {_, {worker_pid, {_, used_by, crf, last_reference}}} when used_by == [] ->
-            #   {worker_pid, crf, last_reference} end)
-            unused_conns =
-              Registry.select(
-                @registry,
-                [
-                  {{:_, :"$1", {:_, :"$2", :"$3", :"$4"}}, [{:==, :"$2", []}],
-                   [{{:"$1", :"$3", :"$4"}}]}
-                ]
-              )
-
-            case unused_conns do
-              [] ->
-                :telemetry.execute(
-                  [:pleroma, :connection_pool, :reclaim, :stop],
-                  %{reclaimed_count: 0},
-                  %{
-                    max_connections: max_connections
-                  }
-                )
-
-                exit(:no_unused_conns)
-
-              unused_conns ->
-                reclaimed =
-                  unused_conns
-                  |> Enum.sort(fn {_pid1, crf1, last_reference1},
-                                  {_pid2, crf2, last_reference2} ->
-                    crf1 <= crf2 and last_reference1 <= last_reference2
-                  end)
-                  |> Enum.take(reclaim_max)
-
-                reclaimed
-                |> Enum.each(fn {pid, _, _} ->
-                  DynamicSupervisor.terminate_child(__MODULE__, pid)
-                end)
-
-                :telemetry.execute(
-                  [:pleroma, :connection_pool, :reclaim, :stop],
-                  %{reclaimed_count: Enum.count(reclaimed)},
-                  %{max_connections: max_connections}
-                )
-            end
-          end)
-
-        wait_for_enforcer_finish(pid)
-
-      [{pid, _}] ->
-        wait_for_enforcer_finish(pid)
-    end
+    wait_for_reclaimer_finish(Pleroma.Gun.ConnectionPool.Reclaimer.start_monitor())
   end
 
-  defp wait_for_enforcer_finish(pid) do
-    ref = Process.monitor(pid)
-
+  defp wait_for_reclaimer_finish({pid, mon}) do
     receive do
-      {:DOWN, ^ref, :process, ^pid, :no_unused_conns} ->
+      {:DOWN, ^mon, :process, ^pid, :no_unused_conns} ->
         :error
 
-      {:DOWN, ^ref, :process, ^pid, :normal} ->
+      {:DOWN, ^mon, :process, ^pid, :normal} ->
         :ok
     end
   end