Merge branch 'ecto-rollback-in-test-env' into 'develop'
[akkoma] / lib / pleroma / object.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Object do
6 use Ecto.Schema
7
8 import Ecto.Query
9 import Ecto.Changeset
10
11 alias Pleroma.Activity
12 alias Pleroma.Config
13 alias Pleroma.Object
14 alias Pleroma.Object.Fetcher
15 alias Pleroma.ObjectTombstone
16 alias Pleroma.Repo
17 alias Pleroma.User
18 alias Pleroma.Workers.AttachmentsCleanupWorker
19
20 require Logger
21
22 @type t() :: %__MODULE__{}
23
24 @derive {Jason.Encoder, only: [:data]}
25
26 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
27
28 schema "objects" do
29 field(:data, :map)
30
31 timestamps()
32 end
33
34 def with_joined_activity(query, activity_type \\ "Create", join_type \\ :inner) do
35 object_position = Map.get(query.aliases, :object, 0)
36
37 join(query, join_type, [{object, object_position}], a in Activity,
38 on:
39 fragment(
40 "COALESCE(?->'object'->>'id', ?->>'object') = (? ->> 'id') AND (?->>'type' = ?) ",
41 a.data,
42 a.data,
43 object.data,
44 a.data,
45 ^activity_type
46 ),
47 as: :object_activity
48 )
49 end
50
51 def create(data) do
52 Object.change(%Object{}, %{data: data})
53 |> Repo.insert()
54 end
55
56 def change(struct, params \\ %{}) do
57 struct
58 |> cast(params, [:data])
59 |> validate_required([:data])
60 |> unique_constraint(:ap_id, name: :objects_unique_apid_index)
61 end
62
63 def get_by_id(nil), do: nil
64 def get_by_id(id), do: Repo.get(Object, id)
65
66 def get_by_id_and_maybe_refetch(id, opts \\ []) do
67 %{updated_at: updated_at} = object = get_by_id(id)
68
69 if opts[:interval] &&
70 NaiveDateTime.diff(NaiveDateTime.utc_now(), updated_at) > opts[:interval] do
71 case Fetcher.refetch_object(object) do
72 {:ok, %Object{} = object} ->
73 object
74
75 e ->
76 Logger.error("Couldn't refresh #{object.data["id"]}:\n#{inspect(e)}")
77 object
78 end
79 else
80 object
81 end
82 end
83
84 def get_by_ap_id(nil), do: nil
85
86 def get_by_ap_id(ap_id) do
87 Repo.one(from(object in Object, where: fragment("(?)->>'id' = ?", object.data, ^ap_id)))
88 end
89
90 @doc """
91 Get a single attachment by it's name and href
92 """
93 @spec get_attachment_by_name_and_href(String.t(), String.t()) :: Object.t() | nil
94 def get_attachment_by_name_and_href(name, href) do
95 query =
96 from(o in Object,
97 where: fragment("(?)->>'name' = ?", o.data, ^name),
98 where: fragment("(?)->>'href' = ?", o.data, ^href)
99 )
100
101 Repo.one(query)
102 end
103
104 defp warn_on_no_object_preloaded(ap_id) do
105 "Object.normalize() called without preloaded object (#{inspect(ap_id)}). Consider preloading the object"
106 |> Logger.debug()
107
108 Logger.debug("Backtrace: #{inspect(Process.info(:erlang.self(), :current_stacktrace))}")
109 end
110
111 def normalize(_, options \\ [fetch: false])
112
113 # If we pass an Activity to Object.normalize(), we can try to use the preloaded object.
114 # Use this whenever possible, especially when walking graphs in an O(N) loop!
115 def normalize(%Object{} = object, _), do: object
116 def normalize(%Activity{object: %Object{} = object}, _), do: object
117
118 # A hack for fake activities
119 def normalize(%Activity{data: %{"object" => %{"fake" => true} = data}}, _) do
120 %Object{id: "pleroma:fake_object_id", data: data}
121 end
122
123 # No preloaded object
124 def normalize(%Activity{data: %{"object" => %{"id" => ap_id}}}, options) do
125 warn_on_no_object_preloaded(ap_id)
126 normalize(ap_id, options)
127 end
128
129 # No preloaded object
130 def normalize(%Activity{data: %{"object" => ap_id}}, options) do
131 warn_on_no_object_preloaded(ap_id)
132 normalize(ap_id, options)
133 end
134
135 # Old way, try fetching the object through cache.
136 def normalize(%{"id" => ap_id}, options), do: normalize(ap_id, options)
137
138 def normalize(ap_id, options) when is_binary(ap_id) do
139 if Keyword.get(options, :fetch) do
140 Fetcher.fetch_object_from_id!(ap_id, options)
141 else
142 get_cached_by_ap_id(ap_id)
143 end
144 end
145
146 def normalize(_, _), do: nil
147
148 # Owned objects can only be accessed by their owner
149 def authorize_access(%Object{data: %{"actor" => actor}}, %User{ap_id: ap_id}) do
150 if actor == ap_id do
151 :ok
152 else
153 {:error, :forbidden}
154 end
155 end
156
157 # Legacy objects can be accessed by anybody
158 def authorize_access(%Object{}, %User{}), do: :ok
159
160 @spec get_cached_by_ap_id(String.t()) :: Object.t() | nil
161 def get_cached_by_ap_id(ap_id) do
162 key = "object:#{ap_id}"
163
164 with {:ok, nil} <- @cachex.get(:object_cache, key),
165 object when not is_nil(object) <- get_by_ap_id(ap_id),
166 {:ok, true} <- @cachex.put(:object_cache, key, object) do
167 object
168 else
169 {:ok, object} -> object
170 nil -> nil
171 end
172 end
173
174 def context_mapping(context) do
175 Object.change(%Object{}, %{data: %{"id" => context}})
176 end
177
178 def make_tombstone(%Object{data: %{"id" => id, "type" => type}}, deleted \\ DateTime.utc_now()) do
179 %ObjectTombstone{
180 id: id,
181 formerType: type,
182 deleted: deleted
183 }
184 |> Map.from_struct()
185 end
186
187 def swap_object_with_tombstone(object) do
188 tombstone = make_tombstone(object)
189
190 object
191 |> Object.change(%{data: tombstone})
192 |> Repo.update()
193 end
194
195 def delete(%Object{data: %{"id" => id}} = object) do
196 with {:ok, _obj} = swap_object_with_tombstone(object),
197 deleted_activity = Activity.delete_all_by_object_ap_id(id),
198 {:ok, _} <- invalid_object_cache(object) do
199 cleanup_attachments(
200 Config.get([:instance, :cleanup_attachments]),
201 %{"object" => object}
202 )
203
204 {:ok, object, deleted_activity}
205 end
206 end
207
208 @spec cleanup_attachments(boolean(), %{required(:object) => map()}) ::
209 {:ok, Oban.Job.t() | nil}
210 def cleanup_attachments(true, %{"object" => _} = params) do
211 AttachmentsCleanupWorker.enqueue("cleanup_attachments", params)
212 end
213
214 def cleanup_attachments(_, _), do: {:ok, nil}
215
216 def prune(%Object{data: %{"id" => _id}} = object) do
217 with {:ok, object} <- Repo.delete(object),
218 {:ok, _} <- invalid_object_cache(object) do
219 {:ok, object}
220 end
221 end
222
223 def invalid_object_cache(%Object{data: %{"id" => id}}) do
224 with {:ok, true} <- @cachex.del(:object_cache, "object:#{id}") do
225 @cachex.del(:web_resp_cache, URI.parse(id).path)
226 end
227 end
228
229 def set_cache(%Object{data: %{"id" => ap_id}} = object) do
230 @cachex.put(:object_cache, "object:#{ap_id}", object)
231 {:ok, object}
232 end
233
234 def update_and_set_cache(changeset) do
235 with {:ok, object} <- Repo.update(changeset) do
236 set_cache(object)
237 end
238 end
239
240 def increase_replies_count(ap_id) do
241 Object
242 |> where([o], fragment("?->>'id' = ?::text", o.data, ^to_string(ap_id)))
243 |> update([o],
244 set: [
245 data:
246 fragment(
247 """
248 safe_jsonb_set(?, '{repliesCount}',
249 (coalesce((?->>'repliesCount')::int, 0) + 1)::varchar::jsonb, true)
250 """,
251 o.data,
252 o.data
253 )
254 ]
255 )
256 |> Repo.update_all([])
257 |> case do
258 {1, [object]} -> set_cache(object)
259 _ -> {:error, "Not found"}
260 end
261 end
262
263 defp poll_is_multiple?(%Object{data: %{"anyOf" => [_ | _]}}), do: true
264
265 defp poll_is_multiple?(_), do: false
266
267 def decrease_replies_count(ap_id) do
268 Object
269 |> where([o], fragment("?->>'id' = ?::text", o.data, ^to_string(ap_id)))
270 |> update([o],
271 set: [
272 data:
273 fragment(
274 """
275 safe_jsonb_set(?, '{repliesCount}',
276 (greatest(0, (?->>'repliesCount')::int - 1))::varchar::jsonb, true)
277 """,
278 o.data,
279 o.data
280 )
281 ]
282 )
283 |> Repo.update_all([])
284 |> case do
285 {1, [object]} -> set_cache(object)
286 _ -> {:error, "Not found"}
287 end
288 end
289
290 def increase_vote_count(ap_id, name, actor) do
291 with %Object{} = object <- Object.normalize(ap_id, fetch: false),
292 "Question" <- object.data["type"] do
293 key = if poll_is_multiple?(object), do: "anyOf", else: "oneOf"
294
295 options =
296 object.data[key]
297 |> Enum.map(fn
298 %{"name" => ^name} = option ->
299 Kernel.update_in(option["replies"]["totalItems"], &(&1 + 1))
300
301 option ->
302 option
303 end)
304
305 voters = [actor | object.data["voters"] || []] |> Enum.uniq()
306
307 data =
308 object.data
309 |> Map.put(key, options)
310 |> Map.put("voters", voters)
311
312 object
313 |> Object.change(%{data: data})
314 |> update_and_set_cache()
315 else
316 _ -> :noop
317 end
318 end
319
320 @doc "Updates data field of an object"
321 def update_data(%Object{data: data} = object, attrs \\ %{}) do
322 object
323 |> Object.change(%{data: Map.merge(data || %{}, attrs)})
324 |> Repo.update()
325 end
326
327 def local?(%Object{data: %{"id" => id}}) do
328 String.starts_with?(id, Pleroma.Web.base_url() <> "/")
329 end
330
331 def replies(object, opts \\ []) do
332 object = Object.normalize(object, fetch: false)
333
334 query =
335 Object
336 |> where(
337 [o],
338 fragment("(?)->>'inReplyTo' = ?", o.data, ^object.data["id"])
339 )
340 |> order_by([o], asc: o.id)
341
342 if opts[:self_only] do
343 actor = object.data["actor"]
344 where(query, [o], fragment("(?)->>'actor' = ?", o.data, ^actor))
345 else
346 query
347 end
348 end
349
350 def self_replies(object, opts \\ []),
351 do: replies(object, Keyword.put(opts, :self_only, true))
352 end