X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fmix%2Ftasks%2Fpleroma%2Fdatabase.ex;h=726a22d411a81e65fb52b4c564f8f6ff7e824ef3;hb=57eef6d76492e772f83acba2402d50ecb6a69f6b;hp=2403ed5813f40c6a311669f6b82fafe8f7aee184;hpb=a853218701c662ce452757a05395c8c6002c5a3d;p=akkoma diff --git a/lib/mix/tasks/pleroma/database.ex b/lib/mix/tasks/pleroma/database.ex index 2403ed581..726a22d41 100644 --- a/lib/mix/tasks/pleroma/database.ex +++ b/lib/mix/tasks/pleroma/database.ex @@ -8,14 +8,17 @@ defmodule Mix.Tasks.Pleroma.Database do alias Pleroma.Object alias Pleroma.Repo alias Pleroma.User + require Logger require Pleroma.Constants + import Ecto.Query import Mix.Pleroma + use Mix.Task @shortdoc "A collection of database related tasks" - @moduledoc File.read!("docs/administration/CLI_tasks/database.md") + @moduledoc File.read!("docs/docs/administration/CLI_tasks/database.md") def run(["remove_embedded_objects" | args]) do {options, [], []} = @@ -64,40 +67,166 @@ defmodule Mix.Tasks.Pleroma.Database do OptionParser.parse( args, strict: [ - vacuum: :boolean + vacuum: :boolean, + keep_threads: :boolean, + keep_non_public: :boolean, + prune_orphaned_activities: :boolean ] ) start_pleroma() deadline = Pleroma.Config.get([:instance, :remote_post_retention_days]) + time_deadline = NaiveDateTime.utc_now() |> NaiveDateTime.add(-(deadline * 86_400)) - Logger.info("Pruning objects older than #{deadline} days") + log_message = "Pruning objects older than #{deadline} days" - time_deadline = - NaiveDateTime.utc_now() - |> NaiveDateTime.add(-(deadline * 86_400)) + log_message = + if Keyword.get(options, :keep_non_public) do + log_message <> ", keeping non public posts" + else + log_message + end - from(o in Object, - where: - fragment( - "?->'to' \\? ? OR ?->'cc' \\? ?", - o.data, - ^Pleroma.Constants.as_public(), - o.data, - ^Pleroma.Constants.as_public() - ), - where: o.inserted_at < ^time_deadline, - where: + log_message = + if Keyword.get(options, :keep_threads) do + log_message <> ", keeping threads intact" + else + log_message + end + + log_message = + if Keyword.get(options, :prune_orphaned_activities) do + log_message <> ", pruning orphaned activities" + else + log_message + end + + log_message = + if Keyword.get(options, :vacuum) do + log_message <> + ", doing a full vacuum (you shouldn't do this as a recurring maintanance task)" + else + log_message + end + + Logger.info(log_message) + + if Keyword.get(options, :keep_threads) do + # We want to delete objects from threads where + # 1. the newest post is still old + # 2. none of the activities is local + # 3. none of the activities is bookmarked + # 4. optionally none of the posts is non-public + deletable_context = + if Keyword.get(options, :keep_non_public) do + Pleroma.Activity + |> join(:left, [a], b in Pleroma.Bookmark, on: a.id == b.activity_id) + |> group_by([a], fragment("? ->> 'context'::text", a.data)) + |> having( + [a], + not fragment( + # Posts (checked on Create Activity) is non-public + "bool_or((not(?->'to' \\? ? OR ?->'cc' \\? ?)) and ? ->> 'type' = 'Create')", + a.data, + ^Pleroma.Constants.as_public(), + a.data, + ^Pleroma.Constants.as_public(), + a.data + ) + ) + else + Pleroma.Activity + |> join(:left, [a], b in Pleroma.Bookmark, on: a.id == b.activity_id) + |> group_by([a], fragment("? ->> 'context'::text", a.data)) + end + |> having([a], max(a.updated_at) < ^time_deadline) + |> having([a], not fragment("bool_or(?)", a.local)) + |> having([_, b], fragment("max(?::text) is null", b.id)) + |> select([a], fragment("? ->> 'context'::text", a.data)) + + Pleroma.Object + |> where([o], fragment("? ->> 'context'::text", o.data) in subquery(deletable_context)) + else + if Keyword.get(options, :keep_non_public) do + Pleroma.Object + |> where( + [o], + fragment( + "?->'to' \\? ? OR ?->'cc' \\? ?", + o.data, + ^Pleroma.Constants.as_public(), + o.data, + ^Pleroma.Constants.as_public() + ) + ) + else + Pleroma.Object + end + |> where([o], o.updated_at < ^time_deadline) + |> where( + [o], fragment("split_part(?->>'actor', '/', 3) != ?", o.data, ^Pleroma.Web.Endpoint.host()) - ) + ) + end |> Repo.delete_all(timeout: :infinity) + if Keyword.get(options, :prune_orphaned_activities) do + # Prune activities who link to a single object + """ + delete from public.activities + where id in ( + select a.id from public.activities a + left join public.objects o on a.data ->> 'object' = o.data ->> 'id' + left join public.activities a2 on a.data ->> 'object' = a2.data ->> 'id' + left join public.users u on a.data ->> 'object' = u.ap_id + where not a.local + and jsonb_typeof(a."data" -> 'object') = 'string' + and o.id is null + and a2.id is null + and u.id is null + ) + """ + |> Repo.query([], timeout: :infinity) + + # Prune activities who link to an array of objects + """ + delete from public.activities + where id in ( + select a.id from public.activities a + join json_array_elements_text((a."data" -> 'object')::json) as j on jsonb_typeof(a."data" -> 'object') = 'array' + left join public.objects o on j.value = o.data ->> 'id' + left join public.activities a2 on j.value = a2.data ->> 'id' + left join public.users u on j.value = u.ap_id + group by a.id + having max(o.data ->> 'id') is null + and max(a2.data ->> 'id') is null + and max(u.ap_id) is null + ) + """ + |> Repo.query([], timeout: :infinity) + end + + """ + DELETE FROM hashtags AS ht + WHERE NOT EXISTS ( + SELECT 1 FROM hashtags_objects hto + WHERE ht.id = hto.hashtag_id) + """ + |> Repo.query() + if Keyword.get(options, :vacuum) do Maintenance.vacuum("full") end end + def run(["prune_task"]) do + start_pleroma() + + nil + |> Pleroma.Workers.Cron.PruneDatabaseWorker.perform() + end + def run(["fix_likes_collections"]) do start_pleroma() @@ -197,7 +326,9 @@ defmodule Mix.Tasks.Pleroma.Database do new.fts_content := to_tsvector(new.data->>'content'); RETURN new; END - $$ LANGUAGE plpgsql" + $$ LANGUAGE plpgsql", + [], + timeout: :infinity ) shell_info("Refresh RUM index") @@ -207,11 +338,41 @@ defmodule Mix.Tasks.Pleroma.Database do Ecto.Adapters.SQL.query!( Pleroma.Repo, - "CREATE INDEX objects_fts ON objects USING gin(to_tsvector('#{tsconfig}', data->>'content')); " + "CREATE INDEX CONCURRENTLY objects_fts ON objects USING gin(to_tsvector('#{tsconfig}', data->>'content')); ", + [], + timeout: :infinity ) end shell_info('Done.') end end + + # Rolls back a specific migration (leaving subsequent migrations applied). + # WARNING: imposes a risk of unrecoverable data loss — proceed at your own responsibility. + # Based on https://stackoverflow.com/a/53825840 + def run(["rollback", version]) do + prompt = "SEVERE WARNING: this operation may result in unrecoverable data loss. Continue?" + + if shell_prompt(prompt, "n") in ~w(Yn Y y) do + {_, result, _} = + Ecto.Migrator.with_repo(Pleroma.Repo, fn repo -> + version = String.to_integer(version) + re = ~r/^#{version}_.*\.exs/ + path = Ecto.Migrator.migrations_path(repo) + + with {_, "" <> file} <- {:find, Enum.find(File.ls!(path), &String.match?(&1, re))}, + {_, [{mod, _} | _]} <- {:compile, Code.compile_file(Path.join(path, file))}, + {_, :ok} <- {:rollback, Ecto.Migrator.down(repo, version, mod)} do + {:ok, "Reversed migration: #{file}"} + else + {:find, _} -> {:error, "No migration found with version prefix: #{version}"} + {:compile, e} -> {:error, "Problem compiling migration module: #{inspect(e)}"} + {:rollback, e} -> {:error, "Problem reversing migration: #{inspect(e)}"} + end + end) + + shell_info(inspect(result)) + end + end end