def handle_continue(:init_state, _state) do
{:ok, _} = State.start_link(nil)
- put_stat(:status, :init)
+ update_status(:init)
- dm = data_migration()
+ data_migration = data_migration()
manual_migrations = Config.get([:instance, :manual_data_migrations], [])
cond do
Config.get(:env) == :test ->
- put_stat(:status, :noop)
+ update_status(:noop)
- is_nil(dm) ->
- put_stat(:status, :halt)
- put_stat(:message, "Data migration does not exist.")
+ is_nil(data_migration) ->
+ update_status(:halt, "Data migration does not exist.")
- dm.state == :manual or dm.name in manual_migrations ->
- put_stat(:status, :noop)
- put_stat(:message, "Data migration is in manual execution state.")
+ data_migration.state == :manual or data_migration.name in manual_migrations ->
+ update_status(:noop, "Data migration is in manual execution state.")
- dm.state == :complete ->
- handle_success()
+ data_migration.state == :complete ->
+ handle_success(data_migration)
true ->
send(self(), :migrate_hashtags)
{:ok, data_migration} =
DataMigration.update(data_migration, %{state: :running, data: persistent_data})
- put_stat(:status, :running)
+ update_status(:running)
Logger.info("Starting transferring object embedded hashtags to `hashtags` table...")
) do
_ = DataMigration.update_state(data_migration, :complete)
- handle_success()
+ handle_success(data_migration)
else
_ ->
_ = DataMigration.update_state(data_migration, :failed)
- put_stat(:status, :failed)
- put_stat(:message, "Please check data_migration_failed_ids records.")
+ update_status(:failed, "Please check data_migration_failed_ids records.")
end
{:noreply, state}
_ = DataMigration.update(data_migration, %{data: runner_state})
end
- defp handle_success do
- put_stat(:status, :complete)
+ defp handle_success(data_migration) do
+ update_status(:complete)
- unless Config.improved_hashtag_timeline() do
+ unless data_migration.feature_lock || Config.improved_hashtag_timeline() do
Config.put(Config.improved_hashtag_timeline_path(), true)
end
:ok
end
+ def failed_objects_query do
+ from(o in Object)
+ |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"),
+ on: dmf.record_id == o.id
+ )
+ |> where([_o, dmf], dmf.data_migration_id == ^data_migration().id)
+ |> order_by([o], asc: o.id)
+ end
+
def force_continue do
send(whereis(), :migrate_hashtags)
end
{:ok, _} = DataMigration.update(data_migration(), %{state: :pending, data: %{}})
force_continue()
end
+
+ defp update_status(status, message \\ nil) do
+ put_stat(:status, status)
+ put_stat(:message, message)
+ end
end
defp restrict_since(query, _), do: query
- defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
+ defp restrict_embedded_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
raise_on_missing_preload()
end
- defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do
+ defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do
from(
[_activity, object] in query,
where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
)
end
- defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
- restrict_tag_reject(query, %{tag_reject: [tag_reject]})
+ defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject})
+ when is_binary(tag_reject) do
+ restrict_embedded_tag_reject(query, %{tag_reject: [tag_reject]})
end
- defp restrict_tag_reject(query, _), do: query
+ defp restrict_embedded_tag_reject(query, _), do: query
- defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
+ defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
raise_on_missing_preload()
end
- defp restrict_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
+ defp restrict_embedded_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
from(
[_activity, object] in query,
where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
)
end
- defp restrict_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
- restrict_tag(query, %{tag: tag})
+ defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
+ restrict_embedded_tag(query, %{tag: tag})
end
- defp restrict_tag_all(query, _), do: query
+ defp restrict_embedded_tag_all(query, _), do: query
- defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
+ defp restrict_embedded_tag(_query, %{tag: _tag, skip_preload: true}) do
raise_on_missing_preload()
end
- defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
+ defp restrict_embedded_tag(query, %{tag: tag}) when is_list(tag) do
from(
[_activity, object] in query,
where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
)
end
- defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
- restrict_tag(query, %{tag: [tag]})
+ defp restrict_embedded_tag(query, %{tag: tag}) when is_binary(tag) do
+ restrict_embedded_tag(query, %{tag: [tag]})
end
- defp restrict_tag(query, _), do: query
+ defp restrict_embedded_tag(query, _), do: query
- defp restrict_hashtag(query, opts) do
- [tag_any, tag_all, tag_reject] =
- [:tag, :tag_all, :tag_reject]
- |> Enum.map(&opts[&1])
- |> Enum.map(&List.wrap(&1))
+ defp hashtag_conditions(opts) do
+ [:tag, :tag_all, :tag_reject]
+ |> Enum.map(&opts[&1])
+ |> Enum.map(&List.wrap(&1))
+ end
+ defp restrict_hashtag_agg(query, opts) do
+ [tag_any, tag_all, tag_reject] = hashtag_conditions(opts)
has_conditions = Enum.any?([tag_any, tag_all, tag_reject], &Enum.any?(&1))
cond do
|> exclude_invisible_actors(opts)
|> exclude_visibility(opts)
+ hashtag_timeline_strategy = Config.improved_hashtag_timeline()
+
cond do
- Config.object_embedded_hashtags?() ->
+ !hashtag_timeline_strategy ->
query
- |> restrict_tag(opts)
- |> restrict_tag_reject(opts)
- |> restrict_tag_all(opts)
+ |> restrict_embedded_tag(opts)
+ |> restrict_embedded_tag_reject(opts)
+ |> restrict_embedded_tag_all(opts)
+
+ hashtag_timeline_strategy == :prefer_aggregation ->
+ restrict_hashtag_agg(query, opts)
- # TODO: benchmark (initial approach preferring non-aggregate ops when possible)
- Config.improved_hashtag_timeline() == :join ->
+ hashtag_timeline_strategy == :avoid_aggregation or avoid_hashtags_aggregation?(opts) ->
query
|> distinct([activity], true)
|> restrict_hashtag_any(opts)
|> restrict_hashtag_reject_any(opts)
true ->
- restrict_hashtag(query, opts)
+ restrict_hashtag_agg(query, opts)
end
end
+ defp avoid_hashtags_aggregation?(opts) do
+ [tag_any, tag_all, tag_reject] = hashtag_conditions(opts)
+
+ joins_count = length(tag_all) + if Enum.any?(tag_any), do: 1, else: 0
+ Enum.empty?(tag_reject) and joins_count <= 2
+ end
+
def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
list_memberships = Pleroma.List.memberships(opts[:user])