updated ScheduledActivity
authorMaksim Pechnikov <parallel588@gmail.com>
Tue, 3 Dec 2019 18:30:10 +0000 (21:30 +0300)
committerMaksim Pechnikov <parallel588@gmail.com>
Wed, 4 Dec 2019 06:12:17 +0000 (09:12 +0300)
config/config.exs
lib/pleroma/scheduled_activity.ex
lib/pleroma/web/mastodon_api/controllers/scheduled_activity_controller.ex
lib/pleroma/web/mastodon_api/controllers/status_controller.ex
lib/pleroma/workers/scheduled_activity_worker.ex [moved from lib/pleroma/workers/cron/scheduled_activity_worker.ex with 59% similarity]
test/scheduled_activity_test.exs
test/support/helpers.ex
test/web/mastodon_api/controllers/scheduled_activity_controller_test.exs
test/workers/cron/scheduled_activity_worker_test.exs [deleted file]

index e7e17669e40814a4e51fcb9932aee1906bcce63a..259529f9707492861c637c3a7a7324589cec55fd 100644 (file)
@@ -496,7 +496,6 @@ config :pleroma, Oban,
   crontab: [
     {"0 0 * * *", Pleroma.Workers.Cron.ClearOauthTokenWorker},
     {"0 * * * *", Pleroma.Workers.Cron.StatsWorker},
-    {"* * * * *", Pleroma.Workers.Cron.ScheduledActivityWorker},
     {"* * * * *", Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker},
     {"0 0 * * 0", Pleroma.Workers.Cron.DigestEmailsWorker}
   ]
index fea2cf3ffab5d7e081c60107198c106dfdc3b596..96fa6a987c9a4a64752fc7019f6f76869868e76c 100644 (file)
@@ -5,11 +5,13 @@
 defmodule Pleroma.ScheduledActivity do
   use Ecto.Schema
 
+  alias Ecto.Multi
   alias Pleroma.Config
   alias Pleroma.Repo
   alias Pleroma.ScheduledActivity
   alias Pleroma.User
   alias Pleroma.Web.CommonAPI.Utils
+  alias Pleroma.Workers.ScheduledActivityWorker
 
   import Ecto.Query
   import Ecto.Changeset
@@ -105,14 +107,29 @@ defmodule Pleroma.ScheduledActivity do
   end
 
   def new(%User{} = user, attrs) do
-    %ScheduledActivity{user_id: user.id}
-    |> changeset(attrs)
+    changeset(%ScheduledActivity{user_id: user.id}, attrs)
   end
 
