Connection Pool: register workers using :via
authorrinpatch <rinpatch@sdf.org>
Wed, 15 Jul 2020 12:24:47 +0000 (15:24 +0300)
committerrinpatch <rinpatch@sdf.org>
Wed, 15 Jul 2020 12:24:47 +0000 (15:24 +0300)
lib/pleroma/gun/connection_pool.ex
lib/pleroma/gun/connection_pool/worker.ex
lib/pleroma/gun/connection_pool/worker_supervisor.ex

index 545bfaf7fea486564d9d035914525c7575463e50..e951872fea422da0df3b16338e69926aa3ed0d8f 100644 (file)
@@ -15,7 +15,7 @@ defmodule Pleroma.Gun.ConnectionPool do
 
     case Registry.lookup(@registry, key) do
       # The key has already been registered, but connection is not up yet
-      [{worker_pid, {nil, _used_by, _crf, _last_reference}}] ->
+      [{worker_pid, nil}] ->
         get_gun_pid_from_worker(worker_pid)
 
       [{worker_pid, {gun_pid, _used_by, _crf, _last_reference}}] ->
@@ -26,13 +26,13 @@ defmodule Pleroma.Gun.ConnectionPool do
         # :gun.set_owner fails in :connected state for whatevever reason,
         # so we open the connection in the process directly and send it's pid back
         # We trust gun to handle timeouts by itself
-        case WorkerSupervisor.start_worker([uri, key, opts, self()]) do
+        case WorkerSupervisor.start_worker([key, uri, opts, self()]) do
           {:ok, _worker_pid} ->
             receive do
               {:conn_pid, pid} -> {:ok, pid}
             end
 
-          {:error, {:error, {:already_registered, worker_pid}}} ->
+          {:error, {:already_started, worker_pid}} ->
             get_gun_pid_from_worker(worker_pid)
 
           err ->
@@ -56,6 +56,8 @@ defmodule Pleroma.Gun.ConnectionPool do
   end
 
   def release_conn(conn_pid) do
+    # :ets.fun2ms(fn {_, {worker_pid, {gun_pid, _, _, _}}} when gun_pid == conn_pid ->
+    #    worker_pid end)
     query_result =
       Registry.select(@registry, [
         {{:_, :"$1", {:"$2", :_, :_, :_}}, [{:==, :"$2", conn_pid}], [:"$1"]}
index 25fafc64c3c4de58f9db4f28f2874292f1ab27fb..0a94f16a2db9d5e19a447bcd90b16dbda4054c88 100644 (file)
@@ -4,20 +4,19 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do
 
   @registry Pleroma.Gun.ConnectionPool
 
-  def start_link(opts) do
-    GenServer.start_link(__MODULE__, opts)
+  def start_link([key | _] = opts) do
+    GenServer.start_link(__MODULE__, opts, name: {:via, Registry, {@registry, key}})
   end
 
   @impl true
-  def init([uri, key, opts, client_pid]) do
-    time = :os.system_time(:second)
-    # Register before opening connection to prevent race conditions
-    with {:ok, _owner} <- Registry.register(@registry, key, {nil, [client_pid], 1, time}),
-         {:ok, conn_pid} <- Gun.Conn.open(uri, opts),
+  def init([key, uri, opts, client_pid]) do
+    with {:ok, conn_pid} <- Gun.Conn.open(uri, opts),
          Process.link(conn_pid) do
+      time = :os.system_time(:second)
+
       {_, _} =
-        Registry.update_value(@registry, key, fn {_, used_by, crf, last_reference} ->
-          {conn_pid, used_by, crf, last_reference}
+        Registry.update_value(@registry, key, fn _ ->
+          {conn_pid, [client_pid], 1, time}
         end)
 
       send(client_pid, {:conn_pid, conn_pid})
index 5b546bd874c58456e542f44747b877c27c8224f2..d090c034e66d177b17f7d96beec5aabc4ff9c3a5 100644 (file)
@@ -1,5 +1,5 @@
 defmodule Pleroma.Gun.ConnectionPool.WorkerSupervisor do
-  @doc "Supervisor for pool workers. Does not do anything except enforce max connection limit"
+  @moduledoc "Supervisor for pool workers. Does not do anything except enforce max connection limit"
 
   use DynamicSupervisor
 
@@ -35,7 +35,6 @@ defmodule Pleroma.Gun.ConnectionPool.WorkerSupervisor do
         pid =
           spawn(fn ->
             {:ok, _pid} = Registry.register(@registry, @enforcer_key, nil)
-
             max_connections = Pleroma.Config.get([:connections_pool, :max_connections])
 
             reclaim_max =