Merge remote-tracking branch 'remotes/origin/develop' into feature/object-hashtags...
authorIvan Tashkinov <ivantashkinov@gmail.com>
Sun, 31 Jan 2021 17:38:58 +0000 (20:38 +0300)
committerIvan Tashkinov <ivantashkinov@gmail.com>
Sun, 31 Jan 2021 17:38:58 +0000 (20:38 +0300)
# Conflicts:
# CHANGELOG.md

39 files changed:
CHANGELOG.md
config/config.exs
config/description.exs
lib/mix/tasks/pleroma/database.ex
lib/pleroma/activity.ex
lib/pleroma/activity/ir/topics.ex
lib/pleroma/application.ex
lib/pleroma/config.ex
lib/pleroma/data_migration.ex [new file with mode: 0644]
lib/pleroma/delivery.ex
lib/pleroma/ecto_enums.ex
lib/pleroma/hashtag.ex [new file with mode: 0644]
lib/pleroma/migrators/hashtags_table_migrator.ex [new file with mode: 0644]
lib/pleroma/migrators/hashtags_table_migrator/state.ex [new file with mode: 0644]
lib/pleroma/object.ex
lib/pleroma/repo.ex
lib/pleroma/web/activity_pub/activity_pub.ex
lib/pleroma/web/activity_pub/mrf/simple_policy.ex
lib/pleroma/web/activity_pub/transmogrifier.ex
lib/pleroma/web/feed/feed_view.ex
lib/pleroma/web/mastodon_api/controllers/timeline_controller.ex
lib/pleroma/web/mastodon_api/views/status_view.ex
lib/pleroma/web/templates/feed/feed/_activity.atom.eex
lib/pleroma/web/templates/feed/feed/_activity.rss.eex
lib/pleroma/web/templates/feed/feed/_tag_activity.atom.eex
lib/pleroma/workers/cron/hashtags_cleanup_worker.ex [new file with mode: 0644]
priv/repo/migrations/20190711042021_create_safe_jsonb_set.exs
priv/repo/migrations/20201221202251_create_hashtags.exs [new file with mode: 0644]
priv/repo/migrations/20201221203824_create_hashtags_objects.exs [new file with mode: 0644]
priv/repo/migrations/20210105195018_create_data_migrations.exs [new file with mode: 0644]
priv/repo/migrations/20210106183301_data_migration_create_populate_hashtags_table.exs [new file with mode: 0644]
priv/repo/migrations/20210111172254_create_data_migration_failed_ids.exs [new file with mode: 0644]
test/pleroma/activity/ir/topics_test.exs
test/pleroma/hashtag_test.exs [new file with mode: 0644]
test/pleroma/object_test.exs
test/pleroma/web/activity_pub/activity_pub_test.exs
test/pleroma/web/activity_pub/transmogrifier/note_handling_test.exs
test/pleroma/web/common_api_test.exs
test/pleroma/web/mastodon_api/views/status_view_test.exs

index c4f3867a26d3d8797b0c4e5bcb8cd79ab8de6735..47c5078b8f953c222621c41249c164334f2d6375 100644 (file)
@@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
 - Admin API: Reports now ordered by newest
 - Deprecated `Pleroma.Uploaders.S3, :public_endpoint`. Now `Pleroma.Upload, :base_url` is the standard configuration key for all uploaders.
 - Improved Apache webserver support: updated sample configuration, MediaProxy cache invalidation verified with the included sample script
+- Extracted object hashtags into separate table in order to improve hashtag timeline performance (via background migration in `Pleroma.Migrators.HashtagsTableMigrator`). 
 
 ### Added
 
index b9af250c5f2f1e28fb6d1fcad718512e7946e8a0..36c60993683f19b35cb8e8565634b20624a76400 100644 (file)
@@ -555,10 +555,12 @@ config :pleroma, Oban,
     remote_fetcher: 2,
     attachments_cleanup: 1,
     new_users_digest: 1,
+    hashtags_cleanup: 1,
     mute_expire: 5
   ],
   plugins: [Oban.Plugins.Pruner],
   crontab: [
+    {"0 1 * * *", Pleroma.Workers.Cron.HashtagsCleanupWorker},
     {"0 0 * * 0", Pleroma.Workers.Cron.DigestEmailsWorker},
     {"0 0 * * *", Pleroma.Workers.Cron.NewUsersDigestWorker}
   ]
index f84b52a4f6904822f75f037990a2aeacf38b09c7..ed3a534a034da0a5d4a38c32821732a442e864e4 100644 (file)
@@ -941,6 +941,12 @@ config :pleroma, :config_description, [
         key: :show_reactions,
         type: :boolean,
         description: "Let favourites and emoji reactions be viewed through the API."
+      },
+      %{
+        key: :improved_hashtag_timeline,
+        type: :keyword,
+        description:
+          "If `true`, hashtags will be fetched from `hashtags` table for hashtags timeline. When `false`, object-embedded hashtags will be used (slower). Is auto-set to `true` (unless overridden) when HashtagsTableMigrator completes."
       }
     ]
   },