+  @doc """
+  Creates ScheduledActivity and add to queue to perform at scheduled_at date
+  """
+  @spec create(User.t(), map()) :: {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
   def create(%User{} = user, attrs) do
-    user
-    |> new(attrs)
-    |> Repo.insert()
+    Multi.new()
+    |> Multi.insert(:scheduled_activity, new(user, attrs))
+    |> Multi.run(:scheduled_activity_job, fn _repo, %{scheduled_activity: activity} ->
+      %{activity_id: activity.id}
+      |> ScheduledActivityWorker.new(scheduled_at: activity.scheduled_at)
+      |> Oban.insert()
+    end)
+    |> Repo.transaction()
+    |> case do
+      {:ok, %{scheduled_activity: scheduled_activity}} ->
+        {:ok, scheduled_activity}
+
+      {:error, _, changeset, _} ->
+        {:error, changeset}
+    end
   end
 
   def get(%User{} = user, scheduled_activity_id) do
@@ -122,15 +139,35 @@ defmodule Pleroma.ScheduledActivity do
     |> Repo.one()
   end
 
-  def update(%ScheduledActivity{} = scheduled_activity, attrs) do
-    scheduled_activity
-    |> update_changeset(attrs)
-    |> Repo.update()
+  @spec update(ScheduledActivity.t(), map()) ::
+          {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
+  def update(%ScheduledActivity{id: id} = scheduled_activity, attrs) do
+    with {:error, %Ecto.Changeset{valid?: true} = changeset} <-
+           {:error, update_changeset(scheduled_activity, attrs)} do
+      Multi.new()
+      |> Multi.update(:scheduled_activity, changeset)
+      |> Multi.update_all(:scheduled_job, job_query(id),
+        set: [scheduled_at: changeset.changes[:scheduled_at]]
+      )
+      |> Repo.transaction()
+      |> case do
+        {:ok, %{scheduled_activity: scheduled_activity}} ->
+          {:ok, scheduled_activity}
+
+        {:error, _, changeset, _} ->
+          {:error, changeset}
+      end
+    end
+  end
+
+  def delete_job(%ScheduledActivity{id: id} = _scheduled_activity) do
+    id
+    |> job_query
+    |> Repo.delete_all()
   end
 
   def delete(%ScheduledActivity{} = scheduled_activity) do
-    scheduled_activity
-    |> Repo.delete()
+    Repo.delete(scheduled_activity)
   end
 
   def delete(id) when is_binary(id) or is_integer(id) do
@@ -158,4 +195,11 @@ defmodule Pleroma.ScheduledActivity do
     |> where([sa], sa.scheduled_at < ^naive_datetime)
     |> Repo.all()
   end
+
+  def job_query(scheduled_activity_id) do
+    from(j in Oban.Job,
+      where: j.queue == "scheduled_activities",
+      where: fragment("args ->> 'activity_id' = ?::text", ^to_string(scheduled_activity_id))
+    )
+  end
 end
index ff9276541614f675491ae644e32678aabd63f627..4f9a8bdbec7237cee3328699909de8b88c721492 100644 (file)
@@ -45,7 +45,8 @@ defmodule Pleroma.Web.MastodonAPI.ScheduledActivityController do
 
   @doc "DELETE /api/v1/scheduled_statuses/:id"
   def delete(%{assigns: %{scheduled_activity: scheduled_activity}} = conn, _params) do
-    with {:ok, scheduled_activity} <- ScheduledActivity.delete(scheduled_activity) do
+    with {:ok, scheduled_activity} <- ScheduledActivity.delete(scheduled_activity),
+         _ <- ScheduledActivity.delete_job(scheduled_activity) do
       render(conn, "show.json", scheduled_activity: scheduled_activity)
     end
   end
index 74b223cf4efcfa1771c999c364bf80bcc808aae0..d70749dfa119c892b2578035d1096379503258ba 100644 (file)
@@ -124,15 +124,18 @@ defmodule Pleroma.Web.MastodonAPI.StatusController do
       ) do
     params = Map.put(params, "in_reply_to_status_id", params["in_reply_to_id"])
 
-    if ScheduledActivity.far_enough?(scheduled_at) do
-      with {:ok, scheduled_activity} <-
-             ScheduledActivity.create(user, %{"params" => params, "scheduled_at" => scheduled_at}) do
-        conn
-        |> put_view(ScheduledActivityView)
-        |> render("show.json", scheduled_activity: scheduled_activity)
-      end
+    with {:far_enough, true} <- {:far_enough, ScheduledActivity.far_enough?(scheduled_at)},
+         attrs <- %{"params" => params, "scheduled_at" => scheduled_at},
+         {:ok, scheduled_activity} <- ScheduledActivity.create(user, attrs) do
+      conn
+      |> put_view(ScheduledActivityView)
+      |> render("show.json", scheduled_activity: scheduled_activity)
     else
-      create(conn, Map.drop(params, ["scheduled_at"]))
+      {:far_enough, _} ->
+        create(conn, Map.drop(params, ["scheduled_at"]))
+
+      error ->
+        error
     end
   end
 
similarity index 59%
rename from lib/pleroma/workers/cron/scheduled_activity_worker.ex
rename to lib/pleroma/workers/scheduled_activity_worker.ex
index 407ab687a5e4dd7a102da07b5ecfe5a727efc284..5109d7f759aa77bd33a61fcc2088ba6905678d8d 100644 (file)
@@ -2,12 +2,13 @@
 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
 # SPDX-License-Identifier: AGPL-3.0-only
 
-defmodule Pleroma.Workers.Cron.ScheduledActivityWorker do
+defmodule Pleroma.Workers.ScheduledActivityWorker do
   @moduledoc """
-  The worker to post scheduled actvities.
+  The worker to post scheduled activity.
   """
 
-  use Oban.Worker, queue: "scheduled_activities"
+  use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities"
+
   alias Pleroma.Config
   alias Pleroma.ScheduledActivity
   alias Pleroma.User
@@ -15,18 +16,20 @@ defmodule Pleroma.Workers.Cron.ScheduledActivityWorker do
 
   require Logger
 
-  @schedule_interval :timer.minutes(1)
-
   @impl Oban.Worker
-  def perform(_opts, _job) do
+  def perform(%{"activity_id" => activity_id}, _job) do
     if Config.get([ScheduledActivity, :enabled]) do
-      @schedule_interval
-      |> ScheduledActivity.due_activities()
-      |> Enum.each(&post_activity/1)
+      case Pleroma.Repo.get(ScheduledActivity, activity_id) do
+        %ScheduledActivity{} = scheduled_activity ->
+          post_activity(scheduled_activity)
+
+        _ ->
+          Logger.error("#{__MODULE__} Couldn't find scheduled activity: #{activity_id}")
+      end
     end
   end
 
-  def post_activity(scheduled_activity) do
+  defp post_activity(%ScheduledActivity{} = scheduled_activity) do
     try do
       {:ok, scheduled_activity} = ScheduledActivity.delete(scheduled_activity)
       %User{} = user = User.get_cached_by_id(scheduled_activity.user_id)
index d3d05745f4c218703d53a6fd297a6312ef7e1310..663cfdd344a00fe7ef779df73b8aaad348633639 100644 (file)
@@ -26,6 +26,7 @@ defmodule Pleroma.ScheduledActivityTest do
       attrs = %{params: %{}, scheduled_at: today}
       {:ok, _} = ScheduledActivity.create(user, attrs)
       {:ok, _} = ScheduledActivity.create(user, attrs)
+
       {:error, changeset} = ScheduledActivity.create(user, attrs)
       assert changeset.errors == [scheduled_at: {"daily limit exceeded", []}]
     end
@@ -83,7 +84,10 @@ defmodule Pleroma.ScheduledActivityTest do
         params: %{status: "hi"}
       )
 
