make bulk user creation from admin works as a transaction
[akkoma] / lib / pleroma / object.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Object do
6 use Ecto.Schema
7
8 alias Pleroma.Activity
9 alias Pleroma.Object
10 alias Pleroma.Object.Fetcher
11 alias Pleroma.ObjectTombstone
12 alias Pleroma.Repo
13 alias Pleroma.User
14
15 import Ecto.Query
16 import Ecto.Changeset
17
18 require Logger
19
20 schema "objects" do
21 field(:data, :map)
22
23 timestamps()
24 end
25
26 def create(data) do
27 Object.change(%Object{}, %{data: data})
28 |> Repo.insert()
29 end
30
31 def change(struct, params \\ %{}) do
32 struct
33 |> cast(params, [:data])
34 |> validate_required([:data])
35 |> unique_constraint(:ap_id, name: :objects_unique_apid_index)
36 end
37
38 def get_by_ap_id(nil), do: nil
39
40 def get_by_ap_id(ap_id) do
41 Repo.one(from(object in Object, where: fragment("(?)->>'id' = ?", object.data, ^ap_id)))
42 end
43
44 def normalize(_, fetch_remote \\ true)
45 # If we pass an Activity to Object.normalize(), we can try to use the preloaded object.
46 # Use this whenever possible, especially when walking graphs in an O(N) loop!
47 def normalize(%Object{} = object, _), do: object
48 def normalize(%Activity{object: %Object{} = object}, _), do: object
49
50 # A hack for fake activities
51 def normalize(%Activity{data: %{"object" => %{"fake" => true} = data}}, _) do
52 %Object{id: "pleroma:fake_object_id", data: data}
53 end
54
55 # Catch and log Object.normalize() calls where the Activity's child object is not
56 # preloaded.
57 def normalize(%Activity{data: %{"object" => %{"id" => ap_id}}}, fetch_remote) do
58 Logger.debug(
59 "Object.normalize() called without preloaded object (#{ap_id}). Consider preloading the object!"
60 )
61
62 Logger.debug("Backtrace: #{inspect(Process.info(:erlang.self(), :current_stacktrace))}")
63
64 normalize(ap_id, fetch_remote)
65 end
66
67 def normalize(%Activity{data: %{"object" => ap_id}}, fetch_remote) do
68 Logger.debug(
69 "Object.normalize() called without preloaded object (#{ap_id}). Consider preloading the object!"
70 )
71
72 Logger.debug("Backtrace: #{inspect(Process.info(:erlang.self(), :current_stacktrace))}")
73
74 normalize(ap_id, fetch_remote)
75 end
76
77 # Old way, try fetching the object through cache.
78 def normalize(%{"id" => ap_id}, fetch_remote), do: normalize(ap_id, fetch_remote)
79 def normalize(ap_id, false) when is_binary(ap_id), do: get_cached_by_ap_id(ap_id)
80 def normalize(ap_id, true) when is_binary(ap_id), do: Fetcher.fetch_object_from_id!(ap_id)
81 def normalize(_, _), do: nil
82
83 # Owned objects can only be mutated by their owner
84 def authorize_mutation(%Object{data: %{"actor" => actor}}, %User{ap_id: ap_id}),
85 do: actor == ap_id
86
87 # Legacy objects can be mutated by anybody
88 def authorize_mutation(%Object{}, %User{}), do: true
89
90 def get_cached_by_ap_id(ap_id) do
91 key = "object:#{ap_id}"
92
93 Cachex.fetch!(:object_cache, key, fn _ ->
94 object = get_by_ap_id(ap_id)
95
96 if object do
97 {:commit, object}
98 else
99 {:ignore, object}
100 end
101 end)
102 end
103
104 def context_mapping(context) do
105 Object.change(%Object{}, %{data: %{"id" => context}})
106 end
107
108 def make_tombstone(%Object{data: %{"id" => id, "type" => type}}, deleted \\ DateTime.utc_now()) do
109 %ObjectTombstone{
110 id: id,
111 formerType: type,
112 deleted: deleted
113 }
114 |> Map.from_struct()
115 end
116
117 def swap_object_with_tombstone(object) do
118 tombstone = make_tombstone(object)
119
120 object
121 |> Object.change(%{data: tombstone})
122 |> Repo.update()
123 end
124
125 def delete(%Object{data: %{"id" => id}} = object) do
126 with {:ok, _obj} = swap_object_with_tombstone(object),
127 deleted_activity = Activity.delete_by_ap_id(id),
128 {:ok, true} <- Cachex.del(:object_cache, "object:#{id}") do
129 {:ok, object, deleted_activity}
130 end
131 end
132
133 def set_cache(%Object{data: %{"id" => ap_id}} = object) do
134 Cachex.put(:object_cache, "object:#{ap_id}", object)
135 {:ok, object}
136 end
137
138 def update_and_set_cache(changeset) do
139 with {:ok, object} <- Repo.update(changeset) do
140 set_cache(object)
141 else
142 e -> e
143 end
144 end
145
146 def increase_replies_count(ap_id) do
147 Object
148 |> where([o], fragment("?->>'id' = ?::text", o.data, ^to_string(ap_id)))
149 |> update([o],
150 set: [
151 data:
152 fragment(
153 """
154 jsonb_set(?, '{repliesCount}',
155 (coalesce((?->>'repliesCount')::int, 0) + 1)::varchar::jsonb, true)
156 """,
157 o.data,
158 o.data
159 )
160 ]
161 )
162 |> Repo.update_all([])
163 |> case do
164 {1, [object]} -> set_cache(object)
165 _ -> {:error, "Not found"}
166 end
167 end
168
169 def decrease_replies_count(ap_id) do
170 Object
171 |> where([o], fragment("?->>'id' = ?::text", o.data, ^to_string(ap_id)))
172 |> update([o],
173 set: [
174 data:
175 fragment(
176 """
177 jsonb_set(?, '{repliesCount}',
178 (greatest(0, (?->>'repliesCount')::int - 1))::varchar::jsonb, true)
179 """,
180 o.data,
181 o.data
182 )
183 ]
184 )
185 |> Repo.update_all([])
186 |> case do
187 {1, [object]} -> set_cache(object)
188 _ -> {:error, "Not found"}
189 end
190 end
191 end