[#1149] Upgraded `oban` from 0.6.0 to 0.7.1.
[akkoma] / lib / pleroma / scheduled_activity_worker.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.ScheduledActivityWorker do
6 @moduledoc """
7 Sends scheduled activities to the job queue.
8 """
9
10 alias Pleroma.Config
11 alias Pleroma.Repo
12 alias Pleroma.ScheduledActivity
13 alias Pleroma.User
14 alias Pleroma.Web.CommonAPI
15
16 use GenServer
17 require Logger
18
19 @schedule_interval :timer.minutes(1)
20
21 defdelegate worker_args(queue), to: Pleroma.Workers.Helper
22
23 def start_link(_) do
24 GenServer.start_link(__MODULE__, nil)
25 end
26
27 def init(_) do
28 if Config.get([ScheduledActivity, :enabled]) do
29 schedule_next()
30 {:ok, nil}
31 else
32 :ignore
33 end
34 end
35
36 def perform(:execute, scheduled_activity_id) do
37 try do
38 {:ok, scheduled_activity} = ScheduledActivity.delete(scheduled_activity_id)
39 %User{} = user = User.get_cached_by_id(scheduled_activity.user_id)
40 {:ok, _result} = CommonAPI.post(user, scheduled_activity.params)
41 rescue
42 error ->
43 Logger.error(
44 "#{__MODULE__} Couldn't create a status from the scheduled activity: #{inspect(error)}"
45 )
46 end
47 end
48
49 def handle_info(:perform, state) do
50 ScheduledActivity.due_activities(@schedule_interval)
51 |> Enum.each(fn scheduled_activity ->
52 %{"op" => "execute", "activity_id" => scheduled_activity.id}
53 |> Pleroma.Workers.ScheduledActivityWorker.new(worker_args(:scheduled_activities))
54 |> Repo.insert()
55 end)
56
57 schedule_next()
58 {:noreply, state}
59 end
60
61 defp schedule_next do
62 Process.send_after(self(), :perform, @schedule_interval)
63 end
64 end