[#1149] Added Oban job for "activity_expiration". Merged remote-tracking branch ...
authorIvan Tashkinov <ivantashkinov@gmail.com>
Sat, 31 Aug 2019 11:25:43 +0000 (14:25 +0300)
committerIvan Tashkinov <ivantashkinov@gmail.com>
Sat, 31 Aug 2019 11:25:43 +0000 (14:25 +0300)
# Conflicts:
# config/config.exs

20 files changed:
1  2 
CHANGELOG.md
config/config.exs
config/test.exs
docs/config.md
lib/pleroma/activity_expiration_worker.ex
lib/pleroma/application.ex
lib/pleroma/user.ex
lib/pleroma/web/activity_pub/activity_pub.ex
lib/pleroma/web/activity_pub/publisher.ex
lib/pleroma/web/activity_pub/transmogrifier.ex
lib/pleroma/web/activity_pub/utils.ex
lib/pleroma/workers/background_worker.ex
test/activity_test.exs
test/user_test.exs
test/web/activity_pub/activity_pub_controller_test.exs
test/web/activity_pub/activity_pub_test.exs
test/web/activity_pub/relay_test.exs
test/web/activity_pub/transmogrifier_test.exs
test/web/admin_api/admin_api_controller_test.exs
test/web/mastodon_api/mastodon_api_controller_test.exs

diff --cc CHANGELOG.md
index fd81b3087723dd51a5fa834726776e5a8892241a,4acb749aca91f3bcbacd64c47a0d430ab50466a4..8b73c783fd2ce757a3f6bc59deaf39abb7950999
@@@ -19,10 -19,9 +19,11 @@@ The format is based on [Keep a Changelo
  - Mastodon API: Unsubscribe followers when they unfollow a user
  - AdminAPI: Add "godmode" while fetching user statuses (i.e. admin can see private statuses)
  - Improve digest email template
 +- Replaced [pleroma_job_queue](https://git.pleroma.social/pleroma/pleroma_job_queue) with [Oban](https://github.com/sorentwo/oban)
 +- Introduced [quantum](https://github.com/quantum-elixir/quantum-core) job scheduler
  
  ### Fixed
+ - Following from Osada
  - Not being able to pin unlisted posts
  - Objects being re-embedded to activities after being updated (e.g faved/reposted). Running 'mix pleroma.database prune_objects' again is advised.
  - Favorites timeline doing database-intensive queries
index 9a8c69448d3414e6d5f328e496b2c958922b83c0,f630771a3d28a39b8605877b3b82460cdad5554c..da89aa3e956074efb02ec43c64161788b08997cc
@@@ -467,25 -449,21 +467,26 @@@ config :pleroma, Pleroma.User
      "web"
    ]
  
 -config :pleroma, Pleroma.Web.Federator.RetryQueue,
 -  enabled: false,
 -  max_jobs: 20,
 -  initial_timeout: 30,
 -  max_retries: 5
 -
 -config :pleroma_job_queue, :queues,
 -  activity_expiration: 10,
 -  federator_incoming: 50,
 -  federator_outgoing: 50,
 -  web_push: 50,
 -  mailer: 10,
 -  transmogrifier: 20,
 -  scheduled_activities: 10,
 -  background: 5
 +config :pleroma, Oban,
 +  repo: Pleroma.Repo,
 +  verbose: false,
 +  prune: {:maxage, 60 * 60 * 24 * 7},
 +  queues: [
++    activity_expiration: 10,
 +    federator_incoming: 50,
 +    federator_outgoing: 50,
 +    web_push: 50,
 +    mailer: 10,
 +    transmogrifier: 20,
 +    scheduled_activities: 10,
 +    background: 5
 +  ]
 +
 +config :pleroma, :workers,
 +  retries: [
 +    federator_incoming: 5,
 +    federator_outgoing: 5
 +  ]
  
  config :pleroma, :fetch_initial_posts,
    enabled: false,
diff --cc config/test.exs
Simple merge
diff --cc docs/config.md
Simple merge
index 0000000000000000000000000000000000000000,0f9e715f8f385eee5c86d973011a732211fa04a3..5c0c532325a670098fbe92a3c2b1febe31ba88b2
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,62 +1,71 @@@
 -      PleromaJobQueue.enqueue(:activity_expiration, __MODULE__, [:execute, expiration.id])
+ # Pleroma: A lightweight social networking server
+ # Copyright © 2019 Pleroma Authors <https://pleroma.social/>
+ # SPDX-License-Identifier: AGPL-3.0-only
+ defmodule Pleroma.ActivityExpirationWorker do
+   alias Pleroma.Activity
+   alias Pleroma.ActivityExpiration
+   alias Pleroma.Config
+   alias Pleroma.Repo
+   alias Pleroma.User
+   alias Pleroma.Web.CommonAPI
++  alias Pleroma.Workers.BackgroundWorker
++
+   require Logger
+   use GenServer
+   import Ecto.Query
++  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
++
+   @schedule_interval :timer.minutes(1)
+   def start_link(_) do
+     GenServer.start_link(__MODULE__, nil)
+   end
+   @impl true
+   def init(_) do
+     if Config.get([ActivityExpiration, :enabled]) do
+       schedule_next()
+       {:ok, nil}
+     else
+       :ignore
+     end
+   end
+   def perform(:execute, expiration_id) do
+     try do
+       expiration =
+         ActivityExpiration
+         |> where([e], e.id == ^expiration_id)
+         |> Repo.one!()
+       activity = Activity.get_by_id_with_object(expiration.activity_id)
+       user = User.get_by_ap_id(activity.object.data["actor"])
+       CommonAPI.delete(activity.id, user)
+     rescue
+       error ->
+         Logger.error("#{__MODULE__} Couldn't delete expired activity: #{inspect(error)}")
+     end
+   end
+   @impl true
+   def handle_info(:perform, state) do
+     ActivityExpiration.due_expirations(@schedule_interval)
+     |> Enum.each(fn expiration ->
++      %{
++        "op" => "activity_expiration",
++        "activity_expiration_id" => expiration.id
++      }
++      |> BackgroundWorker.new(worker_args(:activity_expiration))
++      |> Repo.insert()
+     end)
+     schedule_next()
+     {:noreply, state}
+   end
+   defp schedule_next do
+     Process.send_after(self(), :perform, @schedule_interval)
+   end
+ end
Simple merge
Simple merge
index 8502ca3be2ad1c5cacaa8fcaa467fceb5f148329,c9c0c376335f5bc43b627f8a35b4589faac7d428..52f4b0194376457b866698e4fb031ce4b3e3f0e7
@@@ -166,9 -166,17 +166,10 @@@ defmodule Pleroma.Web.ActivityPub.Util
    @doc """
    Enqueues an activity for federation if it's local
    """
+   @spec maybe_federate(any()) :: :ok
    def maybe_federate(%Activity{local: true} = activity) do
      if Pleroma.Config.get!([:instance, :federating]) do
 -      priority =
 -        case activity.data["type"] do
 -          "Delete" -> 10
 -          "Create" -> 1
 -          _ -> 5
 -        end
 -
 -      Pleroma.Web.Federator.publish(activity, priority)
 +      Pleroma.Web.Federator.publish(activity)
      end
  
      :ok
index 3c021b9b476bfffead44889814783dffbd5c3f28,0000000000000000000000000000000000000000..fbce7d789242e79d2f56c19bd6cc07744d9ea6d0
mode 100644,000000..100644
--- /dev/null
@@@ -1,72 -1,0 +1,79 @@@
 +# Pleroma: A lightweight social networking server
 +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
 +# SPDX-License-Identifier: AGPL-3.0-only
 +
 +defmodule Pleroma.Workers.BackgroundWorker do
 +  alias Pleroma.Activity
 +  alias Pleroma.User
 +  alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
 +  alias Pleroma.Web.OAuth.Token.CleanWorker
 +
 +  # Note: `max_attempts` is intended to be overridden in `new/1` call
 +  use Oban.Worker,
 +    queue: "background",
 +    max_attempts: 1
 +
 +  @impl Oban.Worker
 +  def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do
 +    user = User.get_by_id(user_id)
 +    User.perform(:fetch_initial_posts, user)
 +  end
 +
 +  def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}, _job) do
 +    user = User.get_by_id(user_id)
 +    User.perform(:deactivate_async, user, status)
 +  end
 +
 +  def perform(%{"op" => "delete_user", "user_id" => user_id}, _job) do
 +    user = User.get_by_id(user_id)
 +    User.perform(:delete, user)
 +  end
 +
 +  def perform(
 +        %{
 +          "op" => "blocks_import",
 +          "blocker_id" => blocker_id,
 +          "blocked_identifiers" => blocked_identifiers
 +        },
 +        _job
 +      ) do
 +    blocker = User.get_by_id(blocker_id)
 +    User.perform(:blocks_import, blocker, blocked_identifiers)
 +  end
 +
 +  def perform(
 +        %{
 +          "op" => "follow_import",
 +          "follower_id" => follower_id,
 +          "followed_identifiers" => followed_identifiers
 +        },
 +        _job
 +      ) do
 +    follower = User.get_by_id(follower_id)
 +    User.perform(:follow_import, follower, followed_identifiers)
 +  end
 +
 +  def perform(%{"op" => "clean_expired_tokens"}, _job) do
 +    CleanWorker.perform(:clean)
 +  end
 +
 +  def perform(%{"op" => "media_proxy_preload", "message" => message}, _job) do
 +    MediaProxyWarmingPolicy.perform(:preload, message)
 +  end
 +
 +  def perform(%{"op" => "media_proxy_prefetch", "url" => url}, _job) do
 +    MediaProxyWarmingPolicy.perform(:prefetch, url)
 +  end
 +
 +  def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id}, _job) do
 +    activity = Activity.get_by_id(activity_id)
 +    Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity)
 +  end
