config :pleroma_job_queue, :queues,
federator_incoming: 50,
federator_outgoing: 50,
- mailer: 10
+ mailer: 10,
+ scheduled_activities: 10
config :pleroma, :fetch_initial_posts,
enabled: false,
pages: 5
-config :pleroma, Pleroma.ScheduledActivity,
- daily_user_limit: 25,
- total_user_limit: 100
-
config :auto_linker,
opts: [
scheme: true,
config :prometheus, Pleroma.Web.Endpoint.MetricsExporter, path: "/api/pleroma/app_metrics"
+config :pleroma, Pleroma.ScheduledActivity,
+ daily_user_limit: 25,
+ total_user_limit: 300,
+ enabled: true
+
# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs"
config :pleroma_job_queue, disabled: true
+config :pleroma, Pleroma.ScheduledActivity,
+ daily_user_limit: 2,
+ total_user_limit: 3,
+ enabled: false
+
try do
import_config "test.secret.exs"
rescue
* `federator_outgoing` - Outgoing federation
* `federator_incoming` - Incoming federation
* `mailer` - Email sender, see [`Pleroma.Mailer`](#pleroma-mailer)
+* `scheduled_activities` - Scheduled activities, see [`Pleroma.ScheduledActivities`](#pleromascheduledactivity)
Example:
* `Pleroma.Web.Auth.PleromaAuthenticator`: default database authenticator
* `Pleroma.Web.Auth.LDAPAuthenticator`: LDAP authentication
-## Pleroma.ScheduledActivity
+## Pleroma.ScheduledActivity
-* `daily_user_limit`: the number of scheduled activities a user is allowed to create in a single day
-* `total_user_limit`: the number of scheduled activities a user is allowed to create in total
+* `daily_user_limit`: the number of scheduled activities a user is allowed to create in a single day (Default: `25`)
+* `total_user_limit`: the number of scheduled activities a user is allowed to create in total (Default: `300`)
+* `enabled`: whether scheduled activities are sent to the job queue to be executed
],
id: :cachex_idem
),
- worker(Pleroma.FlakeId, [])
+ worker(Pleroma.FlakeId, []),
+ worker(Pleroma.ScheduledActivityWorker, [])
] ++
hackney_pool_children() ++
[
_ -> {:error, "Not found"}
end
end
-
- def enforce_user_objects(user, object_ids) do
- Object
- |> where([o], fragment("?->>'actor' = ?", o.data, ^user.ap_id))
- |> where([o], o.id in ^object_ids)
- |> select([o], o.id)
- |> Repo.all()
- end
end
use Ecto.Schema
alias Pleroma.Config
- alias Pleroma.Object
alias Pleroma.Repo
alias Pleroma.ScheduledActivity
alias Pleroma.User
%{changes: %{params: %{"media_ids" => media_ids} = params}} = changeset
)
when is_list(media_ids) do
- user = User.get_cached_by_id(changeset.data.user_id)
- media_ids = Object.enforce_user_objects(user, media_ids) |> Enum.map(&to_string(&1))
media_attachments = Utils.attachments_from_ids(%{"media_ids" => media_ids})
params =
def exceeds_daily_user_limit?(user_id, scheduled_at) do
ScheduledActivity
|> where(user_id: ^user_id)
- |> where([s], type(s.scheduled_at, :date) == type(^scheduled_at, :date))
- |> select([u], count(u.id))
+ |> where([sa], type(sa.scheduled_at, :date) == type(^scheduled_at, :date))
+ |> select([sa], count(sa.id))
|> Repo.one()
|> Kernel.>=(Config.get([ScheduledActivity, :daily_user_limit]))
end
def exceeds_total_user_limit?(user_id) do
ScheduledActivity
|> where(user_id: ^user_id)
- |> select([u], count(u.id))
+ |> select([sa], count(sa.id))
|> Repo.one()
|> Kernel.>=(Config.get([ScheduledActivity, :total_user_limit]))
end
|> Repo.one()
end
- def update(scheduled_activity, attrs) do
+ def update(%ScheduledActivity{} = scheduled_activity, attrs) do
scheduled_activity
|> update_changeset(attrs)
|> Repo.update()
end
- def delete(scheduled_activity) do
+ def delete(%ScheduledActivity{} = scheduled_activity) do
scheduled_activity
|> Repo.delete()
end
+ def delete(id) when is_binary(id) or is_integer(id) do
+ ScheduledActivity
+ |> where(id: ^id)
+ |> select([sa], sa)
+ |> Repo.delete_all()
+ |> case do
+ {1, [scheduled_activity]} -> {:ok, scheduled_activity}
+ _ -> :error
+ end
+ end
+
def for_user_query(%User{} = user) do
ScheduledActivity
|> where(user_id: ^user.id)
end
+
+ def due_activities(offset \\ 0) do
+ naive_datetime =
+ NaiveDateTime.utc_now()
+ |> NaiveDateTime.add(offset, :millisecond)
+
+ ScheduledActivity
+ |> where([sa], sa.scheduled_at < ^naive_datetime)
+ |> Repo.all()
+ end
end
--- /dev/null
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.ScheduledActivityWorker do
+ @moduledoc """
+ Sends scheduled activities to the job queue.
+ """
+
+ alias Pleroma.Config
+ alias Pleroma.ScheduledActivity
+ alias Pleroma.User
+ alias Pleroma.Web.CommonAPI
+ use GenServer
+ require Logger
+
+ @schedule_interval :timer.minutes(1)
+
+ def start_link do
+ GenServer.start_link(__MODULE__, nil)
+ end
+
+ def init(_) do
+ if Config.get([ScheduledActivity, :enabled]) do
+ schedule_next()
+ {:ok, nil}
+ else
+ :ignore
+ end
+ end
+
+ def perform(:execute, scheduled_activity_id) do
+ try do
+ {:ok, scheduled_activity} = ScheduledActivity.delete(scheduled_activity_id)
+ %User{} = user = User.get_cached_by_id(scheduled_activity.user_id)
+ {:ok, _result} = CommonAPI.post(user, scheduled_activity.params)
+ rescue
+ error ->
+ Logger.error(
+ "#{__MODULE__} Couldn't create a status from the scheduled activity: #{inspect(error)}"
+ )
+ end
+ end
+
+ def handle_info(:perform, state) do
+ ScheduledActivity.due_activities(@schedule_interval)
+ |> Enum.each(fn scheduled_activity ->
+ PleromaJobQueue.enqueue(:scheduled_activities, __MODULE__, [:execute, scheduled_activity.id])
+ end)
+
+ schedule_next()
+ {:noreply, state}
+ end
+
+ defp schedule_next do
+ Process.send_after(self(), :perform, @schedule_interval)
+ end
+end
defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
use Pleroma.Web, :controller
+ alias Ecto.Changeset
alias Pleroma.Activity
alias Pleroma.Config
alias Pleroma.Filter
scheduled_at = params["scheduled_at"]
if scheduled_at && ScheduledActivity.far_enough?(scheduled_at) do
- {:ok, scheduled_activity} =
- Cachex.fetch!(:idempotency_cache, idempotency_key, fn _ ->
- ScheduledActivity.create(user, %{"params" => params, "scheduled_at" => scheduled_at})
- end)
-
- conn
- |> put_view(ScheduledActivityView)
- |> render("show.json", %{scheduled_activity: scheduled_activity})
+ with {:ok, scheduled_activity} <-
+ ScheduledActivity.create(user, %{"params" => params, "scheduled_at" => scheduled_at}) do
+ conn
+ |> put_view(ScheduledActivityView)
+ |> render("show.json", %{scheduled_activity: scheduled_activity})
+ end
else
params = Map.drop(params, ["scheduled_at"])
# fallback action
#
+ def errors(conn, {:error, %Changeset{} = changeset}) do
+ error_message =
+ changeset
+ |> Changeset.traverse_errors(fn {message, _opt} -> message end)
+ |> Enum.map_join(", ", fn {_k, v} -> v end)
+
+ conn
+ |> put_status(422)
+ |> json(%{error: error_message})
+ end
+
def errors(conn, {:error, :not_found}) do
conn
|> put_status(404)
def render("show.json", %{scheduled_activity: %ScheduledActivity{} = scheduled_activity}) do
%{
- id: scheduled_activity.id |> to_string,
- scheduled_at: scheduled_activity.scheduled_at |> CommonAPI.Utils.to_masto_date(),
+ id: to_string(scheduled_activity.id),
+ scheduled_at: CommonAPI.Utils.to_masto_date(scheduled_activity.scheduled_at),
params: status_params(scheduled_activity.params)
}
|> with_media_attachments(scheduled_activity)
end
defp with_media_attachments(data, %{params: %{"media_attachments" => media_attachments}}) do
- attachments = render_many(media_attachments, StatusView, "attachment.json", as: :attachment)
- Map.put(data, :media_attachments, attachments)
+ try do
+ attachments = render_many(media_attachments, StatusView, "attachment.json", as: :attachment)
+ Map.put(data, :media_attachments, attachments)
+ rescue
+ _ -> data
+ end
end
defp with_media_attachments(data, _), do: data
defmodule Pleroma.ScheduledActivityTest do
use Pleroma.DataCase
- alias Pleroma.Config
alias Pleroma.DataCase
alias Pleroma.ScheduledActivity
- alias Pleroma.Web.ActivityPub.ActivityPub
import Pleroma.Factory
setup context do
- Config.put([ScheduledActivity, :daily_user_limit], 2)
- Config.put([ScheduledActivity, :total_user_limit], 3)
DataCase.ensure_local_uploader(context)
end
tomorrow =
NaiveDateTime.utc_now()
- |> NaiveDateTime.add(:timer.hours(24), :millisecond)
+ |> NaiveDateTime.add(:timer.hours(36), :millisecond)
|> NaiveDateTime.to_iso8601()
{:ok, _} = ScheduledActivity.create(user, %{params: %{}, scheduled_at: today})
{:error, changeset} = ScheduledActivity.create(user, attrs)
assert changeset.errors == [scheduled_at: {"must be at least 5 minutes from now", []}]
end
-
- test "excludes attachments belonging to another user" do
- user = insert(:user)
- another_user = insert(:user)
-
- scheduled_at =
- NaiveDateTime.utc_now()
- |> NaiveDateTime.add(:timer.minutes(10), :millisecond)
- |> NaiveDateTime.to_iso8601()
-
- file = %Plug.Upload{
- content_type: "image/jpg",
- path: Path.absname("test/fixtures/image.jpg"),
- filename: "an_image.jpg"
- }
-
- {:ok, user_upload} = ActivityPub.upload(file, actor: user.ap_id)
- {:ok, another_user_upload} = ActivityPub.upload(file, actor: another_user.ap_id)
-
- media_ids = [user_upload.id, another_user_upload.id]
- attrs = %{params: %{"media_ids" => media_ids}, scheduled_at: scheduled_at}
- {:ok, scheduled_activity} = ScheduledActivity.create(user, attrs)
- assert to_string(user_upload.id) in scheduled_activity.params["media_ids"]
- refute to_string(another_user_upload.id) in scheduled_activity.params["media_ids"]
- end
end
end
--- /dev/null
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.ScheduledActivityWorkerTest do
+ use Pleroma.DataCase
+ alias Pleroma.ScheduledActivity
+ import Pleroma.Factory
+
+ test "creates a status from the scheduled activity" do
+ user = insert(:user)
+ scheduled_activity = insert(:scheduled_activity, user: user, params: %{status: "hi"})
+ Pleroma.ScheduledActivityWorker.perform(:execute, scheduled_activity.id)
+
+ refute Repo.get(ScheduledActivity, scheduled_activity.id)
+ activity = Repo.all(Pleroma.Activity) |> Enum.find(&(&1.actor == user.ap_id))
+ assert activity.data["object"]["content"] == "hi"
+ end
+end
assert [] == Repo.all(ScheduledActivity)
end
+ test "returns error when daily user limit is exceeded", %{conn: conn} do
+ user = insert(:user)
+
+ today =
+ NaiveDateTime.utc_now()
+ |> NaiveDateTime.add(:timer.minutes(6), :millisecond)
+ |> NaiveDateTime.to_iso8601()
+
+ attrs = %{params: %{}, scheduled_at: today}
+ {:ok, _} = ScheduledActivity.create(user, attrs)
+ {:ok, _} = ScheduledActivity.create(user, attrs)
+
+ conn =
+ conn
+ |> assign(:user, user)
+ |> post("/api/v1/statuses", %{"status" => "scheduled", "scheduled_at" => today})
+
+ assert %{"error" => "daily limit exceeded"} == json_response(conn, 422)
+ end
+
+ test "returns error when total user limit is exceeded", %{conn: conn} do
+ user = insert(:user)
+
+ today =
+ NaiveDateTime.utc_now()
+ |> NaiveDateTime.add(:timer.minutes(6), :millisecond)
+ |> NaiveDateTime.to_iso8601()
+
+ tomorrow =
+ NaiveDateTime.utc_now()
+ |> NaiveDateTime.add(:timer.hours(36), :millisecond)
+ |> NaiveDateTime.to_iso8601()
+
+ attrs = %{params: %{}, scheduled_at: today}
+ {:ok, _} = ScheduledActivity.create(user, attrs)
+ {:ok, _} = ScheduledActivity.create(user, attrs)
+ {:ok, _} = ScheduledActivity.create(user, %{params: %{}, scheduled_at: tomorrow})
+
+ conn =
+ conn
+ |> assign(:user, user)
+ |> post("/api/v1/statuses", %{"status" => "scheduled", "scheduled_at" => tomorrow})
+
+ assert %{"error" => "total limit exceeded"} == json_response(conn, 422)
+ end
+
test "shows scheduled activities", %{conn: conn} do
user = insert(:user)
scheduled_activity_id1 = insert(:scheduled_activity, user: user).id |> to_string()
|> Enum.map(&StatusView.render("attachment.json", %{attachment: &1})),
params: %{
in_reply_to_id: to_string(activity.id),
- media_ids: [to_string(upload.id)],
+ media_ids: [upload.id],
poll: nil,
scheduled_at: nil,
sensitive: true,