- Mastodon API: Unsubscribe followers when they unfollow a user
- AdminAPI: Add "godmode" while fetching user statuses (i.e. admin can see private statuses)
- Improve digest email template
+- Replaced [pleroma_job_queue](https://git.pleroma.social/pleroma/pleroma_job_queue) with [Oban](https://github.com/sorentwo/oban)
+- Introduced [quantum](https://github.com/quantum-elixir/quantum-core) job scheduler
### Fixed
+ - Following from Osada
- Not being able to pin unlisted posts
- Objects being re-embedded to activities after being updated (e.g faved/reposted). Running 'mix pleroma.database prune_objects' again is advised.
- Favorites timeline doing database-intensive queries
"web"
]
-config :pleroma, Pleroma.Web.Federator.RetryQueue,
- enabled: false,
- max_jobs: 20,
- initial_timeout: 30,
- max_retries: 5
-
-config :pleroma_job_queue, :queues,
- activity_expiration: 10,
- federator_incoming: 50,
- federator_outgoing: 50,
- web_push: 50,
- mailer: 10,
- transmogrifier: 20,
- scheduled_activities: 10,
- background: 5
+config :pleroma, Oban,
+ repo: Pleroma.Repo,
+ verbose: false,
+ prune: {:maxage, 60 * 60 * 24 * 7},
+ queues: [
++ activity_expiration: 10,
+ federator_incoming: 50,
+ federator_outgoing: 50,
+ web_push: 50,
+ mailer: 10,
+ transmogrifier: 20,
+ scheduled_activities: 10,
+ background: 5
+ ]
+
+config :pleroma, :workers,
+ retries: [
+ federator_incoming: 5,
+ federator_outgoing: 5
+ ]
config :pleroma, :fetch_initial_posts,
enabled: false,
--- /dev/null
- PleromaJobQueue.enqueue(:activity_expiration, __MODULE__, [:execute, expiration.id])
+ # Pleroma: A lightweight social networking server
+ # Copyright © 2019 Pleroma Authors <https://pleroma.social/>
+ # SPDX-License-Identifier: AGPL-3.0-only
+
+ defmodule Pleroma.ActivityExpirationWorker do
+ alias Pleroma.Activity
+ alias Pleroma.ActivityExpiration
+ alias Pleroma.Config
+ alias Pleroma.Repo
+ alias Pleroma.User
+ alias Pleroma.Web.CommonAPI
++ alias Pleroma.Workers.BackgroundWorker
++
+ require Logger
+ use GenServer
+ import Ecto.Query
+
++ defdelegate worker_args(queue), to: Pleroma.Workers.Helper
++
+ @schedule_interval :timer.minutes(1)
+
+ def start_link(_) do
+ GenServer.start_link(__MODULE__, nil)
+ end
+
+ @impl true
+ def init(_) do
+ if Config.get([ActivityExpiration, :enabled]) do
+ schedule_next()
+ {:ok, nil}
+ else
+ :ignore
+ end
+ end
+
+ def perform(:execute, expiration_id) do
+ try do
+ expiration =
+ ActivityExpiration
+ |> where([e], e.id == ^expiration_id)
+ |> Repo.one!()
+
+ activity = Activity.get_by_id_with_object(expiration.activity_id)
+ user = User.get_by_ap_id(activity.object.data["actor"])
+ CommonAPI.delete(activity.id, user)
+ rescue
+ error ->
+ Logger.error("#{__MODULE__} Couldn't delete expired activity: #{inspect(error)}")
+ end
+ end
+
+ @impl true
+ def handle_info(:perform, state) do
+ ActivityExpiration.due_expirations(@schedule_interval)
+ |> Enum.each(fn expiration ->
++ %{
++ "op" => "activity_expiration",
++ "activity_expiration_id" => expiration.id
++ }
++ |> BackgroundWorker.new(worker_args(:activity_expiration))
++ |> Repo.insert()
+ end)
+
+ schedule_next()
+ {:noreply, state}
+ end
+
+ defp schedule_next do
+ Process.send_after(self(), :perform, @schedule_interval)
+ end
+ end
@doc """
Enqueues an activity for federation if it's local
"""
+ @spec maybe_federate(any()) :: :ok
def maybe_federate(%Activity{local: true} = activity) do
if Pleroma.Config.get!([:instance, :federating]) do
- priority =
- case activity.data["type"] do
- "Delete" -> 10
- "Create" -> 1
- _ -> 5
- end
-
- Pleroma.Web.Federator.publish(activity, priority)
+ Pleroma.Web.Federator.publish(activity)
end
:ok
--- /dev/null
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.BackgroundWorker do
+ alias Pleroma.Activity
+ alias Pleroma.User
+ alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
+ alias Pleroma.Web.OAuth.Token.CleanWorker
+
+ # Note: `max_attempts` is intended to be overridden in `new/1` call
+ use Oban.Worker,
+ queue: "background",
+ max_attempts: 1
+
+ @impl Oban.Worker
+ def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do
+ user = User.get_by_id(user_id)
+ User.perform(:fetch_initial_posts, user)
+ end
+
+ def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}, _job) do
+ user = User.get_by_id(user_id)
+ User.perform(:deactivate_async, user, status)
+ end
+
+ def perform(%{"op" => "delete_user", "user_id" => user_id}, _job) do
+ user = User.get_by_id(user_id)
+ User.perform(:delete, user)
+ end
+
+ def perform(
+ %{
+ "op" => "blocks_import",
+ "blocker_id" => blocker_id,
+ "blocked_identifiers" => blocked_identifiers
+ },
+ _job
+ ) do
+ blocker = User.get_by_id(blocker_id)
+ User.perform(:blocks_import, blocker, blocked_identifiers)
+ end
+
+ def perform(
+ %{
+ "op" => "follow_import",
+ "follower_id" => follower_id,
+ "followed_identifiers" => followed_identifiers
+ },
+ _job
+ ) do
+ follower = User.get_by_id(follower_id)
+ User.perform(:follow_import, follower, followed_identifiers)
+ end
+
+ def perform(%{"op" => "clean_expired_tokens"}, _job) do
+ CleanWorker.perform(:clean)
+ end
+
+ def perform(%{"op" => "media_proxy_preload", "message" => message}, _job) do
+ MediaProxyWarmingPolicy.perform(:preload, message)
+ end
+
+ def perform(%{"op" => "media_proxy_prefetch", "url" => url}, _job) do
+ MediaProxyWarmingPolicy.perform(:prefetch, url)
+ end
+
+ def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id}, _job) do
+ activity = Activity.get_by_id(activity_id)
+ Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity)
+ end
++
++ def perform(
++ %{"op" => "activity_expiration", "activity_expiration_id" => activity_expiration_id},
++ _job
++ ) do
++ Pleroma.ActivityExpirationWorker.perform(:execute, activity_expiration_id)
++ end
+end
alias Pleroma.Activity
alias Pleroma.Instances
alias Pleroma.Object
+ alias Pleroma.Tests.ObanHelpers
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ObjectView
+ alias Pleroma.Web.ActivityPub.Relay
alias Pleroma.Web.ActivityPub.UserView
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.CommonAPI
end
describe "like an object" do
- assert called(Pleroma.Web.Federator.publish(like_activity, 5))
+ test_with_mock "sends an activity to federation", Pleroma.Web.Federator, [:passthrough], [] do
+ Pleroma.Config.put([:instance, :federating], true)
+ note_activity = insert(:note_activity)
+ assert object_activity = Object.normalize(note_activity)
+
+ user = insert(:user)
+
+ {:ok, like_activity, _object} = ActivityPub.like(user, object_activity)
++ assert called(Pleroma.Web.Federator.publish(like_activity))
+ end
+
+ test "returns exist activity if object already liked" do
+ note_activity = insert(:note_activity)
+ assert object_activity = Object.normalize(note_activity)
+
+ user = insert(:user)
+
+ {:ok, like_activity, _object} = ActivityPub.like(user, object_activity)
+
+ {:ok, like_activity_exist, _object} = ActivityPub.like(user, object_activity)
+ assert like_activity == like_activity_exist
+ end
+
test "adds a like activity to the db" do
note_activity = insert(:note_activity)
assert object = Object.normalize(note_activity)
end
describe "unliking" do
- assert called(Pleroma.Web.Federator.publish(unlike_activity, 5))
+ test_with_mock "sends an activity to federation", Pleroma.Web.Federator, [:passthrough], [] do
+ Pleroma.Config.put([:instance, :federating], true)
+
+ note_activity = insert(:note_activity)
+ object = Object.normalize(note_activity)
+ user = insert(:user)
+
+ {:ok, object} = ActivityPub.unlike(user, object)
+ refute called(Pleroma.Web.Federator.publish())
+
+ {:ok, _like_activity, object} = ActivityPub.like(user, object)
+ assert object.data["like_count"] == 1
+
+ {:ok, unlike_activity, _, object} = ActivityPub.unlike(user, object)
+ assert object.data["like_count"] == 0
+
++ assert called(Pleroma.Web.Federator.publish(unlike_activity))
+ end
+
test "unliking a previously liked object" do
note_activity = insert(:note_activity)
object = Object.normalize(note_activity)
assert Relay.publish(activity) == {:error, false}
end
- test "returns announce activity" do
+ test "returns error when object is unknown" do
+ activity =
+ insert(:note_activity,
+ data: %{
+ "type" => "Create",
+ "object" => "http://mastodon.example.org/eee/99541947525187367"
+ }
+ )
+
+ assert Relay.publish(activity) == {:error, nil}
+ end
+
+ test_with_mock "returns announce activity and publish to federate",
+ Pleroma.Web.Federator,
+ [:passthrough],
+ [] do
+ Pleroma.Config.put([:instance, :federating], true)
+ service_actor = Relay.get_actor()
+ note = insert(:note_activity)
+ assert {:ok, %Activity{} = activity, %Object{} = obj} = Relay.publish(note)
+ assert activity.data["type"] == "Announce"
+ assert activity.data["actor"] == service_actor.ap_id
+ assert activity.data["object"] == obj.data["id"]
- assert called(Pleroma.Web.Federator.publish(activity, 5))
++ assert called(Pleroma.Web.Federator.publish(activity))
+ end
+
+ test_with_mock "returns announce activity and not publish to federate",
+ Pleroma.Web.Federator,
+ [:passthrough],
+ [] do
+ Pleroma.Config.put([:instance, :federating], false)
service_actor = Relay.get_actor()
note = insert(:note_activity)
assert {:ok, %Activity{} = activity, %Object{} = obj} = Relay.publish(note)
assert activity.data["type"] == "Announce"
assert activity.data["actor"] == service_actor.ap_id
assert activity.data["object"] == obj.data["id"]
- refute called(Pleroma.Web.Federator.publish(activity, 5))
++ refute called(Pleroma.Web.Federator.publish(activity))
end
end
end