Replace Pleroma.Jobs with `pleroma_job_queue`
authorEgor <egor@kislitsyn.com>
Fri, 29 Mar 2019 12:46:05 +0000 (12:46 +0000)
committerlambda <lain@soykaf.club>
Fri, 29 Mar 2019 12:46:05 +0000 (12:46 +0000)
config/config.exs
config/test.exs
docs/config.md
lib/pleroma/application.ex
lib/pleroma/emails/mailer.ex
lib/pleroma/jobs.ex [deleted file]
lib/pleroma/web/federator/federator.ex
mix.exs
mix.lock
test/jobs_test.exs [deleted file]
test/support/jobs_worker_mock.ex [deleted file]

index bd8922b772355b5b0cf21ac651bd6486b1042d17..0df38d75ac8fe83606f9afde0dc7c585c63b707c 100644 (file)
@@ -348,10 +348,10 @@ config :pleroma, Pleroma.Web.Federator.RetryQueue,
   initial_timeout: 30,
   max_retries: 5
 
-config :pleroma, Pleroma.Jobs,
-  federator_incoming: [max_jobs: 50],
-  federator_outgoing: [max_jobs: 50],
-  mailer: [max_jobs: 10]
+config :pleroma_job_queue, :queues,
+  federator_incoming: 50,
+  federator_outgoing: 50,
+  mailer: 10
 
 config :pleroma, :fetch_initial_posts,
   enabled: false,
index 3691e5bd1070af80c04b8bcf260adf47074b9d57..6a7b9067eebcd0fa6e42a952adc44623fad8ffb4 100644 (file)
@@ -48,7 +48,7 @@ config :web_push_encryption, :vapid_details,
 
 config :web_push_encryption, :http_client, Pleroma.Web.WebPushHttpClientMock
 
-config :pleroma, Pleroma.Jobs, testing: [max_jobs: 2]
+config :pleroma_job_queue, disabled: true
 
 try do
   import_config "test.secret.exs"
index c1246ee25760ef833ff2f4e5457ee41f69ab552c..c206358ab89271a78d21963096b84a955572d062 100644 (file)
@@ -253,25 +253,20 @@ You can then do
 curl "http://localhost:4000/api/pleroma/admin/invite_token?admin_token=somerandomtoken"
 ```
 
-## Pleroma.Jobs
+## :pleroma_job_queue
 
-A list of job queues and their settings.
-
-Job queue settings:
-
-* `max_jobs`: The maximum amount of parallel jobs running at the same time.
+[Pleroma Job Queue][https://git.pleroma.social/pleroma/pleroma_job_queue] configuration: a list of queues with maximum concurrent jobs.
 
 Example:
 
-```exs
-config :pleroma, Pleroma.Jobs,
-  federator_incoming: [max_jobs: 50],
-  federator_outgoing: [max_jobs: 50]
+```elixir
+config :pleroma_job_queue, :queues,
+  federator_incoming: 50,
+  federator_outgoing: 50
 ```
 
 This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the `max_jobs` set to `50`.
 
-
 ## Pleroma.Web.Federator.RetryQueue
 
 * `enabled`: If set to `true`, failed federation jobs will be retried
index cc81e180565eb84ea0399692b5c48b5a4762c196..782d1d58997709635bb51099e8a6e6c982e869b4 100644 (file)
@@ -110,7 +110,6 @@ defmodule Pleroma.Application do
           worker(Pleroma.Web.Federator.RetryQueue, []),
           worker(Pleroma.Stats, []),
           worker(Pleroma.Web.Push, []),
-          worker(Pleroma.Jobs, []),
           worker(Task, [&Pleroma.Web.Federator.init/0], restart: :temporary)
         ] ++
         streamer_child() ++
index f7e3aa78b3ccc0f7b219dfbdce46d1019c5e482f..b384e6fecaff5fc0f5a050a551774342cc072c0a 100644 (file)
@@ -6,7 +6,7 @@ defmodule Pleroma.Mailer do
   use Swoosh.Mailer, otp_app: :pleroma
 
   def deliver_async(email, config \\ []) do
