Merge remote-tracking branch 'remotes/origin/develop' into feature/object-hashtags...
authorIvan Tashkinov <ivantashkinov@gmail.com>
Tue, 23 Feb 2021 10:58:35 +0000 (13:58 +0300)
committerIvan Tashkinov <ivantashkinov@gmail.com>
Tue, 23 Feb 2021 10:58:35 +0000 (13:58 +0300)
# Conflicts:
# lib/pleroma/application.ex
# lib/pleroma/config.ex

42 files changed:
CHANGELOG.md
config/config.exs
config/description.exs
docs/configuration/cheatsheet.md
lib/mix/tasks/pleroma/database.ex
lib/pleroma/activity.ex
lib/pleroma/activity/ir/topics.ex
lib/pleroma/application.ex
lib/pleroma/config.ex
lib/pleroma/data_migration.ex [new file with mode: 0644]
lib/pleroma/delivery.ex
lib/pleroma/ecto_enums.ex
lib/pleroma/hashtag.ex [new file with mode: 0644]
lib/pleroma/migrators/hashtags_table_migrator.ex [new file with mode: 0644]
lib/pleroma/migrators/hashtags_table_migrator/state.ex [new file with mode: 0644]
lib/pleroma/object.ex
lib/pleroma/repo.ex
lib/pleroma/web/activity_pub/activity_pub.ex
lib/pleroma/web/activity_pub/mrf/simple_policy.ex
lib/pleroma/web/activity_pub/transmogrifier.ex
lib/pleroma/web/feed/feed_view.ex
lib/pleroma/web/mastodon_api/controllers/timeline_controller.ex
lib/pleroma/web/mastodon_api/views/status_view.ex
lib/pleroma/web/templates/feed/feed/_activity.atom.eex
lib/pleroma/web/templates/feed/feed/_activity.rss.eex
lib/pleroma/web/templates/feed/feed/_tag_activity.atom.eex
priv/repo/migrations/20190711042021_create_safe_jsonb_set.exs
priv/repo/migrations/20201221202251_create_hashtags.exs [new file with mode: 0644]
priv/repo/migrations/20201221202252_remove_data_from_hashtags.exs [new file with mode: 0644]
priv/repo/migrations/20201221203824_create_hashtags_objects.exs [new file with mode: 0644]
priv/repo/migrations/20210105195018_create_data_migrations.exs [new file with mode: 0644]
priv/repo/migrations/20210106183301_data_migration_create_populate_hashtags_table.exs [new file with mode: 0644]
priv/repo/migrations/20210111172254_create_data_migration_failed_ids.exs [new file with mode: 0644]
priv/repo/migrations/20210222183840_remove_hashtags_objects_duplicate_index.exs [new file with mode: 0644]
priv/repo/migrations/20210222184616_change_hashtags_name_to_text.exs [new file with mode: 0644]
test/pleroma/activity/ir/topics_test.exs
test/pleroma/hashtag_test.exs [new file with mode: 0644]
test/pleroma/object_test.exs
test/pleroma/web/activity_pub/activity_pub_test.exs
test/pleroma/web/activity_pub/transmogrifier/note_handling_test.exs
test/pleroma/web/common_api_test.exs
test/pleroma/web/mastodon_api/views/status_view_test.exs

index 74473b3d080b32b887983e0c7ff41ab701c18e1a..974925e4fa060a7205fb116c99232803a3ca7316 100644 (file)
@@ -39,6 +39,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
 - Pleroma API: `GET /api/v1/pleroma/chats` is deprecated in favor of `GET /api/v2/pleroma/chats`.
 
 </details>
+- Improved hashtag timeline performance (requires a background migration). 
 
 ### Added
 
index 66aee3264a0fa0a305a6748ceba77d1b846d1369..23ada389b4fb8c0554bd1f65eec293abd4ff8685 100644 (file)
@@ -654,6 +654,10 @@ config :pleroma, :oauth2,
 
 config :pleroma, :database, rum_enabled: false
 
+config :pleroma, :features, improved_hashtag_timeline: :auto
+
+config :pleroma, :populate_hashtags_table, fault_rate_allowance: 0.01
+
 config :pleroma, :env, Mix.env()
 
 config :http_signatures,
index d9b15e68418eeed6e515e2a6fb871b9ef49d1759..41e5e4056e6b3e79e7dee9ce470f546239c5e209 100644 (file)
@@ -459,6 +459,42 @@ config :pleroma, :config_description, [
       }
     ]
   },
