Merge pull request 'Additional timeline query improvements from upstream' (#291)...
[akkoma] / test / support / data_case.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.DataCase do
6 @moduledoc """
7 This module defines the setup for tests requiring
8 access to the application's data layer.
9
10 You may define functions here to be used as helpers in
11 your tests.
12
13 Finally, if the test case interacts with the database,
14 it cannot be async. For this reason, every test runs
15 inside a transaction which is reset at the beginning
16 of the test unless the test case is marked as async.
17 """
18
19 use ExUnit.CaseTemplate
20
21 import Pleroma.Tests.Helpers, only: [clear_config: 2]
22
23 using do
24 quote do
25 alias Pleroma.Repo
26
27 import Ecto
28 import Ecto.Changeset
29 import Ecto.Query
30 import Pleroma.DataCase
31 use Pleroma.Tests.Helpers
32
33 # Sets up OAuth access with specified scopes
34 defp oauth_access(scopes, opts \\ []) do
35 user =
36 Keyword.get_lazy(opts, :user, fn ->
37 Pleroma.Factory.insert(:user)
38 end)
39
40 token =
41 Keyword.get_lazy(opts, :oauth_token, fn ->
42 Pleroma.Factory.insert(:oauth_token, user: user, scopes: scopes)
43 end)
44
45 %{user: user, token: token}
46 end
47 end
48 end
49
50 def clear_cachex do
51 Pleroma.Supervisor
52 |> Supervisor.which_children()
53 |> Enum.each(fn
54 {name, _, _, [Cachex]} ->
55 name
56 |> to_string
57 |> String.trim_leading("cachex_")
58 |> Kernel.<>("_cache")
59 |> String.to_existing_atom()
60 |> Cachex.clear()
61
62 _ ->
63 nil
64 end)
65 end
66
67 def setup_multi_process_mode(tags) do
68 :ok = Ecto.Adapters.SQL.Sandbox.checkout(Pleroma.Repo)
69
70 if tags[:async] do
71 Mox.stub_with(Pleroma.CachexMock, Pleroma.NullCache)
72 Mox.set_mox_private()
73 else
74 Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()})
75
76 Mox.set_mox_global()
77 Mox.stub_with(Pleroma.CachexMock, Pleroma.CachexProxy)
78 clear_cachex()
79 end
80
81 :ok
82 end
83
84 def setup_streamer(tags) do
85 if tags[:needs_streamer] do
86 start_supervised(%{
87 id: Pleroma.Web.Streamer.registry(),
88 start:
89 {Registry, :start_link, [[keys: :duplicate, name: Pleroma.Web.Streamer.registry()]]}
90 })
91 end
92
93 :ok
94 end
95
96 setup tags do
97 setup_multi_process_mode(tags)
98 setup_streamer(tags)
99 stub_pipeline()
100
101 Mox.verify_on_exit!()
102
103 :ok
104 end
105
106 def stub_pipeline do
107 Mox.stub_with(Pleroma.Web.ActivityPub.SideEffectsMock, Pleroma.Web.ActivityPub.SideEffects)
108
109 Mox.stub_with(
110 Pleroma.Web.ActivityPub.ObjectValidatorMock,
111 Pleroma.Web.ActivityPub.ObjectValidator
112 )
113
114 Mox.stub_with(Pleroma.Web.ActivityPub.MRFMock, Pleroma.Web.ActivityPub.MRF)
115 Mox.stub_with(Pleroma.Web.ActivityPub.ActivityPubMock, Pleroma.Web.ActivityPub.ActivityPub)
116 Mox.stub_with(Pleroma.Web.FederatorMock, Pleroma.Web.Federator)
117 Mox.stub_with(Pleroma.ConfigMock, Pleroma.Config)
118 end
119
120 def ensure_local_uploader(context) do
121 test_uploader = Map.get(context, :uploader) || Pleroma.Uploaders.Local
122
123 clear_config([Pleroma.Upload, :uploader], test_uploader)
124 clear_config([Pleroma.Upload, :filters], [])
125
126 :ok
127 end
128
129 @doc """
130 A helper that transform changeset errors to a map of messages.
131
132 changeset = Accounts.create_user(%{password: "short"})
133 assert "password is too short" in errors_on(changeset).password
134 assert %{password: ["password is too short"]} = errors_on(changeset)
135
136 """
137 def errors_on(changeset) do
138 Ecto.Changeset.traverse_errors(changeset, fn {message, opts} ->
139 Enum.reduce(opts, message, fn {key, value}, acc ->
140 String.replace(acc, "%{#{key}}", to_string(value))
141 end)
142 end)
143 end
144 end