X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fmix%2Ftasks%2Fpleroma%2Fdatabase.ex;h=778de162f1687b7eadfbc1126bfa1c8f96e07b45;hb=4134abef63e1165f5701741c1012e64cb908654c;hp=bcc2052d6867283835eaa3405128c6c40dd166c2;hpb=8340fe8fcce15a22e2bef9d5db41ad58c3c009e0;p=akkoma diff --git a/lib/mix/tasks/pleroma/database.ex b/lib/mix/tasks/pleroma/database.ex index bcc2052d6..093c7dd30 100644 --- a/lib/mix/tasks/pleroma/database.ex +++ b/lib/mix/tasks/pleroma/database.ex @@ -1,46 +1,26 @@ # Pleroma: A lightweight social networking server -# Copyright © 2017-2018 Pleroma Authors +# Copyright © 2017-2020 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only defmodule Mix.Tasks.Pleroma.Database do alias Pleroma.Conversation + alias Pleroma.Hashtag + alias Pleroma.Maintenance alias Pleroma.Object alias Pleroma.Repo alias Pleroma.User + require Logger require Pleroma.Constants + + import Ecto.Query import Mix.Pleroma + use Mix.Task @shortdoc "A collection of database related tasks" - @moduledoc """ - A collection of database related tasks - - ## Replace embedded objects with their references - - Replaces embedded objects with references to them in the `objects` table. Only needs to be ran once. The reason why this is not a migration is because it could significantly increase the database size after being ran, however after this `VACUUM FULL` will be able to reclaim about 20% (really depends on what is in the database, your mileage may vary) of the db size before the migration. - - mix pleroma.database remove_embedded_objects - - Options: - - `--vacuum` - run `VACUUM FULL` after the embedded objects are replaced with their references - - ## Prune old objects from the database - - mix pleroma.database prune_objects - - ## Create a conversation for all existing DMs. Can be safely re-run. - - mix pleroma.database bump_all_conversations - - ## Remove duplicated items from following and update followers count for all users - - mix pleroma.database update_users_following_followers_counts + @moduledoc File.read!("docs/administration/CLI_tasks/database.md") - ## Fix the pre-existing "likes" collections for all objects - - mix pleroma.database fix_likes_collections - """ def run(["remove_embedded_objects" | args]) do {options, [], []} = OptionParser.parse( @@ -54,19 +34,13 @@ defmodule Mix.Tasks.Pleroma.Database do Logger.info("Removing embedded objects") Repo.query!( - "update activities set data = jsonb_set(data, '{object}'::text[], data->'object'->'id') where data->'object'->>'id' is not null;", + "update activities set data = safe_jsonb_set(data, '{object}'::text[], data->'object'->'id') where data->'object'->>'id' is not null;", [], timeout: :infinity ) if Keyword.get(options, :vacuum) do - Logger.info("Runnning VACUUM FULL") - - Repo.query!( - "vacuum full;", - [], - timeout: :infinity - ) + Maintenance.vacuum("full") end end @@ -78,14 +52,18 @@ defmodule Mix.Tasks.Pleroma.Database do def run(["update_users_following_followers_counts"]) do start_pleroma() - users = Repo.all(User) - Enum.each(users, &User.remove_duplicated_following/1) - Enum.each(users, &User.update_follower_count/1) + Repo.transaction( + fn -> + from(u in User, select: u) + |> Repo.stream() + |> Stream.each(&User.update_follower_count/1) + |> Stream.run() + end, + timeout: :infinity + ) end def run(["prune_objects" | args]) do - import Ecto.Query - {options, [], []} = OptionParser.parse( args, @@ -120,26 +98,18 @@ defmodule Mix.Tasks.Pleroma.Database do |> Repo.delete_all(timeout: :infinity) if Keyword.get(options, :vacuum) do - Logger.info("Runnning VACUUM FULL") - - Repo.query!( - "vacuum full;", - [], - timeout: :infinity - ) + Maintenance.vacuum("full") end end def run(["fix_likes_collections"]) do - import Ecto.Query - start_pleroma() from(object in Object, where: fragment("(?)->>'likes' is not null", object.data), select: %{id: object.id, likes: fragment("(?)->>'likes'", object.data)} ) - |> Pleroma.RepoStreamer.chunk_stream(100) + |> Pleroma.Repo.chunk_stream(100, :batches) |> Stream.each(fn objects -> ids = objects @@ -152,7 +122,7 @@ defmodule Mix.Tasks.Pleroma.Database do set: [ data: fragment( - "jsonb_set(?, '{likes}', '[]'::jsonb, true)", + "safe_jsonb_set(?, '{likes}', '[]'::jsonb, true)", object.data ) ] @@ -161,4 +131,104 @@ defmodule Mix.Tasks.Pleroma.Database do end) |> Stream.run() end + + def run(["transfer_hashtags"]) do + import Ecto.Query + + start_pleroma() + + from( + object in Object, + left_join: hashtag in assoc(object, :hashtags), + where: is_nil(hashtag.id), + where: fragment("(?)->>'tag' != '[]'", object.data), + select: %{ + id: object.id, + inserted_at: object.inserted_at, + tag: fragment("(?)->>'tag'", object.data) + }, + order_by: [desc: object.id] + ) + |> Pleroma.Repo.chunk_stream(100, :batches) + |> Stream.each(fn objects -> + chunk_start = List.first(objects) + chunk_end = List.last(objects) + + Logger.info( + "transfer_hashtags: " <> + "#{chunk_start.id} (#{chunk_start.inserted_at}) -- " <> + "#{chunk_end.id} (#{chunk_end.inserted_at})" + ) + + Enum.map( + objects, + fn object -> + hashtags = + object.tag + |> Jason.decode!() + |> Enum.filter(&is_bitstring(&1)) + + with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do + Repo.transaction(fn -> + for hashtag_record <- hashtag_records do + with {:error, _} <- + Ecto.Adapters.SQL.query( + Repo, + "insert into hashtags_objects(hashtag_id, object_id) values " <> + "(#{hashtag_record.id}, #{object.id});" + ) do + Logger.warn( + "ERROR: could not link object #{object.id} and hashtag #{hashtag_record.id}" + ) + end + end + end) + else + e -> Logger.warn("ERROR: could not process object #{object.id}: #{inspect(e)}") + end + end + ) + end) + |> Stream.run() + end + + def run(["vacuum", args]) do + start_pleroma() + + Maintenance.vacuum(args) + end + + def run(["ensure_expiration"]) do + start_pleroma() + days = Pleroma.Config.get([:mrf_activity_expiration, :days], 365) + + Pleroma.Activity + |> join(:inner, [a], o in Object, + on: + fragment( + "(?->>'id') = COALESCE((?)->'object'->> 'id', (?)->>'object')", + o.data, + a.data, + a.data + ) + ) + |> where(local: true) + |> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data)) + |> where([_a, o], fragment("?->>'type' = 'Note'", o.data)) + |> Pleroma.Repo.chunk_stream(100, :batches) + |> Stream.each(fn activities -> + Enum.each(activities, fn activity -> + expires_at = + activity.inserted_at + |> DateTime.from_naive!("Etc/UTC") + |> Timex.shift(days: days) + + Pleroma.Workers.PurgeExpiredActivity.enqueue(%{ + activity_id: activity.id, + expires_at: expires_at + }) + end) + end) + |> Stream.run() + end end