+  %{
+    group: :pleroma,
+    key: :features,
+    type: :group,
+    description: "Customizable features",
+    children: [
+      %{
+        key: :improved_hashtag_timeline,
+        type: {:dropdown, :atom},
+        description:
+          "Setting to force toggle / force disable improved hashtags timeline. `:enabled` forces hashtags to be fetched from `hashtags` table for hashtags timeline. `:disabled` forces object-embedded hashtags to be used (slower). Keep it `:auto` for automatic behaviour (it is auto-set to `:enabled` [unless overridden] when HashtagsTableMigrator completes).",
+        suggestions: [:auto, :enabled, :disabled]
+      }
+    ]
+  },
+  %{
+    group: :pleroma,
+    key: :populate_hashtags_table,
+    type: :group,
+    description: "`populate_hashtags_table` background migration settings",
+    children: [
+      %{
+        key: :fault_rate_allowance,
+        type: :float,
+        description:
+          "Max accepted rate of objects that failed in the migration. Any value from 0.0 which tolerates no errors to 1.0 which will enable the feature even if hashtags transfer failed for all records.",
+        suggestions: [0.01]
+      },
+      %{
+        key: :sleep_interval_ms,
+        type: :integer,
+        description:
+          "Sleep interval between each chunk of processed records in order to decrease the load on the system (defaults to 0 and should be keep default on most instances)."
+      }
+    ]
+  },
   %{
     group: :pleroma,
     key: :instance,
index ad57684656f3f1422bc0a7017f49249d3d2c32e7..db1deb6653fc4b318e89771012b96618de330a30 100644 (file)
@@ -65,6 +65,13 @@ To add configuration to your config file, you can copy it from the base config.
 * `show_reactions`: Let favourites and emoji reactions be viewed through the API (default: `true`).
 * `password_reset_token_validity`: The time after which reset tokens aren't accepted anymore, in seconds (default: one day).
 
+## :database
+* `improved_hashtag_timeline`: Setting to force toggle / force disable improved hashtags timeline. `:enabled` forces hashtags to be fetched from `hashtags` table for hashtags timeline. `:disabled` forces object-embedded hashtags to be used (slower). Keep it `:auto` for automatic behaviour (it is auto-set to `:enabled` [unless overridden] when HashtagsTableMigrator completes).
+
+## Background migrations
+* `populate_hashtags_table/sleep_interval_ms`: Sleep interval between each chunk of processed records in order to decrease the load on the system (defaults to 0 and should be keep default on most instances).
+* `populate_hashtags_table/fault_rate_allowance`: Max rate of failed objects to actually processed objects in order to enable the feature (any value from 0.0 which tolerates no errors to 1.0 which will enable the feature even if hashtags transfer failed for all records).
+
 ## Welcome
 * `direct_message`: - welcome message sent as a direct message.
   * `enabled`: Enables the send a direct message to a newly registered user. Defaults to `false`.
index 2403ed5813f40c6a311669f6b82fafe8f7aee184..2136ddb0243a4d414066663ad0405cc3567694a8 100644 (file)
@@ -8,10 +8,13 @@ defmodule Mix.Tasks.Pleroma.Database do
   alias Pleroma.Object
   alias Pleroma.Repo
   alias Pleroma.User
+
   require Logger
   require Pleroma.Constants
+
   import Ecto.Query
   import Mix.Pleroma
+
   use Mix.Task
 
   @shortdoc "A collection of database related tasks"
@@ -214,4 +217,33 @@ defmodule Mix.Tasks.Pleroma.Database do
       shell_info('Done.')
     end
   end
+
+  # Rolls back a specific migration (leaving subsequent migrations applied).
+  # WARNING: imposes a risk of unrecoverable data loss — proceed at your own responsibility.
+  # Based on https://stackoverflow.com/a/53825840
+  def run(["rollback", version]) do
+    prompt = "SEVERE WARNING: this operation may result in unrecoverable data loss. Continue?"
+
+    if shell_prompt(prompt, "n") in ~w(Yn Y y) do
+      {_, result, _} =
+        Ecto.Migrator.with_repo(Pleroma.Repo, fn repo ->
+          version = String.to_integer(version)
+          re = ~r/^#{version}_.*\.exs/
+          path = Ecto.Migrator.migrations_path(repo)
+
+          with {:find, "" <> file} <- {:find, Enum.find(File.ls!(path), &String.match?(&1, re))},
+               {:compile, [{mod, _} | _]} <- {:compile, Code.compile_file(Path.join(path, file))},
+               {:rollback, :ok} <- {:rollback, Ecto.Migrator.down(repo, version, mod)} do
+            {:ok, "Reversed migration: #{file}"}
+          else
+            {:find, _} -> {:error, "No migration found with version prefix: #{version}"}
+            {:compile, e} -> {:error, "Problem compiling migration module: #{inspect(e)}"}
+            {:rollback, e} -> {:error, "Problem reversing migration: #{inspect(e)}"}
+            e -> {:error, "Something unexpected happened: #{inspect(e)}"}
+          end
+        end)
+
+      IO.inspect(result)
+    end
+  end
 end
index 6542e684e1c82ef04794010889b4b774188a331a..d594038849bbd6ed68bd7f774adb52cacec7b911 100644 (file)
@@ -113,6 +113,7 @@ defmodule Pleroma.Activity do
     from([a] in query,
       left_join: b in Bookmark,
       on: b.user_id == ^user.id and b.activity_id == a.id,
+      as: :bookmark,
       preload: [bookmark: b]
     )
   end
@@ -123,6 +124,7 @@ defmodule Pleroma.Activity do
     from([a] in query,
       left_join: r in ReportNote,
       on: a.id == r.activity_id,
+      as: :report_note,
       preload: [report_notes: r]
     )
   end
index d94395fc175c7f97f39ca2abdf6db6c99b765302..7a603a61524a46deb07ebb8f4b6a48036bc6eddc 100644 (file)
@@ -48,14 +48,12 @@ defmodule Pleroma.Activity.Ir.Topics do
     tags
   end
 
-  defp hashtags_to_topics(%{data: %{"tag" => tags}}) do
-    tags
-    |> Enum.filter(&is_bitstring(&1))
-    |> Enum.map(fn tag -> "hashtag:" <> tag end)
+  defp hashtags_to_topics(object) do
+    object
+    |> Object.hashtags()
+    |> Enum.map(fn hashtag -> "hashtag:" <> hashtag end)
   end
 
-  defp hashtags_to_topics(_), do: []
-
   defp remote_topics(%{local: true}), do: []
 
   defp remote_topics(%{actor: actor}) when is_binary(actor),
index c853a2bb4564409c22bae36d5fdc1a622913a35a..2ff7562e24d5f89e28e1a8a58be1a04139da928f 100644 (file)
@@ -104,6 +104,7 @@ defmodule Pleroma.Application do
         dont_run_in_test(@mix_env) ++
         chat_child(chat_enabled?()) ++
         [
+          Pleroma.Migrators.HashtagsTableMigrator,
           Pleroma.Gopher.Server
         ]
 
index 2e15a37193ada875eeb73746d30b2bc37e464a68..54e332595d73762f155ce5c438ea8bfd494184c0 100644 (file)
@@ -99,4 +99,8 @@ defmodule Pleroma.Config do
   def oauth_consumer_strategies, do: get([:auth, :oauth_consumer_strategies], [])
 
   def oauth_consumer_enabled?, do: oauth_consumer_strategies() != []
+
+  def feature_enabled?(feature_name) do
+    get([:features, feature_name]) not in [nil, false, :disabled, :auto]
+  end
 end
diff --git a/lib/pleroma/data_migration.ex b/lib/pleroma/data_migration.ex
new file mode 100644 (file)
index 0000000..1377af1
--- /dev/null
@@ -0,0 +1,45 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.DataMigration do
+  use Ecto.Schema
+
+  alias Pleroma.DataMigration
+  alias Pleroma.DataMigration.State
+  alias Pleroma.Repo
+
+  import Ecto.Changeset
+  import Ecto.Query
+
+  schema "data_migrations" do
+    field(:name, :string)
+    field(:state, State, default: :pending)
+    field(:feature_lock, :boolean, default: false)
+    field(:params, :map, default: %{})
+    field(:data, :map, default: %{})
+
+    timestamps()
+  end
+
+  def changeset(data_migration, params \\ %{}) do
+    data_migration
+    |> cast(params, [:name, :state, :feature_lock, :params, :data])
+    |> validate_required([:name])
+    |> unique_constraint(:name)
+  end
+
+  def update_one_by_id(id, params \\ %{}) do
+    with {1, _} <-
+           from(dm in DataMigration, where: dm.id == ^id)
+           |> Repo.update_all(set: params) do
+      :ok
+    end
+  end
+
+  def get_by_name(name) do
+    Repo.get_by(DataMigration, name: name)
+  end
+
+  def populate_hashtags_table, do: get_by_name("populate_hashtags_table")
+end
index e8d53676742e871dda026151cc6e106708372e04..511d5cf58258abb773e2434f449834837afacb93 100644 (file)
@@ -9,7 +9,6 @@ defmodule Pleroma.Delivery do
   alias Pleroma.Object
   alias Pleroma.Repo
   alias Pleroma.User
-  alias Pleroma.User
 
   import Ecto.Changeset
   import Ecto.Query
index f198cccb750cd26d2cb32c47c6def9f85dc2ab12..2a9addabcecff4553bff21c483e9db83c8beb270 100644 (file)
@@ -17,3 +17,11 @@ defenum(Pleroma.FollowingRelationship.State,
   follow_accept: 2,
   follow_reject: 3
 )
+
+defenum(Pleroma.DataMigration.State,
+  pending: 1,
+  running: 2,
+  complete: 3,
+  failed: 4,
+  manual: 5
+)
diff --git a/lib/pleroma/hashtag.ex b/lib/pleroma/hashtag.ex
new file mode 100644 (file)
index 0000000..53e2e9c
--- /dev/null
@@ -0,0 +1,106 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Hashtag do
+  use Ecto.Schema
+
+  import Ecto.Changeset
+  import Ecto.Query
+
+  alias Ecto.Multi
+  alias Pleroma.Hashtag
+  alias Pleroma.Object
+  alias Pleroma.Repo
+
+  schema "hashtags" do
+    field(:name, :string)
+
+    many_to_many(:objects, Object, join_through: "hashtags_objects", on_replace: :delete)
+
+    timestamps()
+  end
+
+  def normalize_name(name) do
+    name
+    |> String.downcase()
+    |> String.trim()
+  end
+
+  def get_or_create_by_name(name) do
+    changeset = changeset(%Hashtag{}, %{name: name})
+
+    Repo.insert(
+      changeset,
+      on_conflict: [set: [name: get_field(changeset, :name)]],
+      conflict_target: :name,
+      returning: true
+    )
+  end
+
+  def get_or_create_by_names(names) when is_list(names) do
+    names = Enum.map(names, &normalize_name/1)
+    timestamp = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
+
+    structs =
+      Enum.map(names, fn name ->
+        %Hashtag{}
+        |> changeset(%{name: name})
+        |> Map.get(:changes)
+        |> Map.merge(%{inserted_at: timestamp, updated_at: timestamp})
+      end)
+
+    try do
+      with {:ok, %{query_op: hashtags}} <-
+             Multi.new()
+             |> Multi.insert_all(:insert_all_op, Hashtag, structs,
+               on_conflict: :nothing,
+               conflict_target: :name
+             )
+             |> Multi.run(:query_op, fn _repo, _changes ->
+               {:ok, Repo.all(from(ht in Hashtag, where: ht.name in ^names))}
+             end)
+             |> Repo.transaction() do
+        {:ok, hashtags}
+      else
+        {:error, _name, value, _changes_so_far} -> {:error, value}
+      end
+    rescue
+      e -> {:error, e}
+    end
+  end
+
+  def changeset(%Hashtag{} = struct, params) do
+    struct
+    |> cast(params, [:name])
+    |> update_change(:name, &normalize_name/1)
+    |> validate_required([:name])
+    |> unique_constraint(:name)
+  end
+
+  def unlink(%Object{id: object_id}) do
+    with {_, hashtag_ids} <-
+           from(hto in "hashtags_objects",
+             where: hto.object_id == ^object_id,
+             select: hto.hashtag_id
+           )
+           |> Repo.delete_all(),
+         {:ok, unreferenced_count} <- delete_unreferenced(hashtag_ids) do
+      {:ok, length(hashtag_ids), unreferenced_count}
+    end
+  end
+
+  @delete_unreferenced_query """
+  DELETE FROM hashtags WHERE id IN
+    (SELECT hashtags.id FROM hashtags
+      LEFT OUTER JOIN hashtags_objects
+        ON hashtags_objects.hashtag_id = hashtags.id
+      WHERE hashtags_objects.hashtag_id IS NULL AND hashtags.id = ANY($1));
+  """
+
+  def delete_unreferenced(ids) do
+    with {:ok, %{num_rows: deleted_count}} <- Repo.query(@delete_unreferenced_query, [ids]) do
+      {:ok, deleted_count}
+    end
+  end
+end
diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex
new file mode 100644 (file)
index 0000000..6123c88
--- /dev/null
@@ -0,0 +1,369 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Migrators.HashtagsTableMigrator do
+  use GenServer
+
+  require Logger
+
+  import Ecto.Query
+
+  alias __MODULE__.State
+  alias Pleroma.Config
+  alias Pleroma.Hashtag
+  alias Pleroma.Object
+  alias Pleroma.Repo
+
+  defdelegate data_migration(), to: Pleroma.DataMigration, as: :populate_hashtags_table
+  defdelegate data_migration_id(), to: State
+
+  defdelegate state(), to: State
+  defdelegate persist_state(), to: State, as: :persist_to_db
+  defdelegate get_stat(key, value \\ nil), to: State, as: :get_data_key
+  defdelegate put_stat(key, value), to: State, as: :put_data_key
+  defdelegate increment_stat(key, increment), to: State, as: :increment_data_key
+
+  @feature_config_path [:features, :improved_hashtag_timeline]
+  @reg_name {:global, __MODULE__}
+
+  def whereis, do: GenServer.whereis(@reg_name)
+
+  def feature_state, do: Config.get(@feature_config_path)
+
+  def start_link(_) do
+    case whereis() do
+      nil ->
+        GenServer.start_link(__MODULE__, nil, name: @reg_name)
+
+      pid ->
+        {:ok, pid}
+    end
+  end
+
+  @impl true
+  def init(_) do
+    {:ok, nil, {:continue, :init_state}}
+  end
+
+  @impl true
+  def handle_continue(:init_state, _state) do
+    {:ok, _} = State.start_link(nil)
+
+    data_migration = data_migration()
+    manual_migrations = Config.get([:instance, :manual_data_migrations], [])
+
+    cond do
+      Config.get(:env) == :test ->
+        update_status(:noop)
+
+      is_nil(data_migration) ->
+        message = "Data migration does not exist."
+        update_status(:failed, message)
+        Logger.error("#{__MODULE__}: #{message}")
+
+      data_migration.state == :manual or data_migration.name in manual_migrations ->
+        message = "Data migration is in manual execution or manual fix mode."
+        update_status(:manual, message)
+        Logger.warn("#{__MODULE__}: #{message}")
+
+      data_migration.state == :complete ->
+        on_complete(data_migration)
+
+      true ->
+        send(self(), :migrate_hashtags)
+    end
+
+    {:noreply, nil}
+  end
+
+  @impl true
+  def handle_info(:migrate_hashtags, state) do
+    State.reinit()
+
+    update_status(:running)
+    put_stat(:iteration_processed_count, 0)
+    put_stat(:started_at, NaiveDateTime.utc_now())
+
+    data_migration_id = data_migration_id()
+    max_processed_id = get_stat(:max_processed_id, 0)
+
+    Logger.info("Transferring embedded hashtags to `hashtags` (from oid: #{max_processed_id})...")
+
+    query()
+    |> where([object], object.id > ^max_processed_id)
+    |> Repo.chunk_stream(100, :batches, timeout: :infinity)
+    |> Stream.each(fn objects ->
+      object_ids = Enum.map(objects, & &1.id)
+
+      results = Enum.map(objects, &transfer_object_hashtags(&1))
+
+      failed_ids =
+        results
+        |> Enum.filter(&(elem(&1, 0) == :error))
+        |> Enum.map(&elem(&1, 1))
+
+      # Count of objects with hashtags (`{:noop, id}` is returned for objects having other AS2 tags)
+      chunk_affected_count =
+        results
+        |> Enum.filter(&(elem(&1, 0) == :ok))
+        |> length()
+
+      for failed_id <- failed_ids do
+        _ =
+          Repo.query(
+            "INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <>
+              "VALUES ($1, $2) ON CONFLICT DO NOTHING;",
+            [data_migration_id, failed_id]
+          )
+      end
+
+      _ =
+        Repo.query(
+          "DELETE FROM data_migration_failed_ids " <>
+            "WHERE data_migration_id = $1 AND record_id = ANY($2)",
+          [data_migration_id, object_ids -- failed_ids]
+        )
+
+      max_object_id = Enum.at(object_ids, -1)
+
+      put_stat(:max_processed_id, max_object_id)
+      increment_stat(:iteration_processed_count, length(object_ids))
+      increment_stat(:processed_count, length(object_ids))
+      increment_stat(:failed_count, length(failed_ids))
+      increment_stat(:affected_count, chunk_affected_count)
+      put_stat(:records_per_second, records_per_second())
+      persist_state()
+
+      # A quick and dirty approach to controlling the load this background migration imposes
+      sleep_interval = Config.get([:populate_hashtags_table, :sleep_interval_ms], 0)
+      Process.sleep(sleep_interval)
+    end)
+    |> Stream.run()
+
+    fault_rate = fault_rate()
+    put_stat(:fault_rate, fault_rate)
+    fault_rate_allowance = Config.get([:populate_hashtags_table, :fault_rate_allowance], 0)
+
+    cond do
+      fault_rate == 0 ->
+        set_complete()
+
+      is_float(fault_rate) and fault_rate <= fault_rate_allowance ->
+        message = """
+        Done with fault rate of #{fault_rate} which doesn't exceed #{fault_rate_allowance}.
+        Putting data migration to manual fix mode. Check `retry_failed/0`.
+        """
+
+        Logger.warn("#{__MODULE__}: #{message}")
+        update_status(:manual, message)
+        on_complete(data_migration())
+
+      true ->
+        message = "Too many failures. Check data_migration_failed_ids records / `retry_failed/0`."
+        Logger.error("#{__MODULE__}: #{message}")
+        update_status(:failed, message)
+    end
+
+    persist_state()
+    {:noreply, state}
+  end
+
+  def fault_rate do
+    with failures_count when is_integer(failures_count) <- failures_count() do
+      failures_count / Enum.max([get_stat(:affected_count, 0), 1])
+    else
+      _ -> :error
+    end
+  end
+
+  defp records_per_second do
+    get_stat(:iteration_processed_count, 0) / Enum.max([running_time(), 1])
+  end
+
+  defp running_time do
+    NaiveDateTime.diff(NaiveDateTime.utc_now(), get_stat(:started_at, NaiveDateTime.utc_now()))
+  end
+
+  @hashtags_objects_cleanup_query """
+  DELETE FROM hashtags_objects WHERE object_id IN
+    (SELECT DISTINCT objects.id FROM objects
+      JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities
+        ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') =
+          (objects.data->>'id')
+        AND activities.data->>'type' = 'Create'
+      WHERE activities.id IS NULL);
+  """
+
+  @hashtags_cleanup_query """
+  DELETE FROM hashtags WHERE id IN
+    (SELECT hashtags.id FROM hashtags
+      LEFT OUTER JOIN hashtags_objects
+        ON hashtags_objects.hashtag_id = hashtags.id
+      WHERE hashtags_objects.hashtag_id IS NULL);
+  """
+
+  @doc """
+  Deletes `hashtags_objects` for legacy objects not asoociated with Create activity.
+  Also deletes unreferenced `hashtags` records (might occur after deletion of `hashtags_objects`).
+  """
+  def delete_non_create_activities_hashtags do
+    {:ok, %{num_rows: hashtags_objects_count}} =
+      Repo.query(@hashtags_objects_cleanup_query, [], timeout: :infinity)
+
+    {:ok, %{num_rows: hashtags_count}} =
+      Repo.query(@hashtags_cleanup_query, [], timeout: :infinity)
+
+    {:ok, hashtags_objects_count, hashtags_count}
+  end
+
+  defp query do
+    # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out)
+    # Note: not checking activity type, expecting remove_non_create_objects_hashtags/_ to clean up
+    from(
+      object in Object,
+      where:
+        fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data),
+      select: %{
+        id: object.id,
+        tag: fragment("(?)->'tag'", object.data)
+      }
+    )
+    |> join(:left, [o], hashtags_objects in fragment("SELECT object_id FROM hashtags_objects"),
+      on: hashtags_objects.object_id == o.id
+    )
+    |> where([_o, hashtags_objects], is_nil(hashtags_objects.object_id))
+  end
+
+  @spec transfer_object_hashtags(Map.t()) :: {:noop | :ok | :error, integer()}
+  defp transfer_object_hashtags(object) do
+    embedded_tags = if Map.has_key?(object, :tag), do: object.tag, else: object.data["tag"]
+    hashtags = Object.object_data_hashtags(%{"tag" => embedded_tags})
+
+    if Enum.any?(hashtags) do
+      transfer_object_hashtags(object, hashtags)
+    else
+      {:noop, object.id}
+    end
+  end
+
+  defp transfer_object_hashtags(object, hashtags) do
+    Repo.transaction(fn ->
+      with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do
+        maps = Enum.map(hashtag_records, &%{hashtag_id: &1.id, object_id: object.id})
+        base_error = "ERROR when inserting hashtags_objects for object with id #{object.id}"
+
+        try do
+          with {rows_count, _} when is_integer(rows_count) <-
+                 Repo.insert_all("hashtags_objects", maps, on_conflict: :nothing) do
+            object.id
+          else
+            e ->
+              Logger.error("#{base_error}: #{inspect(e)}")
+              Repo.rollback(object.id)
+          end
+        rescue
+          e ->
+            Logger.error("#{base_error}: #{inspect(e)}")
+            Repo.rollback(object.id)
+        end
+      else
+        e ->
+          error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}"
+          Logger.error(error)
+          Repo.rollback(object.id)
+      end
+    end)
+  end
+
+  @doc "Approximate count for current iteration (including processed records count)"
+  def count(force \\ false, timeout \\ :infinity) do
+    stored_count = get_stat(:count)
+
+    if stored_count && !force do
+      stored_count
+    else
+      processed_count = get_stat(:processed_count, 0)
+      max_processed_id = get_stat(:max_processed_id, 0)
+      query = where(query(), [object], object.id > ^max_processed_id)
+
+      count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count
+      put_stat(:count, count)
+      persist_state()
+
+      count
+    end
+  end
+
+  defp on_complete(data_migration) do
+    if data_migration.feature_lock || feature_state() == :disabled do
+      Logger.warn("#{__MODULE__}: migration complete but feature is locked; consider enabling.")
+      :noop
+    else
+      Config.put(@feature_config_path, :enabled)
+      :ok
+    end
+  end
+
+  def failed_objects_query do
+    from(o in Object)
+    |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"),
+      on: dmf.record_id == o.id
+    )
+    |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id())
+    |> order_by([o], asc: o.id)
+  end
+
+  def failures_count do
+    with {:ok, %{rows: [[count]]}} <-
+           Repo.query(
+             "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;",
+             [data_migration_id()]
+           ) do
+      count
+    end
+  end
+
+  def retry_failed do
+    data_migration_id = data_migration_id()
+
+    failed_objects_query()
+    |> Repo.chunk_stream(100, :one)
+    |> Stream.each(fn object ->
+      with {res, _} when res != :error <- transfer_object_hashtags(object) do
+        _ =
+          Repo.query(
+            "DELETE FROM data_migration_failed_ids " <>
+              "WHERE data_migration_id = $1 AND record_id = $2",
+            [data_migration_id, object.id]
+          )
+      end
+    end)
+    |> Stream.run()
+
+    put_stat(:failed_count, failures_count())
+    persist_state()
+
+    force_continue()
+  end
+
+  def force_continue do
+    send(whereis(), :migrate_hashtags)
+  end
+
+  def force_restart do
+    :ok = State.reset()
+    force_continue()
+  end
+
+  def set_complete do
+    update_status(:complete)
+    persist_state()
+    on_complete(data_migration())
+  end
+
+  defp update_status(status, message \\ nil) do
+    put_stat(:state, status)
+    put_stat(:message, message)
+  end
+end
diff --git a/lib/pleroma/migrators/hashtags_table_migrator/state.ex b/lib/pleroma/migrators/hashtags_table_migrator/state.ex
new file mode 100644 (file)
index 0000000..ee0009b
--- /dev/null
@@ -0,0 +1,104 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Migrators.HashtagsTableMigrator.State do
+  use Agent
+
+  alias Pleroma.DataMigration
+
+  defdelegate data_migration(), to: Pleroma.Migrators.HashtagsTableMigrator
+
+  @reg_name {:global, __MODULE__}
+
+  def start_link(_) do
+    Agent.start_link(fn -> load_state_from_db() end, name: @reg_name)
+  end
+
+  defp load_state_from_db do
+    data_migration = data_migration()
+
+    data =
+      if data_migration do
+        Map.new(data_migration.data, fn {k, v} -> {String.to_atom(k), v} end)
+      else
+        %{}
+      end
+
+    %{
+      data_migration_id: data_migration && data_migration.id,
+      data: data
+    }
+  end
+
+  def persist_to_db do
+    %{data_migration_id: data_migration_id, data: data} = state()
+
+    if data_migration_id do
+      DataMigration.update_one_by_id(data_migration_id, data: data)
+    else
+      {:error, :nil_data_migration_id}
+    end
+  end
+
+  def reset do
+    %{data_migration_id: data_migration_id} = state()
+
+    with false <- is_nil(data_migration_id),
+         :ok <-
+           DataMigration.update_one_by_id(data_migration_id,
+             state: :pending,
+             data: %{}
+           ) do
+      reinit()
+    else
+      true -> {:error, :nil_data_migration_id}
+      e -> e
+    end
+  end
+
+  def reinit do
+    Agent.update(@reg_name, fn _state -> load_state_from_db() end)
+  end
+
+  def state do
+    Agent.get(@reg_name, & &1)
+  end
+
+  def get_data_key(key, default \\ nil) do
+    get_in(state(), [:data, key]) || default
+  end
+
+  def put_data_key(key, value) do
+    _ = persist_non_data_change(key, value)
+
+    Agent.update(@reg_name, fn state ->
+      put_in(state, [:data, key], value)
+    end)
+  end
+
+  def increment_data_key(key, increment \\ 1) do
+    Agent.update(@reg_name, fn state ->
+      initial_value = get_in(state, [:data, key]) || 0
+      updated_value = initial_value + increment
+      put_in(state, [:data, key], updated_value)
+    end)
+  end
+
+  defp persist_non_data_change(:state, value) do
+    with true <- get_data_key(:state) != value,
+         true <- value in Pleroma.DataMigration.State.__valid_values__(),
+         %{data_migration_id: data_migration_id} when not is_nil(data_migration_id) <- state() do
+      DataMigration.update_one_by_id(data_migration_id, state: value)
+    else
+      false -> :ok
+      _ -> {:error, :nil_data_migration_id}
+    end
+  end
+
+  defp persist_non_data_change(_, _) do
+    nil
+  end
+
+  def data_migration_id, do: Map.get(state(), :data_migration_id)
+end
index aaf12384004c37911063e4e82e9b8a3ec49a88e1..3ba749d1a36944e1120c53a07b28aca8d21e9003 100644 (file)
@@ -10,6 +10,7 @@ defmodule Pleroma.Object do
 
   alias Pleroma.Activity
   alias Pleroma.Config
