added example cache purge script
[akkoma] / lib / pleroma / object.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Object do
6 use Ecto.Schema
7
8 import Ecto.Query
9 import Ecto.Changeset
10
11 alias Pleroma.Activity
12 alias Pleroma.Config
13 alias Pleroma.Object
14 alias Pleroma.Object.Fetcher
15 alias Pleroma.ObjectTombstone
16 alias Pleroma.Repo
17 alias Pleroma.User
18 alias Pleroma.Workers.AttachmentsCleanupWorker
19
20 require Logger
21
22 @type t() :: %__MODULE__{}
23
24 @derive {Jason.Encoder, only: [:data]}
25
26 schema "objects" do
27 field(:data, :map)
28
29 timestamps()
30 end
31
32 def with_joined_activity(query, activity_type \\ "Create", join_type \\ :inner) do
33 object_position = Map.get(query.aliases, :object, 0)
34
35 join(query, join_type, [{object, object_position}], a in Activity,
36 on:
37 fragment(
38 "COALESCE(?->'object'->>'id', ?->>'object') = (? ->> 'id') AND (?->>'type' = ?) ",
39 a.data,
40 a.data,
41 object.data,
42 a.data,
43 ^activity_type
44 ),
45 as: :object_activity
46 )
47 end
48
49 def create(data) do
50 Object.change(%Object{}, %{data: data})
51 |> Repo.insert()
52 end
53
54 def change(struct, params \\ %{}) do
55 struct
56 |> cast(params, [:data])
57 |> validate_required([:data])
58 |> unique_constraint(:ap_id, name: :objects_unique_apid_index)
59 end
60
61 def get_by_id(nil), do: nil
62 def get_by_id(id), do: Repo.get(Object, id)
63
64 def get_by_id_and_maybe_refetch(id, opts \\ []) do
65 %{updated_at: updated_at} = object = get_by_id(id)
66
67 if opts[:interval] &&
68 NaiveDateTime.diff(NaiveDateTime.utc_now(), updated_at) > opts[:interval] do
69 case Fetcher.refetch_object(object) do
70 {:ok, %Object{} = object} ->
71 object
72
73 e ->
74 Logger.error("Couldn't refresh #{object.data["id"]}:\n#{inspect(e)}")
75 object
76 end
77 else
78 object
79 end
80 end
81
82 def get_by_ap_id(nil), do: nil
83
84 def get_by_ap_id(ap_id) do
85 Repo.one(from(object in Object, where: fragment("(?)->>'id' = ?", object.data, ^ap_id)))
86 end
87
88 @doc """
89 Get a single attachment by it's name and href
90 """
91 @spec get_attachment_by_name_and_href(String.t(), String.t()) :: Object.t() | nil
92 def get_attachment_by_name_and_href(name, href) do
93 query =
94 from(o in Object,
95 where: fragment("(?)->>'name' = ?", o.data, ^name),
96 where: fragment("(?)->>'href' = ?", o.data, ^href)
97 )
98
99 Repo.one(query)
100 end
101
102 defp warn_on_no_object_preloaded(ap_id) do
103 "Object.normalize() called without preloaded object (#{inspect(ap_id)}). Consider preloading the object"
104 |> Logger.debug()
105
106 Logger.debug("Backtrace: #{inspect(Process.info(:erlang.self(), :current_stacktrace))}")
107 end
108
109 def normalize(_, fetch_remote \\ true, options \\ [])
110
111 # If we pass an Activity to Object.normalize(), we can try to use the preloaded object.
112 # Use this whenever possible, especially when walking graphs in an O(N) loop!
113 def normalize(%Object{} = object, _, _), do: object
114 def normalize(%Activity{object: %Object{} = object}, _, _), do: object
115
116 # A hack for fake activities
117 def normalize(%Activity{data: %{"object" => %{"fake" => true} = data}}, _, _) do
118 %Object{id: "pleroma:fake_object_id", data: data}
119 end
120
121 # No preloaded object
122 def normalize(%Activity{data: %{"object" => %{"id" => ap_id}}}, fetch_remote, _) do
123 warn_on_no_object_preloaded(ap_id)
124 normalize(ap_id, fetch_remote)
125 end
126
127 # No preloaded object
128 def normalize(%Activity{data: %{"object" => ap_id}}, fetch_remote, _) do
129 warn_on_no_object_preloaded(ap_id)
130 normalize(ap_id, fetch_remote)
131 end
132
133 # Old way, try fetching the object through cache.
134 def normalize(%{"id" => ap_id}, fetch_remote, _), do: normalize(ap_id, fetch_remote)
135 def normalize(ap_id, false, _) when is_binary(ap_id), do: get_cached_by_ap_id(ap_id)
136
137 def normalize(ap_id, true, options) when is_binary(ap_id) do
138 Fetcher.fetch_object_from_id!(ap_id, options)
139 end
140
141 def normalize(_, _, _), do: nil
142
143 # Owned objects can only be mutated by their owner
144 def authorize_mutation(%Object{data: %{"actor" => actor}}, %User{ap_id: ap_id}),
145 do: actor == ap_id
146
147 # Legacy objects can be mutated by anybody
148 def authorize_mutation(%Object{}, %User{}), do: true
149
150 @spec get_cached_by_ap_id(String.t()) :: Object.t() | nil
151 def get_cached_by_ap_id(ap_id) do
152 key = "object:#{ap_id}"
153
154 with {:ok, nil} <- Cachex.get(:object_cache, key),
155 object when not is_nil(object) <- get_by_ap_id(ap_id),
156 {:ok, true} <- Cachex.put(:object_cache, key, object) do
157 object
158 else
159 {:ok, object} -> object
160 nil -> nil
161 end
162 end
163
164 def context_mapping(context) do
165 Object.change(%Object{}, %{data: %{"id" => context}})
166 end
167
168 def make_tombstone(%Object{data: %{"id" => id, "type" => type}}, deleted \\ DateTime.utc_now()) do
169 %ObjectTombstone{
170 id: id,
171 formerType: type,
172 deleted: deleted
173 }
174 |> Map.from_struct()
175 end
176
177 def swap_object_with_tombstone(object) do
178 tombstone = make_tombstone(object)
179
180 object
181 |> Object.change(%{data: tombstone})
182 |> Repo.update()
183 end
184
185 def delete(%Object{data: %{"id" => id}} = object) do
186 with {:ok, _obj} = swap_object_with_tombstone(object),
187 deleted_activity = Activity.delete_all_by_object_ap_id(id),
188 {:ok, _} <- invalid_object_cache(object) do
189 cleanup_attachments(
190 Config.get([:instance, :cleanup_attachments]),
191 %{"object" => object}
192 )
193
194 {:ok, object, deleted_activity}
195 end
196 end
197
198 @spec cleanup_attachments(boolean(), %{required(:object) => map()}) ::
199 {:ok, Oban.Job.t() | nil}
200 def cleanup_attachments(true, %{"object" => _} = params) do
201 AttachmentsCleanupWorker.enqueue("cleanup_attachments", params)
202 end
203
204 def cleanup_attachments(_, _), do: {:ok, nil}
205
206 def prune(%Object{data: %{"id" => _id}} = object) do
207 with {:ok, object} <- Repo.delete(object),
208 {:ok, _} <- invalid_object_cache(object) do
209 {:ok, object}
210 end
211 end
212
213 def invalid_object_cache(%Object{data: %{"id" => id}}) do
214 with {:ok, true} <- Cachex.del(:object_cache, "object:#{id}") do
215 Cachex.del(:web_resp_cache, URI.parse(id).path)
216 end
217 end
218
219 def set_cache(%Object{data: %{"id" => ap_id}} = object) do
220 Cachex.put(:object_cache, "object:#{ap_id}", object)
221 {:ok, object}
222 end
223
224 def update_and_set_cache(changeset) do
225 with {:ok, object} <- Repo.update(changeset) do
226 set_cache(object)
227 end
228 end
229
230 def increase_replies_count(ap_id) do
231 Object
232 |> where([o], fragment("?->>'id' = ?::text", o.data, ^to_string(ap_id)))
233 |> update([o],
234 set: [
235 data:
236 fragment(
237 """
238 safe_jsonb_set(?, '{repliesCount}',
239 (coalesce((?->>'repliesCount')::int, 0) + 1)::varchar::jsonb, true)
240 """,
241 o.data,
242 o.data
243 )
244 ]
245 )
246 |> Repo.update_all([])
247 |> case do
248 {1, [object]} -> set_cache(object)
249 _ -> {:error, "Not found"}
250 end
251 end
252
253 def decrease_replies_count(ap_id) do
254 Object
255 |> where([o], fragment("?->>'id' = ?::text", o.data, ^to_string(ap_id)))
256 |> update([o],
257 set: [
258 data:
259 fragment(
260 """
261 safe_jsonb_set(?, '{repliesCount}',
262 (greatest(0, (?->>'repliesCount')::int - 1))::varchar::jsonb, true)
263 """,
264 o.data,
265 o.data
266 )
267 ]
268 )
269 |> Repo.update_all([])
270 |> case do
271 {1, [object]} -> set_cache(object)
272 _ -> {:error, "Not found"}
273 end
274 end
275
276 def increase_vote_count(ap_id, name, actor) do
277 with %Object{} = object <- Object.normalize(ap_id),
278 "Question" <- object.data["type"] do
279 multiple = Map.has_key?(object.data, "anyOf")
280
281 options =
282 (object.data["anyOf"] || object.data["oneOf"] || [])
283 |> Enum.map(fn
284 %{"name" => ^name} = option ->
285 Kernel.update_in(option["replies"]["totalItems"], &(&1 + 1))
286
287 option ->
288 option
289 end)
290
291 voters = [actor | object.data["voters"] || []] |> Enum.uniq()
292
293 data =
294 if multiple do
295 Map.put(object.data, "anyOf", options)
296 else
297 Map.put(object.data, "oneOf", options)
298 end
299 |> Map.put("voters", voters)
300
301 object
302 |> Object.change(%{data: data})
303 |> update_and_set_cache()
304 else
305 _ -> :noop
306 end
307 end
308
309 @doc "Updates data field of an object"
310 def update_data(%Object{data: data} = object, attrs \\ %{}) do
311 object
312 |> Object.change(%{data: Map.merge(data || %{}, attrs)})
313 |> Repo.update()
314 end
315
316 def local?(%Object{data: %{"id" => id}}) do
317 String.starts_with?(id, Pleroma.Web.base_url() <> "/")
318 end
319
320 def replies(object, opts \\ []) do
321 object = Object.normalize(object)
322
323 query =
324 Object
325 |> where(
326 [o],
327 fragment("(?)->>'inReplyTo' = ?", o.data, ^object.data["id"])
328 )
329 |> order_by([o], asc: o.id)
330
331 if opts[:self_only] do
332 actor = object.data["actor"]
333 where(query, [o], fragment("(?)->>'actor' = ?", o.data, ^actor))
334 else
335 query
336 end
337 end
338
339 def self_replies(object, opts \\ []),
340 do: replies(object, Keyword.put(opts, :self_only, true))
341 end