Connection pool: Add client death tracking
authorrinpatch <rinpatch@sdf.org>
Fri, 8 May 2020 15:18:59 +0000 (18:18 +0300)
committerrinpatch <rinpatch@sdf.org>
Wed, 15 Jul 2020 12:26:35 +0000 (15:26 +0300)
While running this in production I noticed a number of ghost
processes with all their clients dead before they released the connection,
so let's track them to log it and remove them from clients

lib/pleroma/gun/connection_pool/worker.ex
lib/pleroma/telemetry/logger.ex

index 0a94f16a2db9d5e19a447bcd90b16dbda4054c88..8467325f3858a77d0094395548eb3cea0b6ad0ab 100644 (file)
@@ -20,7 +20,10 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do
         end)
 
       send(client_pid, {:conn_pid, conn_pid})
-      {:ok, %{key: key, timer: nil}, :hibernate}
+
+      {:ok,
+       %{key: key, timer: nil, client_monitors: %{client_pid => Process.monitor(client_pid)}},
+       :hibernate}
     else
       err -> {:stop, err}
     end
@@ -45,6 +48,9 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do
         state
       end
 
+    ref = Process.monitor(client_pid)
+
+    state = put_in(state.client_monitors[client_pid], ref)
     {:noreply, state, :hibernate}
   end
 
@@ -55,6 +61,9 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do
         {conn_pid, List.delete(used_by, client_pid), crf, last_reference}
       end)
 
+    {ref, state} = pop_in(state.client_monitors[client_pid])
+    Process.demonitor(ref)
+
     timer =
       if used_by == [] do
         max_idle = Pleroma.Config.get([:connections_pool, :max_idle_time], 30_000)
@@ -85,6 +94,26 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do
     {:stop, {:error, down_message}, state}
   end
 
+  @impl true
+  def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
+    # Sometimes the client is dead before we demonitor it in :remove_client, so the message
+    # arrives anyway
+
+    case state.client_monitors[pid] do
+      nil ->
+        {:noreply, state, :hibernate}
+
+      _ref ->
+        :telemetry.execute(
+          [:pleroma, :connection_pool, :client_death],
+          %{client_pid: pid, reason: reason},
+          %{key: state.key}
+        )
+
+        handle_cast({:remove_client, pid}, state)
+    end
+  end
+
   # LRFU policy: https://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.55.1478
   defp crf(time_delta, prev_crf) do
     1 + :math.pow(0.5, time_delta / 100) * prev_crf
index d76dd37b5ca3e83ea33ab6560f143ab8d462ac29..4cacae02f9bd010cea475f03dd50b6b2fc4d5ecc 100644 (file)
@@ -6,7 +6,8 @@ defmodule Pleroma.Telemetry.Logger do
   @events [
     [:pleroma, :connection_pool, :reclaim, :start],
     [:pleroma, :connection_pool, :reclaim, :stop],
-    [:pleroma, :connection_pool, :provision_failure]
+    [:pleroma, :connection_pool, :provision_failure],
+    [:pleroma, :connection_pool, :client_death]
   ]
   def attach do
     :telemetry.attach_many("pleroma-logger", @events, &handle_event/4, [])
@@ -59,4 +60,17 @@ defmodule Pleroma.Telemetry.Logger do
       "Connection pool had to refuse opening a connection to #{key} due to connection limit exhaustion"
     end)
   end
+
+  def handle_event(
+        [:pleroma, :connection_pool, :client_death],
+        %{client_pid: client_pid, reason: reason},
+        %{key: key},
+        _
+      ) do
+    Logger.warn(fn ->
+      "Pool worker for #{key}: Client #{inspect(client_pid)} died before releasing the connection with #{
+        inspect(reason)
+      }"
+    end)
+  end
 end