@@ -1950,6 +1956,7 @@ config :pleroma, :config_description, [
         type: {:list, :tuple},
         description: "Settings for cron background jobs",
         suggestions: [
+          {"0 1 * * *", Pleroma.Workers.Cron.HashtagsCleanupWorker},
           {"0 0 * * 0", Pleroma.Workers.Cron.DigestEmailsWorker},
           {"0 0 * * *", Pleroma.Workers.Cron.NewUsersDigestWorker}
         ]
index 6261910f026ec29dbddd4a53d4caff09ae78147b..4ddace9c97f58a0006435ff504903f3e3bcdc348 100644 (file)
@@ -8,10 +8,13 @@ defmodule Mix.Tasks.Pleroma.Database do
   alias Pleroma.Object
   alias Pleroma.Repo
   alias Pleroma.User
+
   require Logger
   require Pleroma.Constants
+
   import Ecto.Query
   import Mix.Pleroma
+
   use Mix.Task
 
   @shortdoc "A collection of database related tasks"
index 6542e684e1c82ef04794010889b4b774188a331a..d594038849bbd6ed68bd7f774adb52cacec7b911 100644 (file)
@@ -113,6 +113,7 @@ defmodule Pleroma.Activity do
     from([a] in query,
       left_join: b in Bookmark,
       on: b.user_id == ^user.id and b.activity_id == a.id,
+      as: :bookmark,
       preload: [bookmark: b]
     )
   end
@@ -123,6 +124,7 @@ defmodule Pleroma.Activity do
     from([a] in query,
       left_join: r in ReportNote,
       on: a.id == r.activity_id,
+      as: :report_note,
       preload: [report_notes: r]
     )
   end
index d94395fc175c7f97f39ca2abdf6db6c99b765302..7a603a61524a46deb07ebb8f4b6a48036bc6eddc 100644 (file)
@@ -48,14 +48,12 @@ defmodule Pleroma.Activity.Ir.Topics do
     tags
   end
 
-  defp hashtags_to_topics(%{data: %{"tag" => tags}}) do
-    tags
-    |> Enum.filter(&is_bitstring(&1))
-    |> Enum.map(fn tag -> "hashtag:" <> tag end)
+  defp hashtags_to_topics(object) do
+    object
+    |> Object.hashtags()
+    |> Enum.map(fn hashtag -> "hashtag:" <> hashtag end)
   end
 
-  defp hashtags_to_topics(_), do: []
-
   defp remote_topics(%{local: true}), do: []
 
   defp remote_topics(%{actor: actor}) when is_binary(actor),
index 9e262235eef83c86b2426924508699ac7c7dc1ef..d39451a9dcb8bf6b26fe018a8a2708e119b67295 100644 (file)
@@ -104,7 +104,8 @@ defmodule Pleroma.Application do
         chat_child(chat_enabled?()) ++
         [
           Pleroma.Web.Endpoint,
-          Pleroma.Gopher.Server
+          Pleroma.Gopher.Server,
+          Pleroma.Migrators.HashtagsTableMigrator
         ]
 
     # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
index f17e141282883d7c29e876a253003556a66a7747..0a6ac0ad083062ee9fb71368af74a6f5c531727b 100644 (file)
@@ -96,6 +96,9 @@ defmodule Pleroma.Config do
     end
   end
 
+  def improved_hashtag_timeline_path, do: [:instance, :improved_hashtag_timeline]
+  def improved_hashtag_timeline, do: get(improved_hashtag_timeline_path())
+
   def oauth_consumer_strategies, do: get([:auth, :oauth_consumer_strategies], [])
 
   def oauth_consumer_enabled?, do: oauth_consumer_strategies() != []
diff --git a/lib/pleroma/data_migration.ex b/lib/pleroma/data_migration.ex
new file mode 100644 (file)
index 0000000..64fa155
--- /dev/null
@@ -0,0 +1,46 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.DataMigration do
+  use Ecto.Schema
+
+  alias Pleroma.DataMigration
+  alias Pleroma.DataMigration.State
+  alias Pleroma.Repo
+
+  import Ecto.Changeset
+
+  schema "data_migrations" do
+    field(:name, :string)
+    field(:state, State, default: :pending)
+    field(:feature_lock, :boolean, default: false)
+    field(:params, :map, default: %{})
+    field(:data, :map, default: %{})
+
+    timestamps()
+  end
+
+  def changeset(data_migration, params \\ %{}) do
+    data_migration
+    |> cast(params, [:name, :state, :feature_lock, :params, :data])
+    |> validate_required([:name])
+    |> unique_constraint(:name)
+  end
+
+  def update(data_migration, params \\ %{}) do
+    data_migration
+    |> changeset(params)
+    |> Repo.update()
+  end
+
+  def update_state(data_migration, new_state) do
+    update(data_migration, %{state: new_state})
+  end
+
+  def get_by_name(name) do
+    Repo.get_by(DataMigration, name: name)
+  end
+
+  def populate_hashtags_table, do: get_by_name("populate_hashtags_table")
+end
index e8d53676742e871dda026151cc6e106708372e04..511d5cf58258abb773e2434f449834837afacb93 100644 (file)
@@ -9,7 +9,6 @@ defmodule Pleroma.Delivery do
   alias Pleroma.Object
   alias Pleroma.Repo
   alias Pleroma.User
-  alias Pleroma.User
 
   import Ecto.Changeset
   import Ecto.Query
index f198cccb750cd26d2cb32c47c6def9f85dc2ab12..2a9addabcecff4553bff21c483e9db83c8beb270 100644 (file)
@@ -17,3 +17,11 @@ defenum(Pleroma.FollowingRelationship.State,
   follow_accept: 2,
   follow_reject: 3
 )
+
+defenum(Pleroma.DataMigration.State,
+  pending: 1,
+  running: 2,
+  complete: 3,
+  failed: 4,
+  manual: 5
+)
diff --git a/lib/pleroma/hashtag.ex b/lib/pleroma/hashtag.ex
new file mode 100644 (file)
index 0000000..b059275
--- /dev/null
@@ -0,0 +1,58 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Hashtag do
+  use Ecto.Schema
+
+  import Ecto.Changeset
+
+  alias Pleroma.Hashtag
+  alias Pleroma.Repo
+
+  @derive {Jason.Encoder, only: [:data]}
+
+  schema "hashtags" do
+    field(:name, :string)
+    field(:data, :map, default: %{})
+
+    many_to_many(:objects, Pleroma.Object, join_through: "hashtags_objects", on_replace: :delete)
+
+    timestamps()
+  end
+
+  def get_by_name(name) do
+    Repo.get_by(Hashtag, name: name)
+  end
+
+  def get_or_create_by_name(name) when is_bitstring(name) do
+    with %Hashtag{} = hashtag <- get_by_name(name) do
+      {:ok, hashtag}
+    else
+      _ ->
+        %Hashtag{}
+        |> changeset(%{name: name})
+        |> Repo.insert()
+    end
+  end
+
+  def get_or_create_by_names(names) when is_list(names) do
+    Enum.reduce_while(names, {:ok, []}, fn name, {:ok, list} ->
+      case get_or_create_by_name(name) do
+        {:ok, %Hashtag{} = hashtag} ->
+          {:cont, {:ok, list ++ [hashtag]}}
+
+        error ->
+          {:halt, error}
+      end
+    end)
+  end
+
+  def changeset(%Hashtag{} = struct, params) do
+    struct
+    |> cast(params, [:name, :data])
+    |> update_change(:name, &String.downcase/1)
+    |> validate_required([:name])
+    |> unique_constraint(:name)
+  end
+end
diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex
new file mode 100644 (file)
index 0000000..07b42a7
--- /dev/null
@@ -0,0 +1,309 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Migrators.HashtagsTableMigrator do
+  use GenServer
+
+  require Logger
+
+  import Ecto.Query
+
+  alias __MODULE__.State
+  alias Pleroma.Config
+  alias Pleroma.DataMigration
+  alias Pleroma.Hashtag
+  alias Pleroma.Object
+  alias Pleroma.Repo
+
+  defdelegate state(), to: State, as: :get
+  defdelegate put_stat(key, value), to: State, as: :put
+  defdelegate increment_stat(key, increment), to: State, as: :increment
+
+  defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table
+
+  @reg_name {:global, __MODULE__}
+
+  def whereis, do: GenServer.whereis(@reg_name)
+
+  def start_link(_) do
+    case whereis() do
+      nil ->
+        GenServer.start_link(__MODULE__, nil, name: @reg_name)
+
+      pid ->
+        {:ok, pid}
+    end
+  end
+
+  @impl true
+  def init(_) do
+    {:ok, nil, {:continue, :init_state}}
+  end
+
+  @impl true
+  def handle_continue(:init_state, _state) do
+    {:ok, _} = State.start_link(nil)
+
+    update_status(:init)
+
+    data_migration = data_migration()
+    manual_migrations = Config.get([:instance, :manual_data_migrations], [])
+
+    cond do
+      Config.get(:env) == :test ->
+        update_status(:noop)
+
+      is_nil(data_migration) ->
+        update_status(:halt, "Data migration does not exist.")
+
+      data_migration.state == :manual or data_migration.name in manual_migrations ->
+        update_status(:noop, "Data migration is in manual execution state.")
+
+      data_migration.state == :complete ->
+        handle_success(data_migration)
+
+      true ->
+        send(self(), :migrate_hashtags)
+    end
+
+    {:noreply, nil}
+  end
+
+  @impl true
+  def handle_info(:migrate_hashtags, state) do
+    State.clear()
+
+    data_migration = data_migration()
+
+    persistent_data = Map.take(data_migration.data, ["max_processed_id"])
+
+    {:ok, data_migration} =
+      DataMigration.update(data_migration, %{state: :running, data: persistent_data})
+
+    update_status(:running)
+    put_stat(:started_at, NaiveDateTime.utc_now())
+
+    Logger.info("Starting transferring object embedded hashtags to `hashtags` table...")
+
+    max_processed_id = data_migration.data["max_processed_id"] || 0
+
+    query()
+    |> where([object], object.id > ^max_processed_id)
+    |> Repo.chunk_stream(100, :batches, timeout: :infinity)
+    |> Stream.each(fn objects ->
+      object_ids = Enum.map(objects, & &1.id)
+
+      failed_ids =
+        objects
+        |> Enum.map(&transfer_object_hashtags(&1))
+        |> Enum.filter(&(elem(&1, 0) == :error))
+        |> Enum.map(&elem(&1, 1))
+
+      for failed_id <- failed_ids do
+        _ =
+          Repo.query(
+            "INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <>
+              "VALUES ($1, $2) ON CONFLICT DO NOTHING;",
+            [data_migration.id, failed_id]
+          )
+      end
+
+      _ =
+        Repo.query(
+          "DELETE FROM data_migration_failed_ids " <>
+            "WHERE data_migration_id = $1 AND record_id = ANY($2)",
+          [data_migration.id, object_ids -- failed_ids]
+        )
+
+      max_object_id = Enum.at(object_ids, -1)
+
+      put_stat(:max_processed_id, max_object_id)
+      increment_stat(:processed_count, length(object_ids))
+      increment_stat(:failed_count, length(failed_ids))
+
+      put_stat(
+        :records_per_second,
+        state()[:processed_count] /
+          Enum.max([NaiveDateTime.diff(NaiveDateTime.utc_now(), state()[:started_at]), 1])
+      )
+
+      persist_stats(data_migration)
+
+      # A quick and dirty approach to controlling the load this background migration imposes
+      sleep_interval = Config.get([:populate_hashtags_table, :sleep_interval_ms], 0)
+      Process.sleep(sleep_interval)
+    end)
+    |> Stream.run()
+
+    with 0 <- failures_count(data_migration.id) do
+      {:ok, data_migration} = DataMigration.update_state(data_migration, :complete)
+
+      handle_success(data_migration)
+    else
+      _ ->
+        _ = DataMigration.update_state(data_migration, :failed)
+
+        update_status(:failed, "Please check data_migration_failed_ids records.")
+    end
+
+    {:noreply, state}
+  end
+
+  defp query do
+    # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out)
+    # Note: not checking activity type; HashtagsCleanupWorker should clean up unused records later
+    from(
+      object in Object,
+      where:
+        fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data),
+      select: %{
+        id: object.id,
+        tag: fragment("(?)->'tag'", object.data)
+      }
+    )
+    |> join(:left, [o], hashtags_objects in fragment("SELECT object_id FROM hashtags_objects"),
+      on: hashtags_objects.object_id == o.id
+    )
+    |> where([_o, hashtags_objects], is_nil(hashtags_objects.object_id))
+  end
+
+  defp transfer_object_hashtags(object) do
+    embedded_tags = if Map.has_key?(object, :tag), do: object.tag, else: object.data["tag"]
+    hashtags = Object.object_data_hashtags(%{"tag" => embedded_tags})
+
+    if Enum.any?(hashtags) do
+      transfer_object_hashtags(object, hashtags)
+    else
+      {:ok, object.id}
+    end
+  end
+
+  defp transfer_object_hashtags(object, hashtags) do
+    Repo.transaction(fn ->
+      with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do
+        for hashtag_record <- hashtag_records do
+          with {:ok, _} <-
+                 Repo.query(
+                   "insert into hashtags_objects(hashtag_id, object_id) values ($1, $2);",
+                   [hashtag_record.id, object.id]
+                 ) do
+            nil
+          else
+            {:error, e} ->
+              error =
+                "ERROR: could not link object #{object.id} and hashtag " <>
+                  "#{hashtag_record.id}: #{inspect(e)}"
+
+              Logger.error(error)
+              Repo.rollback(object.id)
+          end
+        end
+
+        object.id
+      else
+        e ->
+          error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}"
+          Logger.error(error)
+          Repo.rollback(object.id)
+      end
+    end)
+  end
+
+  @doc "Approximate count for current iteration (including processed records count)"
+  def count(force \\ false, timeout \\ :infinity) do
+    stored_count = state()[:count]
+
+    if stored_count && !force do
+      stored_count
+    else
+      processed_count = state()[:processed_count] || 0
+      max_processed_id = data_migration().data["max_processed_id"] || 0
+      query = where(query(), [object], object.id > ^max_processed_id)
+
+      count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count
+      put_stat(:count, count)
+      count
+    end
+  end
+
+  defp persist_stats(data_migration) do
+    runner_state = Map.drop(state(), [:status])
+    _ = DataMigration.update(data_migration, %{data: runner_state})
+  end
+
+  defp handle_success(data_migration) do
+    update_status(:complete)
+
+    cond do
+      data_migration.feature_lock ->
+        :noop
+
+      not is_nil(Config.improved_hashtag_timeline()) ->
+        :noop
+
+      true ->
+        Config.put(Config.improved_hashtag_timeline_path(), true)
+        :ok
+    end
+  end
+
+  def failed_objects_query do
+    from(o in Object)
+    |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"),
+      on: dmf.record_id == o.id
+    )
+    |> where([_o, dmf], dmf.data_migration_id == ^data_migration().id)
+    |> order_by([o], asc: o.id)
+  end
+
+  def failures_count(data_migration_id \\ nil) do
+    data_migration_id = data_migration_id || data_migration().id
+
+    with {:ok, %{rows: [[count]]}} <-
+           Repo.query(
+             "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;",
+             [data_migration_id]
+           ) do
+      count
+    end
+  end
+
+  def retry_failed do
+    data_migration = data_migration()
+
+    failed_objects_query()
+    |> Repo.chunk_stream(100, :one)
+    |> Stream.each(fn object ->
+      with {:ok, _} <- transfer_object_hashtags(object) do
+        _ =
+          Repo.query(
+            "DELETE FROM data_migration_failed_ids " <>
+              "WHERE data_migration_id = $1 AND record_id = $2",
+            [data_migration.id, object.id]
+          )
+      end
+    end)
+    |> Stream.run()
+  end
+
+  def force_continue do
+    send(whereis(), :migrate_hashtags)
+  end
+
+  def force_restart do
+    {:ok, _} = DataMigration.update(data_migration(), %{state: :pending, data: %{}})
+    force_continue()
+  end
+
+  def force_complete do
+    {:ok, data_migration} = DataMigration.update_state(data_migration(), :complete)
+
+    handle_success(data_migration)
+  end
+
+  defp update_status(status, message \\ nil) do
+    put_stat(:status, status)
+    put_stat(:message, message)
+  end
+end
diff --git a/lib/pleroma/migrators/hashtags_table_migrator/state.ex b/lib/pleroma/migrators/hashtags_table_migrator/state.ex
new file mode 100644 (file)
index 0000000..9015634
--- /dev/null
@@ -0,0 +1,35 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Migrators.HashtagsTableMigrator.State do
+  use Agent
+
+  @init_state %{}
+  @reg_name {:global, __MODULE__}
+
+  def start_link(_) do
+    Agent.start_link(fn -> @init_state end, name: @reg_name)
+  end
+
+  def clear do
+    Agent.update(@reg_name, fn _state -> @init_state end)
+  end
+
+  def get do
+    Agent.get(@reg_name, & &1)
+  end
+
+  def put(key, value) do
+    Agent.update(@reg_name, fn state ->
+      Map.put(state, key, value)
+    end)
+  end
+
+  def increment(key, increment \\ 1) do
+    Agent.update(@reg_name, fn state ->
+      updated_value = (state[key] || 0) + increment
+      Map.put(state, key, updated_value)
+    end)
+  end
+end
index aaf12384004c37911063e4e82e9b8a3ec49a88e1..52b77e41ca2b269636e57d09be5788c767d1025d 100644 (file)
@@ -10,6 +10,7 @@ defmodule Pleroma.Object do
 
   alias Pleroma.Activity
   alias Pleroma.Config