-    Pleroma.Jobs.enqueue(:mailer, __MODULE__, [:deliver_async, email, config])
+    PleromaJobQueue.enqueue(:mailer, __MODULE__, [:deliver_async, email, config])
   end
 
   def perform(:deliver_async, email, config), do: deliver(email, config)
diff --git a/lib/pleroma/jobs.ex b/lib/pleroma/jobs.ex
deleted file mode 100644 (file)
index 24b7e5e..0000000
+++ /dev/null
@@ -1,152 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Jobs do
-  @moduledoc """
-  A basic job queue
-  """
-  use GenServer
-
-  require Logger
-
-  def init(args) do
-    {:ok, args}
-  end
-
-  def start_link do
-    queues =
-      Pleroma.Config.get(Pleroma.Jobs)
-      |> Enum.map(fn {name, _} -> create_queue(name) end)
-      |> Enum.into(%{})
-
-    state = %{
-      queues: queues,
-      refs: %{}
-    }
-
-    GenServer.start_link(__MODULE__, state, name: __MODULE__)
-  end
-
-  def create_queue(name) do
-    {name, {:sets.new(), []}}
-  end
-
-  @doc """
-  Enqueues a job.
-
-  Returns `:ok`.
-
-  ## Arguments
-
-  - `queue_name` - a queue name(must be specified in the config).
-  - `mod` - a worker module (must have `perform` function).
-  - `args` - a list of arguments for the `perform` function of the worker module.
-  - `priority` - a job priority (`0` by default).
-
-  ## Examples
-
-  Enqueue `Module.perform/0` with `priority=1`:
-
-      iex> Pleroma.Jobs.enqueue(:example_queue, Module, [])
-      :ok
-
-  Enqueue `Module.perform(:job_name)` with `priority=5`:
-
-      iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:job_name], 5)
-      :ok
-
-  Enqueue `Module.perform(:another_job, data)` with `priority=1`:
-
-      iex> data = "foobar"
-      iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:another_job, data])
-      :ok
-
-  Enqueue `Module.perform(:foobar_job, :foo, :bar, 42)` with `priority=1`:
-
-      iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:foobar_job, :foo, :bar, 42])
-      :ok
-
-  """
-
-  def enqueue(queue_name, mod, args, priority \\ 1)
-
-  if Mix.env() == :test do
-    def enqueue(_queue_name, mod, args, _priority) do
-      apply(mod, :perform, args)
-    end
-  else
-    @spec enqueue(atom(), atom(), [any()], integer()) :: :ok
-    def enqueue(queue_name, mod, args, priority) do
-      GenServer.cast(__MODULE__, {:enqueue, queue_name, mod, args, priority})
-    end
-  end
-
-  def handle_cast({:enqueue, queue_name, mod, args, priority}, state) do
-    {running_jobs, queue} = state[:queues][queue_name]
-
-    queue = enqueue_sorted(queue, {mod, args}, priority)
-
-    state =
-      state
-      |> update_queue(queue_name, {running_jobs, queue})
-      |> maybe_start_job(queue_name, running_jobs, queue)
-
-    {:noreply, state}
-  end
-
-  def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
-    queue_name = state.refs[ref]
-
-    {running_jobs, queue} = state[:queues][queue_name]
-
-    running_jobs = :sets.del_element(ref, running_jobs)
-
-    state =
-      state
-      |> remove_ref(ref)
-      |> update_queue(queue_name, {running_jobs, queue})
-      |> maybe_start_job(queue_name, running_jobs, queue)
-
-    {:noreply, state}
-  end
-
-  def maybe_start_job(state, queue_name, running_jobs, queue) do
-    if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, queue_name, :max_jobs]) &&
-         queue != [] do
-      {{mod, args}, queue} = queue_pop(queue)
-      {:ok, pid} = Task.start(fn -> apply(mod, :perform, args) end)
-      mref = Process.monitor(pid)
-
-      state
-      |> add_ref(queue_name, mref)
-      |> update_queue(queue_name, {:sets.add_element(mref, running_jobs), queue})
-    else
-      state
-    end
-  end
-
-  def enqueue_sorted(queue, element, priority) do
-    [%{item: element, priority: priority} | queue]
-    |> Enum.sort_by(fn %{priority: priority} -> priority end)
-  end
-
-  def queue_pop([%{item: element} | queue]) do
-    {element, queue}
-  end
-
-  defp add_ref(state, queue_name, ref) do
-    refs = Map.put(state[:refs], ref, queue_name)
-    Map.put(state, :refs, refs)
-  end
-
-  defp remove_ref(state, ref) do
-    refs = Map.delete(state[:refs], ref)
-    Map.put(state, :refs, refs)
-  end
-
-  defp update_queue(state, queue_name, data) do
-    queues = Map.put(state[:queues], queue_name, data)
-    Map.put(state, :queues, queues)
-  end
-end
index 5e690ddb84446e08bb64c99e95aa2660bbaf9740..c47328e13a4e096182ffc97ae1688f7c559eb099 100644 (file)
@@ -4,7 +4,6 @@
 
 defmodule Pleroma.Web.Federator do
   alias Pleroma.Activity
