[#3213] Prototype of data migrations functionality / HashtagsTableMigrator.
authorIvan Tashkinov <ivantashkinov@gmail.com>
Wed, 13 Jan 2021 19:07:38 +0000 (22:07 +0300)
committerIvan Tashkinov <ivantashkinov@gmail.com>
Wed, 13 Jan 2021 19:07:38 +0000 (22:07 +0300)
lib/mix/tasks/pleroma/database.ex
lib/pleroma/application.ex
lib/pleroma/config.ex
lib/pleroma/data_migration.ex [new file with mode: 0644]
lib/pleroma/delivery.ex
lib/pleroma/ecto_enums.ex
lib/pleroma/migrators/hashtags_table_migrator.ex [new file with mode: 0644]
lib/pleroma/web/activity_pub/activity_pub.ex
priv/repo/migrations/20210105195018_create_data_migrations.exs [new file with mode: 0644]
priv/repo/migrations/20210106183301_data_migration_create_populate_hashtags_table.exs [new file with mode: 0644]
priv/repo/migrations/20210111172254_create_data_migration_failed_ids.exs [new file with mode: 0644]

index e9686fc1ba5ebd06cf24e4699d75df60a4d28b20..08ede9eef971d98872168306e57fab2fc805ff55 100644 (file)
@@ -4,7 +4,6 @@
 
 defmodule Mix.Tasks.Pleroma.Database do
   alias Pleroma.Conversation
-  alias Pleroma.Hashtag
   alias Pleroma.Maintenance
   alias Pleroma.Object
   alias Pleroma.Repo
@@ -171,63 +170,4 @@ defmodule Mix.Tasks.Pleroma.Database do
     end)
     |> Stream.run()
   end
-
-  def run(["transfer_hashtags"]) do
-    import Ecto.Query
-
-    start_pleroma()
-
-    Logger.info("Starting transferring object embedded hashtags to `hashtags` table...")
-
-    # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out)
-    from(
-      object in Object,
-      left_join: hashtag in assoc(object, :hashtags),
-      where: is_nil(hashtag.id),
-      where:
-        fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data),
-      select: %{
-        id: object.id,
-        tag: fragment("(?)->'tag'", object.data)
-      }
-    )
-    |> Repo.chunk_stream(100, :one, timeout: :infinity)
-    |> Stream.each(&transfer_object_hashtags(&1))
-    |> Stream.run()
-
-    Logger.info("Done transferring hashtags. Please check logs to ensure no errors.")
-  end
-
-  defp transfer_object_hashtags(object) do
-    hashtags = Object.object_data_hashtags(%{"tag" => object.tag})
-
-    Repo.transaction(fn ->
-      with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do
-        for hashtag_record <- hashtag_records do
-          with {:ok, _} <-
-                 Repo.query(
-                   "insert into hashtags_objects(hashtag_id, object_id) values ($1, $2);",
-                   [hashtag_record.id, object.id]
-                 ) do
-            nil
-          else
-            {:error, e} ->
-              error =
-                "ERROR: could not link object #{object.id} and hashtag " <>
-                  "#{hashtag_record.id}: #{inspect(e)}"
-
-              Logger.error(error)
-              Repo.rollback(object.id)
-          end
-        end
-
-        object.id
-      else
-        e ->
-          error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}"
-          Logger.error(error)
-          Repo.rollback(object.id)
-      end
-    end)
-  end
 end
index bd568d8580c75539cc88354823cccbfcdc93b3b9..962529dfde9eb9dbfde1cde4f1609db51a88256e 100644 (file)
@@ -104,7 +104,8 @@ defmodule Pleroma.Application do
         chat_child(chat_enabled?()) ++
         [
           Pleroma.Web.Endpoint,
-          Pleroma.Gopher.Server
+          Pleroma.Gopher.Server,
+          Pleroma.Migrators.HashtagsTableMigrator
         ]
 
     # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
