1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.ScheduledActivity do
11 alias Pleroma.ScheduledActivity
13 alias Pleroma.Web.CommonAPI.Utils
14 alias Pleroma.Workers.ScheduledActivityWorker
19 @min_offset :timer.minutes(5)
21 schema "scheduled_activities" do
22 belongs_to(:user, User, type: FlakeId.Ecto.CompatType)
23 field(:scheduled_at, :naive_datetime)
29 def changeset(%ScheduledActivity{} = scheduled_activity, attrs) do
31 |> cast(attrs, [:scheduled_at, :params])
32 |> validate_required([:scheduled_at, :params])
33 |> validate_scheduled_at()
34 |> with_media_attachments()
37 defp with_media_attachments(
38 %{changes: %{params: %{"media_ids" => media_ids} = params}} = changeset
40 when is_list(media_ids) do
41 media_attachments = Utils.attachments_from_ids(%{"media_ids" => media_ids})
45 |> Map.put("media_attachments", media_attachments)
46 |> Map.put("media_ids", media_ids)
48 put_change(changeset, :params, params)
51 defp with_media_attachments(changeset), do: changeset
53 def update_changeset(%ScheduledActivity{} = scheduled_activity, attrs) do
55 |> cast(attrs, [:scheduled_at])
56 |> validate_required([:scheduled_at])
57 |> validate_scheduled_at()
60 def validate_scheduled_at(changeset) do
61 validate_change(changeset, :scheduled_at, fn _, scheduled_at ->
63 not far_enough?(scheduled_at) ->
64 [scheduled_at: "must be at least 5 minutes from now"]
66 exceeds_daily_user_limit?(changeset.data.user_id, scheduled_at) ->
67 [scheduled_at: "daily limit exceeded"]
69 exceeds_total_user_limit?(changeset.data.user_id) ->
70 [scheduled_at: "total limit exceeded"]
78 def exceeds_daily_user_limit?(user_id, scheduled_at) do
80 |> where(user_id: ^user_id)
81 |> where([sa], type(sa.scheduled_at, :date) == type(^scheduled_at, :date))
82 |> select([sa], count(sa.id))
84 |> Kernel.>=(Config.get([ScheduledActivity, :daily_user_limit]))
87 def exceeds_total_user_limit?(user_id) do
89 |> where(user_id: ^user_id)
90 |> select([sa], count(sa.id))
92 |> Kernel.>=(Config.get([ScheduledActivity, :total_user_limit]))
95 def far_enough?(scheduled_at) when is_binary(scheduled_at) do
96 with {:ok, scheduled_at} <- Ecto.Type.cast(:naive_datetime, scheduled_at) do
97 far_enough?(scheduled_at)
103 def far_enough?(scheduled_at) do
104 now = NaiveDateTime.utc_now()
105 diff = NaiveDateTime.diff(scheduled_at, now, :millisecond)
109 def new(%User{} = user, attrs) do
110 changeset(%ScheduledActivity{user_id: user.id}, attrs)
114 Creates ScheduledActivity and add to queue to perform at scheduled_at date
116 @spec create(User.t(), map()) :: {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
117 def create(%User{} = user, attrs) do
119 |> Multi.insert(:scheduled_activity, new(user, attrs))
120 |> Multi.run(:scheduled_activity_job, fn _repo, %{scheduled_activity: activity} ->
121 %{activity_id: activity.id}
122 |> ScheduledActivityWorker.new(scheduled_at: activity.scheduled_at)
125 |> Repo.transaction()
127 {:ok, %{scheduled_activity: scheduled_activity}} ->
128 {:ok, scheduled_activity}
130 {:error, _, changeset, _} ->
135 def get(%User{} = user, scheduled_activity_id) do
137 |> where(user_id: ^user.id)
138 |> where(id: ^scheduled_activity_id)
142 @spec update(ScheduledActivity.t(), map()) ::
143 {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
144 def update(%ScheduledActivity{id: id} = scheduled_activity, attrs) do
145 with {:error, %Ecto.Changeset{valid?: true} = changeset} <-
146 {:error, update_changeset(scheduled_activity, attrs)} do
148 |> Multi.update(:scheduled_activity, changeset)
149 |> Multi.update_all(:scheduled_job, job_query(id),
150 set: [scheduled_at: changeset.changes[:scheduled_at]]
152 |> Repo.transaction()
154 {:ok, %{scheduled_activity: scheduled_activity}} ->
155 {:ok, scheduled_activity}
157 {:error, _, changeset, _} ->
163 def delete_job(%ScheduledActivity{id: id} = _scheduled_activity) do
169 def delete(%ScheduledActivity{} = scheduled_activity) do
170 Repo.delete(scheduled_activity)
173 def delete(id) when is_binary(id) or is_integer(id) do
179 {1, [scheduled_activity]} -> {:ok, scheduled_activity}
184 def for_user_query(%User{} = user) do
186 |> where(user_id: ^user.id)
189 def due_activities(offset \\ 0) do
191 NaiveDateTime.utc_now()
192 |> NaiveDateTime.add(offset, :millisecond)
195 |> where([sa], sa.scheduled_at < ^naive_datetime)
199 def job_query(scheduled_activity_id) do
201 where: j.queue == "scheduled_activities",
202 where: fragment("args ->> 'activity_id' = ?::text", ^to_string(scheduled_activity_id))