updated ScheduledActivity
[akkoma] / lib / pleroma / scheduled_activity.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.ScheduledActivity do
6 use Ecto.Schema
7
8 alias Ecto.Multi
9 alias Pleroma.Config
10 alias Pleroma.Repo
11 alias Pleroma.ScheduledActivity
12 alias Pleroma.User
13 alias Pleroma.Web.CommonAPI.Utils
14 alias Pleroma.Workers.ScheduledActivityWorker
15
16 import Ecto.Query
17 import Ecto.Changeset
18
19 @min_offset :timer.minutes(5)
20
21 schema "scheduled_activities" do
22 belongs_to(:user, User, type: FlakeId.Ecto.CompatType)
23 field(:scheduled_at, :naive_datetime)
24 field(:params, :map)
25
26 timestamps()
27 end
28
29 def changeset(%ScheduledActivity{} = scheduled_activity, attrs) do
30 scheduled_activity
31 |> cast(attrs, [:scheduled_at, :params])
32 |> validate_required([:scheduled_at, :params])
33 |> validate_scheduled_at()
34 |> with_media_attachments()
35 end
36
37 defp with_media_attachments(
38 %{changes: %{params: %{"media_ids" => media_ids} = params}} = changeset
39 )
40 when is_list(media_ids) do
41 media_attachments = Utils.attachments_from_ids(%{"media_ids" => media_ids})
42
43 params =
44 params
45 |> Map.put("media_attachments", media_attachments)
46 |> Map.put("media_ids", media_ids)
47
48 put_change(changeset, :params, params)
49 end
50
51 defp with_media_attachments(changeset), do: changeset
52
53 def update_changeset(%ScheduledActivity{} = scheduled_activity, attrs) do
54 scheduled_activity
55 |> cast(attrs, [:scheduled_at])
56 |> validate_required([:scheduled_at])
57 |> validate_scheduled_at()
58 end
59
60 def validate_scheduled_at(changeset) do
61 validate_change(changeset, :scheduled_at, fn _, scheduled_at ->
62 cond do
63 not far_enough?(scheduled_at) ->
64 [scheduled_at: "must be at least 5 minutes from now"]
65
66 exceeds_daily_user_limit?(changeset.data.user_id, scheduled_at) ->
67 [scheduled_at: "daily limit exceeded"]
68
69 exceeds_total_user_limit?(changeset.data.user_id) ->
70 [scheduled_at: "total limit exceeded"]
71
72 true ->
73 []
74 end
75 end)
76 end
77
78 def exceeds_daily_user_limit?(user_id, scheduled_at) do
79 ScheduledActivity
80 |> where(user_id: ^user_id)
81 |> where([sa], type(sa.scheduled_at, :date) == type(^scheduled_at, :date))
82 |> select([sa], count(sa.id))
83 |> Repo.one()
84 |> Kernel.>=(Config.get([ScheduledActivity, :daily_user_limit]))
85 end
86
87 def exceeds_total_user_limit?(user_id) do
88 ScheduledActivity
89 |> where(user_id: ^user_id)
90 |> select([sa], count(sa.id))
91 |> Repo.one()
92 |> Kernel.>=(Config.get([ScheduledActivity, :total_user_limit]))
93 end
94
95 def far_enough?(scheduled_at) when is_binary(scheduled_at) do
96 with {:ok, scheduled_at} <- Ecto.Type.cast(:naive_datetime, scheduled_at) do
97 far_enough?(scheduled_at)
98 else
99 _ -> false
100 end
101 end
102
103 def far_enough?(scheduled_at) do
104 now = NaiveDateTime.utc_now()
105 diff = NaiveDateTime.diff(scheduled_at, now, :millisecond)
106 diff > @min_offset
107 end
108
109 def new(%User{} = user, attrs) do
110 changeset(%ScheduledActivity{user_id: user.id}, attrs)
111 end
112
113 @doc """
114 Creates ScheduledActivity and add to queue to perform at scheduled_at date
115 """
116 @spec create(User.t(), map()) :: {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
117 def create(%User{} = user, attrs) do
118 Multi.new()
119 |> Multi.insert(:scheduled_activity, new(user, attrs))
120 |> Multi.run(:scheduled_activity_job, fn _repo, %{scheduled_activity: activity} ->
121 %{activity_id: activity.id}
122 |> ScheduledActivityWorker.new(scheduled_at: activity.scheduled_at)
123 |> Oban.insert()
124 end)
125 |> Repo.transaction()
126 |> case do
127 {:ok, %{scheduled_activity: scheduled_activity}} ->
128 {:ok, scheduled_activity}
129
130 {:error, _, changeset, _} ->
131 {:error, changeset}
132 end
133 end
134
135 def get(%User{} = user, scheduled_activity_id) do
136 ScheduledActivity
137 |> where(user_id: ^user.id)
138 |> where(id: ^scheduled_activity_id)
139 |> Repo.one()
140 end
141
142 @spec update(ScheduledActivity.t(), map()) ::
143 {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
144 def update(%ScheduledActivity{id: id} = scheduled_activity, attrs) do
145 with {:error, %Ecto.Changeset{valid?: true} = changeset} <-
146 {:error, update_changeset(scheduled_activity, attrs)} do
147 Multi.new()
148 |> Multi.update(:scheduled_activity, changeset)
149 |> Multi.update_all(:scheduled_job, job_query(id),
150 set: [scheduled_at: changeset.changes[:scheduled_at]]
151 )
152 |> Repo.transaction()
153 |> case do
154 {:ok, %{scheduled_activity: scheduled_activity}} ->
155 {:ok, scheduled_activity}
156
157 {:error, _, changeset, _} ->
158 {:error, changeset}
159 end
160 end
161 end
162
163 def delete_job(%ScheduledActivity{id: id} = _scheduled_activity) do
164 id
165 |> job_query
166 |> Repo.delete_all()
167 end
168
169 def delete(%ScheduledActivity{} = scheduled_activity) do
170 Repo.delete(scheduled_activity)
171 end
172
173 def delete(id) when is_binary(id) or is_integer(id) do
174 ScheduledActivity
175 |> where(id: ^id)
176 |> select([sa], sa)
177 |> Repo.delete_all()
178 |> case do
179 {1, [scheduled_activity]} -> {:ok, scheduled_activity}
180 _ -> :error
181 end
182 end
183
184 def for_user_query(%User{} = user) do
185 ScheduledActivity
186 |> where(user_id: ^user.id)
187 end
188
189 def due_activities(offset \\ 0) do
190 naive_datetime =
191 NaiveDateTime.utc_now()
192 |> NaiveDateTime.add(offset, :millisecond)
193
194 ScheduledActivity
195 |> where([sa], sa.scheduled_at < ^naive_datetime)
196 |> Repo.all()
197 end
198
199 def job_query(scheduled_activity_id) do
200 from(j in Oban.Job,
201 where: j.queue == "scheduled_activities",
202 where: fragment("args ->> 'activity_id' = ?::text", ^to_string(scheduled_activity_id))
203 )
204 end
205 end