+  alias Pleroma.Hashtag
   alias Pleroma.Object
   alias Pleroma.Object.Fetcher
   alias Pleroma.ObjectTombstone
@@ -28,6 +29,8 @@ defmodule Pleroma.Object do
   schema "objects" do
     field(:data, :map)
 
+    many_to_many(:hashtags, Hashtag, join_through: "hashtags_objects", on_replace: :delete)
+
     timestamps()
   end
 
@@ -49,7 +52,8 @@ defmodule Pleroma.Object do
   end
 
   def create(data) do
-    Object.change(%Object{}, %{data: data})
+    %Object{}
+    |> Object.change(%{data: data})
     |> Repo.insert()
   end
 
@@ -58,8 +62,41 @@ defmodule Pleroma.Object do
     |> cast(params, [:data])
     |> validate_required([:data])
     |> unique_constraint(:ap_id, name: :objects_unique_apid_index)
+    # Expecting `maybe_handle_hashtags_change/1` to run last:
+    |> maybe_handle_hashtags_change(struct)
+  end
+
+  # Note: not checking activity type (assuming non-legacy objects are associated with Create act.)
+  defp maybe_handle_hashtags_change(changeset, struct) do
+    with %Ecto.Changeset{valid?: true} <- changeset,
+         data_hashtags_change = get_change(changeset, :data),
+         {_, true} <- {:changed, hashtags_changed?(struct, data_hashtags_change)},
+         {:ok, hashtag_records} <-
+           data_hashtags_change
+           |> object_data_hashtags()
+           |> Hashtag.get_or_create_by_names() do
+      put_assoc(changeset, :hashtags, hashtag_records)
+    else
+      %{valid?: false} ->
+        changeset
+
+      {:changed, false} ->
+        changeset
+
+      {:error, _} ->
+        validate_change(changeset, :data, fn _, _ ->
+          [data: "error referencing hashtags"]
+        end)
+    end
+  end
+
+  defp hashtags_changed?(%Object{} = struct, %{"tag" => _} = data) do
+    Enum.sort(embedded_hashtags(struct)) !=
+      Enum.sort(object_data_hashtags(data))
   end
 
