Merge remote-tracking branch 'upstream/develop' into earmark
[akkoma] / lib / pleroma / repo.ex
index f621384663266e7122484971af82d3da5ba90b79..b8ea06e33c9a33effe0bce819e0135ae77cc1bf2 100644 (file)
@@ -1,5 +1,5 @@
 # Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Repo do
@@ -8,11 +8,12 @@ defmodule Pleroma.Repo do
     adapter: Ecto.Adapters.Postgres,
     migration_timestamps: [type: :naive_datetime_usec]
 
+  use Ecto.Explain
+
+  import Ecto.Query
   require Logger
 
-  defmodule Instrumenter do
-    use Prometheus.EctoInstrumenter
-  end
+  defmodule Instrumenter, do: use(Prometheus.EctoInstrumenter)
 
   @doc """
   Dynamically loads the repository url from the
@@ -50,36 +51,49 @@ defmodule Pleroma.Repo do
     end
   end
 
-  def check_migrations_applied!() do
-    unless Pleroma.Config.get(
-             [:i_am_aware_this_may_cause_data_loss, :disable_migration_check],
-             false
-           ) do
-      Ecto.Migrator.with_repo(__MODULE__, fn repo ->
-        down_migrations =
-          Ecto.Migrator.migrations(repo)
-          |> Enum.reject(fn
-            {:up, _, _} -> true
-            {:down, _, _} -> false
-          end)
-
-        if length(down_migrations) > 0 do
-          down_migrations_text =
-            Enum.map(down_migrations, fn {:down, id, name} -> "- #{name} (#{id})\n" end)
-
-          Logger.error(
-            "The following migrations were not applied:\n#{down_migrations_text}If you want to start Pleroma anyway, set\nconfig :pleroma, :i_am_aware_this_may_cause_data_loss, disable_migration_check: true"
-          )
-
-          raise Pleroma.Repo.UnappliedMigrationsError
-        end
-      end)
-    else
-      :ok
-    end
-  end
-end
+  @doc """
+  Returns a lazy enumerable that emits all entries from the data store matching the given query.
+
+  `returns_as` use to group records. use the `batches` option to fetch records in bulk.
 
-defmodule Pleroma.Repo.UnappliedMigrationsError do
-  defexception message: "Unapplied Migrations detected"
+  ## Examples
+
+  # fetch records one-by-one
+  iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500)
+
+  # fetch records in bulk
+  iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500, :batches)
+  """
+  @spec chunk_stream(Ecto.Query.t(), integer(), atom()) :: Enumerable.t()
+  def chunk_stream(query, chunk_size, returns_as \\ :one, query_options \\ []) do
+    # We don't actually need start and end functions of resource streaming,
+    # but it seems to be the only way to not fetch records one-by-one and
+    # have individual records be the elements of the stream, instead of
+    # lists of records
+    Stream.resource(
+      fn -> 0 end,
+      fn
+        last_id ->
+          query
+          |> order_by(asc: :id)
+          |> where([r], r.id > ^last_id)
+          |> limit(^chunk_size)
+          |> all(query_options)
+          |> case do
+            [] ->
+              {:halt, last_id}
+
+            records ->
+              last_id = List.last(records).id
+
+              if returns_as == :one do
+                {records, last_id}
+              else
+                {[records], last_id}
+              end
+          end
+      end,
+      fn _ -> :ok end
+    )
+  end
 end