alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.CommonAPI
- alias Pleroma.Workers.ActivityExpirationWorker
require Logger
use GenServer
import Ecto.Query
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
@schedule_interval :timer.minutes(1)
def start_link(_) do
def handle_info(:perform, state) do
ActivityExpiration.due_expirations(@schedule_interval)
|> Enum.each(fn expiration ->
- %{
- "op" => "activity_expiration",
- "activity_expiration_id" => expiration.id
- }
- |> ActivityExpirationWorker.new(worker_args(:activity_expiration))
- |> Repo.insert()
+ Pleroma.Workers.ActivityExpirationWorker.enqueue(
+ "activity_expiration",
+ %{"activity_expiration_id" => expiration.id}
+ )
end)
schedule_next()
defmodule Pleroma.DigestEmailWorker do
alias Pleroma.Repo
- alias Pleroma.Workers.MailerWorker
+ alias Pleroma.Workers.DigestEmailsWorker
import Ecto.Query
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
def perform do
config = Pleroma.Config.get([:email_notifications, :digest])
negative_interval = -Map.fetch!(config, :interval)
where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"),
select: u
)
- |> Pleroma.Repo.all()
+ |> Repo.all()
|> Enum.each(fn user ->
- %{"op" => "digest_email", "user_id" => user.id}
- |> MailerWorker.new([queue: "digest_emails"] ++ worker_args(:digest_emails))
- |> Repo.insert()
+ DigestEmailsWorker.enqueue("digest_email", %{"user_id" => user.id})
end)
end
The module contains functions to delivery email using Swoosh.Mailer.
"""
- alias Pleroma.Repo
alias Pleroma.Workers.MailerWorker
alias Swoosh.DeliveryError
@spec enabled?() :: boolean()
def enabled?, do: Pleroma.Config.get([__MODULE__, :enabled])
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
@doc "add email to queue"
def deliver_async(email, config \\ []) do
encoded_email =
|> :erlang.term_to_binary()
|> Base.encode64()
- %{"op" => "email", "encoded_email" => encoded_email, "config" => config}
- |> MailerWorker.new(worker_args(:mailer))
- |> Repo.insert()
+ MailerWorker.enqueue("email", %{"encoded_email" => encoded_email, "config" => config})
end
@doc "callback to perform send email from queue"
"""
alias Pleroma.Config
- alias Pleroma.Repo
alias Pleroma.ScheduledActivity
alias Pleroma.User
alias Pleroma.Web.CommonAPI
@schedule_interval :timer.minutes(1)
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
def start_link(_) do
GenServer.start_link(__MODULE__, nil)
end
def handle_info(:perform, state) do
ScheduledActivity.due_activities(@schedule_interval)
|> Enum.each(fn scheduled_activity ->
- %{"op" => "execute", "activity_id" => scheduled_activity.id}
- |> Pleroma.Workers.ScheduledActivityWorker.new(worker_args(:scheduled_activities))
- |> Repo.insert()
+ Pleroma.Workers.ScheduledActivityWorker.enqueue(
+ "execute",
+ %{"activity_id" => scheduled_activity.id}
+ )
end)
schedule_next()
@strict_local_nickname_regex ~r/^[a-zA-Z\d]+$/
@extended_local_nickname_regex ~r/^[a-zA-Z\d_-]+$/
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
schema "users" do
field(:bio, :string)
field(:email, :string)
@doc "Fetch some posts when the user has just been federated with"
def fetch_initial_posts(user) do
- %{"op" => "fetch_initial_posts", "user_id" => user.id}
- |> BackgroundWorker.new(worker_args(:background))
- |> Repo.insert()
+ BackgroundWorker.enqueue("fetch_initial_posts", %{"user_id" => user.id})
end
@spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t()
end
def deactivate_async(user, status \\ true) do
- %{"op" => "deactivate_user", "user_id" => user.id, "status" => status}
- |> BackgroundWorker.new(worker_args(:background))
- |> Repo.insert()
+ BackgroundWorker.enqueue("deactivate_user", %{"user_id" => user.id, "status" => status})
end
def deactivate(%User{} = user, status \\ true) do
end
def delete(%User{} = user) do
- %{"op" => "delete_user", "user_id" => user.id}
- |> BackgroundWorker.new(worker_args(:background))
- |> Repo.insert()
+ BackgroundWorker.enqueue("delete_user", %{"user_id" => user.id})
end
@spec perform(atom(), User.t()) :: {:ok, User.t()}
end
def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do
- %{
- "op" => "blocks_import",
+ BackgroundWorker.enqueue("blocks_import", %{
"blocker_id" => blocker.id,
"blocked_identifiers" => blocked_identifiers
- }
- |> BackgroundWorker.new(worker_args(:background))
- |> Repo.insert()
+ })
end
def follow_import(%User{} = follower, followed_identifiers)
when is_list(followed_identifiers) do
- %{
- "op" => "follow_import",
+ BackgroundWorker.enqueue("follow_import", %{
"follower_id" => follower.id,
"followed_identifiers" => followed_identifiers
- }
- |> BackgroundWorker.new(worker_args(:background))
- |> Repo.insert()
+ })
end
def delete_user_activities(%User{ap_id: ap_id} = user) do
require Logger
require Pleroma.Constants
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
# For Announce activities, we filter the recipients based on following status for any actors
# that match actual users. See issue #164 for more information about why this is necessary.
defp get_recipients(%{"type" => "Announce"} = data) do
activity
end
- %{"op" => "fetch_data_for_activity", "activity_id" => activity.id}
- |> BackgroundWorker.new(worker_args(:background))
- |> Repo.insert()
+ BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
Notification.create_notifications(activity)
@behaviour Pleroma.Web.ActivityPub.MRF
alias Pleroma.HTTP
- alias Pleroma.Repo
alias Pleroma.Web.MediaProxy
alias Pleroma.Workers.BackgroundWorker
recv_timeout: 10_000
]
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
def perform(:prefetch, url) do
Logger.info("Prefetching #{inspect(url)}")
url
|> Enum.each(fn
%{"href" => href} ->
- %{"op" => "media_proxy_prefetch", "url" => href}
- |> BackgroundWorker.new(worker_args(:background))
- |> Repo.insert()
+ BackgroundWorker.enqueue("media_proxy_prefetch", %{"url" => href})
x ->
Logger.debug("Unhandled attachment URL object #{inspect(x)}")
%{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message
)
when is_list(attachments) and length(attachments) > 0 do
- %{"op" => "media_proxy_preload", "message" => message}
- |> BackgroundWorker.new(worker_args(:background))
- |> Repo.insert()
+ BackgroundWorker.enqueue("media_proxy_preload", %{"message" => message})
{:ok, message}
end
require Logger
require Pleroma.Constants
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
@doc """
Modifies an incoming AP object (mastodon format) to our internal format.
"""
already_ap <- User.ap_enabled?(user),
{:ok, user} <- user |> User.upgrade_changeset(data) |> User.update_and_set_cache() do
unless already_ap do
- %{"op" => "user_upgrade", "user_id" => user.id}
- |> TransmogrifierWorker.new(worker_args(:transmogrifier))
- |> Repo.insert()
+ TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id})
end
{:ok, user}
require Logger
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
def init do
# To do: consider removing this call in favor of scheduled execution (`quantum`-based)
refresh_subscriptions(schedule_in: 60)
# Client API
def incoming_doc(doc) do
- %{"op" => "incoming_doc", "body" => doc}
- |> ReceiverWorker.new(worker_args(:federator_incoming))
- |> Pleroma.Repo.insert()
+ ReceiverWorker.enqueue("incoming_doc", %{"body" => doc})
end
def incoming_ap_doc(params) do
- %{"op" => "incoming_ap_doc", "params" => params}
- |> ReceiverWorker.new(worker_args(:federator_incoming))
- |> Pleroma.Repo.insert()
+ ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params})
end
def publish(%{id: "pleroma:fakeid"} = activity) do
end
def publish(activity) do
- %{"op" => "publish", "activity_id" => activity.id}
- |> PublisherWorker.new(worker_args(:federator_outgoing))
- |> Pleroma.Repo.insert()
+ PublisherWorker.enqueue("publish", %{"activity_id" => activity.id})
end
def verify_websub(websub) do
- %{"op" => "verify_websub", "websub_id" => websub.id}
- |> SubscriberWorker.new(worker_args(:federator_outgoing))
- |> Pleroma.Repo.insert()
+ SubscriberWorker.enqueue("verify_websub", %{"websub_id" => websub.id})
end
def request_subscription(websub) do
- %{"op" => "request_subscription", "websub_id" => websub.id}
- |> SubscriberWorker.new(worker_args(:federator_outgoing))
- |> Pleroma.Repo.insert()
+ SubscriberWorker.enqueue("request_subscription", %{"websub_id" => websub.id})
end
def refresh_subscriptions(worker_args \\ []) do
- %{"op" => "refresh_subscriptions"}
- |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing))
- |> Pleroma.Repo.insert()
+ SubscriberWorker.enqueue("refresh_subscriptions", %{}, worker_args ++ [max_attempts: 1])
end
# Job Worker Callbacks
"""
@spec enqueue_one(module(), Map.t()) :: :ok
def enqueue_one(module, %{} = params) do
- worker_args = Pleroma.Workers.WorkerHelper.worker_args(:federator_outgoing)
-
- %{"op" => "publish_one", "module" => to_string(module), "params" => params}
- |> PublisherWorker.new(worker_args)
- |> Pleroma.Repo.insert()
+ PublisherWorker.enqueue(
+ "publish_one",
+ %{"module" => to_string(module), "params" => params}
+ )
end
@doc """
@one_day
)
- alias Pleroma.Repo
alias Pleroma.Web.OAuth.Token
alias Pleroma.Workers.BackgroundWorker
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
def start_link(_), do: GenServer.start_link(__MODULE__, %{})
def init(_) do
@doc false
def handle_info(:perform, state) do
- %{"op" => "clean_expired_tokens"}
- |> BackgroundWorker.new(worker_args(:background))
- |> Repo.insert()
+ BackgroundWorker.enqueue("clean_expired_tokens", %{})
Process.send_after(self(), :perform, @interval)
{:noreply, state}
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Push do
- alias Pleroma.Repo
alias Pleroma.Workers.WebPusherWorker
require Logger
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
def init do
unless enabled() do
Logger.warn("""
end
def send(notification) do
- %{"op" => "web_push", "notification_id" => notification.id}
- |> WebPusherWorker.new(worker_args(:web_push))
- |> Repo.insert()
+ WebPusherWorker.enqueue("web_push", %{"notification_id" => notification.id})
end
end
queue: "activity_expiration",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "activity_expiration"
+
@impl Oban.Worker
def perform(
%{
queue: "background",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "background"
+
@impl Oban.Worker
def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do
user = User.get_cached_by_id(user_id)
--- /dev/null
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.DigestEmailsWorker do
+ alias Pleroma.User
+
+ # Note: `max_attempts` is intended to be overridden in `new/2` call
+ use Oban.Worker,
+ queue: "digest_emails",
+ max_attempts: 1
+
+ use Pleroma.Workers.WorkerHelper, queue: "digest_emails"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
+ user_id
+ |> User.get_cached_by_id()
+ |> Pleroma.DigestEmailWorker.perform()
+ end
+end
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.MailerWorker do
- alias Pleroma.User
-
# Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker,
queue: "mailer",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "mailer"
+
@impl Oban.Worker
def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do
encoded_email
|> :erlang.binary_to_term()
|> Pleroma.Emails.Mailer.deliver(config)
end
-
- def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
- user_id
- |> User.get_cached_by_id()
- |> Pleroma.DigestEmailWorker.perform()
- end
end
queue: "federator_outgoing",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
+
def backoff(attempt) when is_integer(attempt) do
Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5)
end
queue: "federator_incoming",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "federator_incoming"
+
@impl Oban.Worker
def perform(%{"op" => "incoming_doc", "body" => doc}, _job) do
Federator.perform(:incoming_doc, doc)
queue: "scheduled_activities",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities"
+
@impl Oban.Worker
def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do
Pleroma.ScheduledActivityWorker.perform(:execute, activity_id)
queue: "federator_outgoing",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
+
@impl Oban.Worker
def perform(%{"op" => "refresh_subscriptions"}, _job) do
Federator.perform(:refresh_subscriptions)
queue: "transmogrifier",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "transmogrifier"
+
@impl Oban.Worker
def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do
user = User.get_cached_by_id(user_id)
queue: "web_push",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "web_push"
+
@impl Oban.Worker
def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do
notification = Repo.get(Notification, notification_id)
defmodule Pleroma.Workers.WorkerHelper do
alias Pleroma.Config
+ alias Pleroma.Workers.WorkerHelper
def worker_args(queue) do
case Config.get([:workers, :retries, queue]) do
trunc(backoff)
end
+
+ defmacro __using__(opts) do
+ caller_module = __CALLER__.module
+ queue = Keyword.fetch!(opts, :queue)
+
+ quote do
+ def enqueue(op, params, worker_args \\ []) do
+ params = Map.merge(%{"op" => op}, params)
+ queue_atom = String.to_atom(unquote(queue))
+ worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom)
+
+ unquote(caller_module)
+ |> apply(:new, [params, worker_args])
+ |> Pleroma.Repo.insert()
+ end
+ end
+ end
end