-    Pleroma.Workers.Cron.ScheduledActivityWorker.perform(:opts, :pid)
+    Pleroma.Workers.ScheduledActivityWorker.perform(
+      %{"activity_id" => scheduled_activity.id},
+      :pid
+    )
 
     refute Repo.get(ScheduledActivity, scheduled_activity.id)
     activity = Repo.all(Pleroma.Activity) |> Enum.find(&(&1.actor == user.ap_id))
index ce39dd9d8fad52ccaa405d36a10388068e3a8a45..ec556a916cb96302002fcfca8a234072c2026c4a 100644 (file)
@@ -53,6 +53,12 @@ defmodule Pleroma.Tests.Helpers do
           clear_config_all: 2
         ]
 
+      def to_datetime(naive_datetime) do
+        naive_datetime
+        |> DateTime.from_naive!("Etc/UTC")
+        |> DateTime.truncate(:second)
+      end
+
       def collect_ids(collection) do
         collection
         |> Enum.map(& &1.id)
index ae5fee2bcd3b38b8b86ef52b7f30c6385f6e4d6a..5f3a376be7a522808d26e3a879dbffd67ee223e2 100644 (file)
@@ -9,6 +9,7 @@ defmodule Pleroma.Web.MastodonAPI.ScheduledActivityControllerTest do
   alias Pleroma.ScheduledActivity
 
   import Pleroma.Factory
+  import Ecto.Query
 
   test "shows scheduled activities", %{conn: conn} do
     user = insert(:user)
@@ -68,7 +69,30 @@ defmodule Pleroma.Web.MastodonAPI.ScheduledActivityControllerTest do
 
   test "updates a scheduled activity", %{conn: conn} do
     user = insert(:user)