index ee0167f4ef537138983a9d338b411ee9e0e0e84b..dbfb114d69ea8a28ccad42a0b372febaca06cfc9 100644 (file)
@@ -96,7 +96,9 @@ defmodule Pleroma.Config do
     end
   end
 
-  def object_embedded_hashtags?, do: !get([:instance, :improved_hashtag_timeline])
+  def improved_hashtag_timeline_path, do: [:instance, :improved_hashtag_timeline]
+  def improved_hashtag_timeline, do: get(improved_hashtag_timeline_path())
+  def object_embedded_hashtags?, do: !improved_hashtag_timeline()
 
   def oauth_consumer_strategies, do: get([:auth, :oauth_consumer_strategies], [])
 
diff --git a/lib/pleroma/data_migration.ex b/lib/pleroma/data_migration.ex
new file mode 100644 (file)
index 0000000..64fa155
--- /dev/null
@@ -0,0 +1,46 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.DataMigration do
+  use Ecto.Schema
+
+  alias Pleroma.DataMigration
+  alias Pleroma.DataMigration.State
+  alias Pleroma.Repo
+
+  import Ecto.Changeset
+
+  schema "data_migrations" do
+    field(:name, :string)
+    field(:state, State, default: :pending)
+    field(:feature_lock, :boolean, default: false)
+    field(:params, :map, default: %{})
+    field(:data, :map, default: %{})
+
+    timestamps()
+  end
+
+  def changeset(data_migration, params \\ %{}) do
+    data_migration
+    |> cast(params, [:name, :state, :feature_lock, :params, :data])
+    |> validate_required([:name])
+    |> unique_constraint(:name)
+  end
+
+  def update(data_migration, params \\ %{}) do
+    data_migration
+    |> changeset(params)
+    |> Repo.update()
+  end
+
+  def update_state(data_migration, new_state) do
+    update(data_migration, %{state: new_state})
+  end
+
+  def get_by_name(name) do
+    Repo.get_by(DataMigration, name: name)
+  end
+
+  def populate_hashtags_table, do: get_by_name("populate_hashtags_table")
+end
index 0ded2855c79dc76783a609979b7513e7be2d6840..baf79dda79a029e7c7f376a06c9c61654b5dee7b 100644 (file)
@@ -9,7 +9,6 @@ defmodule Pleroma.Delivery do
   alias Pleroma.Object
   alias Pleroma.Repo
   alias Pleroma.User
-  alias Pleroma.User
 
   import Ecto.Changeset
   import Ecto.Query
index 6fc47620c7760c74d85a4b0be04656920a924550..f0ae658a440b4bd405a37e0bb8cfdb2866b5e45f 100644 (file)
@@ -17,3 +17,11 @@ defenum(Pleroma.FollowingRelationship.State,
   follow_accept: 2,
   follow_reject: 3
 )