+  alias Pleroma.Hashtag
   alias Pleroma.Object
   alias Pleroma.Object.Fetcher
   alias Pleroma.ObjectTombstone
@@ -28,6 +29,8 @@ defmodule Pleroma.Object do
   schema "objects" do
     field(:data, :map)
 
+    many_to_many(:hashtags, Hashtag, join_through: "hashtags_objects", on_replace: :delete)
+
     timestamps()
   end
 
@@ -49,7 +52,8 @@ defmodule Pleroma.Object do
   end
 
   def create(data) do
-    Object.change(%Object{}, %{data: data})
+    %Object{}
+    |> Object.change(%{data: data})
     |> Repo.insert()
   end
 
@@ -58,8 +62,38 @@ defmodule Pleroma.Object do
     |> cast(params, [:data])
     |> validate_required([:data])
     |> unique_constraint(:ap_id, name: :objects_unique_apid_index)
+    |> maybe_handle_hashtags_change(struct)
+  end
+
+  # Note: not checking activity type; HashtagsCleanupWorker should clean up unused records later
+  defp maybe_handle_hashtags_change(changeset, struct) do
+    with data_hashtags_change = get_change(changeset, :data),
+         true <- hashtags_changed?(struct, data_hashtags_change),
+         {:ok, hashtag_records} <-
+           data_hashtags_change
+           |> object_data_hashtags()
+           |> Hashtag.get_or_create_by_names() do
+      put_assoc(changeset, :hashtags, hashtag_records)
+    else
+      false ->
+        changeset
+
+      {:error, hashtag_changeset} ->
+        failed_hashtag = get_field(hashtag_changeset, :name)
+
+        validate_change(changeset, :data, fn _, _ ->
+          [data: "error referencing hashtag: #{failed_hashtag}"]
+        end)
+    end
+  end
+
+  defp hashtags_changed?(%Object{} = struct, %{"tag" => _} = data) do
+    Enum.sort(embedded_hashtags(struct)) !=
+      Enum.sort(object_data_hashtags(data))
   end
 
