"web"
]
-config :pleroma, Pleroma.Web.Federator.RetryQueue,
- enabled: false,
- max_jobs: 20,
- initial_timeout: 30,
- max_retries: 5
-
-config :pleroma_job_queue, :queues,
+job_queues = [
federator_incoming: 50,
federator_outgoing: 50,
web_push: 50,
transmogrifier: 20,
scheduled_activities: 10,
background: 5
+]
+
+config :pleroma_job_queue, :queues, job_queues
+
+config :pleroma, Oban,
+ repo: Pleroma.Repo,
+ verbose: false,
+ prune: {:maxage, 60 * 60 * 24 * 7},
+ queues: job_queues
config :pleroma, :fetch_initial_posts,
enabled: false,
config :pleroma_job_queue, disabled: true
+config :pleroma, Oban,
+ queues: false,
+ prune: :disabled
+
config :pleroma, Pleroma.ScheduledActivity,
daily_user_limit: 2,
total_user_limit: 3,
This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the `max_jobs` set to `50`.
-## Pleroma.Web.Federator.RetryQueue
-
-* `enabled`: If set to `true`, failed federation jobs will be retried
-* `max_jobs`: The maximum amount of parallel federation jobs running at the same time.
-* `initial_timeout`: The initial timeout in seconds
-* `max_retries`: The maximum number of times a federation job is retried
-
## Pleroma.Web.Metadata
* `providers`: a list of metadata providers to enable. Providers available:
* Pleroma.Web.Metadata.Providers.OpenGraph
hackney_pool_children() ++
[
%{
- id: Pleroma.Web.Federator.RetryQueue,
- start: {Pleroma.Web.Federator.RetryQueue, :start_link, []}
+ id: Oban,
+ start: {Oban, :start_link, [Application.get_env(:pleroma, Oban)]}
},
%{
id: Pleroma.Web.OAuth.Token.CleanWorker,
end
end
+ def publish_one(%{actor_id: actor_id} = params) do
+ actor = User.get_by_id(actor_id)
+
+ params
+ |> Map.delete(:actor_id)
+ |> Map.put(:actor, actor)
+ |> publish_one()
+ end
+
defp should_federate?(inbox, public) do
if public do
true
Publishes an activity with BCC to all relevant peers.
"""
- def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do
+ def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity)
+ when is_list(bcc) and bcc != [] do
public = is_public?(activity)
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{
inbox: inbox,
json: json,
- actor: actor,
+ actor_id: actor.id,
id: activity.data["id"],
unreachable_since: unreachable_since
})
%{
inbox: inbox,
json: json,
- actor: actor,
+ actor_id: actor.id,
id: activity.data["id"],
unreachable_since: unreachable_since
}
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.Federator.Publisher
- alias Pleroma.Web.Federator.RetryQueue
alias Pleroma.Web.OStatus
alias Pleroma.Web.Websub
end
end
- def perform(
- :publish_single_websub,
- %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
- ) do
- case Websub.publish_one(params) do
- {:ok, _} ->
- :ok
-
- {:error, _} ->
- RetryQueue.enqueue(params, Websub)
- end
- end
-
def perform(type, _) do
Logger.debug(fn -> "Unknown task: #{type}" end)
{:error, "Don't know what to do with this"}
alias Pleroma.Activity
alias Pleroma.Config
alias Pleroma.User
- alias Pleroma.Web.Federator.RetryQueue
require Logger
Enqueue publishing a single activity.
"""
@spec enqueue_one(module(), Map.t()) :: :ok
- def enqueue_one(module, %{} = params),
- do: PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_one, module, params])
-
- @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
- def perform(:publish_one, module, params) do
- case apply(module, :publish_one, [params]) do
- {:ok, _} ->
- :ok
-
- {:error, _e} ->
- RetryQueue.enqueue(params, module)
- end
- end
-
- def perform(type, _, _) do
- Logger.debug("Unknown task: #{type}")
- {:error, "Don't know what to do with this"}
+ def enqueue_one(module, %{} = params) do
+ %{module: to_string(module), params: params}
+ |> Pleroma.Workers.Publisher.new()
+ |> Pleroma.Repo.insert()
end
@doc """
+++ /dev/null
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Federator.RetryQueue do
- use GenServer
-
- require Logger
-
- def init(args) do
- queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected])
-
- {:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}}
- end
-
- def start_link do
- enabled =
- if Pleroma.Config.get(:env) == :test,
- do: true,
- else: Pleroma.Config.get([__MODULE__, :enabled], false)
-
- if enabled do
- Logger.info("Starting retry queue")
-
- linkres =
- GenServer.start_link(
- __MODULE__,
- %{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil},
- name: __MODULE__
- )
-
- maybe_kickoff_timer()
- linkres
- else
- Logger.info("Retry queue disabled")
- :ignore
- end
- end
-
- def enqueue(data, transport, retries \\ 0) do
- GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
- end
-
- def get_stats do
- GenServer.call(__MODULE__, :get_stats)
- end
-
- def reset_stats do
- GenServer.call(__MODULE__, :reset_stats)
- end
-
- def get_retry_params(retries) do
- if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do
- {:drop, "Max retries reached"}
- else
- {:retry, growth_function(retries)}
- end
- end
-
- def get_retry_timer_interval do
- Pleroma.Config.get([:retry_queue, :interval], 1000)
- end
-
- defp ets_count_expires(table, current_time) do
- :ets.select_count(
- table,
- [
- {
- {:"$1", :"$2"},
- [{:"=<", :"$1", {:const, current_time}}],
- [true]
- }
- ]
- )
- end
-
- defp ets_pop_n_expired(table, current_time, desired) do
- {popped, _continuation} =
- :ets.select(
- table,
- [
- {
- {:"$1", :"$2"},
- [{:"=<", :"$1", {:const, current_time}}],
- [:"$_"]
- }
- ],
- desired
- )
-
- popped
- |> Enum.each(fn e ->
- :ets.delete_object(table, e)
- end)
-
- popped
- end
-
- def maybe_start_job(running_jobs, queue_table) do
- # we don't want to hit the ets or the DateTime more times than we have to
- # could optimize slightly further by not using the count, and instead grabbing
- # up to N objects early...
- current_time = DateTime.to_unix(DateTime.utc_now())
- n_running_jobs = :sets.size(running_jobs)
-
- if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do
- n_ready_jobs = ets_count_expires(queue_table, current_time)
-
- if n_ready_jobs > 0 do
- # figure out how many we could start
- available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs
- start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
- else
- running_jobs
- end
- else
- running_jobs
- end
- end
-
- defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do
- running_jobs
- end
-
- defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
- when available_job_slots > 0 do
- candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots)
-
- candidates
- |> List.foldl(running_jobs, fn {_, e}, rj ->
- {:ok, pid} = Task.start(fn -> worker(e) end)
- mref = Process.monitor(pid)
- :sets.add_element(mref, rj)
- end)
- end
-
- def worker({:send, data, transport, retries}) do
- case transport.publish_one(data) do
- {:ok, _} ->
- GenServer.cast(__MODULE__, :inc_delivered)
- :delivered
-
- {:error, _reason} ->
- enqueue(data, transport, retries)
- :retry
- end
- end
-
- def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
- {:reply, %{delivered: delivery_count, dropped: drop_count}, state}
- end
-
- def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
- {:reply, %{delivered: delivery_count, dropped: drop_count},
- %{state | delivered: 0, dropped: 0}}
- end
-
- def handle_cast(:reset_stats, state) do
- {:noreply, %{state | delivered: 0, dropped: 0}}
- end
-
- def handle_cast(
- {:maybe_enqueue, data, transport, retries},
- %{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state
- ) do
- case get_retry_params(retries) do
- {:retry, timeout} ->
- :ets.insert(queue_table, {timeout, {:send, data, transport, retries}})
- running_jobs = maybe_start_job(running_jobs, queue_table)
- {:noreply, %{state | running_jobs: running_jobs}}
-
- {:drop, message} ->
- Logger.debug(message)
- {:noreply, %{state | dropped: drop_count + 1}}
- end
- end
-
- def handle_cast(:kickoff_timer, state) do
- retry_interval = get_retry_timer_interval()
- Process.send_after(__MODULE__, :retry_timer_run, retry_interval)
- {:noreply, state}
- end
-
- def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do
- {:noreply, %{state | delivered: delivery_count + 1}}
- end
-
- def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do
- {:noreply, %{state | dropped: drop_count + 1}}
- end
-
- def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
- case transport.publish_one(data) do
- {:ok, _} ->
- {:noreply, %{state | delivered: delivery_count + 1}}
-
- {:error, _reason} ->
- enqueue(data, transport, retries)
- {:noreply, state}
- end
- end
-
- def handle_info(
- :retry_timer_run,
- %{queue_table: queue_table, running_jobs: running_jobs} = state
- ) do
- maybe_kickoff_timer()
- running_jobs = maybe_start_job(running_jobs, queue_table)
- {:noreply, %{state | running_jobs: running_jobs}}
- end
-
- def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
- %{running_jobs: running_jobs, queue_table: queue_table} = state
- running_jobs = :sets.del_element(ref, running_jobs)
- running_jobs = maybe_start_job(running_jobs, queue_table)
- {:noreply, %{state | running_jobs: running_jobs}}
- end
-
- def handle_info(unknown, state) do
- Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
- {:noreply, state}
- end
-
- if Pleroma.Config.get(:env) == :test do
- defp growth_function(_retries) do
- _shutit = Pleroma.Config.get([__MODULE__, :initial_timeout])
- DateTime.to_unix(DateTime.utc_now()) - 1
- end
- else
- defp growth_function(retries) do
- round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) +
- DateTime.to_unix(DateTime.utc_now())
- end
- end
-
- defp maybe_kickoff_timer do
- GenServer.cast(__MODULE__, :kickoff_timer)
- end
-end
end
end
+ def publish_one(%{recipient_id: recipient_id} = params) do
+ recipient = User.get_by_id(recipient_id)
+
+ params
+ |> Map.delete(:recipient_id)
+ |> Map.put(:recipient, recipient)
+ |> publish_one()
+ end
+
def publish_one(_), do: :noop
@supported_activities [
Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
Publisher.enqueue_one(__MODULE__, %{
- recipient: remote_user,
+ recipient_id: remote_user.id,
feed: feed,
unreachable_since: reachable_urls_metadata[remote_user.info.salmon]
})
--- /dev/null
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Publisher do
+ use Oban.Worker, queue: "federator_outgoing", max_attempts: 5
+
+ @impl Oban.Worker
+ def perform(%Oban.Job{args: %{module: module_name, params: params}}) do
+ module_name
+ |> String.to_atom()
+ |> apply(:publish_one, [params])
+ end
+end
{:phoenix_ecto, "~> 4.0"},
{:ecto_sql, "~> 3.1"},
{:postgrex, ">= 0.13.5"},
+ {:oban, "~> 0.6"},
{:gettext, "~> 0.15"},
{:comeonin, "~> 4.1.1"},
{:pbkdf2_elixir, "~> 0.12.3"},
"mogrify": {:hex, :mogrify, "0.6.1", "de1b527514f2d95a7bbe9642eb556061afb337e220cf97adbf3a4e6438ed70af", [:mix], [], "hexpm"},
"mox": {:hex, :mox, "0.5.1", "f86bb36026aac1e6f924a4b6d024b05e9adbed5c63e8daa069bd66fb3292165b", [:mix], [], "hexpm"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"},
+ "oban": {:hex, :oban, "0.6.0", "8b9b861355610e703e58a878bc29959f3f0e1b4cd1e90d785cf2bb2498d3b893", [:mix], [{:ecto_sql, "~> 3.1", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.14", [hex: :postgrex, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
"parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"},
"pbkdf2_elixir": {:hex, :pbkdf2_elixir, "0.12.3", "6706a148809a29c306062862c803406e88f048277f6e85b68faf73291e820b84", [:mix], [], "hexpm"},
"phoenix": {:hex, :phoenix, "1.4.9", "746d098e10741c334d88143d3c94cab1756435f94387a63441792e66ec0ee974", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 1.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.8.1 or ~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
--- /dev/null
+defmodule Pleroma.Repo.Migrations.AddObanJobsTable do
+ use Ecto.Migration
+
+ defdelegate up, to: Oban.Migrations
+ defdelegate down, to: Oban.Migrations
+end
alias Pleroma.Web.CommonAPI
use Pleroma.DataCase
+ use Oban.Testing, repo: Pleroma.Repo
import Pleroma.Factory
- import Mock
setup_all do
Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
refute Activity.get_by_id(repeat.id)
end
- test_with_mock "it sends out User Delete activity",
- %{user: user},
- Pleroma.Web.ActivityPub.Publisher,
- [:passthrough],
- [] do
+ test "it sends out User Delete activity", %{user: user} do
config_path = [:instance, :federating]
initial_setting = Pleroma.Config.get(config_path)
Pleroma.Config.put(config_path, true)
{:ok, _user} = User.delete(user)
- assert called(
- Pleroma.Web.ActivityPub.Publisher.publish_one(%{
- inbox: "http://mastodon.example.org/inbox"
- })
- )
+ assert [%{args: %{"params" => %{"inbox" => "http://mastodon.example.org/inbox"}}}] =
+ all_enqueued(worker: Pleroma.Workers.Publisher)
Pleroma.Config.put(config_path, initial_setting)
end
assert called(
Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{
inbox: "https://domain.com/users/nick1/inbox",
- actor: actor,
+ actor_id: actor.id,
id: note_activity.data["id"]
})
)
alias Pleroma.Instances
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Federator
+
use Pleroma.DataCase
+ use Oban.Testing, repo: Pleroma.Repo
+
import Pleroma.Factory
import Mock
:ok
end
- describe "Publisher.perform" do
- test "call `perform` with unknown task" do
- assert {
- :error,
- "Don't know what to do with this"
- } = Pleroma.Web.Federator.Publisher.perform("test", :ok, :ok)
- end
- end
-
describe "Publish an activity" do
setup do
user = insert(:user)
end
describe "Targets reachability filtering in `publish`" do
- test_with_mock "it federates only to reachable instances via AP",
- Pleroma.Web.ActivityPub.Publisher,
- [:passthrough],
- [] do
+ test "it federates only to reachable instances via AP" do
user = insert(:user)
{inbox1, inbox2} =
{:ok, _activity} =
CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"})
- assert called(
- Pleroma.Web.ActivityPub.Publisher.publish_one(%{
- inbox: inbox1,
- unreachable_since: dt
- })
- )
+ expected_dt = NaiveDateTime.to_iso8601(dt)
- refute called(Pleroma.Web.ActivityPub.Publisher.publish_one(%{inbox: inbox2}))
+ assert [%{args: %{"params" => %{"inbox" => ^inbox1, "unreachable_since" => ^expected_dt}}}] =
+ all_enqueued(worker: Pleroma.Workers.Publisher)
end
- test_with_mock "it federates only to reachable instances via Websub",
- Pleroma.Web.Websub,
- [:passthrough],
- [] do
+ test "it federates only to reachable instances via Websub" do
user = insert(:user)
websub_topic = Pleroma.Web.OStatus.feed_path(user)
{:ok, _activity} = CommonAPI.post(user, %{"status" => "HI"})
- assert called(
- Pleroma.Web.Websub.publish_one(%{
- callback: sub2.callback,
- unreachable_since: dt
- })
- )
-
- refute called(Pleroma.Web.Websub.publish_one(%{callback: sub1.callback}))
+ expected_callback = sub2.callback
+ expected_dt = NaiveDateTime.to_iso8601(dt)
+
+ assert [
+ %{
+ args: %{
+ "params" => %{
+ "callback" => ^expected_callback,
+ "unreachable_since" => ^expected_dt
+ }
+ }
+ }
+ ] = all_enqueued(worker: Pleroma.Workers.Publisher)
end
- test_with_mock "it federates only to reachable instances via Salmon",
- Pleroma.Web.Salmon,
- [:passthrough],
- [] do
+ test "it federates only to reachable instances via Salmon" do
user = insert(:user)
- remote_user1 =
+ _remote_user1 =
insert(:user, %{
local: false,
nickname: "nick1@domain.com",
info: %{salmon: "https://domain2.com/salmon"}
})
+ remote_user2_id = remote_user2.id
+
dt = NaiveDateTime.utc_now()
Instances.set_unreachable(remote_user2.ap_id, dt)
{:ok, _activity} =
CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"})
- assert called(
- Pleroma.Web.Salmon.publish_one(%{
- recipient: remote_user2,
- unreachable_since: dt
- })
- )
-
- refute called(Pleroma.Web.Salmon.publish_one(%{recipient: remote_user1}))
+ expected_dt = NaiveDateTime.to_iso8601(dt)
+
+ assert [
+ %{
+ args: %{
+ "params" => %{
+ "recipient_id" => ^remote_user2_id,
+ "unreachable_since" => ^expected_dt
+ }
+ }
+ }
+ ] = all_enqueued(worker: Pleroma.Workers.Publisher)
end
end
+++ /dev/null
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule MockActivityPub do
- def publish_one({ret, waiter}) do
- send(waiter, :complete)
- {ret, "success"}
- end
-end
-
-defmodule Pleroma.Web.Federator.RetryQueueTest do
- use Pleroma.DataCase
- alias Pleroma.Web.Federator.RetryQueue
-
- @small_retry_count 0
- @hopeless_retry_count 10
-
- setup do
- RetryQueue.reset_stats()
- end
-
- test "RetryQueue responds to stats request" do
- assert %{delivered: 0, dropped: 0} == RetryQueue.get_stats()
- end
-
- test "failed posts are retried" do
- {:retry, _timeout} = RetryQueue.get_retry_params(@small_retry_count)
-
- wait_task =
- Task.async(fn ->
- receive do
- :complete -> :ok
- end
- end)
-
- RetryQueue.enqueue({:ok, wait_task.pid}, MockActivityPub, @small_retry_count)
- Task.await(wait_task)
- assert %{delivered: 1, dropped: 0} == RetryQueue.get_stats()
- end
-
- test "posts that have been tried too many times are dropped" do
- {:drop, _timeout} = RetryQueue.get_retry_params(@hopeless_retry_count)
-
- RetryQueue.enqueue({:ok, nil}, MockActivityPub, @hopeless_retry_count)
- assert %{delivered: 0, dropped: 1} == RetryQueue.get_stats()
- end
-end
Salmon.publish(user, activity)
- assert called(Publisher.enqueue_one(Salmon, %{recipient: mentioned_user}))
+ assert called(Publisher.enqueue_one(Salmon, %{recipient_id: mentioned_user.id}))
end
end