X-Git-Url: https://git.squeep.com/?a=blobdiff_plain;f=lib%2Fmix%2Ftasks%2Fpleroma%2Fdatabase.ex;h=a098525038e008b77fcadbfa6fce1aa651295e24;hb=d0c2479710b40a88b3a74c85c9ed9e72e9a2bfaf;hp=7d8f00b089b50f6d2dc686a6865a8eec72a38ff1;hpb=724ed354f25fb83f65dff2fbadd4b5a121fb77d0;p=akkoma diff --git a/lib/mix/tasks/pleroma/database.ex b/lib/mix/tasks/pleroma/database.ex index 7d8f00b08..a09852503 100644 --- a/lib/mix/tasks/pleroma/database.ex +++ b/lib/mix/tasks/pleroma/database.ex @@ -48,9 +48,15 @@ defmodule Mix.Tasks.Pleroma.Database do def run(["update_users_following_followers_counts"]) do start_pleroma() - User - |> Repo.all() - |> Enum.each(&User.update_follower_count/1) + Repo.transaction( + fn -> + from(u in User, select: u) + |> Repo.stream() + |> Stream.each(&User.update_follower_count/1) + |> Stream.run() + end, + timeout: :infinity + ) end def run(["prune_objects" | args]) do @@ -99,7 +105,7 @@ defmodule Mix.Tasks.Pleroma.Database do where: fragment("(?)->>'likes' is not null", object.data), select: %{id: object.id, likes: fragment("(?)->>'likes'", object.data)} ) - |> Pleroma.RepoStreamer.chunk_stream(100) + |> Pleroma.Repo.chunk_stream(100, :batches) |> Stream.each(fn objects -> ids = objects @@ -122,6 +128,57 @@ defmodule Mix.Tasks.Pleroma.Database do |> Stream.run() end + def run(["fill_old_hashtags", month_limit]) do + import Ecto.Query + + start_pleroma() + + month_limit = String.to_integer(month_limit) + + if month_limit < 1 do + shell_error("Invalid `month_limit` argument, needs to be greater than 1") + else + time_limit = DateTime.utc_now() |> Timex.shift(months: -month_limit) + + from( + o in Object, + where: fragment("(?)->>'hashtags' is null", o.data), + where: fragment("(?)->>'tag' != '[]'", o.data), + where: o.inserted_at < ^time_limit, + select: %{id: o.id, tag: fragment("(?)->>'tag'", o.data)} + ) + |> Pleroma.Repo.chunk_stream(200, :batches) + |> Stream.each(fn objects -> + Repo.transaction(fn -> + objects_first = objects |> List.first() + objects_last = objects |> List.last() + + Logger.info( + "fill_old_hashtags: #{objects_first.id} (#{objects_first.inserted_at}) -- #{ + objects_last.id + } (#{objects_last.inserted_at})" + ) + + objects + |> Enum.map(fn object -> + tags = + object.tag + |> Jason.decode!() + |> Enum.filter(&is_bitstring(&1)) + + Object + |> where([o], o.id == ^object.id) + |> update([o], + set: [data: fragment("safe_jsonb_set(?, '{hashtags}', ?, true)", o.data, ^tags)] + ) + |> Repo.update_all([], timeout: :infinity) + end) + end) + end) + |> Stream.run() + end + end + def run(["vacuum", args]) do start_pleroma() @@ -133,8 +190,7 @@ defmodule Mix.Tasks.Pleroma.Database do days = Pleroma.Config.get([:mrf_activity_expiration, :days], 365) Pleroma.Activity - |> join(:left, [a], u in assoc(a, :expiration)) - |> join(:inner, [a, _u], o in Object, + |> join(:inner, [a], o in Object, on: fragment( "(?->>'id') = COALESCE((?)->'object'->> 'id', (?)->>'object')", @@ -144,14 +200,20 @@ defmodule Mix.Tasks.Pleroma.Database do ) ) |> where(local: true) - |> where([a, u], is_nil(u)) |> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data)) - |> where([_a, _u, o], fragment("?->>'type' = 'Note'", o.data)) - |> Pleroma.RepoStreamer.chunk_stream(100) + |> where([_a, o], fragment("?->>'type' = 'Note'", o.data)) + |> Pleroma.Repo.chunk_stream(100, :batches) |> Stream.each(fn activities -> Enum.each(activities, fn activity -> - expires_at = Timex.shift(activity.inserted_at, days: days) - Pleroma.ActivityExpiration.create(activity, expires_at, false) + expires_at = + activity.inserted_at + |> DateTime.from_naive!("Etc/UTC") + |> Timex.shift(days: days) + + Pleroma.Workers.PurgeExpiredActivity.enqueue(%{ + activity_id: activity.id, + expires_at: expires_at + }) end) end) |> Stream.run()