-  alias Pleroma.Jobs
   alias Pleroma.User
   alias Pleroma.Web.ActivityPub.ActivityPub
   alias Pleroma.Web.ActivityPub.Relay
@@ -31,39 +30,39 @@ defmodule Pleroma.Web.Federator do
   # Client API
 
   def incoming_doc(doc) do
-    Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
+    PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
   end
 
   def incoming_ap_doc(params) do
-    Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
+    PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
   end
 
   def publish(activity, priority \\ 1) do
-    Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
+    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
   end
 
   def publish_single_ap(params) do
-    Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_ap, params])
+    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_ap, params])
   end
 
   def publish_single_websub(websub) do
-    Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_websub, websub])
+    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_websub, websub])
   end
 
   def verify_websub(websub) do
-    Jobs.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
+    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
   end
 
   def request_subscription(sub) do
-    Jobs.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
+    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
   end
 
   def refresh_subscriptions do
-    Jobs.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
+    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
   end
 
   def publish_single_salmon(params) do
-    Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_salmon, params])
+    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_salmon, params])
   end
 
   # Job Worker Callbacks
diff --git a/mix.exs b/mix.exs
index 6614304645f11fe65e3b1baa66ead185d51c7baa..333f21a914d71e24c5c30687aeb258e1324323ab 100644 (file)
--- a/mix.exs
+++ b/mix.exs
@@ -93,7 +93,8 @@ defmodule Pleroma.Mixfile do
       {:timex, "~> 3.5"},
       {:auto_linker,
        git: "https://git.pleroma.social/pleroma/auto_linker.git",
-       ref: "94193ca5f97c1f9fdf3d1469653e2d46fac34bcd"}
+       ref: "94193ca5f97c1f9fdf3d1469653e2d46fac34bcd"},
+      {:pleroma_job_queue, "~> 0.2.0"}
     ]
   end
 
