[#3213] Reorganized hashtags cleanup. Transaction-wrapped Hashtag.get_or_create_by_na...
authorIvan Tashkinov <ivantashkinov@gmail.com>
Thu, 11 Feb 2021 16:30:21 +0000 (19:30 +0300)
committerIvan Tashkinov <ivantashkinov@gmail.com>
Thu, 11 Feb 2021 16:30:21 +0000 (19:30 +0300)
config/config.exs
config/description.exs
lib/pleroma/hashtag.ex
lib/pleroma/migrators/hashtags_table_migrator.ex
lib/pleroma/object.ex
lib/pleroma/workers/cron/hashtags_cleanup_worker.ex [deleted file]

index 36c60993683f19b35cb8e8565634b20624a76400..91888c5127e4dfe93d79798996eaf92b5e5d33eb 100644 (file)
@@ -560,7 +560,6 @@ config :pleroma, Oban,
   ],
   plugins: [Oban.Plugins.Pruner],
   crontab: [
-    {"0 1 * * *", Pleroma.Workers.Cron.HashtagsCleanupWorker},
     {"0 0 * * 0", Pleroma.Workers.Cron.DigestEmailsWorker},
     {"0 0 * * *", Pleroma.Workers.Cron.NewUsersDigestWorker}
   ]
