Merge remote-tracking branch 'pleroma/develop' into cycles-router-mediaproxy
[akkoma] / lib / pleroma / object.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Object do
6 use Ecto.Schema
7
8 import Ecto.Query
9 import Ecto.Changeset
10
11 alias Pleroma.Activity
12 alias Pleroma.Config
13 alias Pleroma.Hashtag
14 alias Pleroma.Object
15 alias Pleroma.Object.Fetcher
16 alias Pleroma.ObjectTombstone
17 alias Pleroma.Repo
18 alias Pleroma.User
19 alias Pleroma.Workers.AttachmentsCleanupWorker
20
21 require Logger
22
23 @type t() :: %__MODULE__{}
24
25 @derive {Jason.Encoder, only: [:data]}
26
27 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
28
29 schema "objects" do
30 field(:data, :map)
31
32 many_to_many(:hashtags, Hashtag, join_through: "hashtags_objects", on_replace: :delete)
33
34 timestamps()
35 end
36
37 def with_joined_activity(query, activity_type \\ "Create", join_type \\ :inner) do
38 object_position = Map.get(query.aliases, :object, 0)
39
40 join(query, join_type, [{object, object_position}], a in Activity,
41 on:
42 fragment(
43 "COALESCE(?->'object'->>'id', ?->>'object') = (? ->> 'id') AND (?->>'type' = ?) ",
44 a.data,
45 a.data,
46 object.data,
47 a.data,
48 ^activity_type
49 ),
50 as: :object_activity
51 )
52 end
53
54 def create(data) do
55 %Object{}
56 |> Object.change(%{data: data})
57 |> Repo.insert()
58 end
59
60 def change(struct, params \\ %{}) do
61 struct
62 |> cast(params, [:data])
63 |> validate_required([:data])
64 |> unique_constraint(:ap_id, name: :objects_unique_apid_index)
65 # Expecting `maybe_handle_hashtags_change/1` to run last:
66 |> maybe_handle_hashtags_change(struct)
67 end
68
69 # Note: not checking activity type (assuming non-legacy objects are associated with Create act.)
70 defp maybe_handle_hashtags_change(changeset, struct) do
71 with %Ecto.Changeset{valid?: true} <- changeset,
72 data_hashtags_change = get_change(changeset, :data),
73 {_, true} <- {:changed, hashtags_changed?(struct, data_hashtags_change)},
74 {:ok, hashtag_records} <-
75 data_hashtags_change
76 |> object_data_hashtags()
77 |> Hashtag.get_or_create_by_names() do
78 put_assoc(changeset, :hashtags, hashtag_records)
79 else
80 %{valid?: false} ->
81 changeset
82
83 {:changed, false} ->
84 changeset
85
86 {:error, _} ->
87 validate_change(changeset, :data, fn _, _ ->
88 [data: "error referencing hashtags"]
89 end)
90 end
91 end
92
93 defp hashtags_changed?(%Object{} = struct, %{"tag" => _} = data) do
94 Enum.sort(embedded_hashtags(struct)) !=
95 Enum.sort(object_data_hashtags(data))
96 end
97
98 defp hashtags_changed?(_, _), do: false
99
100 def get_by_id(nil), do: nil
101 def get_by_id(id), do: Repo.get(Object, id)
102
103 def get_by_id_and_maybe_refetch(id, opts \\ []) do
104 %{updated_at: updated_at} = object = get_by_id(id)
105
106 if opts[:interval] &&
107 NaiveDateTime.diff(NaiveDateTime.utc_now(), updated_at) > opts[:interval] do
108 case Fetcher.refetch_object(object) do
109 {:ok, %Object{} = object} ->
110 object
111
112 e ->
113 Logger.error("Couldn't refresh #{object.data["id"]}:\n#{inspect(e)}")
114 object
115 end
116 else
117 object
118 end
119 end
120
121 def get_by_ap_id(nil), do: nil
122
123 def get_by_ap_id(ap_id) do
124 Repo.one(from(object in Object, where: fragment("(?)->>'id' = ?", object.data, ^ap_id)))
125 end
126
127 @doc """
128 Get a single attachment by it's name and href
129 """
130 @spec get_attachment_by_name_and_href(String.t(), String.t()) :: Object.t() | nil
131 def get_attachment_by_name_and_href(name, href) do
132 query =
133 from(o in Object,
134 where: fragment("(?)->>'name' = ?", o.data, ^name),
135 where: fragment("(?)->>'href' = ?", o.data, ^href)
136 )
137
138 Repo.one(query)
139 end
140
141 defp warn_on_no_object_preloaded(ap_id) do
142 "Object.normalize() called without preloaded object (#{inspect(ap_id)}). Consider preloading the object"
143 |> Logger.debug()
144
145 Logger.debug("Backtrace: #{inspect(Process.info(:erlang.self(), :current_stacktrace))}")
146 end
147
148 def normalize(_, options \\ [fetch: false])
149
150 # If we pass an Activity to Object.normalize(), we can try to use the preloaded object.
151 # Use this whenever possible, especially when walking graphs in an O(N) loop!
152 def normalize(%Object{} = object, _), do: object
153 def normalize(%Activity{object: %Object{} = object}, _), do: object
154
155 # A hack for fake activities
156 def normalize(%Activity{data: %{"object" => %{"fake" => true} = data}}, _) do
157 %Object{id: "pleroma:fake_object_id", data: data}
158 end
159
160 # No preloaded object
161 def normalize(%Activity{data: %{"object" => %{"id" => ap_id}}}, options) do
162 warn_on_no_object_preloaded(ap_id)
163 normalize(ap_id, options)
164 end
165
166 # No preloaded object
167 def normalize(%Activity{data: %{"object" => ap_id}}, options) do
168 warn_on_no_object_preloaded(ap_id)
169 normalize(ap_id, options)
170 end
171
172 # Old way, try fetching the object through cache.
173 def normalize(%{"id" => ap_id}, options), do: normalize(ap_id, options)
174
175 def normalize(ap_id, options) when is_binary(ap_id) do
176 if Keyword.get(options, :fetch) do
177 Fetcher.fetch_object_from_id!(ap_id, options)
178 else
179 get_cached_by_ap_id(ap_id)
180 end
181 end
182
183 def normalize(_, _), do: nil
184
185 # Owned objects can only be accessed by their owner
186 def authorize_access(%Object{data: %{"actor" => actor}}, %User{ap_id: ap_id}) do
187 if actor == ap_id do
188 :ok
189 else
190 {:error, :forbidden}
191 end
192 end
193
194 # Legacy objects can be accessed by anybody
195 def authorize_access(%Object{}, %User{}), do: :ok
196
197 @spec get_cached_by_ap_id(String.t()) :: Object.t() | nil
198 def get_cached_by_ap_id(ap_id) do
199 key = "object:#{ap_id}"
200
201 with {:ok, nil} <- @cachex.get(:object_cache, key),
202 object when not is_nil(object) <- get_by_ap_id(ap_id),
203 {:ok, true} <- @cachex.put(:object_cache, key, object) do
204 object
205 else
206 {:ok, object} -> object
207 nil -> nil
208 end
209 end
210
211 def context_mapping(context) do
212 Object.change(%Object{}, %{data: %{"id" => context}})
213 end
214
215 def make_tombstone(%Object{data: %{"id" => id, "type" => type}}, deleted \\ DateTime.utc_now()) do
216 %ObjectTombstone{
217 id: id,
218 formerType: type,
219 deleted: deleted
220 }
221 |> Map.from_struct()
222 end
223
224 def swap_object_with_tombstone(object) do
225 tombstone = make_tombstone(object)
226
227 with {:ok, object} <-
228 object
229 |> Object.change(%{data: tombstone})
230 |> Repo.update() do
231 Hashtag.unlink(object)
232 {:ok, object}
233 end
234 end
235
236 def delete(%Object{data: %{"id" => id}} = object) do
237 with {:ok, _obj} = swap_object_with_tombstone(object),
238 deleted_activity = Activity.delete_all_by_object_ap_id(id),
239 {:ok, _} <- invalid_object_cache(object) do
240 cleanup_attachments(
241 Config.get([:instance, :cleanup_attachments]),
242 %{"object" => object}
243 )
244
245 {:ok, object, deleted_activity}
246 end
247 end
248
249 @spec cleanup_attachments(boolean(), %{required(:object) => map()}) ::
250 {:ok, Oban.Job.t() | nil}
251 def cleanup_attachments(true, %{"object" => _} = params) do
252 AttachmentsCleanupWorker.enqueue("cleanup_attachments", params)
253 end
254
255 def cleanup_attachments(_, _), do: {:ok, nil}
256
257 def prune(%Object{data: %{"id" => _id}} = object) do
258 with {:ok, object} <- Repo.delete(object),
259 {:ok, _} <- invalid_object_cache(object) do
260 {:ok, object}
261 end
262 end
263
264 def invalid_object_cache(%Object{data: %{"id" => id}}) do
265 with {:ok, true} <- @cachex.del(:object_cache, "object:#{id}") do
266 @cachex.del(:web_resp_cache, URI.parse(id).path)
267 end
268 end
269
270 def set_cache(%Object{data: %{"id" => ap_id}} = object) do
271 @cachex.put(:object_cache, "object:#{ap_id}", object)
272 {:ok, object}
273 end
274
275 def update_and_set_cache(changeset) do
276 with {:ok, object} <- Repo.update(changeset) do
277 set_cache(object)
278 end
279 end
280
281 def increase_replies_count(ap_id) do
282 Object
283 |> where([o], fragment("?->>'id' = ?::text", o.data, ^to_string(ap_id)))
284 |> update([o],
285 set: [
286 data:
287 fragment(
288 """
289 safe_jsonb_set(?, '{repliesCount}',
290 (coalesce((?->>'repliesCount')::int, 0) + 1)::varchar::jsonb, true)
291 """,
292 o.data,
293 o.data
294 )
295 ]
296 )
297 |> Repo.update_all([])
298 |> case do
299 {1, [object]} -> set_cache(object)
300 _ -> {:error, "Not found"}
301 end
302 end
303
304 defp poll_is_multiple?(%Object{data: %{"anyOf" => [_ | _]}}), do: true
305
306 defp poll_is_multiple?(_), do: false
307
308 def decrease_replies_count(ap_id) do
309 Object
310 |> where([o], fragment("?->>'id' = ?::text", o.data, ^to_string(ap_id)))
311 |> update([o],
312 set: [
313 data:
314 fragment(
315 """
316 safe_jsonb_set(?, '{repliesCount}',
317 (greatest(0, (?->>'repliesCount')::int - 1))::varchar::jsonb, true)
318 """,
319 o.data,
320 o.data
321 )
322 ]
323 )
324 |> Repo.update_all([])
325 |> case do
326 {1, [object]} -> set_cache(object)
327 _ -> {:error, "Not found"}
328 end
329 end
330
331 def increase_vote_count(ap_id, name, actor) do
332 with %Object{} = object <- Object.normalize(ap_id, fetch: false),
333 "Question" <- object.data["type"] do
334 key = if poll_is_multiple?(object), do: "anyOf", else: "oneOf"
335
336 options =
337 object.data[key]
338 |> Enum.map(fn
339 %{"name" => ^name} = option ->
340 Kernel.update_in(option["replies"]["totalItems"], &(&1 + 1))
341
342 option ->
343 option
344 end)
345
346 voters = [actor | object.data["voters"] || []] |> Enum.uniq()
347
348 data =
349 object.data
350 |> Map.put(key, options)
351 |> Map.put("voters", voters)
352
353 object
354 |> Object.change(%{data: data})
355 |> update_and_set_cache()
356 else
357 _ -> :noop
358 end
359 end
360
361 @doc "Updates data field of an object"
362 def update_data(%Object{data: data} = object, attrs \\ %{}) do
363 object
364 |> Object.change(%{data: Map.merge(data || %{}, attrs)})
365 |> Repo.update()
366 end
367
368 def local?(%Object{data: %{"id" => id}}) do
369 String.starts_with?(id, Pleroma.Web.base_url() <> "/")
370 end
371
372 def replies(object, opts \\ []) do
373 object = Object.normalize(object, fetch: false)
374
375 query =
376 Object
377 |> where(
378 [o],
379 fragment("(?)->>'inReplyTo' = ?", o.data, ^object.data["id"])
380 )
381 |> order_by([o], asc: o.id)
382
383 if opts[:self_only] do
384 actor = object.data["actor"]
385 where(query, [o], fragment("(?)->>'actor' = ?", o.data, ^actor))
386 else
387 query
388 end
389 end
390
391 def self_replies(object, opts \\ []),
392 do: replies(object, Keyword.put(opts, :self_only, true))
393
394 def tags(%Object{data: %{"tag" => tags}}) when is_list(tags), do: tags
395
396 def tags(_), do: []
397
398 def hashtags(%Object{} = object) do
399 # Note: always using embedded hashtags regardless whether they are migrated to hashtags table
400 # (embedded hashtags stay in sync anyways, and we avoid extra joins and preload hassle)
401 embedded_hashtags(object)
402 end
403
404 def embedded_hashtags(%Object{data: data}) do
405 object_data_hashtags(data)
406 end
407
408 def embedded_hashtags(_), do: []
409
410 def object_data_hashtags(%{"tag" => tags}) when is_list(tags) do
411 tags
412 |> Enum.filter(fn
413 %{"type" => "Hashtag"} = data -> Map.has_key?(data, "name")
414 plain_text when is_bitstring(plain_text) -> true
415 _ -> false
416 end)
417 |> Enum.map(fn
418 %{"name" => "#" <> hashtag} -> String.downcase(hashtag)
419 %{"name" => hashtag} -> String.downcase(hashtag)
420 hashtag when is_bitstring(hashtag) -> String.downcase(hashtag)
421 end)
422 |> Enum.uniq()
423 # Note: "" elements (plain text) might occur in `data.tag` for incoming objects
424 |> Enum.filter(&(&1 not in [nil, ""]))
425 end
426
427 def object_data_hashtags(_), do: []
428 end