From: Ivan Tashkinov Date: Sat, 31 Aug 2019 11:25:43 +0000 (+0300) Subject: [#1149] Added Oban job for "activity_expiration". Merged remote-tracking branch ... X-Git-Url: https://git.squeep.com/?a=commitdiff_plain;h=e890ea7e821d61fca75084d46f70ed125acf1fc8;p=akkoma [#1149] Added Oban job for "activity_expiration". Merged remote-tracking branch 'remotes/upstream/develop' into 1149-oban-job-queue # Conflicts: # config/config.exs --- e890ea7e821d61fca75084d46f70ed125acf1fc8 diff --cc CHANGELOG.md index fd81b3087,4acb749ac..8b73c783f --- a/CHANGELOG.md +++ b/CHANGELOG.md @@@ -19,10 -19,9 +19,11 @@@ The format is based on [Keep a Changelo - Mastodon API: Unsubscribe followers when they unfollow a user - AdminAPI: Add "godmode" while fetching user statuses (i.e. admin can see private statuses) - Improve digest email template +- Replaced [pleroma_job_queue](https://git.pleroma.social/pleroma/pleroma_job_queue) with [Oban](https://github.com/sorentwo/oban) +- Introduced [quantum](https://github.com/quantum-elixir/quantum-core) job scheduler ### Fixed + - Following from Osada - Not being able to pin unlisted posts - Objects being re-embedded to activities after being updated (e.g faved/reposted). Running 'mix pleroma.database prune_objects' again is advised. - Favorites timeline doing database-intensive queries diff --cc config/config.exs index 9a8c69448,f630771a3..da89aa3e9 --- a/config/config.exs +++ b/config/config.exs @@@ -467,25 -449,21 +467,26 @@@ config :pleroma, Pleroma.User "web" ] -config :pleroma, Pleroma.Web.Federator.RetryQueue, - enabled: false, - max_jobs: 20, - initial_timeout: 30, - max_retries: 5 - -config :pleroma_job_queue, :queues, - activity_expiration: 10, - federator_incoming: 50, - federator_outgoing: 50, - web_push: 50, - mailer: 10, - transmogrifier: 20, - scheduled_activities: 10, - background: 5 +config :pleroma, Oban, + repo: Pleroma.Repo, + verbose: false, + prune: {:maxage, 60 * 60 * 24 * 7}, + queues: [ ++ activity_expiration: 10, + federator_incoming: 50, + federator_outgoing: 50, + web_push: 50, + mailer: 10, + transmogrifier: 20, + scheduled_activities: 10, + background: 5 + ] + +config :pleroma, :workers, + retries: [ + federator_incoming: 5, + federator_outgoing: 5 + ] config :pleroma, :fetch_initial_posts, enabled: false, diff --cc lib/pleroma/activity_expiration_worker.ex index 000000000,0f9e715f8..5c0c53232 mode 000000,100644..100644 --- a/lib/pleroma/activity_expiration_worker.ex +++ b/lib/pleroma/activity_expiration_worker.ex @@@ -1,0 -1,62 +1,71 @@@ + # Pleroma: A lightweight social networking server + # Copyright © 2019 Pleroma Authors + # SPDX-License-Identifier: AGPL-3.0-only + + defmodule Pleroma.ActivityExpirationWorker do + alias Pleroma.Activity + alias Pleroma.ActivityExpiration + alias Pleroma.Config + alias Pleroma.Repo + alias Pleroma.User + alias Pleroma.Web.CommonAPI ++ alias Pleroma.Workers.BackgroundWorker ++ + require Logger + use GenServer + import Ecto.Query + ++ defdelegate worker_args(queue), to: Pleroma.Workers.Helper ++ + @schedule_interval :timer.minutes(1) + + def start_link(_) do + GenServer.start_link(__MODULE__, nil) + end + + @impl true + def init(_) do + if Config.get([ActivityExpiration, :enabled]) do + schedule_next() + {:ok, nil} + else + :ignore + end + end + + def perform(:execute, expiration_id) do + try do + expiration = + ActivityExpiration + |> where([e], e.id == ^expiration_id) + |> Repo.one!() + + activity = Activity.get_by_id_with_object(expiration.activity_id) + user = User.get_by_ap_id(activity.object.data["actor"]) + CommonAPI.delete(activity.id, user) + rescue + error -> + Logger.error("#{__MODULE__} Couldn't delete expired activity: #{inspect(error)}") + end + end + + @impl true + def handle_info(:perform, state) do + ActivityExpiration.due_expirations(@schedule_interval) + |> Enum.each(fn expiration -> - PleromaJobQueue.enqueue(:activity_expiration, __MODULE__, [:execute, expiration.id]) ++ %{ ++ "op" => "activity_expiration", ++ "activity_expiration_id" => expiration.id ++ } ++ |> BackgroundWorker.new(worker_args(:activity_expiration)) ++ |> Repo.insert() + end) + + schedule_next() + {:noreply, state} + end + + defp schedule_next do + Process.send_after(self(), :perform, @schedule_interval) + end + end diff --cc lib/pleroma/web/activity_pub/utils.ex index 8502ca3be,c9c0c3763..52f4b0194 --- a/lib/pleroma/web/activity_pub/utils.ex +++ b/lib/pleroma/web/activity_pub/utils.ex @@@ -166,9 -166,17 +166,10 @@@ defmodule Pleroma.Web.ActivityPub.Util @doc """ Enqueues an activity for federation if it's local """ + @spec maybe_federate(any()) :: :ok def maybe_federate(%Activity{local: true} = activity) do if Pleroma.Config.get!([:instance, :federating]) do - priority = - case activity.data["type"] do - "Delete" -> 10 - "Create" -> 1 - _ -> 5 - end - - Pleroma.Web.Federator.publish(activity, priority) + Pleroma.Web.Federator.publish(activity) end :ok diff --cc lib/pleroma/workers/background_worker.ex index 3c021b9b4,000000000..fbce7d789 mode 100644,000000..100644 --- a/lib/pleroma/workers/background_worker.ex +++ b/lib/pleroma/workers/background_worker.ex @@@ -1,72 -1,0 +1,79 @@@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.BackgroundWorker do + alias Pleroma.Activity + alias Pleroma.User + alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy + alias Pleroma.Web.OAuth.Token.CleanWorker + + # Note: `max_attempts` is intended to be overridden in `new/1` call + use Oban.Worker, + queue: "background", + max_attempts: 1 + + @impl Oban.Worker + def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do + user = User.get_by_id(user_id) + User.perform(:fetch_initial_posts, user) + end + + def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}, _job) do + user = User.get_by_id(user_id) + User.perform(:deactivate_async, user, status) + end + + def perform(%{"op" => "delete_user", "user_id" => user_id}, _job) do + user = User.get_by_id(user_id) + User.perform(:delete, user) + end + + def perform( + %{ + "op" => "blocks_import", + "blocker_id" => blocker_id, + "blocked_identifiers" => blocked_identifiers + }, + _job + ) do + blocker = User.get_by_id(blocker_id) + User.perform(:blocks_import, blocker, blocked_identifiers) + end + + def perform( + %{ + "op" => "follow_import", + "follower_id" => follower_id, + "followed_identifiers" => followed_identifiers + }, + _job + ) do + follower = User.get_by_id(follower_id) + User.perform(:follow_import, follower, followed_identifiers) + end + + def perform(%{"op" => "clean_expired_tokens"}, _job) do + CleanWorker.perform(:clean) + end + + def perform(%{"op" => "media_proxy_preload", "message" => message}, _job) do + MediaProxyWarmingPolicy.perform(:preload, message) + end + + def perform(%{"op" => "media_proxy_prefetch", "url" => url}, _job) do + MediaProxyWarmingPolicy.perform(:prefetch, url) + end + + def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id}, _job) do + activity = Activity.get_by_id(activity_id) + Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity) + end ++ ++ def perform( ++ %{"op" => "activity_expiration", "activity_expiration_id" => activity_expiration_id}, ++ _job ++ ) do ++ Pleroma.ActivityExpirationWorker.perform(:execute, activity_expiration_id) ++ end +end diff --cc test/web/activity_pub/activity_pub_controller_test.exs index a214f57a6,5192e734f..a1b567a46 --- a/test/web/activity_pub/activity_pub_controller_test.exs +++ b/test/web/activity_pub/activity_pub_controller_test.exs @@@ -10,9 -8,9 +10,10 @@@ defmodule Pleroma.Web.ActivityPub.Activ alias Pleroma.Activity alias Pleroma.Instances alias Pleroma.Object + alias Pleroma.Tests.ObanHelpers alias Pleroma.User alias Pleroma.Web.ActivityPub.ObjectView + alias Pleroma.Web.ActivityPub.Relay alias Pleroma.Web.ActivityPub.UserView alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.CommonAPI diff --cc test/web/activity_pub/activity_pub_test.exs index 1515f4eb6,f72b44aed..d0118fefa --- a/test/web/activity_pub/activity_pub_test.exs +++ b/test/web/activity_pub/activity_pub_test.exs @@@ -676,6 -678,29 +678,29 @@@ defmodule Pleroma.Web.ActivityPub.Activ end describe "like an object" do + test_with_mock "sends an activity to federation", Pleroma.Web.Federator, [:passthrough], [] do + Pleroma.Config.put([:instance, :federating], true) + note_activity = insert(:note_activity) + assert object_activity = Object.normalize(note_activity) + + user = insert(:user) + + {:ok, like_activity, _object} = ActivityPub.like(user, object_activity) - assert called(Pleroma.Web.Federator.publish(like_activity, 5)) ++ assert called(Pleroma.Web.Federator.publish(like_activity)) + end + + test "returns exist activity if object already liked" do + note_activity = insert(:note_activity) + assert object_activity = Object.normalize(note_activity) + + user = insert(:user) + + {:ok, like_activity, _object} = ActivityPub.like(user, object_activity) + + {:ok, like_activity_exist, _object} = ActivityPub.like(user, object_activity) + assert like_activity == like_activity_exist + end + test "adds a like activity to the db" do note_activity = insert(:note_activity) assert object = Object.normalize(note_activity) @@@ -706,6 -731,25 +731,25 @@@ end describe "unliking" do + test_with_mock "sends an activity to federation", Pleroma.Web.Federator, [:passthrough], [] do + Pleroma.Config.put([:instance, :federating], true) + + note_activity = insert(:note_activity) + object = Object.normalize(note_activity) + user = insert(:user) + + {:ok, object} = ActivityPub.unlike(user, object) + refute called(Pleroma.Web.Federator.publish()) + + {:ok, _like_activity, object} = ActivityPub.like(user, object) + assert object.data["like_count"] == 1 + + {:ok, unlike_activity, _, object} = ActivityPub.unlike(user, object) + assert object.data["like_count"] == 0 + - assert called(Pleroma.Web.Federator.publish(unlike_activity, 5)) ++ assert called(Pleroma.Web.Federator.publish(unlike_activity)) + end + test "unliking a previously liked object" do note_activity = insert(:note_activity) object = Object.normalize(note_activity) diff --cc test/web/activity_pub/relay_test.exs index e10b808f7,4f7d592a6..a64011ff0 --- a/test/web/activity_pub/relay_test.exs +++ b/test/web/activity_pub/relay_test.exs @@@ -63,13 -69,44 +69,44 @@@ defmodule Pleroma.Web.ActivityPub.Relay assert Relay.publish(activity) == {:error, false} end - test "returns announce activity" do + test "returns error when object is unknown" do + activity = + insert(:note_activity, + data: %{ + "type" => "Create", + "object" => "http://mastodon.example.org/eee/99541947525187367" + } + ) + + assert Relay.publish(activity) == {:error, nil} + end + + test_with_mock "returns announce activity and publish to federate", + Pleroma.Web.Federator, + [:passthrough], + [] do + Pleroma.Config.put([:instance, :federating], true) + service_actor = Relay.get_actor() + note = insert(:note_activity) + assert {:ok, %Activity{} = activity, %Object{} = obj} = Relay.publish(note) + assert activity.data["type"] == "Announce" + assert activity.data["actor"] == service_actor.ap_id + assert activity.data["object"] == obj.data["id"] - assert called(Pleroma.Web.Federator.publish(activity, 5)) ++ assert called(Pleroma.Web.Federator.publish(activity)) + end + + test_with_mock "returns announce activity and not publish to federate", + Pleroma.Web.Federator, + [:passthrough], + [] do + Pleroma.Config.put([:instance, :federating], false) service_actor = Relay.get_actor() note = insert(:note_activity) assert {:ok, %Activity{} = activity, %Object{} = obj} = Relay.publish(note) assert activity.data["type"] == "Announce" assert activity.data["actor"] == service_actor.ap_id assert activity.data["object"] == obj.data["id"] - refute called(Pleroma.Web.Federator.publish(activity, 5)) ++ refute called(Pleroma.Web.Federator.publish(activity)) end end end