8309ef64a042de3ffbb1da16a36c6869fca50fc8
[akkoma] / lib / pleroma / object / fetcher.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Object.Fetcher do
6 alias Pleroma.HTTP
7 alias Pleroma.Instances
8 alias Pleroma.Maps
9 alias Pleroma.Object
10 alias Pleroma.Object.Containment
11 alias Pleroma.Repo
12 alias Pleroma.Signature
13 alias Pleroma.Web.ActivityPub.InternalFetchActor
14 alias Pleroma.Web.ActivityPub.ObjectValidator
15 alias Pleroma.Web.ActivityPub.Transmogrifier
16 alias Pleroma.Web.Federator
17
18 require Logger
19 require Pleroma.Constants
20
21 defp touch_changeset(changeset) do
22 updated_at =
23 NaiveDateTime.utc_now()
24 |> NaiveDateTime.truncate(:second)
25
26 Ecto.Changeset.put_change(changeset, :updated_at, updated_at)
27 end
28
29 defp maybe_reinject_internal_fields(%{data: %{} = old_data}, new_data) do
30 has_history? = fn
31 %{"formerRepresentations" => %{"orderedItems" => list}} when is_list(list) -> true
32 _ -> false
33 end
34
35 internal_fields = Map.take(old_data, Pleroma.Constants.object_internal_fields())
36
37 remote_history_exists? = has_history?.(new_data)
38
39 # If the remote history exists, we treat that as the only source of truth.
40 new_data =
41 if has_history?.(old_data) and not remote_history_exists? do
42 Map.put(new_data, "formerRepresentations", old_data["formerRepresentations"])
43 else
44 new_data
45 end
46
47 # If the remote does not have history information, we need to manage it ourselves
48 new_data =
49 if not remote_history_exists? do
50 changed? =
51 Pleroma.Constants.status_updatable_fields()
52 |> Enum.any?(fn field -> Map.get(old_data, field) != Map.get(new_data, field) end)
53
54 %{updated_object: updated_object} =
55 new_data
56 |> Object.Updater.maybe_update_history(old_data,
57 updated: changed?,
58 use_history_in_new_object?: false
59 )
60
61 updated_object
62 else
63 new_data
64 end
65
66 Map.merge(new_data, internal_fields)
67 end
68
69 defp maybe_reinject_internal_fields(_, new_data), do: new_data
70
71 @spec reinject_object(struct(), map()) :: {:ok, Object.t()} | {:error, any()}
72 defp reinject_object(%Object{data: %{"type" => "Question"}} = object, new_data) do
73 Logger.debug("Reinjecting object #{new_data["id"]}")
74
75 with data <- maybe_reinject_internal_fields(object, new_data),
76 {:ok, data, _} <- ObjectValidator.validate(data, %{}),
77 changeset <- Object.change(object, %{data: data}),
78 changeset <- touch_changeset(changeset),
79 {:ok, object} <- Repo.insert_or_update(changeset),
80 {:ok, object} <- Object.set_cache(object) do
81 {:ok, object}
82 else
83 e ->
84 Logger.error("Error while processing object: #{inspect(e)}")
85 {:error, e}
86 end
87 end
88
89 defp reinject_object(%Object{} = object, new_data) do
90 Logger.debug("Reinjecting object #{new_data["id"]}")
91
92 with new_data <- Transmogrifier.fix_object(new_data),
93 data <- maybe_reinject_internal_fields(object, new_data),
94 changeset <- Object.change(object, %{data: data}),
95 changeset <- touch_changeset(changeset),
96 {:ok, object} <- Repo.insert_or_update(changeset),
97 {:ok, object} <- Object.set_cache(object) do
98 {:ok, object}
99 else
100 e ->
101 Logger.error("Error while processing object: #{inspect(e)}")
102 {:error, e}
103 end
104 end
105
106 def refetch_object(%Object{data: %{"id" => id}} = object) do
107 with {:local, false} <- {:local, Object.local?(object)},
108 {:ok, new_data} <- fetch_and_contain_remote_object_from_id(id),
109 {:ok, object} <- reinject_object(object, new_data) do
110 {:ok, object}
111 else
112 {:local, true} -> {:ok, object}
113 e -> {:error, e}
114 end
115 end
116
117 # Note: will create a Create activity, which we need internally at the moment.
118 def fetch_object_from_id(id, options \\ []) do
119 with %URI{} = uri <- URI.parse(id),
120 # If we have instance restrictions, apply them here to prevent fetching from unwanted instances
121 {:ok, nil} <- Pleroma.Web.ActivityPub.MRF.SimplePolicy.check_reject(uri),
122 {:ok, _} <- Pleroma.Web.ActivityPub.MRF.SimplePolicy.check_accept(uri),
123 {_, nil} <- {:fetch_object, Object.get_cached_by_ap_id(id)},
124 {_, true} <- {:allowed_depth, Federator.allowed_thread_distance?(options[:depth])},
125 {_, {:ok, data}} <- {:fetch, fetch_and_contain_remote_object_from_id(id)},
126 {_, nil} <- {:normalize, Object.normalize(data, fetch: false)},
127 params <- prepare_activity_params(data),
128 {_, :ok} <- {:containment, Containment.contain_origin(id, params)},
129 {_, {:ok, activity}} <-
130 {:transmogrifier, Transmogrifier.handle_incoming(params, options)},
131 {_, _data, %Object{} = object} <-
132 {:object, data, Object.normalize(activity, fetch: false)} do
133 {:ok, object}
134 else
135 {:allowed_depth, false} ->
136 {:error, "Max thread distance exceeded."}
137
138 {:containment, _} ->
139 {:error, "Object containment failed."}
140
141 {:transmogrifier, {:error, {:reject, e}}} ->
142 {:reject, e}
143
144 {:transmogrifier, {:reject, e}} ->
145 {:reject, e}
146
147 {:transmogrifier, _} = e ->
148 {:error, e}
149
150 {:object, data, nil} ->
151 reinject_object(%Object{}, data)
152
153 {:normalize, object = %Object{}} ->
154 {:ok, object}
155
156 {:fetch_object, %Object{} = object} ->
157 {:ok, object}
158
159 {:fetch, {:error, error}} ->
160 {:error, error}
161
162 {:reject, reason} ->
163 {:reject, reason}
164
165 e ->
166 e
167 end
168 end
169
170 defp prepare_activity_params(data) do
171 %{
172 "type" => "Create",
173 # Should we seriously keep this attributedTo thing?
174 "actor" => data["actor"] || data["attributedTo"],
175 "object" => data
176 }
177 |> Maps.put_if_present("to", data["to"])
178 |> Maps.put_if_present("cc", data["cc"])
179 |> Maps.put_if_present("bto", data["bto"])
180 |> Maps.put_if_present("bcc", data["bcc"])
181 end
182
183 def fetch_object_from_id!(id, options \\ []) do
184 with {:ok, object} <- fetch_object_from_id(id, options) do
185 object
186 else
187 {:error, %Tesla.Mock.Error{}} ->
188 nil
189
190 {:error, {"Object has been deleted", _id, _code}} ->
191 nil
192
193 {:reject, reason} ->
194 Logger.debug("Rejected #{id} while fetching: #{inspect(reason)}")
195 nil
196
197 e ->
198 Logger.error("Error while fetching #{id}: #{inspect(e)}")
199 nil
200 end
201 end
202
203 defp make_signature(id, date) do
204 uri = URI.parse(id)
205
206 signature =
207 InternalFetchActor.get_actor()
208 |> Signature.sign(%{
209 "(request-target)": "get #{uri.path}",
210 host: uri.host,
211 date: date
212 })
213
214 {"signature", signature}
215 end
216
217 defp sign_fetch(headers, id, date) do
218 if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do
219 [make_signature(id, date) | headers]
220 else
221 headers
222 end
223 end
224
225 defp maybe_date_fetch(headers, date) do
226 if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do
227 [{"date", date} | headers]
228 else
229 headers
230 end
231 end
232
233 def fetch_and_contain_remote_object_from_id(id)
234
235 def fetch_and_contain_remote_object_from_id(%{"id" => id}),
236 do: fetch_and_contain_remote_object_from_id(id)
237
238 def fetch_and_contain_remote_object_from_id(id) when is_binary(id) do
239 Logger.debug("Fetching object #{id} via AP")
240
241 with {:scheme, true} <- {:scheme, String.starts_with?(id, "http")},
242 {:ok, body} <- get_object(id),
243 {:ok, data} <- safe_json_decode(body),
244 :ok <- Containment.contain_origin_from_id(id, data) do
245 unless Instances.reachable?(id) do
246 Instances.set_reachable(id)
247 end
248
249 {:ok, data}
250 else
251 {:scheme, _} ->
252 {:error, "Unsupported URI scheme"}
253
254 {:error, e} ->
255 {:error, e}
256
257 e ->
258 {:error, e}
259 end
260 end
261
262 def fetch_and_contain_remote_object_from_id(_id),
263 do: {:error, "id must be a string"}
264
265 defp get_object(id) do
266 date = Pleroma.Signature.signed_date()
267
268 headers =
269 [{"accept", "application/activity+json"}]
270 |> maybe_date_fetch(date)
271 |> sign_fetch(id, date)
272
273 case HTTP.get(id, headers) do
274 {:ok, %{body: body, status: code, headers: headers}} when code in 200..299 ->
275 case List.keyfind(headers, "content-type", 0) do
276 {_, content_type} ->
277 case Plug.Conn.Utils.media_type(content_type) do
278 {:ok, "application", "activity+json", _} ->
279 {:ok, body}
280
281 {:ok, "application", "ld+json",
282 %{"profile" => "https://www.w3.org/ns/activitystreams"}} ->
283 {:ok, body}
284
285 _ ->
286 {:error, {:content_type, content_type}}
287 end
288
289 _ ->
290 {:error, {:content_type, nil}}
291 end
292
293 {:ok, %{status: code}} when code in [404, 410] ->
294 {:error, {"Object has been deleted", id, code}}
295
296 {:error, e} ->
297 {:error, e}
298
299 e ->
300 {:error, e}
301 end
302 end
303
304 defp safe_json_decode(nil), do: {:ok, nil}
305 defp safe_json_decode(json), do: Jason.decode(json)
306 end