[#1149] Refactored Oban workers API (introduced `enqueue/3`).
[akkoma] / lib / pleroma / scheduled_activity.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.ScheduledActivity do
6 use Ecto.Schema
7
8 alias Pleroma.Config
9 alias Pleroma.Repo
10 alias Pleroma.ScheduledActivity
11 alias Pleroma.User
12 alias Pleroma.Web.CommonAPI.Utils
13
14 import Ecto.Query
15 import Ecto.Changeset
16
17 @min_offset :timer.minutes(5)
18
19 schema "scheduled_activities" do
20 belongs_to(:user, User, type: Pleroma.FlakeId)
21 field(:scheduled_at, :naive_datetime)
22 field(:params, :map)
23
24 timestamps()
25 end
26
27 def changeset(%ScheduledActivity{} = scheduled_activity, attrs) do
28 scheduled_activity
29 |> cast(attrs, [:scheduled_at, :params])
30 |> validate_required([:scheduled_at, :params])
31 |> validate_scheduled_at()
32 |> with_media_attachments()
33 end
34
35 defp with_media_attachments(
36 %{changes: %{params: %{"media_ids" => media_ids} = params}} = changeset
37 )
38 when is_list(media_ids) do
39 media_attachments = Utils.attachments_from_ids(%{"media_ids" => media_ids})
40
41 params =
42 params
43 |> Map.put("media_attachments", media_attachments)
44 |> Map.put("media_ids", media_ids)
45
46 put_change(changeset, :params, params)
47 end
48
49 defp with_media_attachments(changeset), do: changeset
50
51 def update_changeset(%ScheduledActivity{} = scheduled_activity, attrs) do
52 scheduled_activity
53 |> cast(attrs, [:scheduled_at])
54 |> validate_required([:scheduled_at])
55 |> validate_scheduled_at()
56 end
57
58 def validate_scheduled_at(changeset) do
59 validate_change(changeset, :scheduled_at, fn _, scheduled_at ->
60 cond do
61 not far_enough?(scheduled_at) ->
62 [scheduled_at: "must be at least 5 minutes from now"]
63
64 exceeds_daily_user_limit?(changeset.data.user_id, scheduled_at) ->
65 [scheduled_at: "daily limit exceeded"]
66
67 exceeds_total_user_limit?(changeset.data.user_id) ->
68 [scheduled_at: "total limit exceeded"]
69
70 true ->
71 []
72 end
73 end)
74 end
75
76 def exceeds_daily_user_limit?(user_id, scheduled_at) do
77 ScheduledActivity
78 |> where(user_id: ^user_id)
79 |> where([sa], type(sa.scheduled_at, :date) == type(^scheduled_at, :date))
80 |> select([sa], count(sa.id))
81 |> Repo.one()
82 |> Kernel.>=(Config.get([ScheduledActivity, :daily_user_limit]))
83 end
84
85 def exceeds_total_user_limit?(user_id) do
86 ScheduledActivity
87 |> where(user_id: ^user_id)
88 |> select([sa], count(sa.id))
89 |> Repo.one()
90 |> Kernel.>=(Config.get([ScheduledActivity, :total_user_limit]))
91 end
92
93 def far_enough?(scheduled_at) when is_binary(scheduled_at) do
94 with {:ok, scheduled_at} <- Ecto.Type.cast(:naive_datetime, scheduled_at) do
95 far_enough?(scheduled_at)
96 else
97 _ -> false
98 end
99 end
100
101 def far_enough?(scheduled_at) do
102 now = NaiveDateTime.utc_now()
103 diff = NaiveDateTime.diff(scheduled_at, now, :millisecond)
104 diff > @min_offset
105 end
106
107 def new(%User{} = user, attrs) do
108 %ScheduledActivity{user_id: user.id}
109 |> changeset(attrs)
110 end
111
112 def create(%User{} = user, attrs) do
113 user
114 |> new(attrs)
115 |> Repo.insert()
116 end
117
118 def get(%User{} = user, scheduled_activity_id) do
119 ScheduledActivity
120 |> where(user_id: ^user.id)
121 |> where(id: ^scheduled_activity_id)
122 |> Repo.one()
123 end
124
125 def update(%ScheduledActivity{} = scheduled_activity, attrs) do
126 scheduled_activity
127 |> update_changeset(attrs)
128 |> Repo.update()
129 end
130
131 def delete(%ScheduledActivity{} = scheduled_activity) do
132 scheduled_activity
133 |> Repo.delete()
134 end
135
136 def delete(id) when is_binary(id) or is_integer(id) do
137 ScheduledActivity
138 |> where(id: ^id)
139 |> select([sa], sa)
140 |> Repo.delete_all()
141 |> case do
142 {1, [scheduled_activity]} -> {:ok, scheduled_activity}
143 _ -> :error
144 end
145 end
146
147 def for_user_query(%User{} = user) do
148 ScheduledActivity
149 |> where(user_id: ^user.id)
150 end
151
152 def due_activities(offset \\ 0) do
153 naive_datetime =
154 NaiveDateTime.utc_now()
155 |> NaiveDateTime.add(offset, :millisecond)
156
157 ScheduledActivity
158 |> where([sa], sa.scheduled_at < ^naive_datetime)
159 |> Repo.all()
160 end
161 end