Merge remote-tracking branch 'upstream/develop' into restrict-domain
[akkoma] / lib / pleroma / gun / connection_pool / worker_supervisor.ex
index 4b5d10d2a20396c17f60c912e23cabdd571d1b24..4c23bcbd92b67cde82578ab5b1ef35f37d473f4d 100644 (file)
@@ -1,3 +1,7 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
 defmodule Pleroma.Gun.ConnectionPool.WorkerSupervisor do
   @moduledoc "Supervisor for pool workers. Does not do anything except enforce max connection limit"
 
@@ -14,16 +18,14 @@ defmodule Pleroma.Gun.ConnectionPool.WorkerSupervisor do
     )
   end
 
-  def start_worker(opts) do
+  def start_worker(opts, retry \\ false) do
     case DynamicSupervisor.start_child(__MODULE__, {Pleroma.Gun.ConnectionPool.Worker, opts}) do
       {:error, :max_children} ->
-        case free_pool() do
-          :ok ->
-            start_worker(opts)
-
-          :error ->
-            :telemetry.execute([:pleroma, :connection_pool, :provision_failure], %{opts: opts})
-            {:error, :pool_full}
+        if retry or free_pool() == :error do
+          :telemetry.execute([:pleroma, :connection_pool, :provision_failure], %{opts: opts})
+          {:error, :pool_full}
+        else
+          start_worker(opts, true)
         end
 
       res ->
@@ -31,89 +33,16 @@ defmodule Pleroma.Gun.ConnectionPool.WorkerSupervisor do
     end
   end
 
-  @registry Pleroma.Gun.ConnectionPool
-  @enforcer_key "enforcer"
   defp free_pool do
-    case Registry.lookup(@registry, @enforcer_key) do
-      [] ->
-        pid =
-          spawn(fn ->
-            {:ok, _pid} = Registry.register(@registry, @enforcer_key, nil)
-            max_connections = Pleroma.Config.get([:connections_pool, :max_connections])
-
-            reclaim_max =
-              [:connections_pool, :reclaim_multiplier]
-              |> Pleroma.Config.get()
-              |> Kernel.*(max_connections)
-              |> round
-              |> max(1)
-
-            :telemetry.execute([:pleroma, :connection_pool, :reclaim, :start], %{}, %{
-              max_connections: max_connections,
-              reclaim_max: reclaim_max
-            })
-
-            # :ets.fun2ms(
-            # fn {_, {worker_pid, {_, used_by, crf, last_reference}}} when used_by == [] ->
-            #   {worker_pid, crf, last_reference} end)
-            unused_conns =
-              Registry.select(
-                @registry,
-                [
-                  {{:_, :"$1", {:_, :"$2", :"$3", :"$4"}}, [{:==, :"$2", []}],
-                   [{{:"$1", :"$3", :"$4"}}]}
-                ]
-              )
-
-            case unused_conns do
-              [] ->
-                :telemetry.execute(
-                  [:pleroma, :connection_pool, :reclaim, :stop],
-                  %{reclaimed_count: 0},
-                  %{
-                    max_connections: max_connections
-                  }
-                )
-
-                exit(:no_unused_conns)
-
-              unused_conns ->
-                reclaimed =
-                  unused_conns
-                  |> Enum.sort(fn {_pid1, crf1, last_reference1},
-                                  {_pid2, crf2, last_reference2} ->
-                    crf1 <= crf2 and last_reference1 <= last_reference2
-                  end)
-                  |> Enum.take(reclaim_max)
-
-                reclaimed
-                |> Enum.each(fn {pid, _, _} ->
-                  DynamicSupervisor.terminate_child(__MODULE__, pid)
-                end)
-
-                :telemetry.execute(
-                  [:pleroma, :connection_pool, :reclaim, :stop],
-                  %{reclaimed_count: Enum.count(reclaimed)},
-                  %{max_connections: max_connections}
-                )
-            end
-          end)
-
-        wait_for_enforcer_finish(pid)
-
-      [{pid, _}] ->
-        wait_for_enforcer_finish(pid)
-    end
+    wait_for_reclaimer_finish(Pleroma.Gun.ConnectionPool.Reclaimer.start_monitor())
   end
 
-  defp wait_for_enforcer_finish(pid) do
-    ref = Process.monitor(pid)
-
+  defp wait_for_reclaimer_finish({pid, mon}) do
     receive do
-      {:DOWN, ^ref, :process, ^pid, :no_unused_conns} ->
+      {:DOWN, ^mon, :process, ^pid, :no_unused_conns} ->
         :error
 
-      {:DOWN, ^ref, :process, ^pid, :normal} ->
+      {:DOWN, ^mon, :process, ^pid, :normal} ->
         :ok
     end
   end