++
++  def perform(
++        %{"op" => "activity_expiration", "activity_expiration_id" => activity_expiration_id},
++        _job
++      ) do
++    Pleroma.ActivityExpirationWorker.perform(:execute, activity_expiration_id)
++  end
 +end
Simple merge
Simple merge
index a214f57a670ca3bfe644366ab95ddc4c269b437c,5192e734f7246be87efc138469ea9810c6bc1247..a1b567a464cba26c10066eb37769093b268d3923
@@@ -10,9 -8,9 +10,10 @@@ defmodule Pleroma.Web.ActivityPub.Activ
    alias Pleroma.Activity
    alias Pleroma.Instances
    alias Pleroma.Object
 +  alias Pleroma.Tests.ObanHelpers
    alias Pleroma.User
    alias Pleroma.Web.ActivityPub.ObjectView
+   alias Pleroma.Web.ActivityPub.Relay
    alias Pleroma.Web.ActivityPub.UserView
    alias Pleroma.Web.ActivityPub.Utils
    alias Pleroma.Web.CommonAPI
index 1515f4eb6a677d62602e673693e8b8266c758803,f72b44aed4c0f531b6ebda36e778da5fc4e7c825..d0118fefadfe1aae84c4f725583c1b58fef5ef04
@@@ -676,6 -678,29 +678,29 @@@ defmodule Pleroma.Web.ActivityPub.Activ
    end
  
    describe "like an object" do
 -      assert called(Pleroma.Web.Federator.publish(like_activity, 5))