+  defp hashtags_changed?(_, _), do: false
+
   def get_by_id(nil), do: nil
   def get_by_id(id), do: Repo.get(Object, id)
 
@@ -187,9 +224,13 @@ defmodule Pleroma.Object do
   def swap_object_with_tombstone(object) do
     tombstone = make_tombstone(object)
 
-    object
-    |> Object.change(%{data: tombstone})
-    |> Repo.update()
+    with {:ok, object} <-
+           object
+           |> Object.change(%{data: tombstone})
+           |> Repo.update() do
+      Hashtag.unlink(object)
+      {:ok, object}
+    end
   end
 
   def delete(%Object{data: %{"id" => id}} = object) do
@@ -349,4 +390,39 @@ defmodule Pleroma.Object do
 
   def self_replies(object, opts \\ []),
     do: replies(object, Keyword.put(opts, :self_only, true))
+
+  def tags(%Object{data: %{"tag" => tags}}) when is_list(tags), do: tags
+
+  def tags(_), do: []
+
+  def hashtags(%Object{} = object) do
+    # Note: always using embedded hashtags regardless whether they are migrated to hashtags table
+    #   (embedded hashtags stay in sync anyways, and we avoid extra joins and preload hassle)
+    embedded_hashtags(object)
+  end
+
+  def embedded_hashtags(%Object{data: data}) do
+    object_data_hashtags(data)
+  end
+
+  def embedded_hashtags(_), do: []
+
+  def object_data_hashtags(%{"tag" => tags}) when is_list(tags) do
+    tags
+    |> Enum.filter(fn
+      %{"type" => "Hashtag"} = data -> Map.has_key?(data, "name")
+      plain_text when is_bitstring(plain_text) -> true
+      _ -> false
+    end)
+    |> Enum.map(fn
+      %{"name" => "#" <> hashtag} -> String.downcase(hashtag)
+      %{"name" => hashtag} -> String.downcase(hashtag)
+      hashtag when is_bitstring(hashtag) -> String.downcase(hashtag)
+    end)
+    |> Enum.uniq()
+    # Note: "" elements (plain text) might occur in `data.tag` for incoming objects
+    |> Enum.filter(&(&1 not in [nil, ""]))
+  end
+
+  def object_data_hashtags(_), do: []
 end