+
+defenum(Pleroma.DataMigration.State,
+  pending: 1,
+  running: 2,
+  complete: 3,
+  failed: 4,
+  manual: 5
+)
diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex
new file mode 100644 (file)
index 0000000..a7e3de5
--- /dev/null
@@ -0,0 +1,211 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Migrators.HashtagsTableMigrator do
+  defmodule State do
+    use Agent
+
+    @init_state %{}
+
+    def start_link(_) do
+      Agent.start_link(fn -> @init_state end, name: __MODULE__)
+    end
+
+    def get do
+      Agent.get(__MODULE__, & &1)
+    end
+
+    def put(key, value) do
+      Agent.update(__MODULE__, fn state ->
+        Map.put(state, key, value)
+      end)
+    end
+
+    def increment(key, increment \\ 1) do
+      Agent.update(__MODULE__, fn state ->
+        updated_value = (state[key] || 0) + increment
+        Map.put(state, key, updated_value)
+      end)
+    end
+  end
+
+  use GenServer
+
+  require Logger
+
+  import Ecto.Query
+
+  alias Pleroma.Config
+  alias Pleroma.DataMigration
+  alias Pleroma.Hashtag
+  alias Pleroma.Object
+  alias Pleroma.Repo
+
+  defdelegate state(), to: State, as: :get
+  defdelegate put_state(key, value), to: State, as: :put
+  defdelegate increment_state(key, increment), to: State, as: :increment
+
+  defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table
+
+  def start_link(_) do
+    GenServer.start_link(__MODULE__, nil, name: __MODULE__)
+  end
+
+  @impl true
+  def init(_) do
+    {:ok, nil, {:continue, :init_state}}
+  end
+
+  @impl true
+  def handle_continue(:init_state, _state) do
+    {:ok, _} = State.start_link(nil)
+
+    put_state(:status, :init)
+
+    dm = data_migration()
+
+    cond do
+      Config.get(:env) == :test ->
+        put_state(:status, :noop)
+
+      is_nil(dm) ->
+        put_state(:status, :halt)
+        put_state(:message, "Data migration does not exist.")
+
+      dm.state == :manual ->
+        put_state(:status, :noop)
+        put_state(:message, "Data migration is in manual execution state.")
+
+      dm.state == :complete ->
+        handle_success()
+
+      true ->
+        send(self(), :migrate_hashtags)
+    end
+
+    {:noreply, nil}
+  end
+
+  @impl true
+  def handle_info(:migrate_hashtags, state) do
+    data_migration = data_migration()
+
+    {:ok, data_migration} = DataMigration.update_state(data_migration, :running)
+    put_state(:status, :running)
+
+    Logger.info("Starting transferring object embedded hashtags to `hashtags` table...")
+
+    max_processed_id = data_migration.data["max_processed_id"] || 0
+
+    # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out)
+    from(
+      object in Object,
+      left_join: hashtag in assoc(object, :hashtags),
+      where: object.id > ^max_processed_id,
+      where: is_nil(hashtag.id),
+      where:
+        fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data),
+      select: %{
+        id: object.id,
+        tag: fragment("(?)->'tag'", object.data)
+      }
+    )
+    |> Repo.chunk_stream(100, :batches, timeout: :infinity)
+    |> Stream.each(fn objects ->
+      object_ids = Enum.map(objects, & &1.id)
+
+      failed_ids =
+        objects
+        |> Enum.map(&transfer_object_hashtags(&1))
+        |> Enum.filter(&(elem(&1, 0) == :error))
+        |> Enum.map(&elem(&1, 1))
+
+      for failed_id <- failed_ids do
+        _ =
+          Repo.query(
+            "INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <>
+              "VALUES ($1, $2) ON CONFLICT DO NOTHING;",
+            [data_migration.id, failed_id]
+          )
+      end
+
+      _ =
+        Repo.query(
+          "DELETE FROM data_migration_failed_ids WHERE id = ANY($1)",
+          [object_ids -- failed_ids]
+        )
+
+      max_object_id = Enum.at(object_ids, -1)
+      _ = DataMigration.update(data_migration, %{data: %{"max_processed_id" => max_object_id}})
+
+      increment_state(:processed_count, length(object_ids))
+      increment_state(:failed_count, length(failed_ids))
+
+      # A quick and dirty approach to controlling the load this background migration imposes
+      sleep_interval = Config.get([:populate_hashtags_table, :sleep_interval_ms], 0)
+      Process.sleep(sleep_interval)
+    end)
+    |> Stream.run()
+
+    with {:ok, %{rows: [[0]]}} <-
+           Repo.query(
+             "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;",
+             [data_migration.id]
+           ) do
+      put_state(:status, :complete)
+      _ = DataMigration.update_state(data_migration, :complete)
+
+      handle_success()
+    else
+      _ ->
+        put_state(:status, :failed)
+        put_state(:message, "Please check data_migration_failed_ids records.")
+    end
+
+    {:noreply, state}
+  end
+
+  defp transfer_object_hashtags(object) do
+    hashtags = Object.object_data_hashtags(%{"tag" => object.tag})
+
+    Repo.transaction(fn ->
+      with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do
+        for hashtag_record <- hashtag_records do
+          with {:ok, _} <-
+                 Repo.query(
+                   "insert into hashtags_objects(hashtag_id, object_id) values ($1, $2);",
+                   [hashtag_record.id, object.id]
+                 ) do
+            nil
+          else
+            {:error, e} ->
+              error =
+                "ERROR: could not link object #{object.id} and hashtag " <>
+                  "#{hashtag_record.id}: #{inspect(e)}"
+
+              Logger.error(error)
+              Repo.rollback(object.id)
+          end
+        end
+
+        object.id
+      else
+        e ->
+          error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}"
+          Logger.error(error)
+          Repo.rollback(object.id)
+      end
+    end)
+  end
+
+  defp handle_success do
+    put_state(:status, :complete)
+
+    unless Config.improved_hashtag_timeline() do
+      Config.put(Config.improved_hashtag_timeline_path(), true)
+    end
+
+    :ok
+  end
+end
index 339843330f8a3c96fa376b0d5580e0d6f6e26deb..6131ae85b76582a64ca2bdf7eae8706416a27dae 100644 (file)
@@ -1276,7 +1276,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
         |> restrict_tag_all(opts)
 
       # TODO: benchmark (initial approach preferring non-aggregate ops when possible)
