[#3213] Refactoring of HashtagsTableMigrator. Hashtag timeline performance optimizati...
authorIvan Tashkinov <ivantashkinov@gmail.com>
Sat, 16 Jan 2021 17:22:14 +0000 (20:22 +0300)
committerIvan Tashkinov <ivantashkinov@gmail.com>
Sat, 16 Jan 2021 17:22:14 +0000 (20:22 +0300)
CHANGELOG.md
config/description.exs
lib/pleroma/migrators/hashtags_table_migrator.ex
lib/pleroma/migrators/hashtags_table_migrator/state.ex
lib/pleroma/web/activity_pub/activity_pub.ex
test/pleroma/web/activity_pub/activity_pub_test.exs

index 25b24bf07a660bb912f7122748fab7a0f8d9bcfb..9a053156fe9f89e61fb5d2c98def7e3d32448398 100644 (file)
@@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
 - Search: When using Postgres 11+, Pleroma will use the `websearch_to_tsvector` function to parse search queries.
 - Emoji: Support the full Unicode 13.1 set of Emoji for reactions, plus regional indicators.
 - Admin API: Reports now ordered by newest
+- Extracted object hashtags into separate table in order to improve hashtag timeline performance (via background migration in `Pleroma.Migrators.HashtagsTableMigrator`). 
 
 ### Added
 
index f438a88abc10a394670705f9aca9fb9a77a8e67d..c73d50f7db82d13ba4e918611597ca70d8c4378b 100644 (file)
@@ -941,6 +941,12 @@ config :pleroma, :config_description, [
         key: :show_reactions,
         type: :boolean,
         description: "Let favourites and emoji reactions be viewed through the API."
+      },
+      %{
+        key: :improved_hashtag_timeline,
+        type: :keyword,
+        description:
+          "If `true` / `:prefer_aggregation` / `:avoid_aggregation`, hashtags table and selected strategy will be used for hashtags timeline. When `false`, object-embedded hashtags will be used (slower). Is auto-set to `true` (unless overridden) when HashtagsTableMigrator completes."
       }
     ]
   },
index 9f1a00f9c43f10c0da73cc83d5baea86c8effe02..b40578d507d69f778810940f9a06f08cdeb6254d 100644 (file)
@@ -45,25 +45,23 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
   def handle_continue(:init_state, _state) do
     {:ok, _} = State.start_link(nil)
 
-    put_stat(:status, :init)
+    update_status(:init)
 
-    dm = data_migration()
+    data_migration = data_migration()
     manual_migrations = Config.get([:instance, :manual_data_migrations], [])
 
     cond do
       Config.get(:env) == :test ->
-        put_stat(:status, :noop)
+        update_status(:noop)
 
-      is_nil(dm) ->
-        put_stat(:status, :halt)
-        put_stat(:message, "Data migration does not exist.")
+      is_nil(data_migration) ->
+        update_status(:halt, "Data migration does not exist.")
 
-      dm.state == :manual or dm.name in manual_migrations ->
-        put_stat(:status, :noop)
-        put_stat(:message, "Data migration is in manual execution state.")
+      data_migration.state == :manual or data_migration.name in manual_migrations ->
+        update_status(:noop, "Data migration is in manual execution state.")
 
-      dm.state == :complete ->
-        handle_success()
+      data_migration.state == :complete ->
+        handle_success(data_migration)
 
       true ->
         send(self(), :migrate_hashtags)
@@ -81,7 +79,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
     {:ok, data_migration} =
       DataMigration.update(data_migration, %{state: :running, data: persistent_data})
 
-    put_stat(:status, :running)
+    update_status(:running)
 
     Logger.info("Starting transferring object embedded hashtags to `hashtags` table...")
 
@@ -146,13 +144,12 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
            ) do
       _ = DataMigration.update_state(data_migration, :complete)
 
-      handle_success()
+      handle_success(data_migration)
     else
       _ ->
         _ = DataMigration.update_state(data_migration, :failed)
 
-        put_stat(:status, :failed)
-        put_stat(:message, "Please check data_migration_failed_ids records.")
+        update_status(:failed, "Please check data_migration_failed_ids records.")
     end
 
     {:noreply, state}
@@ -196,16 +193,25 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
     _ = DataMigration.update(data_migration, %{data: runner_state})
   end
 
-  defp handle_success do
-    put_stat(:status, :complete)
+  defp handle_success(data_migration) do
+    update_status(:complete)
 
-    unless Config.improved_hashtag_timeline() do
+    unless data_migration.feature_lock || Config.improved_hashtag_timeline() do
       Config.put(Config.improved_hashtag_timeline_path(), true)
     end
 
     :ok
   end
 
+  def failed_objects_query do
+    from(o in Object)
+    |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"),
+      on: dmf.record_id == o.id
+    )
+    |> where([_o, dmf], dmf.data_migration_id == ^data_migration().id)
+    |> order_by([o], asc: o.id)
+  end
+
   def force_continue do
     send(whereis(), :migrate_hashtags)
   end
@@ -214,4 +220,9 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
     {:ok, _} = DataMigration.update(data_migration(), %{state: :pending, data: %{}})
     force_continue()
   end
+
+  defp update_status(status, message \\ nil) do
+    put_stat(:status, status)
+    put_stat(:message, message)
+  end
 end
index 79926892cb4bf0ae764240b2f0befc743b4bea29..c1a2709fc3402ff49746c74a2f9deac1d483d83f 100644 (file)
@@ -2,23 +2,24 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator.State do
   use Agent
 
   @init_state %{}
+  @reg_name {:global, __MODULE__}
 
   def start_link(_) do
-    Agent.start_link(fn -> @init_state end, name: __MODULE__)
+    Agent.start_link(fn -> @init_state end, name: @reg_name)
   end
 
   def get do