+     test_with_mock "sends an activity to federation", Pleroma.Web.Federator, [:passthrough], [] do
+       Pleroma.Config.put([:instance, :federating], true)
+       note_activity = insert(:note_activity)
+       assert object_activity = Object.normalize(note_activity)
+       user = insert(:user)
+       {:ok, like_activity, _object} = ActivityPub.like(user, object_activity)
++      assert called(Pleroma.Web.Federator.publish(like_activity))
+     end
+     test "returns exist activity if object already liked" do
+       note_activity = insert(:note_activity)
+       assert object_activity = Object.normalize(note_activity)
+       user = insert(:user)
+       {:ok, like_activity, _object} = ActivityPub.like(user, object_activity)
+       {:ok, like_activity_exist, _object} = ActivityPub.like(user, object_activity)
+       assert like_activity == like_activity_exist
+     end
      test "adds a like activity to the db" do
        note_activity = insert(:note_activity)
        assert object = Object.normalize(note_activity)
    end
  
    describe "unliking" do
 -      assert called(Pleroma.Web.Federator.publish(unlike_activity, 5))
+     test_with_mock "sends an activity to federation", Pleroma.Web.Federator, [:passthrough], [] do
+       Pleroma.Config.put([:instance, :federating], true)
+       note_activity = insert(:note_activity)
+       object = Object.normalize(note_activity)
+       user = insert(:user)
+       {:ok, object} = ActivityPub.unlike(user, object)
+       refute called(Pleroma.Web.Federator.publish())
+       {:ok, _like_activity, object} = ActivityPub.like(user, object)
+       assert object.data["like_count"] == 1
+       {:ok, unlike_activity, _, object} = ActivityPub.unlike(user, object)
+       assert object.data["like_count"] == 0
++      assert called(Pleroma.Web.Federator.publish(unlike_activity))
+     end
      test "unliking a previously liked object" do
        note_activity = insert(:note_activity)
        object = Object.normalize(note_activity)
