Underscore unused variable
[akkoma] / lib / pleroma / scheduled_activity.ex
index fea2cf3ffab5d7e081c60107198c106dfdc3b596..2b156341fcd5e2cc147f6cfd9dce4799c7a70323 100644 (file)
@@ -1,19 +1,23 @@
 # Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.ScheduledActivity do
   use Ecto.Schema
 
+  alias Ecto.Multi
   alias Pleroma.Config
   alias Pleroma.Repo
   alias Pleroma.ScheduledActivity
   alias Pleroma.User
   alias Pleroma.Web.CommonAPI.Utils
+  alias Pleroma.Workers.ScheduledActivityWorker
 
   import Ecto.Query
   import Ecto.Changeset
 
+  @type t :: %__MODULE__{}
+
   @min_offset :timer.minutes(5)
 
   schema "scheduled_activities" do
@@ -36,7 +40,7 @@ defmodule Pleroma.ScheduledActivity do
          %{changes: %{params: %{"media_ids" => media_ids} = params}} = changeset
        )
        when is_list(media_ids) do
-    media_attachments = Utils.attachments_from_ids(%{"media_ids" => media_ids})
+    media_attachments = Utils.attachments_from_ids(%{media_ids: media_ids})
 
     params =
       params
@@ -105,16 +109,32 @@ defmodule Pleroma.ScheduledActivity do
   end
 
   def new(%User{} = user, attrs) do
-    %ScheduledActivity{user_id: user.id}
-    |> changeset(attrs)
+    changeset(%ScheduledActivity{user_id: user.id}, attrs)
   end
 
+  @doc """
+  Creates ScheduledActivity and add to queue to perform at scheduled_at date
+  """
+  @spec create(User.t(), map()) :: {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
   def create(%User{} = user, attrs) do
-    user
-    |> new(attrs)
-    |> Repo.insert()
+    Multi.new()
+    |> Multi.insert(:scheduled_activity, new(user, attrs))
+    |> maybe_add_jobs(Config.get([ScheduledActivity, :enabled]))
+    |> Repo.transaction()
+    |> transaction_response
+  end
+
+  defp maybe_add_jobs(multi, true) do
+    multi
+    |> Multi.run(:scheduled_activity_job, fn _repo, %{scheduled_activity: activity} ->
+      %{activity_id: activity.id}
+      |> ScheduledActivityWorker.new(scheduled_at: activity.scheduled_at)
+      |> Oban.insert()
+    end)
   end
 
+  defp maybe_add_jobs(multi, _), do: multi
+
   def get(%User{} = user, scheduled_activity_id) do
     ScheduledActivity
     |> where(user_id: ^user.id)
@@ -122,25 +142,43 @@ defmodule Pleroma.ScheduledActivity do
     |> Repo.one()
   end
 
-  def update(%ScheduledActivity{} = scheduled_activity, attrs) do
-    scheduled_activity
-    |> update_changeset(attrs)
-    |> Repo.update()
+  @spec update(ScheduledActivity.t(), map()) ::
+          {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
+  def update(%ScheduledActivity{id: id} = scheduled_activity, attrs) do
+    with {:error, %Ecto.Changeset{valid?: true} = changeset} <-
+           {:error, update_changeset(scheduled_activity, attrs)} do
+      Multi.new()
+      |> Multi.update(:scheduled_activity, changeset)
+      |> Multi.update_all(:scheduled_job, job_query(id),
+        set: [scheduled_at: get_field(changeset, :scheduled_at)]
+      )
+      |> Repo.transaction()
+      |> transaction_response
+    end
   end
 
-  def delete(%ScheduledActivity{} = scheduled_activity) do
-    scheduled_activity
-    |> Repo.delete()
+  @doc "Deletes a ScheduledActivity and linked jobs."
+  @spec delete(ScheduledActivity.t() | binary() | integer) ::
+          {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
+  def delete(%ScheduledActivity{id: id} = scheduled_activity) do
+    Multi.new()
+    |> Multi.delete(:scheduled_activity, scheduled_activity, stale_error_field: :id)
+    |> Multi.delete_all(:jobs, job_query(id))
+    |> Repo.transaction()
+    |> transaction_response
   end
 
   def delete(id) when is_binary(id) or is_integer(id) do
-    ScheduledActivity
-    |> where(id: ^id)
-    |> select([sa], sa)
-    |> Repo.delete_all()
-    |> case do
-      {1, [scheduled_activity]} -> {:ok, scheduled_activity}
-      _ -> :error
+    delete(%__MODULE__{id: id})
+  end
+
+  defp transaction_response(result) do
+    case result do
+      {:ok, %{scheduled_activity: scheduled_activity}} ->
+        {:ok, scheduled_activity}
+
+      {:error, _, changeset, _} ->
+        {:error, changeset}
     end
   end
 
@@ -158,4 +196,11 @@ defmodule Pleroma.ScheduledActivity do
     |> where([sa], sa.scheduled_at < ^naive_datetime)
     |> Repo.all()
   end
+
+  def job_query(scheduled_activity_id) do
+    from(j in Oban.Job,
+      where: j.queue == "scheduled_activities",
+      where: fragment("args ->> 'activity_id' = ?::text", ^to_string(scheduled_activity_id))
+    )
+  end
 end