Add Pleroma.JobQueueMonitor
authorEgor Kislitsyn <egor@kislitsyn.com>
Thu, 26 Sep 2019 11:49:57 +0000 (18:49 +0700)
committerEgor Kislitsyn <egor@kislitsyn.com>
Thu, 26 Sep 2019 11:49:57 +0000 (18:49 +0700)
lib/pleroma/application.ex
lib/pleroma/healthcheck.ex
lib/pleroma/job_queue_monitor.ex [new file with mode: 0644]
lib/pleroma/workers/worker_helper.ex
test/healthcheck_test.exs

index 7aec2c5456915f1cc816b166973257e8ef98c0e6..3e21d440368f9c6d565c3f71b55f0ee7270d9f93 100644 (file)
@@ -42,6 +42,7 @@ defmodule Pleroma.Application do
         hackney_pool_children() ++
         [
           Pleroma.Stats,
+          Pleroma.JobQueueMonitor,
           {Oban, Pleroma.Config.get(Oban)}
         ] ++
         task_children(@env) ++
index 977b78c268d8ab59f2af297069a6eb44966725bc..fc212981523178777fc7eb91bce2a293bd0bd8a4 100644 (file)
@@ -14,6 +14,7 @@ defmodule Pleroma.Healthcheck do
             active: 0,
             idle: 0,
             memory_used: 0,
+            job_queue_stats: nil,
             healthy: true
 
   @type t :: %__MODULE__{
@@ -21,6 +22,7 @@ defmodule Pleroma.Healthcheck do
           active: non_neg_integer(),
           idle: non_neg_integer(),
           memory_used: number(),
+          job_queue_stats: map(),
           healthy: boolean()
         }
 
@@ -30,6 +32,7 @@ defmodule Pleroma.Healthcheck do
       memory_used: Float.round(:erlang.memory(:total) / 1024 / 1024, 2)
     }
     |> assign_db_info()
+    |> assign_job_queue_stats()
     |> check_health()
   end
 
@@ -55,6 +58,11 @@ defmodule Pleroma.Healthcheck do
     Map.merge(healthcheck, db_info)
   end
 
+  defp assign_job_queue_stats(healthcheck) do
+    stats = Pleroma.JobQueueMonitor.stats()
+    Map.put(healthcheck, :job_queue_stats, stats)
+  end
+
   @spec check_health(Healthcheck.t()) :: Healthcheck.t()
   def check_health(%{pool_size: pool_size, active: active} = check)
       when active >= pool_size do
diff --git a/lib/pleroma/job_queue_monitor.ex b/lib/pleroma/job_queue_monitor.ex
new file mode 100644 (file)
index 0000000..685ba2e
--- /dev/null
@@ -0,0 +1,115 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.JobQueueMonitor do
+  use GenServer
+
+  @initial_state %{workers: %{}, queues: %{}, processed_jobs: 0, enqueued: 0}
+  @queue %{processed_jobs: 0, success: 0, failure: 0, enqueued: 0}
+  @operation %{processed_jobs: 0, success: 0, failure: 0, enqueued: 0}
+
+  def start_link(_) do
+    GenServer.start_link(__MODULE__, @initial_state, name: __MODULE__)
+  end
+
+  @impl true
+  def init(state) do
+    :telemetry.attach("oban-monitor-failure", [:oban, :failure], &handle_event/4, nil)
+    :telemetry.attach("oban-monitor-success", [:oban, :success], &handle_event/4, nil)
+
+    {:ok, state}
+  end
+
+  def stats do
+    GenServer.call(__MODULE__, :stats)
+  end
+
+  def enqueue({:ok, job}) do
+    meta = Map.take(job, [:args, :queue, :worker])
+    GenServer.cast(__MODULE__, {:process_enqueue, meta})
+
+    {:ok, job}
+  end
+
+  def enqueue(result), do: result
+
+  def handle_event([:oban, status], %{duration: duration}, meta, _) do
+    GenServer.cast(__MODULE__, {:process_event, status, duration, meta})
+  end
+
+  @impl true
+  def handle_call(:stats, _from, state) do
+    {:reply, state, state}
+  end
+
+  def handle_cast({:process_enqueue, meta}, state) do
+    state =
+      state
+      |> Map.update!(:workers, fn workers ->
+        workers
+        |> Map.put_new(meta.worker, %{})
+        |> Map.update!(meta.worker, &update_worker(&1, :enqueue, meta))
+      end)
+      |> Map.update!(:queues, fn workers ->
+        workers
+        |> Map.put_new(meta.queue, @queue)
+        |> Map.update!(meta.queue, fn queue -> Map.update!(queue, :enqueued, &(&1 + 1)) end)
+      end)
+      |> Map.update!(:enqueued, &(&1 + 1))
+
+    {:noreply, state}
+  end
+
+  @impl true
+  def handle_cast({:process_event, status, duration, meta}, state) do
+    state =
+      state
+      |> Map.update!(:workers, fn workers ->
+        workers
+        |> Map.put_new(meta.worker, %{})
+        |> Map.update!(meta.worker, &update_worker(&1, status, meta, duration))
+      end)
+      |> Map.update!(:queues, fn workers ->
+        workers
+        |> Map.put_new(meta.queue, @queue)
+        |> Map.update!(meta.queue, &update_queue(&1, status, meta, duration))
+      end)
+      |> Map.update!(:processed_jobs, &(&1 + 1))
+      |> decr_enqueued()
+
+    {:noreply, state}
+  end
+
+  defp update_worker(worker, status, meta, duration \\ 0) do
+    worker
+    |> Map.put_new(meta.args["op"], @operation)
+    |> Map.update!(meta.args["op"], &update_op(&1, status, meta, duration))
+  end
+
+  defp update_op(op, :enqueue, _meta, _duration) do
+    op
+    |> Map.update!(:enqueued, &(&1 + 1))
+  end
+
+  defp update_op(op, status, _meta, _duration) do
+    op
+    |> Map.update!(:processed_jobs, &(&1 + 1))
+    |> Map.update!(status, &(&1 + 1))
+    |> decr_enqueued()
+  end
+
+  defp update_queue(queue, status, _meta, _duration) do
+    queue
+    |> Map.update!(:processed_jobs, &(&1 + 1))
+    |> Map.update!(status, &(&1 + 1))
+    |> decr_enqueued()
+  end
+
+  defp decr_enqueued(map) do
+    Map.update!(map, :enqueued, fn
+      0 -> 0
+      enqueued -> enqueued - 1
+    end)
+  end
+end
index 358efa14a2c170073eeb81612c92628f211b1fbb..a43ce8bc07df80659b96e253b9c49f7b802f14a9 100644 (file)
@@ -40,6 +40,7 @@ defmodule Pleroma.Workers.WorkerHelper do
         unquote(caller_module)
         |> apply(:new, [params, worker_args])
         |> Pleroma.Repo.insert()
+        |> Pleroma.JobQueueMonitor.enqueue()
       end
     end
   end
index 6bb8d5b7fff026a91155b596f784181093d44304..66d5026ff417223964f9e90120cbb2e8e2549e8b 100644 (file)
@@ -9,7 +9,14 @@ defmodule Pleroma.HealthcheckTest do
   test "system_info/0" do
     result = Healthcheck.system_info() |> Map.from_struct()
 
-    assert Map.keys(result) == [:active, :healthy, :idle, :memory_used, :pool_size]
+    assert Map.keys(result) == [
+             :active,
+             :healthy,
+             :idle,
+             :job_queue_stats,
+             :memory_used,
+             :pool_size
+           ]
   end
 
   describe "check_health/1" do