index e10b808f7220eb45e2e1d35326ff09a3766f8b42,4f7d592a66694e1e374a63ca0a9802aeb1801164..a64011ff06f0884af4f9c1f0838d295d29d1ad42
@@@ -63,13 -69,44 +69,44 @@@ defmodule Pleroma.Web.ActivityPub.Relay
        assert Relay.publish(activity) == {:error, false}
      end
  
-     test "returns announce activity" do
+     test "returns error when object is unknown" do
+       activity =
+         insert(:note_activity,
+           data: %{
+             "type" => "Create",
+             "object" => "http://mastodon.example.org/eee/99541947525187367"
+           }
+         )
+       assert Relay.publish(activity) == {:error, nil}
+     end
+     test_with_mock "returns announce activity and publish to federate",
+                    Pleroma.Web.Federator,
+                    [:passthrough],
+                    [] do
+       Pleroma.Config.put([:instance, :federating], true)
+       service_actor = Relay.get_actor()
+       note = insert(:note_activity)
+       assert {:ok, %Activity{} = activity, %Object{} = obj} = Relay.publish(note)
+       assert activity.data["type"] == "Announce"
+       assert activity.data["actor"] == service_actor.ap_id
+       assert activity.data["object"] == obj.data["id"]
 -      assert called(Pleroma.Web.Federator.publish(activity, 5))
++      assert called(Pleroma.Web.Federator.publish(activity))
+     end
+     test_with_mock "returns announce activity and not publish to federate",
+                    Pleroma.Web.Federator,
+                    [:passthrough],
+                    [] do
+       Pleroma.Config.put([:instance, :federating], false)
        service_actor = Relay.get_actor()
        note = insert(:note_activity)
        assert {:ok, %Activity{} = activity, %Object{} = obj} = Relay.publish(note)
        assert activity.data["type"] == "Announce"
        assert activity.data["actor"] == service_actor.ap_id
        assert activity.data["object"] == obj.data["id"]
 -      refute called(Pleroma.Web.Federator.publish(activity, 5))
++      refute called(Pleroma.Web.Federator.publish(activity))
      end
    end
  end