-    scheduled_activity = insert(:scheduled_activity, user: user)
+
+    scheduled_at =
+      NaiveDateTime.add(
+        NaiveDateTime.utc_now(),
+        :timer.minutes(60),
+        :millisecond
+      )
+
+    {:ok, scheduled_activity} =
+      ScheduledActivity.create(
+        user,
+        %{
+          scheduled_at: scheduled_at,
+          params: build(:note).data
+        }
+      )
+
+    scheduled_activity_job =
+      Repo.one(from(j in Oban.Job, where: j.queue == "scheduled_activities"))
+
+    assert scheduled_activity_job.args == %{"activity_id" => scheduled_activity.id}
+
+    assert DateTime.truncate(scheduled_activity_job.scheduled_at, :second) ==
+             to_datetime(scheduled_at)
 
     new_scheduled_at =
       NaiveDateTime.add(NaiveDateTime.utc_now(), :timer.minutes(120), :millisecond)
@@ -82,6 +106,10 @@ defmodule Pleroma.Web.MastodonAPI.ScheduledActivityControllerTest do
 
     assert %{"scheduled_at" => expected_scheduled_at} = json_response(res_conn, 200)
     assert expected_scheduled_at == Pleroma.Web.CommonAPI.Utils.to_masto_date(new_scheduled_at)
+    scheduled_activity_job = refresh_record(scheduled_activity_job)
+
+    assert DateTime.truncate(scheduled_activity_job.scheduled_at, :second) ==
+             to_datetime(new_scheduled_at)
 
     res_conn =
       conn
@@ -93,7 +121,25 @@ defmodule Pleroma.Web.MastodonAPI.ScheduledActivityControllerTest do
 
   test "deletes a scheduled activity", %{conn: conn} do
     user = insert(:user)
-    scheduled_activity = insert(:scheduled_activity, user: user)
+
+    {:ok, scheduled_activity} =
+      ScheduledActivity.create(
+        user,
+        %{
+          scheduled_at:
+            NaiveDateTime.add(
+              NaiveDateTime.utc_now(),
+              :timer.minutes(60),
+              :millisecond
+            ),
+          params: build(:note).data
+        }
+      )
+
+    scheduled_activity_job =
+      Repo.one(from(j in Oban.Job, where: j.queue == "scheduled_activities"))
+
+    assert scheduled_activity_job.args == %{"activity_id" => scheduled_activity.id}
 
     res_conn =
       conn
@@ -101,7 +147,8 @@ defmodule Pleroma.Web.MastodonAPI.ScheduledActivityControllerTest do
       |> delete("/api/v1/scheduled_statuses/#{scheduled_activity.id}")
 
     assert %{} = json_response(res_conn, 200)
-    assert nil == Repo.get(ScheduledActivity, scheduled_activity.id)
+    refute Repo.get(ScheduledActivity, scheduled_activity.id)
+    refute Repo.get(Oban.Job, scheduled_activity_job.id)
 
     res_conn =
       conn
diff --git a/test/workers/cron/scheduled_activity_worker_test.exs b/test/workers/cron/scheduled_activity_worker_test.exs
deleted file mode 100644 (file)
index 6f17d6f..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Workers.Cron.ScheduledActivityWorkerTest do
-  use Pleroma.DataCase
-  alias Pleroma.ScheduledActivity
-  import Pleroma.Factory
-
-  clear_config([ScheduledActivity, :enabled])
-
-  test "creates a status from the scheduled activity" do
-    Pleroma.Config.put([ScheduledActivity, :enabled], true)
-    user = insert(:user)
-
-    naive_datetime =
-      NaiveDateTime.add(
-        NaiveDateTime.utc_now(),
-        -:timer.minutes(2),
-        :millisecond
-      )
-
-    scheduled_activity =
-      insert(
-        :scheduled_activity,
-        scheduled_at: naive_datetime,
-        user: user,
-        params: %{status: "hi"}
-      )
-
-    Pleroma.Workers.Cron.ScheduledActivityWorker.perform(:opts, :pid)
-
-    refute Repo.get(ScheduledActivity, scheduled_activity.id)
-    activity = Repo.all(Pleroma.Activity) |> Enum.find(&(&1.actor == user.ap_id))
-    assert Pleroma.Object.normalize(activity).data["content"] == "hi"
-  end
-end