-    Agent.get(__MODULE__, & &1)
+    Agent.get(@reg_name, & &1)
   end
 
   def put(key, value) do
-    Agent.update(__MODULE__, fn state ->
+    Agent.update(@reg_name, fn state ->
       Map.put(state, key, value)
     end)
   end
 
   def increment(key, increment \\ 1) do
-    Agent.update(__MODULE__, fn state ->
+    Agent.update(@reg_name, fn state ->
       updated_value = (state[key] || 0) + increment
       Map.put(state, key, updated_value)
     end)
index f5563b0fde7fe121145e7c0401df5e0a39a0c2ed..0609827ecf0a1c024c9b3025b0770d370c9529e8 100644 (file)
@@ -669,63 +669,66 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
 
   defp restrict_since(query, _), do: query
 
-  defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
+  defp restrict_embedded_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
     raise_on_missing_preload()
   end
 
-  defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do
+  defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do
     from(
       [_activity, object] in query,
       where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
     )
   end
 
-  defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
-    restrict_tag_reject(query, %{tag_reject: [tag_reject]})
+  defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject})
+       when is_binary(tag_reject) do
+    restrict_embedded_tag_reject(query, %{tag_reject: [tag_reject]})
   end
 
-  defp restrict_tag_reject(query, _), do: query
+  defp restrict_embedded_tag_reject(query, _), do: query
 
-  defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
+  defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
     raise_on_missing_preload()
   end
 
-  defp restrict_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
+  defp restrict_embedded_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
     from(
       [_activity, object] in query,
       where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
     )
   end
 
-  defp restrict_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
-    restrict_tag(query, %{tag: tag})
+  defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
+    restrict_embedded_tag(query, %{tag: tag})
   end
 
-  defp restrict_tag_all(query, _), do: query
+  defp restrict_embedded_tag_all(query, _), do: query
 
-  defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
+  defp restrict_embedded_tag(_query, %{tag: _tag, skip_preload: true}) do
     raise_on_missing_preload()
   end
 
-  defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
+  defp restrict_embedded_tag(query, %{tag: tag}) when is_list(tag) do
     from(
       [_activity, object] in query,
       where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
     )
   end
 
-  defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
-    restrict_tag(query, %{tag: [tag]})
+  defp restrict_embedded_tag(query, %{tag: tag}) when is_binary(tag) do
+    restrict_embedded_tag(query, %{tag: [tag]})
   end
 
-  defp restrict_tag(query, _), do: query
+  defp restrict_embedded_tag(query, _), do: query
 
-  defp restrict_hashtag(query, opts) do
-    [tag_any, tag_all, tag_reject] =
-      [:tag, :tag_all, :tag_reject]
-      |> Enum.map(&opts[&1])
-      |> Enum.map(&List.wrap(&1))
+  defp hashtag_conditions(opts) do
+    [:tag, :tag_all, :tag_reject]
+    |> Enum.map(&opts[&1])
+    |> Enum.map(&List.wrap(&1))
+  end
 
+  defp restrict_hashtag_agg(query, opts) do
+    [tag_any, tag_all, tag_reject] = hashtag_conditions(opts)
     has_conditions = Enum.any?([tag_any, tag_all, tag_reject], &Enum.any?(&1))
 
     cond do
@@ -1275,15 +1278,19 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
       |> exclude_invisible_actors(opts)
       |> exclude_visibility(opts)
 
+    hashtag_timeline_strategy = Config.improved_hashtag_timeline()
+
     cond do
-      Config.object_embedded_hashtags?() ->
+      !hashtag_timeline_strategy ->
         query
-        |> restrict_tag(opts)
-        |> restrict_tag_reject(opts)
-        |> restrict_tag_all(opts)
+        |> restrict_embedded_tag(opts)
+        |> restrict_embedded_tag_reject(opts)
+        |> restrict_embedded_tag_all(opts)
+
+      hashtag_timeline_strategy == :prefer_aggregation ->
+        restrict_hashtag_agg(query, opts)
 
-      # TODO: benchmark (initial approach preferring non-aggregate ops when possible)
-      Config.improved_hashtag_timeline() == :join ->
+      hashtag_timeline_strategy == :avoid_aggregation or avoid_hashtags_aggregation?(opts) ->
         query
         |> distinct([activity], true)
         |> restrict_hashtag_any(opts)
@@ -1291,10 +1298,17 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
         |> restrict_hashtag_reject_any(opts)
 
       true ->
-        restrict_hashtag(query, opts)
+        restrict_hashtag_agg(query, opts)
     end
   end
 
+  defp avoid_hashtags_aggregation?(opts) do
+    [tag_any, tag_all, tag_reject] = hashtag_conditions(opts)
+
+    joins_count = length(tag_all) + if Enum.any?(tag_any), do: 1, else: 0
+    Enum.empty?(tag_reject) and joins_count <= 2
+  end
+
   def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
     list_memberships = Pleroma.List.memberships(opts[:user])
 
index f86d0a265d30b384b88fc3589eb97f655a7ed643..36fd65c769ebb6adae642a518994067aca9e24c1 100644 (file)
@@ -217,8 +217,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubTest do
     {:ok, status_two} = CommonAPI.post(user, %{status: ". #essais"})
     {:ok, status_three} = CommonAPI.post(user, %{status: ". #test #reject"})
 
-    for new_timeline_enabled <- [true, false] do
-      clear_config([:instance, :improved_hashtag_timeline], new_timeline_enabled)
+    for hashtag_timeline_strategy <- [true, :prefer_aggregation, :avoid_aggregation, false] do
+      clear_config([:instance, :improved_hashtag_timeline], hashtag_timeline_strategy)
 
       fetch_one = ActivityPub.fetch_activities([], %{type: "Create", tag: "test"})