import Ecto.Query
import Pleroma.DataCase
use Pleroma.Tests.Helpers
+
+ # Sets up OAuth access with specified scopes
+ defp oauth_access(scopes, opts \\ []) do
+ user =
+ Keyword.get_lazy(opts, :user, fn ->
+ Pleroma.Factory.insert(:user)
+ end)
+
+ token =
+ Keyword.get_lazy(opts, :oauth_token, fn ->
+ Pleroma.Factory.insert(:oauth_token, user: user, scopes: scopes)
+ end)
+
+ %{user: user, token: token}
+ end
end
end
+ def clear_cachex do
+ Pleroma.Supervisor
+ |> Supervisor.which_children()
+ |> Enum.each(fn
+ {name, _, _, [Cachex]} ->
+ name
+ |> to_string
+ |> String.trim_leading("cachex_")
+ |> Kernel.<>("_cache")
+ |> String.to_existing_atom()
+ |> Cachex.clear()
+
+ _ ->
+ nil
+ end)
+ end
+
setup tags do
- Cachex.clear(:user_cache)
- Cachex.clear(:object_cache)
:ok = Ecto.Adapters.SQL.Sandbox.checkout(Pleroma.Repo)
- unless tags[:async] do
+ if tags[:async] do
+ Mox.stub_with(Pleroma.CachexMock, Pleroma.NullCache)
+ Mox.set_mox_private()
+ else
Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()})
+ Mox.stub_with(Pleroma.CachexMock, Pleroma.CachexProxy)
+ Mox.set_mox_global()
+ clear_cachex()
end
if tags[:needs_streamer] do
- start_supervised(Pleroma.Web.Streamer.supervisor())
+ start_supervised(%{
+ id: Pleroma.Web.Streamer.registry(),
+ start:
+ {Registry, :start_link, [[keys: :duplicate, name: Pleroma.Web.Streamer.registry()]]}
+ })
end
:ok