[#3213] HashtagsTableMigrator: failures handling fix, retry function.
authorIvan Tashkinov <ivantashkinov@gmail.com>
Thu, 21 Jan 2021 17:19:09 +0000 (20:19 +0300)
committerIvan Tashkinov <ivantashkinov@gmail.com>
Thu, 21 Jan 2021 17:23:08 +0000 (20:23 +0300)
Changed default hashtags filtering strategy to non-aggregate approach.

config/description.exs
lib/pleroma/migrators/hashtags_table_migrator.ex
lib/pleroma/web/activity_pub/activity_pub.ex
test/pleroma/web/activity_pub/activity_pub_test.exs

index b48616b229d06a98be08a06bb7b1ecf81c3a4087..46f085c70de263935c40e2a391b2381a73351d1d 100644 (file)
@@ -940,7 +940,7 @@ config :pleroma, :config_description, [
         key: :improved_hashtag_timeline,
         type: :keyword,
         description:
-          "If `true` / `:prefer_aggregation` / `:avoid_aggregation`, hashtags table and selected strategy will be used for hashtags timeline. When `false`, object-embedded hashtags will be used (slower). Is auto-set to `true` (unless overridden) when HashtagsTableMigrator completes."
+          "If `true` / `:prefer_aggregation`, hashtags table and selected strategy will be used for hashtags timeline. When `false`, object-embedded hashtags will be used (slower). Is auto-set to `true` (unless overridden) when HashtagsTableMigrator completes."
       }
     ]
   },
index e9dd9b70c62738a17d014566324ae9851cb300b5..8ad2c8c73a8930c2cc7e50b43b58c19a613032af 100644 (file)
@@ -109,8 +109,9 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
 
       _ =
         Repo.query(
-          "DELETE FROM data_migration_failed_ids WHERE id = ANY($1)",
-          [object_ids -- failed_ids]
+          "DELETE FROM data_migration_failed_ids " <>
+            "WHERE data_migration_id = $1 AND record_id = ANY($2)",
+          [data_migration.id, object_ids -- failed_ids]
         )
 
       max_object_id = Enum.at(object_ids, -1)
@@ -133,12 +134,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
     end)
     |> Stream.run()
 
-    with {:ok, %{rows: [[0]]}} <-
-           Repo.query(
-             "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;",
-             [data_migration.id]
-           ) do
-      _ = DataMigration.update_state(data_migration, :complete)
+    with 0 <- failures_count(data_migration.id) do
+      {:ok, data_migration} = DataMigration.update_state(data_migration, :complete)
 
       handle_success(data_migration)
     else
@@ -167,7 +164,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
   end
 
   defp transfer_object_hashtags(object) do
-    hashtags = Object.object_data_hashtags(%{"tag" => object.tag})
+    embedded_tags = (Map.has_key?(object, :tag) && object.tag) || object.data["tag"]
+    hashtags = Object.object_data_hashtags(%{"tag" => embedded_tags})
 
     Repo.transaction(fn ->
       with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do
@@ -246,6 +244,36 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
     |> order_by([o], asc: o.id)
   end
 
+  def failures_count(data_migration_id \\ nil) do
+    data_migration_id = data_migration_id || data_migration().id
+
+    with {:ok, %{rows: [[count]]}} <-
+           Repo.query(
+             "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;",
+             [data_migration_id]
+           ) do
+      count
+    end
+  end
+
+  def retry_failed do
+    data_migration = data_migration()
+
+    failed_objects_query()
+    |> Repo.chunk_stream(100, :one)
+    |> Stream.each(fn object ->
+      with {:ok, _} <- transfer_object_hashtags(object) do
+        _ =
+          Repo.query(
+            "DELETE FROM data_migration_failed_ids " <>
+              "WHERE data_migration_id = $1 AND record_id = $2",
+            [data_migration.id, object.id]
+          )
+      end
+    end)
+    |> Stream.run()
+  end
+
   def force_continue do
     send(whereis(), :migrate_hashtags)
   end
@@ -255,6 +283,12 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
     force_continue()
   end
 
+  def force_complete do
+    {:ok, data_migration} = DataMigration.update_state(data_migration(), :complete)
+
+    handle_success(data_migration)
+  end
+
   defp update_status(status, message \\ nil) do
     put_stat(:status, status)
     put_stat(:message, message)
index 0609827ecf0a1c024c9b3025b0770d370c9529e8..dbfd3839d6e32a9bf3f0e69bac34cc5aa76ae45c 100644 (file)
@@ -727,6 +727,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     |> Enum.map(&List.wrap(&1))
   end
 
+  # Note: times out on larger instances (with default timeout), intended for complex queries
   defp restrict_hashtag_agg(query, opts) do
     [tag_any, tag_all, tag_reject] = hashtag_conditions(opts)
     has_conditions = Enum.any?([tag_any, tag_all, tag_reject], &Enum.any?(&1))
@@ -1290,25 +1291,15 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
       hashtag_timeline_strategy == :prefer_aggregation ->
         restrict_hashtag_agg(query, opts)
 
-      hashtag_timeline_strategy == :avoid_aggregation or avoid_hashtags_aggregation?(opts) ->
+      true ->
         query
         |> distinct([activity], true)
         |> restrict_hashtag_any(opts)
         |> restrict_hashtag_all(opts)
         |> restrict_hashtag_reject_any(opts)
-
-      true ->
-        restrict_hashtag_agg(query, opts)
     end
   end
 
-  defp avoid_hashtags_aggregation?(opts) do
-    [tag_any, tag_all, tag_reject] = hashtag_conditions(opts)
-
-    joins_count = length(tag_all) + if Enum.any?(tag_any), do: 1, else: 0
-    Enum.empty?(tag_reject) and joins_count <= 2
-  end
-
   def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
     list_memberships = Pleroma.List.memberships(opts[:user])
 
index 36fd65c769ebb6adae642a518994067aca9e24c1..1fcaf74d3037a8a4b31c69f95c7c2486bc7b2f38 100644 (file)
@@ -217,7 +217,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubTest do
     {:ok, status_two} = CommonAPI.post(user, %{status: ". #essais"})
     {:ok, status_three} = CommonAPI.post(user, %{status: ". #test #reject"})
 
-    for hashtag_timeline_strategy <- [true, :prefer_aggregation, :avoid_aggregation, false] do
+    for hashtag_timeline_strategy <- [true, :prefer_aggregation, false] do
       clear_config([:instance, :improved_hashtag_timeline], hashtag_timeline_strategy)
 
       fetch_one = ActivityPub.fetch_activities([], %{type: "Create", tag: "test"})