],
plugins: [Oban.Plugins.Pruner],
crontab: [
- {"0 1 * * *", Pleroma.Workers.Cron.HashtagsCleanupWorker},
{"0 0 * * 0", Pleroma.Workers.Cron.DigestEmailsWorker},
{"0 0 * * *", Pleroma.Workers.Cron.NewUsersDigestWorker}
]
type: {:list, :tuple},
description: "Settings for cron background jobs",
suggestions: [
- {"0 1 * * *", Pleroma.Workers.Cron.HashtagsCleanupWorker},
{"0 0 * * 0", Pleroma.Workers.Cron.DigestEmailsWorker},
{"0 0 * * *", Pleroma.Workers.Cron.NewUsersDigestWorker}
]
use Ecto.Schema
import Ecto.Changeset
+ import Ecto.Query
+ alias Ecto.Multi
alias Pleroma.Hashtag
+ alias Pleroma.Object
alias Pleroma.Repo
schema "hashtags" do
field(:name, :string)
- many_to_many(:objects, Pleroma.Object, join_through: "hashtags_objects", on_replace: :delete)
+ many_to_many(:objects, Object, join_through: "hashtags_objects", on_replace: :delete)
timestamps()
end
end
def get_or_create_by_names(names) when is_list(names) do
- Enum.reduce_while(names, {:ok, []}, fn name, {:ok, list} ->
- case get_or_create_by_name(name) do
- {:ok, %Hashtag{} = hashtag} ->
- {:cont, {:ok, list ++ [hashtag]}}
-
- error ->
- {:halt, error}
- end
- end)
+ timestamp = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
+
+ structs =
+ Enum.map(names, fn name ->
+ %Hashtag{}
+ |> changeset(%{name: name})
+ |> Map.get(:changes)
+ |> Map.merge(%{inserted_at: timestamp, updated_at: timestamp})
+ end)
+
+ with {:ok, %{query_op: hashtags}} <-
+ Multi.new()
+ |> Multi.insert_all(:insert_all_op, Hashtag, structs, on_conflict: :nothing)
+ |> Multi.run(:query_op, fn _repo, _changes ->
+ {:ok, Repo.all(from(ht in Hashtag, where: ht.name in ^names))}
+ end)
+ |> Repo.transaction() do
+ {:ok, hashtags}
+ else
+ {:error, _name, value, _changes_so_far} -> {:error, value}
+ end
end
def changeset(%Hashtag{} = struct, params) do
|> validate_required([:name])
|> unique_constraint(:name)
end
+
+ def unlink(%Object{id: object_id}) do
+ with {_, hashtag_ids} <-
+ from(hto in "hashtags_objects",
+ where: hto.object_id == ^object_id,
+ select: hto.hashtag_id
+ )
+ |> Repo.delete_all() do
+ delete_unreferenced(hashtag_ids)
+ end
+ end
+
+ @delete_unreferenced_query """
+ DELETE FROM hashtags WHERE id IN
+ (SELECT hashtags.id FROM hashtags
+ LEFT OUTER JOIN hashtags_objects
+ ON hashtags_objects.hashtag_id = hashtags.id
+ WHERE hashtags_objects.hashtag_id IS NULL AND hashtags.id = ANY($1));
+ """
+
+ def delete_unreferenced(ids) do
+ with {:ok, %{num_rows: deleted_count}} <- Repo.query(@delete_unreferenced_query, [ids]) do
+ {:ok, deleted_count}
+ end
+ end
end
def handle_info(:migrate_hashtags, state) do
State.clear()
- data_migration = data_migration()
+ update_status(:running)
+ put_stat(:started_at, NaiveDateTime.utc_now())
+ data_migration = data_migration()
persistent_data = Map.take(data_migration.data, ["max_processed_id"])
{:ok, data_migration} =
DataMigration.update(data_migration, %{state: :running, data: persistent_data})
- update_status(:running)
- put_stat(:started_at, NaiveDateTime.utc_now())
-
Logger.info("Starting transferring object embedded hashtags to `hashtags` table...")
max_processed_id = data_migration.data["max_processed_id"] || 0
|> Stream.run()
with 0 <- failures_count(data_migration.id) do
+ _ = delete_non_create_activities_hashtags()
+
{:ok, data_migration} = DataMigration.update_state(data_migration, :complete)
handle_success(data_migration)
{:noreply, state}
end
+ @hashtags_objects_cleanup_query """
+ DELETE FROM hashtags_objects WHERE object_id IN
+ (SELECT DISTINCT objects.id FROM objects
+ JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities
+ ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') =
+ (objects.data->>'id')
+ AND activities.data->>'type' = 'Create'
+ WHERE activities.id IS NULL);
+ """
+
+ @hashtags_cleanup_query """
+ DELETE FROM hashtags WHERE id IN
+ (SELECT hashtags.id FROM hashtags
+ LEFT OUTER JOIN hashtags_objects
+ ON hashtags_objects.hashtag_id = hashtags.id
+ WHERE hashtags_objects.hashtag_id IS NULL);
+ """
+
+ def delete_non_create_activities_hashtags do
+ {:ok, %{num_rows: hashtags_objects_count}} =
+ Repo.query(@hashtags_objects_cleanup_query, [], timeout: :infinity)
+
+ {:ok, %{num_rows: hashtags_count}} =
+ Repo.query(@hashtags_cleanup_query, [], timeout: :infinity)
+
+ {:ok, hashtags_objects_count, hashtags_count}
+ end
+
defp query do
# Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out)
- # Note: not checking activity type; HashtagsCleanupWorker should clean up unused records later
+ # Note: not checking activity type, expecting remove_non_create_objects_hashtags/_ to clean up
from(
object in Object,
where:
defp transfer_object_hashtags(object, hashtags) do
Repo.transaction(fn ->
with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do
- for hashtag_record <- hashtag_records do
- with {:ok, _} <-
- Repo.query(
- "insert into hashtags_objects(hashtag_id, object_id) values ($1, $2);",
- [hashtag_record.id, object.id]
- ) do
- nil
- else
- {:error, e} ->
- error =
- "ERROR: could not link object #{object.id} and hashtag " <>
- "#{hashtag_record.id}: #{inspect(e)}"
-
- Logger.error(error)
- Repo.rollback(object.id)
- end
+ maps = Enum.map(hashtag_records, &%{hashtag_id: &1.id, object_id: object.id})
+ expected_rows = length(hashtag_records)
+
+ with {^expected_rows, _} <- Repo.insert_all("hashtags_objects", maps) do
+ object.id
+ else
+ e ->
+ error =
+ "ERROR when inserting #{expected_rows} hashtags_objects " <>
+ "for object #{object.id}: #{inspect(e)}"
+
+ Logger.error(error)
+ Repo.rollback(object.id)
end
-
- object.id
else
e ->
error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}"
|> cast(params, [:data])
|> validate_required([:data])
|> unique_constraint(:ap_id, name: :objects_unique_apid_index)
+ # Expecting `maybe_handle_hashtags_change/1` to run last:
|> maybe_handle_hashtags_change(struct)
end
- # Note: not checking activity type; HashtagsCleanupWorker should clean up unused records later
+ # Note: not checking activity type (assuming non-legacy objects are associated with Create act.)
defp maybe_handle_hashtags_change(changeset, struct) do
- with data_hashtags_change = get_change(changeset, :data),
- true <- hashtags_changed?(struct, data_hashtags_change),
+ with %Ecto.Changeset{valid?: true} <- changeset,
+ data_hashtags_change = get_change(changeset, :data),
+ {_, true} <- {:changed, hashtags_changed?(struct, data_hashtags_change)},
{:ok, hashtag_records} <-
data_hashtags_change
|> object_data_hashtags()
|> Hashtag.get_or_create_by_names() do
put_assoc(changeset, :hashtags, hashtag_records)
else
- false ->
+ %{valid?: false} ->
changeset
- {:error, hashtag_changeset} ->
- failed_hashtag = get_field(hashtag_changeset, :name)
+ {:changed, false} ->
+ changeset
+ {:error, _} ->
validate_change(changeset, :data, fn _, _ ->
- [data: "error referencing hashtag: #{failed_hashtag}"]
+ [data: "error referencing hashtags"]
end)
end
end
def swap_object_with_tombstone(object) do
tombstone = make_tombstone(object)
- object
- |> Object.change(%{data: tombstone})
- |> Repo.update()
+ with {:ok, object} <-
+ object
+ |> Object.change(%{data: tombstone})
+ |> Repo.update() do
+ Hashtag.unlink(object)
+ {:ok, object}
+ end
end
def delete(%Object{data: %{"id" => id}} = object) do
+++ /dev/null
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Workers.Cron.HashtagsCleanupWorker do
- @moduledoc """
- The worker to clean up unused hashtags_objects and hashtags.
- """
-
- use Oban.Worker, queue: "hashtags_cleanup"
-
- alias Pleroma.Repo
-
- require Logger
-
- @hashtags_objects_query """
- DELETE FROM hashtags_objects WHERE object_id IN
- (SELECT DISTINCT objects.id FROM objects
- JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities
- ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') =
- (objects.data->>'id')
- AND activities.data->>'type' = 'Create'
- WHERE activities.id IS NULL);
- """
-
- @hashtags_query """
- DELETE FROM hashtags WHERE id IN
- (SELECT hashtags.id FROM hashtags
- LEFT OUTER JOIN hashtags_objects
- ON hashtags_objects.hashtag_id = hashtags.id
- WHERE hashtags_objects.hashtag_id IS NULL AND hashtags.inserted_at < $1);
- """
-
- @impl Oban.Worker
- def perform(_job) do
- Logger.info("Cleaning up unused `hashtags_objects` records...")
-
- {:ok, %{num_rows: hashtags_objects_count}} =
- Repo.query(@hashtags_objects_query, [], timeout: :infinity)
-
- Logger.info("Deleted #{hashtags_objects_count} unused `hashtags_objects` records.")
-
- Logger.info("Cleaning up unused `hashtags` records...")
-
- # Note: ignoring recently created hashtags since references are added after hashtag is created
- {:ok, %{num_rows: hashtags_count}} =
- Repo.query(@hashtags_query, [NaiveDateTime.add(NaiveDateTime.utc_now(), -3600 * 24)],
- timeout: :infinity
- )
-
- Logger.info("Deleted #{hashtags_count} unused `hashtags` records.")
-
- Logger.info("HashtagsCleanupWorker complete.")
-
- :ok
- end
-end