+  defp hashtags_changed?(_, _), do: false
+
   def get_by_id(nil), do: nil
   def get_by_id(id), do: Repo.get(Object, id)
 
@@ -349,4 +383,39 @@ defmodule Pleroma.Object do
 
   def self_replies(object, opts \\ []),
     do: replies(object, Keyword.put(opts, :self_only, true))
+
+  def tags(%Object{data: %{"tag" => tags}}) when is_list(tags), do: tags
+
+  def tags(_), do: []
+
+  def hashtags(%Object{} = object) do
+    # Note: always using embedded hashtags regardless whether they are migrated to hashtags table
+    #   (embedded hashtags stay in sync anyways, and we avoid extra joins and preload hassle)
+    embedded_hashtags(object)
+  end
+
+  def embedded_hashtags(%Object{data: data}) do
+    object_data_hashtags(data)
+  end
+
+  def embedded_hashtags(_), do: []
+
+  def object_data_hashtags(%{"tag" => tags}) when is_list(tags) do
+    tags
+    |> Enum.filter(fn
+      %{"type" => "Hashtag"} = data -> Map.has_key?(data, "name")
+      plain_text when is_bitstring(plain_text) -> true
+      _ -> false
+    end)
+    |> Enum.map(fn
+      %{"name" => "#" <> hashtag} -> String.downcase(hashtag)
+      %{"name" => hashtag} -> String.downcase(hashtag)
+      hashtag when is_bitstring(hashtag) -> String.downcase(hashtag)
+    end)
+    |> Enum.uniq()
+    # Note: "" elements (plain text) might occur in `data.tag` for incoming objects
+    |> Enum.filter(&(&1 not in [nil, ""]))
+  end
+
+  def object_data_hashtags(_), do: []
 end