index 02cdf2ff3f42566701280103a1b8738b5f3c1ddc..b2f301e2dca39ddf1984592b52ad96f1f4c6df00 100644 (file)
@@ -1964,7 +1964,6 @@ config :pleroma, :config_description, [
         type: {:list, :tuple},
         description: "Settings for cron background jobs",
         suggestions: [
-          {"0 1 * * *", Pleroma.Workers.Cron.HashtagsCleanupWorker},
           {"0 0 * * 0", Pleroma.Workers.Cron.DigestEmailsWorker},
           {"0 0 * * *", Pleroma.Workers.Cron.NewUsersDigestWorker}
         ]
index 9e4c6c89446cdc998655b613359efea1d68189ab..de52c4dae66b5a98e744ea7d4db259e98ec5921f 100644 (file)
@@ -6,14 +6,17 @@ defmodule Pleroma.Hashtag do
   use Ecto.Schema
 
   import Ecto.Changeset
+  import Ecto.Query
 
+  alias Ecto.Multi
   alias Pleroma.Hashtag
+  alias Pleroma.Object
   alias Pleroma.Repo
 
   schema "hashtags" do
     field(:name, :string)
 
-    many_to_many(:objects, Pleroma.Object, join_through: "hashtags_objects", on_replace: :delete)
+    many_to_many(:objects, Object, join_through: "hashtags_objects", on_replace: :delete)
 
     timestamps()
   end
@@ -34,15 +37,27 @@ defmodule Pleroma.Hashtag do
   end
 
   def get_or_create_by_names(names) when is_list(names) do
-    Enum.reduce_while(names, {:ok, []}, fn name, {:ok, list} ->
-      case get_or_create_by_name(name) do
-        {:ok, %Hashtag{} = hashtag} ->
-          {:cont, {:ok, list ++ [hashtag]}}
-
-        error ->
-          {:halt, error}
-      end
-    end)
+    timestamp = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
+
+    structs =
+      Enum.map(names, fn name ->
+        %Hashtag{}
+        |> changeset(%{name: name})
+        |> Map.get(:changes)
+        |> Map.merge(%{inserted_at: timestamp, updated_at: timestamp})
+      end)
+
+    with {:ok, %{query_op: hashtags}} <-
+           Multi.new()
+           |> Multi.insert_all(:insert_all_op, Hashtag, structs, on_conflict: :nothing)
+           |> Multi.run(:query_op, fn _repo, _changes ->
+             {:ok, Repo.all(from(ht in Hashtag, where: ht.name in ^names))}
+           end)
+           |> Repo.transaction() do
+      {:ok, hashtags}
+    else
+      {:error, _name, value, _changes_so_far} -> {:error, value}
+    end
   end
 
   def changeset(%Hashtag{} = struct, params) do
@@ -52,4 +67,29 @@ defmodule Pleroma.Hashtag do
     |> validate_required([:name])
     |> unique_constraint(:name)
   end
+
+  def unlink(%Object{id: object_id}) do
+    with {_, hashtag_ids} <-
+           from(hto in "hashtags_objects",
+             where: hto.object_id == ^object_id,
+             select: hto.hashtag_id
+           )
+           |> Repo.delete_all() do
+      delete_unreferenced(hashtag_ids)
+    end
+  end
+
+  @delete_unreferenced_query """
+  DELETE FROM hashtags WHERE id IN
+    (SELECT hashtags.id FROM hashtags
+      LEFT OUTER JOIN hashtags_objects
+        ON hashtags_objects.hashtag_id = hashtags.id
+      WHERE hashtags_objects.hashtag_id IS NULL AND hashtags.id = ANY($1));
+  """
+
+  def delete_unreferenced(ids) do
+    with {:ok, %{num_rows: deleted_count}} <- Repo.query(@delete_unreferenced_query, [ids]) do
+      {:ok, deleted_count}
+    end
+  end
 end
index 9a036e0b298506d8d0652184a1cad2c1888c6a0a..c53f6be127188104a1d8a510186a98bcbf317f9c 100644 (file)
@@ -74,16 +74,15 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
   def handle_info(:migrate_hashtags, state) do
     State.clear()
 
-    data_migration = data_migration()
+    update_status(:running)
+    put_stat(:started_at, NaiveDateTime.utc_now())
 
+    data_migration = data_migration()
     persistent_data = Map.take(data_migration.data, ["max_processed_id"])
 
     {:ok, data_migration} =
       DataMigration.update(data_migration, %{state: :running, data: persistent_data})
 
-    update_status(:running)
-    put_stat(:started_at, NaiveDateTime.utc_now())
-
     Logger.info("Starting transferring object embedded hashtags to `hashtags` table...")
 
     max_processed_id = data_migration.data["max_processed_id"] || 0
@@ -137,6 +136,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
     |> Stream.run()
 
     with 0 <- failures_count(data_migration.id) do
+      _ = delete_non_create_activities_hashtags()
+
       {:ok, data_migration} = DataMigration.update_state(data_migration, :complete)
 
       handle_success(data_migration)
@@ -150,9 +151,37 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
     {:noreply, state}
   end
 
+  @hashtags_objects_cleanup_query """
+  DELETE FROM hashtags_objects WHERE object_id IN
+    (SELECT DISTINCT objects.id FROM objects
+      JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities
+        ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') =
+          (objects.data->>'id')
+        AND activities.data->>'type' = 'Create'
+      WHERE activities.id IS NULL);
+  """
+
+  @hashtags_cleanup_query """
+  DELETE FROM hashtags WHERE id IN
+    (SELECT hashtags.id FROM hashtags
+      LEFT OUTER JOIN hashtags_objects
+        ON hashtags_objects.hashtag_id = hashtags.id
+      WHERE hashtags_objects.hashtag_id IS NULL);
+  """
+
+  def delete_non_create_activities_hashtags do
+    {:ok, %{num_rows: hashtags_objects_count}} =
+      Repo.query(@hashtags_objects_cleanup_query, [], timeout: :infinity)
+
+    {:ok, %{num_rows: hashtags_count}} =
+      Repo.query(@hashtags_cleanup_query, [], timeout: :infinity)
+
+    {:ok, hashtags_objects_count, hashtags_count}
+  end
+
   defp query do
     # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out)
-    # Note: not checking activity type; HashtagsCleanupWorker should clean up unused records later
+    # Note: not checking activity type, expecting remove_non_create_objects_hashtags/_ to clean up
     from(
       object in Object,
       where:
@@ -182,25 +211,20 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
   defp transfer_object_hashtags(object, hashtags) do
     Repo.transaction(fn ->
       with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do
-        for hashtag_record <- hashtag_records do
-          with {:ok, _} <-
-                 Repo.query(
-                   "insert into hashtags_objects(hashtag_id, object_id) values ($1, $2);",
-                   [hashtag_record.id, object.id]
-                 ) do
-            nil
-          else
-            {:error, e} ->
-              error =
-                "ERROR: could not link object #{object.id} and hashtag " <>
-                  "#{hashtag_record.id}: #{inspect(e)}"
-
-              Logger.error(error)
-              Repo.rollback(object.id)
-          end
+        maps = Enum.map(hashtag_records, &%{hashtag_id: &1.id, object_id: object.id})
+        expected_rows = length(hashtag_records)
+
+        with {^expected_rows, _} <- Repo.insert_all("hashtags_objects", maps) do
+          object.id
+        else
+          e ->
+            error =
+              "ERROR when inserting #{expected_rows} hashtags_objects " <>
+                "for object #{object.id}: #{inspect(e)}"
+
+            Logger.error(error)
+            Repo.rollback(object.id)
         end
-
-        object.id
       else
         e ->
           error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}"
index 52b77e41ca2b269636e57d09be5788c767d1025d..3ba749d1a36944e1120c53a07b28aca8d21e9003 100644 (file)
@@ -62,27 +62,30 @@ defmodule Pleroma.Object do
     |> cast(params, [:data])
     |> validate_required([:data])
     |> unique_constraint(:ap_id, name: :objects_unique_apid_index)
+    # Expecting `maybe_handle_hashtags_change/1` to run last:
     |> maybe_handle_hashtags_change(struct)
   end
 
-  # Note: not checking activity type; HashtagsCleanupWorker should clean up unused records later
+  # Note: not checking activity type (assuming non-legacy objects are associated with Create act.)
   defp maybe_handle_hashtags_change(changeset, struct) do
-    with data_hashtags_change = get_change(changeset, :data),
-         true <- hashtags_changed?(struct, data_hashtags_change),
+    with %Ecto.Changeset{valid?: true} <- changeset,
+         data_hashtags_change = get_change(changeset, :data),
+         {_, true} <- {:changed, hashtags_changed?(struct, data_hashtags_change)},
          {:ok, hashtag_records} <-
            data_hashtags_change
            |> object_data_hashtags()
            |> Hashtag.get_or_create_by_names() do
       put_assoc(changeset, :hashtags, hashtag_records)
     else
-      false ->
+      %{valid?: false} ->
         changeset
 
-      {:error, hashtag_changeset} ->
-        failed_hashtag = get_field(hashtag_changeset, :name)
+      {:changed, false} ->
+        changeset
 
+      {:error, _} ->
         validate_change(changeset, :data, fn _, _ ->
-          [data: "error referencing hashtag: #{failed_hashtag}"]
+          [data: "error referencing hashtags"]
         end)
     end
   end
@@ -221,9 +224,13 @@ defmodule Pleroma.Object do
   def swap_object_with_tombstone(object) do
     tombstone = make_tombstone(object)
 
-    object
-    |> Object.change(%{data: tombstone})
-    |> Repo.update()
+    with {:ok, object} <-
+           object
+           |> Object.change(%{data: tombstone})
+           |> Repo.update() do
+      Hashtag.unlink(object)
+      {:ok, object}
+    end
   end
 
   def delete(%Object{data: %{"id" => id}} = object) do
diff --git a/lib/pleroma/workers/cron/hashtags_cleanup_worker.ex b/lib/pleroma/workers/cron/hashtags_cleanup_worker.ex
deleted file mode 100644 (file)
index b319067..0000000
+++ /dev/null
@@ -1,57 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Workers.Cron.HashtagsCleanupWorker do
-  @moduledoc """
-  The worker to clean up unused hashtags_objects and hashtags.
-  """
-
-  use Oban.Worker, queue: "hashtags_cleanup"
-
-  alias Pleroma.Repo
-
-  require Logger
-
-  @hashtags_objects_query """
-  DELETE FROM hashtags_objects WHERE object_id IN
-    (SELECT DISTINCT objects.id FROM objects
-      JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities
-        ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') =
-          (objects.data->>'id')
-        AND activities.data->>'type' = 'Create'
-      WHERE activities.id IS NULL);
-  """
-
-  @hashtags_query """
-  DELETE FROM hashtags WHERE id IN
-    (SELECT hashtags.id FROM hashtags
-      LEFT OUTER JOIN hashtags_objects
-        ON hashtags_objects.hashtag_id = hashtags.id
-      WHERE hashtags_objects.hashtag_id IS NULL AND hashtags.inserted_at < $1);
-  """
-
-  @impl Oban.Worker
-  def perform(_job) do
-    Logger.info("Cleaning up unused `hashtags_objects` records...")
-
-    {:ok, %{num_rows: hashtags_objects_count}} =
-      Repo.query(@hashtags_objects_query, [], timeout: :infinity)
-
-    Logger.info("Deleted #{hashtags_objects_count} unused `hashtags_objects` records.")
-
-    Logger.info("Cleaning up unused `hashtags` records...")
-
-    # Note: ignoring recently created hashtags since references are added after hashtag is created
-    {:ok, %{num_rows: hashtags_count}} =
-      Repo.query(@hashtags_query, [NaiveDateTime.add(NaiveDateTime.utc_now(), -3600 * 24)],
-        timeout: :infinity
-      )
-
-    Logger.info("Deleted #{hashtags_count} unused `hashtags` records.")
-
-    Logger.info("HashtagsCleanupWorker complete.")
-
-    :ok
-  end
-end