index 4556352d073bc079c7b4f9a0e2a903c351b9741b..61b64ed3ef997263ef0cb3a6ad0c6cbe275588ca 100644 (file)
@@ -63,8 +63,8 @@ defmodule Pleroma.Repo do
   iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500, :batches)
   """
   @spec chunk_stream(Ecto.Query.t(), integer(), atom()) :: Enumerable.t()
-  def chunk_stream(query, chunk_size, returns_as \\ :one) do
-    # We don't actually need start and end funcitons of resource streaming,
+  def chunk_stream(query, chunk_size, returns_as \\ :one, query_options \\ []) do
+    # We don't actually need start and end functions of resource streaming,
     # but it seems to be the only way to not fetch records one-by-one and
     # have individual records be the elements of the stream, instead of
     # lists of records
@@ -76,7 +76,7 @@ defmodule Pleroma.Repo do
           |> order_by(asc: :id)
           |> where([r], r.id > ^last_id)
           |> limit(^chunk_size)
-          |> all()
+          |> all(query_options)
           |> case do
             [] ->
               {:halt, last_id}
index 5b45e2ca1dca30c5488b53c4477d73958c25aeb3..9d557c2cdf8781e50a1be5684bd9dcc85dd64824 100644 (file)
@@ -10,6 +10,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
   alias Pleroma.Conversation
   alias Pleroma.Conversation.Participation
   alias Pleroma.Filter
+  alias Pleroma.Hashtag
   alias Pleroma.Maps
   alias Pleroma.Notification
   alias Pleroma.Object
@@ -693,51 +694,144 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
 
   defp restrict_since(query, _), do: query
 
-  defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
-    raise "Can't use the child object without preloading!"
+  defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
+    raise_on_missing_preload()
+  end
+
+  defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
+    from(
+      [_activity, object] in query,
+      where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
+    )
+  end
+
+  defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
+    restrict_embedded_tag_any(query, %{tag: tag})
+  end
+
+  defp restrict_embedded_tag_all(query, _), do: query
+
+  defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
+    raise_on_missing_preload()
+  end
+
+  defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
+    from(
+      [_activity, object] in query,
+      where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
+    )
+  end
+
+  defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
+    restrict_embedded_tag_any(query, %{tag: [tag]})
   end
 
-  defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
+  defp restrict_embedded_tag_any(query, _), do: query
+
+  defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
+    raise_on_missing_preload()
+  end
+
+  defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
     from(
       [_activity, object] in query,
       where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
     )
   end
 
-  defp restrict_tag_reject(query, _), do: query
+  defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
+       when is_binary(tag_reject) do
+    restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
+  end
 
-  defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
-    raise "Can't use the child object without preloading!"
+  defp restrict_embedded_tag_reject_any(query, _), do: query
+
+  defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
+    raise_on_missing_preload()
+  end
+
+  defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
+    restrict_hashtag_any(query, %{tag: single_tag})
   end
 
-  defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
+  defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
     from(
       [_activity, object] in query,
-      where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
+      where:
+        fragment(
+          """
+          (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
+            ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
+              AND hashtags_objects.object_id = ?) @> ?
+          """,
+          ^tags,
+          object.id,
+          ^tags
+        )
     )
   end
 
-  defp restrict_tag_all(query, _), do: query
+  defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
+    restrict_hashtag_all(query, %{tag_all: [tag]})
+  end
 
-  defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
-    raise "Can't use the child object without preloading!"
+  defp restrict_hashtag_all(query, _), do: query
+
+  defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
+    raise_on_missing_preload()
   end
 
-  defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
+  defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
     from(
       [_activity, object] in query,
-      where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
+      where:
+        fragment(
+          """
+          EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
+            ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
+              AND hashtags_objects.object_id = ? LIMIT 1)
+          """,
+          ^tags,
+          object.id
+        )
     )
   end
 
-  defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
+  defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
+    restrict_hashtag_any(query, %{tag: [tag]})
+  end
+
+  defp restrict_hashtag_any(query, _), do: query
+
+  defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
+    raise_on_missing_preload()
+  end
+
+  defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
     from(
       [_activity, object] in query,
-      where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
+      where:
+        fragment(
+          """
+          NOT EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
+            ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
+              AND hashtags_objects.object_id = ? LIMIT 1)
+          """,
+          ^tags_reject,
+          object.id
+        )
     )
   end
 
-  defp restrict_tag(query, _), do: query
+  defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
+    restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
+  end
+
+  defp restrict_hashtag_reject_any(query, _), do: query
+
+  defp raise_on_missing_preload do
+    raise "Can't use the child object without preloading!"
+  end
 
   defp restrict_recipients(query, [], _user), do: query
 
@@ -1098,6 +1192,21 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
 
   defp maybe_order(query, _), do: query
 
+  defp normalize_fetch_activities_query_opts(opts) do
+    Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
+      case opts[key] do
+        value when is_bitstring(value) ->
+          Map.put(opts, key, Hashtag.normalize_name(value))
+
+        value when is_list(value) ->
+          Map.put(opts, key, Enum.map(value, &Hashtag.normalize_name/1))
+
+        _ ->
+          opts
+      end
+    end)
+  end
+
   defp fetch_activities_query_ap_ids_ops(opts) do
     source_user = opts[:muting_user]
     ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
@@ -1121,6 +1230,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
   end
 
   def fetch_activities_query(recipients, opts \\ %{}) do
+    opts = normalize_fetch_activities_query_opts(opts)
+
     {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
       fetch_activities_query_ap_ids_ops(opts)
 
@@ -1128,41 +1239,51 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
       skip_thread_containment: Config.get([:instance, :skip_thread_containment])
     }
 
-    Activity
-    |> maybe_preload_objects(opts)
-    |> maybe_preload_bookmarks(opts)
-    |> maybe_preload_report_notes(opts)
-    |> maybe_set_thread_muted_field(opts)
-    |> maybe_order(opts)
-    |> restrict_recipients(recipients, opts[:user])
-    |> restrict_replies(opts)
-    |> restrict_tag(opts)
-    |> restrict_tag_reject(opts)
-    |> restrict_tag_all(opts)
-    |> restrict_since(opts)
-    |> restrict_local(opts)
-    |> restrict_remote(opts)
-    |> restrict_actor(opts)
-    |> restrict_type(opts)
-    |> restrict_state(opts)
-    |> restrict_favorited_by(opts)
-    |> restrict_blocked(restrict_blocked_opts)
-    |> restrict_muted(restrict_muted_opts)
-    |> restrict_filtered(opts)
-    |> restrict_media(opts)
-    |> restrict_visibility(opts)
-    |> restrict_thread_visibility(opts, config)
-    |> restrict_reblogs(opts)
-    |> restrict_pinned(opts)
-    |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
-    |> restrict_instance(opts)
-    |> restrict_announce_object_actor(opts)
-    |> restrict_filtered(opts)
-    |> Activity.restrict_deactivated_users()
-    |> exclude_poll_votes(opts)
-    |> exclude_chat_messages(opts)
-    |> exclude_invisible_actors(opts)
-    |> exclude_visibility(opts)
+    query =
+      Activity
+      |> maybe_preload_objects(opts)
+      |> maybe_preload_bookmarks(opts)
+      |> maybe_preload_report_notes(opts)
+      |> maybe_set_thread_muted_field(opts)
+      |> maybe_order(opts)
+      |> restrict_recipients(recipients, opts[:user])
+      |> restrict_replies(opts)
+      |> restrict_since(opts)
+      |> restrict_local(opts)
+      |> restrict_remote(opts)
+      |> restrict_actor(opts)
+      |> restrict_type(opts)
+      |> restrict_state(opts)
+      |> restrict_favorited_by(opts)
+      |> restrict_blocked(restrict_blocked_opts)
+      |> restrict_muted(restrict_muted_opts)
+      |> restrict_filtered(opts)
+      |> restrict_media(opts)
+      |> restrict_visibility(opts)
+      |> restrict_thread_visibility(opts, config)
+      |> restrict_reblogs(opts)
+      |> restrict_pinned(opts)
+      |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
+      |> restrict_instance(opts)
+      |> restrict_announce_object_actor(opts)
+      |> restrict_filtered(opts)
+      |> Activity.restrict_deactivated_users()
+      |> exclude_poll_votes(opts)
+      |> exclude_chat_messages(opts)
+      |> exclude_invisible_actors(opts)
+      |> exclude_visibility(opts)
+
+    if Config.feature_enabled?(:improved_hashtag_timeline) do
+      query
+      |> restrict_hashtag_any(opts)
+      |> restrict_hashtag_all(opts)
+      |> restrict_hashtag_reject_any(opts)
+    else
+      query
+      |> restrict_embedded_tag_any(opts)
+      |> restrict_embedded_tag_all(opts)
+      |> restrict_embedded_tag_reject_any(opts)
+    end
   end
 
   def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
index bb3838d2c18f009efefee8c2f18d1f373fac0abb..0b1be8c517bcecc24d0e7cb105dc6966a686709c 100644 (file)
@@ -74,9 +74,11 @@ defmodule Pleroma.Web.ActivityPub.MRF.SimplePolicy do
 
     object =
       if MRF.subdomain_match?(media_nsfw, actor_host) do
-        tags = (child_object["tag"] || []) ++ ["nsfw"]
-        child_object = Map.put(child_object, "tag", tags)
-        child_object = Map.put(child_object, "sensitive", true)
+        child_object =
+          child_object
+          |> Map.put("tag", (child_object["tag"] || []) ++ ["nsfw"])
+          |> Map.put("sensitive", true)
+
         Map.put(object, "object", child_object)
       else
         object
index 4d9a5617eb9b5b3fcdebfb44b0e3cf0ee9c8aed3..0a701334f00d08f7f57e2a8ac963e2e526414c2e 100644 (file)
@@ -32,18 +32,18 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
   """
   def fix_object(object, options \\ []) do
     object
