Merge remote-tracking branch 'pleroma/develop' into object-tombstone-visibility
[akkoma] / lib / pleroma / gun / connection_pool / worker.ex
index fec9d0efa9daa0323a02e26159c491e00313424d..a3fa75386946f198d4eb3158c65aa6c5b1daa277 100644 (file)
@@ -1,11 +1,15 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
 defmodule Pleroma.Gun.ConnectionPool.Worker do
   alias Pleroma.Gun
   use GenServer, restart: :temporary
 
-  @registry Pleroma.Gun.ConnectionPool
+  defp registry, do: Pleroma.Gun.ConnectionPool
 
   def start_link([key | _] = opts) do
-    GenServer.start_link(__MODULE__, opts, name: {:via, Registry, {@registry, key}})
+    GenServer.start_link(__MODULE__, opts, name: {:via, Registry, {registry(), key}})
   end
 
   @impl true
@@ -15,20 +19,24 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do
 
   @impl true
   def handle_continue({:connect, [key, uri, opts, client_pid]}, _) do
-    with {:ok, conn_pid} <- Gun.Conn.open(uri, opts),
+    with {:ok, conn_pid, protocol} <- Gun.Conn.open(uri, opts),
          Process.link(conn_pid) do
       time = :erlang.monotonic_time(:millisecond)
 
       {_, _} =
-        Registry.update_value(@registry, key, fn _ ->
+        Registry.update_value(registry(), key, fn _ ->
           {conn_pid, [client_pid], 1, time}
         end)
 
       send(client_pid, {:conn_pid, conn_pid})
 
       {:noreply,
-       %{key: key, timer: nil, client_monitors: %{client_pid => Process.monitor(client_pid)}},
-       :hibernate}
+       %{
+         key: key,
+         timer: nil,
+         client_monitors: %{client_pid => Process.monitor(client_pid)},
+         protocol: protocol
+       }, :hibernate}
     else
       err ->
         {:stop, {:shutdown, err}, nil}
@@ -53,14 +61,20 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do
   end
 
   @impl true
-  def handle_call(:add_client, {client_pid, _}, %{key: key} = state) do
+  def handle_call(:add_client, {client_pid, _}, %{key: key, protocol: protocol} = state) do
     time = :erlang.monotonic_time(:millisecond)
 
-    {{conn_pid, _, _, _}, _} =
-      Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} ->
+    {{conn_pid, used_by, _, _}, _} =
+      Registry.update_value(registry(), key, fn {conn_pid, used_by, crf, last_reference} ->
         {conn_pid, [client_pid | used_by], crf(time - last_reference, crf), time}
       end)
 
+    :telemetry.execute(
+      [:pleroma, :connection_pool, :client, :add],
+      %{client_pid: client_pid, clients: used_by},
+      %{key: state.key, protocol: protocol}
+    )
+
     state =
       if state.timer != nil do
         Process.cancel_timer(state[:timer])
@@ -78,12 +92,13 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do
   @impl true
   def handle_call(:remove_client, {client_pid, _}, %{key: key} = state) do
     {{_conn_pid, used_by, _crf, _last_reference}, _} =
-      Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} ->
+      Registry.update_value(registry(), key, fn {conn_pid, used_by, crf, last_reference} ->
         {conn_pid, List.delete(used_by, client_pid), crf, last_reference}
       end)
 
     {ref, state} = pop_in(state.client_monitors[client_pid])
-    Process.demonitor(ref)
+
+    Process.demonitor(ref, [:flush])
 
     timer =
       if used_by == [] do
@@ -103,22 +118,27 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do
     {:stop, :normal, state}
   end
 
+  @impl true
+  def handle_info({:gun_up, _pid, _protocol}, state) do
+    {:noreply, state, :hibernate}
+  end
+
   # Gracefully shutdown if the connection got closed without any streams left
   @impl true
   def handle_info({:gun_down, _pid, _protocol, _reason, []}, state) do
     {:stop, :normal, state}
   end
 
-  # Otherwise, shutdown with an error
+  # Otherwise, wait for retry
   @impl true
-  def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams} = down_message, state) do
-    {:stop, {:error, down_message}, state}
+  def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams}, state) do
+    {:noreply, state, :hibernate}
   end
 
   @impl true
   def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
     :telemetry.execute(
-      [:pleroma, :connection_pool, :client_death],
+      [:pleroma, :connection_pool, :client, :dead],
       %{client_pid: pid, reason: reason},
       %{key: state.key}
     )