|> Stream.run()
end
- def run(["fill_old_hashtags"]) do
+ def run(["fill_old_hashtags", month_limit]) do
import Ecto.Query
start_pleroma()
- from(
- o in Object,
- where: fragment("(?)->>'hashtags' is null", o.data),
- where: fragment("(?)->>'tag' != '[]'", o.data),
- select: %{id: o.id, tag: fragment("(?)->>'tag'", o.data)},
- order_by: [:desc, o.id]
- )
- |> Pleroma.Repo.chunk_stream(200, :batches)
- |> Stream.each(fn objects ->
- Repo.transaction(fn ->
- objects_first = objects |> List.first()
- objects_last = objects |> List.last()
-
- Logger.info(
- "fill_old_hashtags: #{objects_first.id} (#{objects_first.inserted_at}) -- #{
- objects_last.id
- } (#{objects_last.inserted_at})"
- )
+ month_limit = String.to_integer(month_limit)
- objects
- |> Enum.map(fn object ->
- tags =
- object.tag
- |> Jason.decode!()
- |> Enum.filter(&is_bitstring(&1))
-
- Object
- |> where([o], o.id == ^object.id)
- |> update([o],
- set: [data: fragment("safe_jsonb_set(?, '{hashtags}', ?, true)", o.data, ^tags)]
+ if month_limit < 1 do
+ shell_error("Invalid `month_limit` argument, needs to be greater than 1")
+ else
+ time_limit = DateTime.utc_now() |> Timex.shift(months: -month_limit)
+
+ from(
+ o in Object,
+ where: fragment("(?)->>'hashtags' is null", o.data),
+ where: fragment("(?)->>'tag' != '[]'", o.data),
+ where: o.inserted_at < ^time_limit,
+ select: %{id: o.id, tag: fragment("(?)->>'tag'", o.data)}
+ )
+ |> Pleroma.Repo.chunk_stream(200, :batches)
+ |> Stream.each(fn objects ->
+ Repo.transaction(fn ->
+ objects_first = objects |> List.first()
+ objects_last = objects |> List.last()
+
+ Logger.info(
+ "fill_old_hashtags: #{objects_first.id} (#{objects_first.inserted_at}) -- #{
+ objects_last.id
+ } (#{objects_last.inserted_at})"
)
- |> Repo.update_all([], timeout: :infinity)
+
+ objects
+ |> Enum.map(fn object ->
+ tags =
+ object.tag
+ |> Jason.decode!()
+ |> Enum.filter(&is_bitstring(&1))
+
+ Object
+ |> where([o], o.id == ^object.id)
+ |> update([o],
+ set: [data: fragment("safe_jsonb_set(?, '{hashtags}', ?, true)", o.data, ^tags)]
+ )
+ |> Repo.update_all([], timeout: :infinity)
+ end)
end)
end)
- end)
- |> Stream.run()
+ |> Stream.run()
+ end
end
def run(["vacuum", args]) do