Merge remote-tracking branch 'upstream/develop' into earmark
[akkoma] / lib / pleroma / repo.ex
index 6d85d70bc66c9ac6de1d0266c4937572402c2e95..b8ea06e33c9a33effe0bce819e0135ae77cc1bf2 100644 (file)
@@ -1,5 +1,5 @@
 # Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Repo do
@@ -8,12 +8,12 @@ defmodule Pleroma.Repo do
     adapter: Ecto.Adapters.Postgres,
     migration_timestamps: [type: :naive_datetime_usec]
 
+  use Ecto.Explain
+
   import Ecto.Query
   require Logger
 
-  defmodule Instrumenter do
-    use Prometheus.EctoInstrumenter
-  end
+  defmodule Instrumenter, do: use(Prometheus.EctoInstrumenter)
 
   @doc """
   Dynamically loads the repository url from the
@@ -51,37 +51,22 @@ defmodule Pleroma.Repo do
     end
   end
 
-  def check_migrations_applied!() do
-    unless Pleroma.Config.get(
-             [:i_am_aware_this_may_cause_data_loss, :disable_migration_check],
-             false
-           ) do
-      Ecto.Migrator.with_repo(__MODULE__, fn repo ->
-        down_migrations =
-          Ecto.Migrator.migrations(repo)
-          |> Enum.reject(fn
-            {:up, _, _} -> true
-            {:down, _, _} -> false
-          end)
-
-        if length(down_migrations) > 0 do
-          down_migrations_text =
-            Enum.map(down_migrations, fn {:down, id, name} -> "- #{name} (#{id})\n" end)
-
-          Logger.error(
-            "The following migrations were not applied:\n#{down_migrations_text}If you want to start Pleroma anyway, set\nconfig :pleroma, :i_am_aware_this_may_cause_data_loss, disable_migration_check: true"
-          )
-
-          raise Pleroma.Repo.UnappliedMigrationsError
-        end
-      end)
-    else
-      :ok
-    end
-  end
+  @doc """
+  Returns a lazy enumerable that emits all entries from the data store matching the given query.
+
+  `returns_as` use to group records. use the `batches` option to fetch records in bulk.
 
-  def chunk_stream(query, chunk_size) do
-    # We don't actually need start and end funcitons of resource streaming,
+  ## Examples
+
+  # fetch records one-by-one
+  iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500)
+
+  # fetch records in bulk
+  iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500, :batches)
+  """
+  @spec chunk_stream(Ecto.Query.t(), integer(), atom()) :: Enumerable.t()
+  def chunk_stream(query, chunk_size, returns_as \\ :one, query_options \\ []) do
+    # We don't actually need start and end functions of resource streaming,
     # but it seems to be the only way to not fetch records one-by-one and
     # have individual records be the elements of the stream, instead of
     # lists of records
@@ -93,21 +78,22 @@ defmodule Pleroma.Repo do
           |> order_by(asc: :id)
           |> where([r], r.id > ^last_id)
           |> limit(^chunk_size)
-          |> all()
+          |> all(query_options)
           |> case do
             [] ->
               {:halt, last_id}
 
             records ->
               last_id = List.last(records).id
-              {records, last_id}
+
+              if returns_as == :one do
+                {records, last_id}
+              else
+                {[records], last_id}
+              end
           end
       end,
       fn _ -> :ok end
     )
   end
 end
-
-defmodule Pleroma.Repo.UnappliedMigrationsError do
-  defexception message: "Unapplied Migrations detected"
-end