index 4556352d073bc079c7b4f9a0e2a903c351b9741b..61b64ed3ef997263ef0cb3a6ad0c6cbe275588ca 100644 (file)
@@ -63,8 +63,8 @@ defmodule Pleroma.Repo do
   iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500, :batches)
   """
   @spec chunk_stream(Ecto.Query.t(), integer(), atom()) :: Enumerable.t()
-  def chunk_stream(query, chunk_size, returns_as \\ :one) do
-    # We don't actually need start and end funcitons of resource streaming,
+  def chunk_stream(query, chunk_size, returns_as \\ :one, query_options \\ []) do
+    # We don't actually need start and end functions of resource streaming,
     # but it seems to be the only way to not fetch records one-by-one and
     # have individual records be the elements of the stream, instead of
     # lists of records
@@ -76,7 +76,7 @@ defmodule Pleroma.Repo do
           |> order_by(asc: :id)
           |> where([r], r.id > ^last_id)
           |> limit(^chunk_size)
-          |> all()
+          |> all(query_options)
           |> case do
             [] ->
               {:halt, last_id}
index d0bb07aab8f2327a2acab59901bf1f7a671a2a70..cda8d3f547809afa281bbaaef0e45584e6ef9b29 100644 (file)
@@ -669,51 +669,141 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
 
   defp restrict_since(query, _), do: query
 
-  defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
-    raise "Can't use the child object without preloading!"
+  defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
+    raise_on_missing_preload()
+  end
+
+  defp restrict_embedded_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
+    from(
+      [_activity, object] in query,
+      where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
+    )
+  end
+
+  defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
+    restrict_embedded_tag_any(query, %{tag: tag})
   end
 
-  defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
+  defp restrict_embedded_tag_all(query, _), do: query
+
+  defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
+    raise_on_missing_preload()
+  end
+
+  defp restrict_embedded_tag_any(query, %{tag: tag}) when is_list(tag) do
+    from(
+      [_activity, object] in query,
+      where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
+    )
+  end
+
+  defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
+    restrict_embedded_tag_any(query, %{tag: [tag]})
+  end
+
+  defp restrict_embedded_tag_any(query, _), do: query
+
+  defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
+    raise_on_missing_preload()
+  end
+
+  defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
+       when is_list(tag_reject) do
     from(
       [_activity, object] in query,
       where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
     )
   end
 
-  defp restrict_tag_reject(query, _), do: query
+  defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
+       when is_binary(tag_reject) do
+    restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
+  end
 
-  defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
-    raise "Can't use the child object without preloading!"
+  defp restrict_embedded_tag_reject_any(query, _), do: query
+
+  defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
+    raise_on_missing_preload()
   end
 
-  defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
+  defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do
     from(
       [_activity, object] in query,
-      where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
+      where:
+        fragment(
+          """
+          (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
+            ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
+              AND hashtags_objects.object_id = ?) @> ?
+          """,
+          ^tags,
+          object.id,
+          ^tags
+        )
     )
   end
 
-  defp restrict_tag_all(query, _), do: query
+  defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
+    restrict_hashtag_any(query, %{tag: tag})
+  end
+
+  defp restrict_hashtag_all(query, _), do: query
 
-  defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
-    raise "Can't use the child object without preloading!"
+  defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
+    raise_on_missing_preload()
   end
 
-  defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
+  defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do
     from(
       [_activity, object] in query,
-      where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
+      where:
+        fragment(
+          """
+          EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
+            ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
+              AND hashtags_objects.object_id = ? LIMIT 1)
+          """,
+          ^tags,
+          object.id
+        )
     )
   end
 
-  defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
+  defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
+    restrict_hashtag_any(query, %{tag: [tag]})
+  end
+
+  defp restrict_hashtag_any(query, _), do: query
+
+  defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
+    raise_on_missing_preload()
+  end
+
+  defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do
     from(
       [_activity, object] in query,
-      where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
+      where:
+        fragment(
+          """
+          NOT EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
+            ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
+              AND hashtags_objects.object_id = ? LIMIT 1)
+          """,
+          ^tags_reject,
+          object.id
+        )
     )
   end
 
-  defp restrict_tag(query, _), do: query
+  defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
+    restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
+  end
+
+  defp restrict_hashtag_reject_any(query, _), do: query
+
+  defp raise_on_missing_preload do
+    raise "Can't use the child object without preloading!"
+  end
 
   defp restrict_recipients(query, [], _user), do: query
 
@@ -1098,40 +1188,50 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
       skip_thread_containment: Config.get([:instance, :skip_thread_containment])
     }
 
-    Activity
-    |> maybe_preload_objects(opts)
-    |> maybe_preload_bookmarks(opts)
-    |> maybe_preload_report_notes(opts)
-    |> maybe_set_thread_muted_field(opts)
-    |> maybe_order(opts)
-    |> restrict_recipients(recipients, opts[:user])
-    |> restrict_replies(opts)
-    |> restrict_tag(opts)
-    |> restrict_tag_reject(opts)
-    |> restrict_tag_all(opts)
-    |> restrict_since(opts)
-    |> restrict_local(opts)
-    |> restrict_actor(opts)
-    |> restrict_type(opts)
-    |> restrict_state(opts)
-    |> restrict_favorited_by(opts)
-    |> restrict_blocked(restrict_blocked_opts)
-    |> restrict_muted(restrict_muted_opts)
-    |> restrict_filtered(opts)
-    |> restrict_media(opts)
-    |> restrict_visibility(opts)
-    |> restrict_thread_visibility(opts, config)
-    |> restrict_reblogs(opts)
-    |> restrict_pinned(opts)
-    |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
-    |> restrict_instance(opts)
-    |> restrict_announce_object_actor(opts)
-    |> restrict_filtered(opts)
-    |> Activity.restrict_deactivated_users()
-    |> exclude_poll_votes(opts)
-    |> exclude_chat_messages(opts)
-    |> exclude_invisible_actors(opts)
-    |> exclude_visibility(opts)
+    query =
+      Activity
+      |> maybe_preload_objects(opts)
+      |> maybe_preload_bookmarks(opts)
+      |> maybe_preload_report_notes(opts)
+      |> maybe_set_thread_muted_field(opts)
+      |> maybe_order(opts)
+      |> restrict_recipients(recipients, opts[:user])
+      |> restrict_replies(opts)
+      |> restrict_since(opts)
+      |> restrict_local(opts)
+      |> restrict_actor(opts)
+      |> restrict_type(opts)
+      |> restrict_state(opts)
+      |> restrict_favorited_by(opts)
+      |> restrict_blocked(restrict_blocked_opts)
+      |> restrict_muted(restrict_muted_opts)
+      |> restrict_filtered(opts)
+      |> restrict_media(opts)
+      |> restrict_visibility(opts)
+      |> restrict_thread_visibility(opts, config)
+      |> restrict_reblogs(opts)
+      |> restrict_pinned(opts)
+      |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
+      |> restrict_instance(opts)
+      |> restrict_announce_object_actor(opts)
+      |> restrict_filtered(opts)
+      |> Activity.restrict_deactivated_users()
+      |> exclude_poll_votes(opts)
+      |> exclude_chat_messages(opts)
+      |> exclude_invisible_actors(opts)
+      |> exclude_visibility(opts)
+
+    if Config.improved_hashtag_timeline() do
+      query
+      |> restrict_hashtag_any(opts)
+      |> restrict_hashtag_all(opts)
+      |> restrict_hashtag_reject_any(opts)
+    else
+      query
+      |> restrict_embedded_tag_any(opts)
+      |> restrict_embedded_tag_all(opts)
+      |> restrict_embedded_tag_reject_any(opts)
+    end
   end
 
   def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
index bb3838d2c18f009efefee8c2f18d1f373fac0abb..0b1be8c517bcecc24d0e7cb105dc6966a686709c 100644 (file)
@@ -74,9 +74,11 @@ defmodule Pleroma.Web.ActivityPub.MRF.SimplePolicy do
 
     object =
       if MRF.subdomain_match?(media_nsfw, actor_host) do
-        tags = (child_object["tag"] || []) ++ ["nsfw"]
-        child_object = Map.put(child_object, "tag", tags)
-        child_object = Map.put(child_object, "sensitive", true)
+        child_object =
+          child_object
+          |> Map.put("tag", (child_object["tag"] || []) ++ ["nsfw"])
+          |> Map.put("sensitive", true)
+
         Map.put(object, "object", child_object)
       else
         object
index 4d9a5617eb9b5b3fcdebfb44b0e3cf0ee9c8aed3..0a701334f00d08f7f57e2a8ac963e2e526414c2e 100644 (file)
@@ -32,18 +32,18 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
   """
   def fix_object(object, options \\ []) do
     object
