[#3213] Added query options support for Repo.chunk_stream/4.
authorIvan Tashkinov <ivantashkinov@gmail.com>
Thu, 31 Dec 2020 06:36:26 +0000 (09:36 +0300)
committerIvan Tashkinov <ivantashkinov@gmail.com>
Thu, 31 Dec 2020 06:36:26 +0000 (09:36 +0300)
Used infinite timeout in transfer_hashtags select query.

lib/mix/tasks/pleroma/database.ex
lib/pleroma/repo.ex

index d44bd34784b24b2c2bbd9d727c4464fb6d7fac9d..f903cf75bdc18fac39119cd44a0761f9ed2a8f88 100644 (file)
@@ -149,9 +149,9 @@ defmodule Mix.Tasks.Pleroma.Database do
         tag: fragment("(?)->>'tag'", object.data)
       }
     )
-    |> Pleroma.Repo.chunk_stream(100, :batches)
+    |> Repo.chunk_stream(100, :batches, timeout: :infinity)
     |> Stream.each(fn objects ->
-      Logger.info("Processing #{length(objects)} objects...")
+      Logger.info("Processing #{length(objects)} objects starting from id #{hd(objects).id}...")
 
       Enum.map(
         objects,
@@ -165,10 +165,9 @@ defmodule Mix.Tasks.Pleroma.Database do
             with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do
               for hashtag_record <- hashtag_records do
                 with {:ok, _} <-
-                       Ecto.Adapters.SQL.query(
-                         Repo,
-                         "insert into hashtags_objects(hashtag_id, object_id) values " <>
-                           "(#{hashtag_record.id}, #{object.id});"
+                       Repo.query(
+                         "insert into hashtags_objects(hashtag_id, object_id) values ($1, $2);",
+                         [hashtag_record.id, object.id]
                        ) do
                   :noop
                 else
index 4524bd5e2c766636382ba2c0c34d409d10c289b4..78711e6ac16eed14696cbb61444dee1f0f0ba59b 100644 (file)
@@ -63,8 +63,8 @@ defmodule Pleroma.Repo do
   iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500, :batches)
   """
   @spec chunk_stream(Ecto.Query.t(), integer(), atom()) :: Enumerable.t()
-  def chunk_stream(query, chunk_size, returns_as \\ :one) do
-    # We don't actually need start and end funcitons of resource streaming,
+  def chunk_stream(query, chunk_size, returns_as \\ :one, query_options \\ []) do
+    # We don't actually need start and end functions of resource streaming,
     # but it seems to be the only way to not fetch records one-by-one and
     # have individual records be the elements of the stream, instead of
     # lists of records
@@ -76,7 +76,7 @@ defmodule Pleroma.Repo do
           |> order_by(asc: :id)
           |> where([r], r.id > ^last_id)
           |> limit(^chunk_size)
-          |> all()
+          |> all(query_options)
           |> case do
             [] ->
               {:halt, last_id}