add a job queue
[akkoma] / lib / pleroma / jobs.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Jobs do
6 @moduledoc """
7 A basic job queue
8 """
9 use GenServer
10
11 require Logger
12
13 def init(args) do
14 {:ok, args}
15 end
16
17 def start_link do
18 queues =
19 Pleroma.Config.get(Pleroma.Jobs)
20 |> Enum.map(fn {name, _} -> create_queue(name) end)
21 |> Enum.into(%{})
22
23 state = %{
24 queues: queues,
25 refs: %{}
26 }
27
28 GenServer.start_link(__MODULE__, state, name: __MODULE__)
29 end
30
31 def create_queue(name) do
32 {name, {:sets.new(), []}}
33 end
34
35 @doc """
36 Enqueues a job.
37
38 Returns `:ok`.
39
40 ## Arguments
41
42 - `queue_name` - a queue name(must be specified in the config).
43 - `mod` - a worker module, must have `perform` function.
44 - `args` - a list of arguments for the `perform` function of the worker module.
45 - `priority` - a job priority (`0` by default).
46
47 ## Examples
48
49 Enqueue `Module.perform/0` with `priority=1`:
50
51 iex> Pleroma.Jobs.enqueue(:example_queue, Module, [])
52 :ok
53
54 Enqueue `Module.perform(:job_name)` with `priority=5`:
55
56 iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:job_name], 5)
57 :ok
58
59 Enqueue `Module.perform(:another_job, data)` with `priority=1`:
60
61 iex> data = "foobar"
62 iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:another_job, data])
63 :ok
64
65 Enqueue `Module.perform(:foobar_job, :foo, :bar, 42)` with `priority=1`:
66
67 iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:foobar_job, :foo, :bar, 42])
68 :ok
69
70 """
71
72 def enqueue(queue_name, mod, args, priority \\ 1)
73
74 if Mix.env() == :test do
75 def enqueue(_queue_name, mod, args, _priority) do
76 apply(mod, :perform, args)
77 end
78 else
79 @spec enqueue(atom(), atom(), [any()], integer()) :: :ok
80 def enqueue(queue_name, mod, args, priority \\ 1) do
81 GenServer.cast(__MODULE__, {:enqueue, queue_name, mod, args, priority})
82 end
83 end
84
85 def handle_cast({:enqueue, queue_name, mod, args, priority}, state) do
86 {running_jobs, queue} = state[:queues][queue_name]
87
88 queue = enqueue_sorted(queue, {mod, args}, priority)
89
90 state =
91 state
92 |> update_queue(queue_name, {running_jobs, queue})
93 |> maybe_start_job(queue_name, running_jobs, queue)
94
95 {:noreply, state}
96 end
97
98 def handle_cast(m, state) do
99 IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}")
100 {:noreply, state}
101 end
102
103 def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
104 queue_name = state.refs[ref]
105
106 {running_jobs, queue} = state[:queues][queue_name]
107
108 running_jobs = :sets.del_element(ref, running_jobs)
109
110 state = state |> remove_ref(ref) |> maybe_start_job(queue_name, running_jobs, queue)
111
112 {:noreply, state}
113 end
114
115 def maybe_start_job(state, queue_name, running_jobs, queue) do
116 if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, queue_name, :max_jobs]) &&
117 queue != [] do
118 {{mod, args}, queue} = queue_pop(queue)
119 {:ok, pid} = Task.start(fn -> apply(mod, :perform, args) end)
120 mref = Process.monitor(pid)
121
122 state
123 |> add_ref(queue_name, mref)
124 |> update_queue(queue_name, {:sets.add_element(mref, running_jobs), queue})
125 else
126 update_queue(state, queue_name, {running_jobs, queue})
127 end
128 end
129
130 def enqueue_sorted(queue, element, priority) do
131 [%{item: element, priority: priority} | queue]
132 |> Enum.sort_by(fn %{priority: priority} -> priority end)
133 end
134
135 def queue_pop([%{item: element} | queue]) do
136 {element, queue}
137 end
138
139 defp add_ref(state, queue_name, ref) do
140 refs = Map.put(state[:refs], ref, queue_name)
141 Map.put(state, :refs, refs)
142 end
143
144 defp remove_ref(state, ref) do
145 refs = Map.delete(state[:refs], ref)
146 Map.put(state, :refs, refs)
147 end
148
149 defp update_queue(state, queue_name, data) do
150 queues = Map.put(state[:queues], queue_name, data)
151 Map.put(state, :queues, queues)
152 end
153 end