-    |> strip_internal_fields
-    |> fix_actor
-    |> fix_url
-    |> fix_attachments
-    |> fix_context
+    |> strip_internal_fields()
+    |> fix_actor()
+    |> fix_url()
+    |> fix_attachments()
+    |> fix_context()
     |> fix_in_reply_to(options)
-    |> fix_emoji
-    |> fix_tag
-    |> set_sensitive
-    |> fix_content_map
-    |> fix_addressing
-    |> fix_summary
+    |> fix_emoji()
+    |> fix_tag()
+    |> set_sensitive()
+    |> fix_content_map()
+    |> fix_addressing()
+    |> fix_summary()
     |> fix_type(options)
   end
 
@@ -315,10 +315,9 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
     tags =
       tag
       |> Enum.filter(fn data -> data["type"] == "Hashtag" and data["name"] end)
-      |> Enum.map(fn %{"name" => name} ->
-        name
-        |> String.slice(1..-1)
-        |> String.downcase()
+      |> Enum.map(fn
+        %{"name" => "#" <> hashtag} -> String.downcase(hashtag)
+        %{"name" => hashtag} -> String.downcase(hashtag)
       end)
 
     Map.put(object, "tag", tag ++ tags)
index df97d2f46e1476df600585816a9940d70773a350..66940f3110a1abb7cbc46ccc9382851c1dd62da7 100644 (file)
@@ -32,6 +32,7 @@ defmodule Pleroma.Web.Feed.FeedView do
 
     %{
       activity: activity,
+      object: object,
       data: Map.get(object, :data),
       actor: actor
     }
index 08e6f23b98cf8eb8fa8a261c030795359e37a7da..1fb954a9b1007f695eb13ee5116171fc26db5252 100644 (file)
@@ -134,9 +134,9 @@ defmodule Pleroma.Web.MastodonAPI.TimelineController do
     tags =
       [params[:tag], params[:any]]
       |> List.flatten()
-      |> Enum.uniq()
       |> Enum.reject(&is_nil/1)
       |> Enum.map(&String.downcase/1)
+      |> Enum.uniq()
 
     tag_all =
       params
index 2cd6732fe0a5ee9cac257cc5dd9c5ca863936814..bbe0b11ecd221878d750ef25a00fb54e549ba46e 100644 (file)
@@ -201,8 +201,10 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do
     like_count = object.data["like_count"] || 0
     announcement_count = object.data["announcement_count"] || 0
 
-    tags = object.data["tag"] || []
-    sensitive = object.data["sensitive"] || Enum.member?(tags, "nsfw")
+    hashtags = Object.hashtags(object)
+    sensitive = object.data["sensitive"] || Enum.member?(hashtags, "nsfw")
+
+    tags = Object.tags(object)
 
     tag_mentions =
       tags
index 3fd150c4e7570b6d54e449a87ffac7d1adc74d7a..6688830baa954d081ac736703fe870e514535259 100644 (file)
@@ -22,7 +22,7 @@
     <link type="text/html" href='<%= @data["external_url"] %>' rel="alternate"/>
   <% end %>
 
-  <%= for tag <- @data["tag"] || [] do %>
+  <%= for tag <- Pleroma.Object.hashtags(@object) do %>
     <category term="<%= tag %>"></category>
   <% end %>
 
index 42960de7d45f58926546a23145a8d1db60ad108f..fc6d74b42f1d9aab417d6e99f5eb9a57085c106d 100644 (file)
@@ -21,7 +21,7 @@
     <link><%= @data["external_url"] %></link>
   <% end %>
 
-  <%= for tag <- @data["tag"] || [] do %>
+  <%= for tag <- Pleroma.Object.hashtags(@object) do %>
     <category term="<%= tag %>"></category>
   <% end %>
 
index cf5874a91341cb8108631829aa150ec9fd70e15b..c2de28fe48888bea7d8f3fc8c71d6a576cc5a89b 100644 (file)
@@ -41,7 +41,7 @@
       <% end %>
     <% end %>
 
-    <%= for tag <- @data["tag"] || [] do %>
+    <%= for tag <- Pleroma.Object.hashtags(@object) do %>
       <category term="<%= tag %>"></category>
     <% end %>
 
diff --git a/lib/pleroma/workers/cron/hashtags_cleanup_worker.ex b/lib/pleroma/workers/cron/hashtags_cleanup_worker.ex
new file mode 100644 (file)
index 0000000..b319067
--- /dev/null
@@ -0,0 +1,57 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Cron.HashtagsCleanupWorker do
+  @moduledoc """
+  The worker to clean up unused hashtags_objects and hashtags.
+  """
+
+  use Oban.Worker, queue: "hashtags_cleanup"
+
+  alias Pleroma.Repo
+
+  require Logger
+
+  @hashtags_objects_query """
+  DELETE FROM hashtags_objects WHERE object_id IN
+    (SELECT DISTINCT objects.id FROM objects
+      JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities
+        ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') =
+          (objects.data->>'id')
+        AND activities.data->>'type' = 'Create'
+      WHERE activities.id IS NULL);
+  """
+
+  @hashtags_query """
+  DELETE FROM hashtags WHERE id IN
+    (SELECT hashtags.id FROM hashtags
+      LEFT OUTER JOIN hashtags_objects
+        ON hashtags_objects.hashtag_id = hashtags.id
+      WHERE hashtags_objects.hashtag_id IS NULL AND hashtags.inserted_at < $1);
+  """
+
+  @impl Oban.Worker
+  def perform(_job) do
+    Logger.info("Cleaning up unused `hashtags_objects` records...")
+
+    {:ok, %{num_rows: hashtags_objects_count}} =
+      Repo.query(@hashtags_objects_query, [], timeout: :infinity)
+
+    Logger.info("Deleted #{hashtags_objects_count} unused `hashtags_objects` records.")
+
+    Logger.info("Cleaning up unused `hashtags` records...")
+
+    # Note: ignoring recently created hashtags since references are added after hashtag is created
+    {:ok, %{num_rows: hashtags_count}} =
+      Repo.query(@hashtags_query, [NaiveDateTime.add(NaiveDateTime.utc_now(), -3600 * 24)],
+        timeout: :infinity
+      )
+
+    Logger.info("Deleted #{hashtags_count} unused `hashtags` records.")
+
+    Logger.info("HashtagsCleanupWorker complete.")
+
+    :ok
+  end
+end
index 43d61670555ad019d4fe1b7659d12d23f508b79d..bfac09f9e397e1bfbc734ec636f6f8ffff244e33 100644 (file)
@@ -9,7 +9,7 @@ defmodule Pleroma.Repo.Migrations.CreateSafeJsonbSet do
     begin
       result := jsonb_set(target, path, coalesce(new_value, 'null'::jsonb), create_missing);
       if result is NULL then
-        raise 'jsonb_set tried to wipe the object, please report this incindent to Pleroma bug tracker. https://git.pleroma.social/pleroma/pleroma/issues/new';
+        raise 'jsonb_set tried to wipe the object, please report this incident to Pleroma bug tracker. https://git.pleroma.social/pleroma/pleroma/issues/new';
         return target;
       else
         return result;
diff --git a/priv/repo/migrations/20201221202251_create_hashtags.exs b/priv/repo/migrations/20201221202251_create_hashtags.exs
new file mode 100644 (file)
index 0000000..afc5220
--- /dev/null
@@ -0,0 +1,14 @@
+defmodule Pleroma.Repo.Migrations.CreateHashtags do
+  use Ecto.Migration
+
+  def change do
+    create_if_not_exists table(:hashtags) do
+      add(:name, :citext, null: false)
+      add(:data, :map, default: %{})
+
+      timestamps()
+    end
+
+    create_if_not_exists(unique_index(:hashtags, [:name]))
+  end
+end
diff --git a/priv/repo/migrations/20201221203824_create_hashtags_objects.exs b/priv/repo/migrations/20201221203824_create_hashtags_objects.exs
new file mode 100644 (file)
index 0000000..214ea81
--- /dev/null
@@ -0,0 +1,13 @@
+defmodule Pleroma.Repo.Migrations.CreateHashtagsObjects do
+  use Ecto.Migration
+
+  def change do
+    create_if_not_exists table(:hashtags_objects, primary_key: false) do
+      add(:hashtag_id, references(:hashtags), null: false)
+      add(:object_id, references(:objects), null: false)
+    end
+
+    create_if_not_exists(unique_index(:hashtags_objects, [:hashtag_id, :object_id]))
+    create_if_not_exists(index(:hashtags_objects, [:object_id]))
+  end
+end
diff --git a/priv/repo/migrations/20210105195018_create_data_migrations.exs b/priv/repo/migrations/20210105195018_create_data_migrations.exs
new file mode 100644 (file)
index 0000000..5f2e8d9
--- /dev/null
@@ -0,0 +1,17 @@
+defmodule Pleroma.Repo.Migrations.CreateDataMigrations do
+  use Ecto.Migration
+
+  def change do
+    create_if_not_exists table(:data_migrations) do
+      add(:name, :string, null: false)
+      add(:state, :integer, default: 1)
+      add(:feature_lock, :boolean, default: false)
+      add(:params, :map, default: %{})
+      add(:data, :map, default: %{})
+
+      timestamps()
+    end
+
+    create_if_not_exists(unique_index(:data_migrations, [:name]))
+  end
+end
diff --git a/priv/repo/migrations/20210106183301_data_migration_create_populate_hashtags_table.exs b/priv/repo/migrations/20210106183301_data_migration_create_populate_hashtags_table.exs
new file mode 100644 (file)
index 0000000..2a965f0
--- /dev/null
@@ -0,0 +1,14 @@
+defmodule Pleroma.Repo.Migrations.DataMigrationCreatePopulateHashtagsTable do
+  use Ecto.Migration
+
+  def up do
+    dt = NaiveDateTime.utc_now()
+
+    execute(
+      "INSERT INTO data_migrations(name, inserted_at, updated_at) " <>
+        "VALUES ('populate_hashtags_table', '#{dt}', '#{dt}') ON CONFLICT DO NOTHING;"
+    )
+  end
+
+  def down, do: :ok
+end
diff --git a/priv/repo/migrations/20210111172254_create_data_migration_failed_ids.exs b/priv/repo/migrations/20210111172254_create_data_migration_failed_ids.exs
new file mode 100644 (file)
index 0000000..ba0be98
--- /dev/null
@@ -0,0 +1,14 @@
+defmodule Pleroma.Repo.Migrations.CreateDataMigrationFailedIds do
+  use Ecto.Migration
+
+  def change do
+    create_if_not_exists table(:data_migration_failed_ids, primary_key: false) do
+      add(:data_migration_id, references(:data_migrations), null: false)
+      add(:record_id, :bigint, null: false)
+    end
+
+    create_if_not_exists(
+      unique_index(:data_migration_failed_ids, [:data_migration_id, :record_id])
+    )
+  end
+end
index 6b848e04d868f166ff8c88c34d49551bca0679e6..9c8e5d93276a5d1674ee216d8af238b024c55568 100644 (file)
@@ -11,6 +11,8 @@ defmodule Pleroma.Activity.Ir.TopicsTest do
 
   require Pleroma.Constants
 
+  import Mock
+
   describe "poll answer" do
     test "produce no topics" do
       activity = %Activity{object: %Object{data: %{"type" => "Answer"}}}
@@ -77,14 +79,13 @@ defmodule Pleroma.Activity.Ir.TopicsTest do
       refute Enum.member?(topics, "public:local:media")
     end
 
-    test "converts tags to hash tags", %{activity: %{object: %{data: data} = object} = activity} do
-      tagged_data = Map.put(data, "tag", ["foo", "bar"])
-      activity = %{activity | object: %{object | data: tagged_data}}
-
-      topics = Topics.get_activity_topics(activity)
+    test "converts tags to hash tags", %{activity: activity} do
+      with_mock(Object, [:passthrough], hashtags: fn _ -> ["foo", "bar"] end) do
+        topics = Topics.get_activity_topics(activity)
 
-      assert Enum.member?(topics, "hashtag:foo")
-      assert Enum.member?(topics, "hashtag:bar")
+        assert Enum.member?(topics, "hashtag:foo")
+        assert Enum.member?(topics, "hashtag:bar")
+      end
     end
 
     test "only converts strings to hash tags", %{
diff --git a/test/pleroma/hashtag_test.exs b/test/pleroma/hashtag_test.exs
new file mode 100644 (file)
index 0000000..0264dea
--- /dev/null
@@ -0,0 +1,17 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.HashtagTest do
+  use Pleroma.DataCase
+
+  alias Pleroma.Hashtag
+
+  describe "changeset validations" do
+    test "ensure non-blank :name" do
+      changeset = Hashtag.changeset(%Hashtag{}, %{name: ""})
+
+      assert {:name, {"can't be blank", [validation: :required]}} in changeset.errors
+    end
+  end
+end
index db7678d5d183ce9ab41079792924fccefc6513b9..8320660a50cacc3ae51acbb96ba5e9948745da31 100644 (file)
@@ -5,10 +5,13 @@
 defmodule Pleroma.ObjectTest do
   use Pleroma.DataCase
   use Oban.Testing, repo: Pleroma.Repo
+
   import ExUnit.CaptureLog
   import Pleroma.Factory
   import Tesla.Mock
+
   alias Pleroma.Activity
+  alias Pleroma.Hashtag
   alias Pleroma.Object
   alias Pleroma.Repo
   alias Pleroma.Tests.ObanHelpers
@@ -417,4 +420,28 @@ defmodule Pleroma.ObjectTest do
       assert updated_object.data["like_count"] == 1
     end
   end
+
+  describe ":hashtags association" do
+    test "Hashtag records are created with Object record and updated on its change" do
+      user = insert(:user)
+
+      {:ok, %{object: object}} =
+        CommonAPI.post(user, %{status: "some text #hashtag1 #hashtag2 ..."})
+
+      assert [%Hashtag{name: "hashtag1"}, %Hashtag{name: "hashtag2"}] =
+               Enum.sort_by(object.hashtags, & &1.name)
+
+      {:ok, object} = Object.update_data(object, %{"tag" => []})
+
+      assert [] = object.hashtags
+
+      object = Object.get_by_id(object.id) |> Repo.preload(:hashtags)
+      assert [] = object.hashtags
+
+      {:ok, object} = Object.update_data(object, %{"tag" => ["abc", "def"]})
+
+      assert [%Hashtag{name: "abc"}, %Hashtag{name: "def"}] =
+               Enum.sort_by(object.hashtags, & &1.name)
+    end
+  end
 end
index f4023856cec81971bb6a0ff10255ede627a86c31..5b9fc061ee9d2cff6033f077e28d8b4ebe0a2676 100644 (file)
@@ -217,28 +217,49 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubTest do
     {:ok, status_two} = CommonAPI.post(user, %{status: ". #essais"})
     {:ok, status_three} = CommonAPI.post(user, %{status: ". #test #reject"})
 
-    fetch_one = ActivityPub.fetch_activities([], %{type: "Create", tag: "test"})
+    {:ok, status_four} = CommonAPI.post(user, %{status: ". #any1 #any2"})
+    {:ok, status_five} = CommonAPI.post(user, %{status: ". #any2 #any1"})
 
-    fetch_two = ActivityPub.fetch_activities([], %{type: "Create", tag: ["test", "essais"]})
+    for hashtag_timeline_strategy <- [true, false] do
+      clear_config([:instance, :improved_hashtag_timeline], hashtag_timeline_strategy)
 
-    fetch_three =
-      ActivityPub.fetch_activities([], %{
-        type: "Create",
-        tag: ["test", "essais"],
-        tag_reject: ["reject"]
-      })
+      fetch_one = ActivityPub.fetch_activities([], %{type: "Create", tag: "test"})
 
-    fetch_four =
-      ActivityPub.fetch_activities([], %{
-        type: "Create",
-        tag: ["test"],
-        tag_all: ["test", "reject"]
-      })
+      fetch_two = ActivityPub.fetch_activities([], %{type: "Create", tag: ["test", "essais"]})
 
-    assert fetch_one == [status_one, status_three]
-    assert fetch_two == [status_one, status_two, status_three]
-    assert fetch_three == [status_one, status_two]
-    assert fetch_four == [status_three]
+      fetch_three =
+        ActivityPub.fetch_activities([], %{
+          type: "Create",
+          tag: ["test", "essais"],
+          tag_reject: ["reject"]
+        })
+
+      fetch_four =
+        ActivityPub.fetch_activities([], %{
+          type: "Create",
+          tag: ["test"],
+          tag_all: ["test", "reject"]
+        })
+
+      # Testing that deduplication (if needed) is done on DB (not Ecto) level; :limit is important
+      fetch_five =
+        ActivityPub.fetch_activities([], %{
+          type: "Create",
+          tag: ["any1", "any2"],
+          limit: 2
+        })
+
+      [fetch_one, fetch_two, fetch_three, fetch_four, fetch_five] =
+        Enum.map([fetch_one, fetch_two, fetch_three, fetch_four, fetch_five], fn statuses ->
+          Enum.map(statuses, fn s -> Repo.preload(s, object: :hashtags) end)
+        end)
+
+      assert fetch_one == [status_one, status_three]
+      assert fetch_two == [status_one, status_two, status_three]
+      assert fetch_three == [status_one, status_two]
+      assert fetch_four == [status_three]
+      assert fetch_five == [status_four, status_five]
+    end
   end
 
   describe "insertion" do
index 31586abc90ec6cfd408cf07678176a33250ebba0..deb956410f3fa5544e02d1080303908eecc9d5cb 100644 (file)
@@ -39,7 +39,8 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier.NoteHandlingTest do
       {:ok, %Activity{data: data, local: false}} = Transmogrifier.handle_incoming(data)
       object = Object.normalize(data["object"], fetch: false)
 
-      assert "test" in object.data["tag"]
+      assert "test" in Object.tags(object)
+      assert Object.hashtags(object) == ["test"]
     end
 
     test "it cleans up incoming notices which are not really DMs" do
@@ -220,7 +221,8 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier.NoteHandlingTest do
       {:ok, %Activity{data: data, local: false}} = Transmogrifier.handle_incoming(data)
       object = Object.normalize(data["object"], fetch: false)
 
-      assert Enum.at(object.data["tag"], 2) == "moo"
+      assert Enum.at(Object.tags(object), 2) == "moo"
+      assert Object.hashtags(object) == ["moo"]
     end
 
     test "it works for incoming notices with contentMap" do
index adfe58def268babb03fbdeb771a956dc00bfaf66..9d005697cd0efc6d4dbe812d99513161dea30f9c 100644 (file)
@@ -493,7 +493,7 @@ defmodule Pleroma.Web.CommonAPITest do
 
     object = Object.normalize(activity, fetch: false)
 
-    assert object.data["tag"] == ["2hu"]
+    assert Object.tags(object) == ["2hu"]
   end
 
   test "it adds emoji in the object" do
index ed59cf285652cab805ea999b225c8d672cc26975..48f5f0dd0cda5eb7d62643b7a329948dbd9acd27 100644 (file)
@@ -262,8 +262,8 @@ defmodule Pleroma.Web.MastodonAPI.StatusViewTest do
       mentions: [],
       tags: [
         %{
-          name: "#{object_data["tag"]}",
-          url: "http://localhost:4001/tag/#{object_data["tag"]}"
+          name: "#{hd(object_data["tag"])}",
+          url: "http://localhost:4001/tag/#{hd(object_data["tag"])}"
         }
       ],
       application: %{