index 05eaa1d69eb42b93faa9df54260b558de22a32d4..f401258e9de7bd253b154e76e1fd6d25115ec5c6 100644 (file)
--- a/mix.lock
+++ b/mix.lock
@@ -50,6 +50,7 @@
   "phoenix_ecto": {:hex, :phoenix_ecto, "4.0.0", "c43117a136e7399ea04ecaac73f8f23ee0ffe3e07acfcb8062fe5f4c9f0f6531", [:mix], [{:ecto, "~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.9", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"},
   "phoenix_html": {:hex, :phoenix_html, "2.13.1", "fa8f034b5328e2dfa0e4131b5569379003f34bc1fafdaa84985b0b9d2f12e68b", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"},
   "phoenix_pubsub": {:hex, :phoenix_pubsub, "1.1.1", "6668d787e602981f24f17a5fbb69cc98f8ab085114ebfac6cc36e10a90c8e93c", [:mix], [], "hexpm"},
+  "pleroma_job_queue": {:hex, :pleroma_job_queue, "0.2.0", "879e660aa1cebe8dc6f0aaaa6aa48b4875e89cd961d4a585fd128e0773b31a18", [:mix], [], "hexpm"},
   "plug": {:hex, :plug, "1.7.2", "d7b7db7fbd755e8283b6c0a50be71ec0a3d67d9213d74422d9372effc8e87fd1", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}], "hexpm"},
   "plug_cowboy": {:hex, :plug_cowboy, "2.0.1", "d798f8ee5acc86b7d42dbe4450b8b0dadf665ce588236eb0a751a132417a980e", [:mix], [{:cowboy, "~> 2.5", [hex: :cowboy, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"},
   "plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm"},
diff --git a/test/jobs_test.exs b/test/jobs_test.exs
deleted file mode 100644 (file)
index d55c86c..0000000
+++ /dev/null
@@ -1,83 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.JobsTest do
-  use ExUnit.Case, async: true
-
-  alias Jobs.WorkerMock
-  alias Pleroma.Jobs
-
-  setup do
-    state = %{
-      queues: Enum.into([Jobs.create_queue(:testing)], %{}),
-      refs: %{}
-    }
-
-    [state: state]
-  end
-
-  test "creates queue" do
-    queue = Jobs.create_queue(:foobar)
-
-    assert {:foobar, set} = queue
-    assert :set == elem(set, 0) |> elem(0)
-  end
-
-  test "enqueues an element according to priority" do
-    queue = [%{item: 1, priority: 2}]
-
-    new_queue = Jobs.enqueue_sorted(queue, 2, 1)
-    assert new_queue == [%{item: 2, priority: 1}, %{item: 1, priority: 2}]
-
-    new_queue = Jobs.enqueue_sorted(queue, 2, 3)
-    assert new_queue == [%{item: 1, priority: 2}, %{item: 2, priority: 3}]
-  end
-
-  test "pop first item" do
-    queue = [%{item: 2, priority: 1}, %{item: 1, priority: 2}]
-
-    assert {2, [%{item: 1, priority: 2}]} = Jobs.queue_pop(queue)
-  end
-
-  test "enqueue a job", %{state: state} do
-    assert {:noreply, new_state} =
-             Jobs.handle_cast({:enqueue, :testing, WorkerMock, [:test_job, :foo, :bar], 3}, state)
-
-    assert %{queues: %{testing: {running_jobs, []}}, refs: _} = new_state
-    assert :sets.size(running_jobs) == 1
-    assert [ref] = :sets.to_list(running_jobs)
-    assert %{refs: %{^ref => :testing}} = new_state
-  end
-
-  test "max jobs setting", %{state: state} do
-    max_jobs = Pleroma.Config.get([Jobs, :testing, :max_jobs])
-
-    {:noreply, state} =
-      Enum.reduce(1..(max_jobs + 1), {:noreply, state}, fn _, {:noreply, state} ->
-        Jobs.handle_cast({:enqueue, :testing, WorkerMock, [:test_job, :foo, :bar], 3}, state)
-      end)
-
-    assert %{
-             queues: %{
-               testing:
-                 {running_jobs, [%{item: {WorkerMock, [:test_job, :foo, :bar]}, priority: 3}]}
-             }
-           } = state
-
-    assert :sets.size(running_jobs) == max_jobs
-  end
-
-  test "remove job after it finished", %{state: state} do
-    {:noreply, new_state} =
-      Jobs.handle_cast({:enqueue, :testing, WorkerMock, [:test_job, :foo, :bar], 3}, state)
-
-    %{queues: %{testing: {running_jobs, []}}} = new_state
-    [ref] = :sets.to_list(running_jobs)
-
-    assert {:noreply, %{queues: %{testing: {running_jobs, []}}, refs: %{}}} =
-             Jobs.handle_info({:DOWN, ref, :process, nil, nil}, new_state)
-
-    assert :sets.size(running_jobs) == 0
-  end
-end
diff --git a/test/support/jobs_worker_mock.ex b/test/support/jobs_worker_mock.ex
deleted file mode 100644 (file)
index 0fb976d..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Jobs.WorkerMock do
-  require Logger
-
-  def perform(:test_job, arg, arg2) do
-    Logger.debug({:perform, :test_job, arg, arg2})
-  end
-
-  def perform(:test_job, payload) do
-    Logger.debug({:perform, :test_job, payload})
-  end
-
-  def test_job(payload) do
-    Pleroma.Jobs.enqueue(:testing, __MODULE__, [:test_job, payload])
-  end
-end