-      Config.get([:instance, :improved_hashtag_timeline]) == :join ->
+      Config.improved_hashtag_timeline() == :join ->
         query
         |> distinct([activity], true)
         |> restrict_hashtag_any(opts)
diff --git a/priv/repo/migrations/20210105195018_create_data_migrations.exs b/priv/repo/migrations/20210105195018_create_data_migrations.exs
new file mode 100644 (file)
index 0000000..5f2e8d9
--- /dev/null
@@ -0,0 +1,17 @@
+defmodule Pleroma.Repo.Migrations.CreateDataMigrations do
+  use Ecto.Migration
+
+  def change do
+    create_if_not_exists table(:data_migrations) do
+      add(:name, :string, null: false)
+      add(:state, :integer, default: 1)
+      add(:feature_lock, :boolean, default: false)
+      add(:params, :map, default: %{})
+      add(:data, :map, default: %{})
+
+      timestamps()
+    end
+
+    create_if_not_exists(unique_index(:data_migrations, [:name]))
+  end
+end
diff --git a/priv/repo/migrations/20210106183301_data_migration_create_populate_hashtags_table.exs b/priv/repo/migrations/20210106183301_data_migration_create_populate_hashtags_table.exs
new file mode 100644 (file)
index 0000000..2a965f0
--- /dev/null
@@ -0,0 +1,14 @@
+defmodule Pleroma.Repo.Migrations.DataMigrationCreatePopulateHashtagsTable do
+  use Ecto.Migration
+
+  def up do
+    dt = NaiveDateTime.utc_now()
+
+    execute(
+      "INSERT INTO data_migrations(name, inserted_at, updated_at) " <>
+        "VALUES ('populate_hashtags_table', '#{dt}', '#{dt}') ON CONFLICT DO NOTHING;"
+    )
+  end
+
+  def down, do: :ok
+end
diff --git a/priv/repo/migrations/20210111172254_create_data_migration_failed_ids.exs b/priv/repo/migrations/20210111172254_create_data_migration_failed_ids.exs
new file mode 100644 (file)
index 0000000..ba0be98
--- /dev/null
@@ -0,0 +1,14 @@
+defmodule Pleroma.Repo.Migrations.CreateDataMigrationFailedIds do
+  use Ecto.Migration
+
+  def change do
+    create_if_not_exists table(:data_migration_failed_ids, primary_key: false) do
+      add(:data_migration_id, references(:data_migrations), null: false)
+      add(:record_id, :bigint, null: false)
+    end
+
+    create_if_not_exists(
+      unique_index(:data_migration_failed_ids, [:data_migration_id, :record_id])
+    )
+  end
+end