-    |> strip_internal_fields
-    |> fix_actor
-    |> fix_url
-    |> fix_attachments
-    |> fix_context
+    |> strip_internal_fields()
+    |> fix_actor()
+    |> fix_url()
+    |> fix_attachments()
+    |> fix_context()
     |> fix_in_reply_to(options)
-    |> fix_emoji
-    |> fix_tag
-    |> set_sensitive
-    |> fix_content_map
-    |> fix_addressing
-    |> fix_summary
+    |> fix_emoji()
+    |> fix_tag()
+    |> set_sensitive()
+    |> fix_content_map()
+    |> fix_addressing()
+    |> fix_summary()
     |> fix_type(options)
   end
 
@@ -315,10 +315,9 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
     tags =
       tag
       |> Enum.filter(fn data -> data["type"] == "Hashtag" and data["name"] end)
-      |> Enum.map(fn %{"name" => name} ->
-        name
-        |> String.slice(1..-1)
-        |> String.downcase()
+      |> Enum.map(fn
+        %{"name" => "#" <> hashtag} -> String.downcase(hashtag)
+        %{"name" => hashtag} -> String.downcase(hashtag)
       end)
 
     Map.put(object, "tag", tag ++ tags)
index df97d2f46e1476df600585816a9940d70773a350..66940f3110a1abb7cbc46ccc9382851c1dd62da7 100644 (file)
@@ -32,6 +32,7 @@ defmodule Pleroma.Web.Feed.FeedView do
 
     %{
       activity: activity,
+      object: object,
       data: Map.get(object, :data),
       actor: actor
     }
index cef299aa49a1579719b3f017aade8edfced9076a..87effa00bcce3f431f93d4a4077a926c692e4192 100644 (file)
@@ -136,9 +136,9 @@ defmodule Pleroma.Web.MastodonAPI.TimelineController do
     tags =
       [params[:tag], params[:any]]
       |> List.flatten()
-      |> Enum.uniq()
       |> Enum.reject(&is_nil/1)
       |> Enum.map(&String.downcase/1)
+      |> Enum.uniq()
 
     tag_all =
       params
index 2cd6732fe0a5ee9cac257cc5dd9c5ca863936814..bbe0b11ecd221878d750ef25a00fb54e549ba46e 100644 (file)
@@ -201,8 +201,10 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do
     like_count = object.data["like_count"] || 0
     announcement_count = object.data["announcement_count"] || 0
 
-    tags = object.data["tag"] || []
-    sensitive = object.data["sensitive"] || Enum.member?(tags, "nsfw")
+    hashtags = Object.hashtags(object)
+    sensitive = object.data["sensitive"] || Enum.member?(hashtags, "nsfw")
+
+    tags = Object.tags(object)
 
     tag_mentions =
       tags
index 3fd150c4e7570b6d54e449a87ffac7d1adc74d7a..6688830baa954d081ac736703fe870e514535259 100644 (file)
@@ -22,7 +22,7 @@
     <link type="text/html" href='<%= @data["external_url"] %>' rel="alternate"/>
   <% end %>
 
-  <%= for tag <- @data["tag"] || [] do %>
+  <%= for tag <- Pleroma.Object.hashtags(@object) do %>
     <category term="<%= tag %>"></category>
   <% end %>
 
index 947bbb09970fa6f01788cf813414a1554eaf44a4..592b9dcdc4331b86ab4ee5c2c2c52203c0745593 100644 (file)
@@ -22,7 +22,7 @@
 
   <link rel="ostatus:conversation"><%= activity_context(@activity) %></link>
 
-  <%= for tag <- @data["tag"] || [] do %>
+  <%= for tag <- Pleroma.Object.hashtags(@object) do %>
     <category term="<%= tag %>"></category>
   <% end %>
 
index cf5874a91341cb8108631829aa150ec9fd70e15b..c2de28fe48888bea7d8f3fc8c71d6a576cc5a89b 100644 (file)
@@ -41,7 +41,7 @@
       <% end %>
     <% end %>
 
-    <%= for tag <- @data["tag"] || [] do %>
+    <%= for tag <- Pleroma.Object.hashtags(@object) do %>
       <category term="<%= tag %>"></category>
     <% end %>
 
index 43d61670555ad019d4fe1b7659d12d23f508b79d..bfac09f9e397e1bfbc734ec636f6f8ffff244e33 100644 (file)
@@ -9,7 +9,7 @@ defmodule Pleroma.Repo.Migrations.CreateSafeJsonbSet do
     begin
       result := jsonb_set(target, path, coalesce(new_value, 'null'::jsonb), create_missing);
       if result is NULL then
-        raise 'jsonb_set tried to wipe the object, please report this incindent to Pleroma bug tracker. https://git.pleroma.social/pleroma/pleroma/issues/new';
+        raise 'jsonb_set tried to wipe the object, please report this incident to Pleroma bug tracker. https://git.pleroma.social/pleroma/pleroma/issues/new';
         return target;
       else
         return result;
diff --git a/priv/repo/migrations/20201221202251_create_hashtags.exs b/priv/repo/migrations/20201221202251_create_hashtags.exs
new file mode 100644 (file)
index 0000000..8d2e9ae
--- /dev/null
@@ -0,0 +1,13 @@
+defmodule Pleroma.Repo.Migrations.CreateHashtags do
+  use Ecto.Migration
+
+  def change do
+    create_if_not_exists table(:hashtags) do
+      add(:name, :citext, null: false)
+
+      timestamps()
+    end
+
+    create_if_not_exists(unique_index(:hashtags, [:name]))
+  end
+end
diff --git a/priv/repo/migrations/20201221202252_remove_data_from_hashtags.exs b/priv/repo/migrations/20201221202252_remove_data_from_hashtags.exs
new file mode 100644 (file)
index 0000000..0442c3b
--- /dev/null
@@ -0,0 +1,15 @@
+defmodule Pleroma.Repo.Migrations.RemoveDataFromHashtags do
+  use Ecto.Migration
+
+  def up do
+    alter table(:hashtags) do
+      remove_if_exists(:data, :map)
+    end
+  end
+
+  def down do
+    alter table(:hashtags) do
+      add_if_not_exists(:data, :map, default: %{})
+    end
+  end
+end
diff --git a/priv/repo/migrations/20201221203824_create_hashtags_objects.exs b/priv/repo/migrations/20201221203824_create_hashtags_objects.exs
new file mode 100644 (file)
index 0000000..581f32b
--- /dev/null
@@ -0,0 +1,13 @@
+defmodule Pleroma.Repo.Migrations.CreateHashtagsObjects do
+  use Ecto.Migration
+
+  def change do
+    create_if_not_exists table(:hashtags_objects, primary_key: false) do
+      add(:hashtag_id, references(:hashtags), null: false, primary_key: true)
+      add(:object_id, references(:objects), null: false, primary_key: true)
+    end
+
+    # Note: PK index: "hashtags_objects_pkey" PRIMARY KEY, btree (hashtag_id, object_id)
+    create_if_not_exists(index(:hashtags_objects, [:object_id]))
+  end
+end
diff --git a/priv/repo/migrations/20210105195018_create_data_migrations.exs b/priv/repo/migrations/20210105195018_create_data_migrations.exs
new file mode 100644 (file)
index 0000000..5f2e8d9
--- /dev/null
@@ -0,0 +1,17 @@
+defmodule Pleroma.Repo.Migrations.CreateDataMigrations do
+  use Ecto.Migration
+
+  def change do
+    create_if_not_exists table(:data_migrations) do
+      add(:name, :string, null: false)
+      add(:state, :integer, default: 1)
+      add(:feature_lock, :boolean, default: false)
+      add(:params, :map, default: %{})
+      add(:data, :map, default: %{})
+
+      timestamps()
+    end
+
+    create_if_not_exists(unique_index(:data_migrations, [:name]))
+  end
+end
diff --git a/priv/repo/migrations/20210106183301_data_migration_create_populate_hashtags_table.exs b/priv/repo/migrations/20210106183301_data_migration_create_populate_hashtags_table.exs
new file mode 100644 (file)
index 0000000..cf3cf26
--- /dev/null
@@ -0,0 +1,16 @@
+defmodule Pleroma.Repo.Migrations.DataMigrationCreatePopulateHashtagsTable do
+  use Ecto.Migration
+
+  def up do
+    dt = NaiveDateTime.utc_now()
+
+    execute(
+      "INSERT INTO data_migrations(name, inserted_at, updated_at) " <>
+        "VALUES ('populate_hashtags_table', '#{dt}', '#{dt}') ON CONFLICT DO NOTHING;"
+    )
+  end
+
+  def down do
+    execute("DELETE FROM data_migrations WHERE name = 'populate_hashtags_table';")
+  end
+end
diff --git a/priv/repo/migrations/20210111172254_create_data_migration_failed_ids.exs b/priv/repo/migrations/20210111172254_create_data_migration_failed_ids.exs
new file mode 100644 (file)
index 0000000..18afa74
--- /dev/null
@@ -0,0 +1,14 @@
+defmodule Pleroma.Repo.Migrations.CreateDataMigrationFailedIds do
+  use Ecto.Migration
+
+  def change do
+    create_if_not_exists table(:data_migration_failed_ids, primary_key: false) do
+      add(:data_migration_id, references(:data_migrations), null: false, primary_key: true)
+      add(:record_id, :bigint, null: false, primary_key: true)
+    end
+
+    create_if_not_exists(
+      unique_index(:data_migration_failed_ids, [:data_migration_id, :record_id])
+    )
+  end
+end
diff --git a/priv/repo/migrations/20210222183840_remove_hashtags_objects_duplicate_index.exs b/priv/repo/migrations/20210222183840_remove_hashtags_objects_duplicate_index.exs
new file mode 100644 (file)
index 0000000..6c4a2df
--- /dev/null
@@ -0,0 +1,11 @@
+defmodule Pleroma.Repo.Migrations.RemoveHashtagsObjectsDuplicateIndex do
+  use Ecto.Migration
+
+  @moduledoc "Removes `hashtags_objects_hashtag_id_object_id_index` index (duplicate of PK index)."
+
+  def up do
+    drop_if_exists(unique_index(:hashtags_objects, [:hashtag_id, :object_id]))
+  end
+
+  def down, do: nil
+end
diff --git a/priv/repo/migrations/20210222184616_change_hashtags_name_to_text.exs b/priv/repo/migrations/20210222184616_change_hashtags_name_to_text.exs
new file mode 100644 (file)
index 0000000..8940b6c
--- /dev/null
@@ -0,0 +1,15 @@
+defmodule Pleroma.Repo.Migrations.ChangeHashtagsNameToText do
+  use Ecto.Migration
+
+  def up do
+    alter table(:hashtags) do
+      modify(:name, :text)
+    end
+  end
+
+  def down do
+    alter table(:hashtags) do
+      modify(:name, :citext)
+    end
+  end
+end
index 6b848e04d868f166ff8c88c34d49551bca0679e6..9c8e5d93276a5d1674ee216d8af238b024c55568 100644 (file)
@@ -11,6 +11,8 @@ defmodule Pleroma.Activity.Ir.TopicsTest do
 
   require Pleroma.Constants
 
+  import Mock
+
   describe "poll answer" do
     test "produce no topics" do
       activity = %Activity{object: %Object{data: %{"type" => "Answer"}}}
@@ -77,14 +79,13 @@ defmodule Pleroma.Activity.Ir.TopicsTest do
       refute Enum.member?(topics, "public:local:media")
     end
 
-    test "converts tags to hash tags", %{activity: %{object: %{data: data} = object} = activity} do
-      tagged_data = Map.put(data, "tag", ["foo", "bar"])
-      activity = %{activity | object: %{object | data: tagged_data}}
-
-      topics = Topics.get_activity_topics(activity)
+    test "converts tags to hash tags", %{activity: activity} do
+      with_mock(Object, [:passthrough], hashtags: fn _ -> ["foo", "bar"] end) do
+        topics = Topics.get_activity_topics(activity)
 
-      assert Enum.member?(topics, "hashtag:foo")
-      assert Enum.member?(topics, "hashtag:bar")
+        assert Enum.member?(topics, "hashtag:foo")
+        assert Enum.member?(topics, "hashtag:bar")
+      end
     end
 
     test "only converts strings to hash tags", %{
diff --git a/test/pleroma/hashtag_test.exs b/test/pleroma/hashtag_test.exs
new file mode 100644 (file)
index 0000000..0264dea
--- /dev/null
@@ -0,0 +1,17 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.HashtagTest do
+  use Pleroma.DataCase
+
+  alias Pleroma.Hashtag
+
+  describe "changeset validations" do
+    test "ensure non-blank :name" do
+      changeset = Hashtag.changeset(%Hashtag{}, %{name: ""})
+
+      assert {:name, {"can't be blank", [validation: :required]}} in changeset.errors
+    end
+  end
+end
index db7678d5d183ce9ab41079792924fccefc6513b9..8320660a50cacc3ae51acbb96ba5e9948745da31 100644 (file)
@@ -5,10 +5,13 @@
 defmodule Pleroma.ObjectTest do
   use Pleroma.DataCase
   use Oban.Testing, repo: Pleroma.Repo
+
   import ExUnit.CaptureLog
   import Pleroma.Factory
   import Tesla.Mock
+
   alias Pleroma.Activity
+  alias Pleroma.Hashtag
   alias Pleroma.Object
   alias Pleroma.Repo
   alias Pleroma.Tests.ObanHelpers
@@ -417,4 +420,28 @@ defmodule Pleroma.ObjectTest do
       assert updated_object.data["like_count"] == 1
     end
   end
+
+  describe ":hashtags association" do
+    test "Hashtag records are created with Object record and updated on its change" do
+      user = insert(:user)
+
+      {:ok, %{object: object}} =
+        CommonAPI.post(user, %{status: "some text #hashtag1 #hashtag2 ..."})
+
+      assert [%Hashtag{name: "hashtag1"}, %Hashtag{name: "hashtag2"}] =
+               Enum.sort_by(object.hashtags, & &1.name)
+
+      {:ok, object} = Object.update_data(object, %{"tag" => []})
+
+      assert [] = object.hashtags
+
+      object = Object.get_by_id(object.id) |> Repo.preload(:hashtags)
+      assert [] = object.hashtags
+
+      {:ok, object} = Object.update_data(object, %{"tag" => ["abc", "def"]})
+
+      assert [%Hashtag{name: "abc"}, %Hashtag{name: "def"}] =
+               Enum.sort_by(object.hashtags, & &1.name)
+    end
+  end
 end
index f4023856cec81971bb6a0ff10255ede627a86c31..f92323abe916c7e65053b5a8a845f563aff13cbb 100644 (file)
@@ -213,32 +213,64 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubTest do
   test "it fetches the appropriate tag-restricted posts" do
     user = insert(:user)
 
-    {:ok, status_one} = CommonAPI.post(user, %{status: ". #test"})
+    {:ok, status_one} = CommonAPI.post(user, %{status: ". #TEST"})
     {:ok, status_two} = CommonAPI.post(user, %{status: ". #essais"})
-    {:ok, status_three} = CommonAPI.post(user, %{status: ". #test #reject"})
+    {:ok, status_three} = CommonAPI.post(user, %{status: ". #test #Reject"})
 
-    fetch_one = ActivityPub.fetch_activities([], %{type: "Create", tag: "test"})
+    {:ok, status_four} = CommonAPI.post(user, %{status: ". #Any1 #any2"})
+    {:ok, status_five} = CommonAPI.post(user, %{status: ". #Any2 #any1"})
 
-    fetch_two = ActivityPub.fetch_activities([], %{type: "Create", tag: ["test", "essais"]})
+    for hashtag_timeline_strategy <- [:eanbled, :disabled] do
+      clear_config([:features, :improved_hashtag_timeline], hashtag_timeline_strategy)
 
-    fetch_three =
-      ActivityPub.fetch_activities([], %{
-        type: "Create",
-        tag: ["test", "essais"],
-        tag_reject: ["reject"]
-      })
+      fetch_one = ActivityPub.fetch_activities([], %{type: "Create", tag: "test"})
 
-    fetch_four =
-      ActivityPub.fetch_activities([], %{
-        type: "Create",
-        tag: ["test"],
-        tag_all: ["test", "reject"]
-      })
+      fetch_two = ActivityPub.fetch_activities([], %{type: "Create", tag: ["TEST", "essais"]})
+
+      fetch_three =
+        ActivityPub.fetch_activities([], %{
+          type: "Create",
+          tag: ["test", "Essais"],
+          tag_reject: ["reject"]
+        })
+
+      fetch_four =
+        ActivityPub.fetch_activities([], %{
+          type: "Create",
+          tag: ["test"],
+          tag_all: ["test", "REJECT"]
+        })
 
-    assert fetch_one == [status_one, status_three]
-    assert fetch_two == [status_one, status_two, status_three]
-    assert fetch_three == [status_one, status_two]
-    assert fetch_four == [status_three]
+      # Testing that deduplication (if needed) is done on DB (not Ecto) level; :limit is important
+      fetch_five =
+        ActivityPub.fetch_activities([], %{
+          type: "Create",
+          tag: ["ANY1", "any2"],
+          limit: 2
+        })
+
+      fetch_six =
+        ActivityPub.fetch_activities([], %{
+          type: "Create",
+          tag: ["any1", "Any2"],
+          tag_all: [],
+          tag_reject: []
+        })
+
+      # Regression test: passing empty lists as filter options shouldn't affect the results
+      assert fetch_five == fetch_six
+
+      [fetch_one, fetch_two, fetch_three, fetch_four, fetch_five] =
+        Enum.map([fetch_one, fetch_two, fetch_three, fetch_four, fetch_five], fn statuses ->
+          Enum.map(statuses, fn s -> Repo.preload(s, object: :hashtags) end)
+        end)
+
+      assert fetch_one == [status_one, status_three]
+      assert fetch_two == [status_one, status_two, status_three]
+      assert fetch_three == [status_one, status_two]
+      assert fetch_four == [status_three]
+      assert fetch_five == [status_four, status_five]
+    end
   end
 
   describe "insertion" do
index 31586abc90ec6cfd408cf07678176a33250ebba0..deb956410f3fa5544e02d1080303908eecc9d5cb 100644 (file)
@@ -39,7 +39,8 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier.NoteHandlingTest do
       {:ok, %Activity{data: data, local: false}} = Transmogrifier.handle_incoming(data)
       object = Object.normalize(data["object"], fetch: false)
 
-      assert "test" in object.data["tag"]
+      assert "test" in Object.tags(object)
+      assert Object.hashtags(object) == ["test"]
     end
 
     test "it cleans up incoming notices which are not really DMs" do
@@ -220,7 +221,8 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier.NoteHandlingTest do
       {:ok, %Activity{data: data, local: false}} = Transmogrifier.handle_incoming(data)
       object = Object.normalize(data["object"], fetch: false)
 
-      assert Enum.at(object.data["tag"], 2) == "moo"
+      assert Enum.at(Object.tags(object), 2) == "moo"
+      assert Object.hashtags(object) == ["moo"]
     end
 
     test "it works for incoming notices with contentMap" do
index adfe58def268babb03fbdeb771a956dc00bfaf66..9d005697cd0efc6d4dbe812d99513161dea30f9c 100644 (file)
@@ -493,7 +493,7 @@ defmodule Pleroma.Web.CommonAPITest do
 
     object = Object.normalize(activity, fetch: false)
 
-    assert object.data["tag"] == ["2hu"]
+    assert Object.tags(object) == ["2hu"]
   end
 
   test "it adds emoji in the object" do
index ed59cf285652cab805ea999b225c8d672cc26975..48f5f0dd0cda5eb7d62643b7a329948dbd9acd27 100644 (file)
@@ -262,8 +262,8 @@ defmodule Pleroma.Web.MastodonAPI.StatusViewTest do
       mentions: [],
       tags: [
         %{
-          name: "#{object_data["tag"]}",
-          url: "http://localhost:4001/tag/#{object_data["tag"]}"
+          name: "#{hd(object_data["tag"])}",
+          url: "http://localhost:4001/tag/#{hd(object_data["tag"])}"
         }
       ],
       application: %{