Object.Fetcher: Fix getting transmogrifier reject reason
[akkoma] / lib / pleroma / object / fetcher.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Object.Fetcher do
6 alias Pleroma.HTTP
7 alias Pleroma.Maps
8 alias Pleroma.Object
9 alias Pleroma.Object.Containment
10 alias Pleroma.Repo
11 alias Pleroma.Signature
12 alias Pleroma.Web.ActivityPub.InternalFetchActor
13 alias Pleroma.Web.ActivityPub.ObjectValidator
14 alias Pleroma.Web.ActivityPub.Transmogrifier
15 alias Pleroma.Web.Federator
16
17 require Logger
18 require Pleroma.Constants
19
20 defp touch_changeset(changeset) do
21 updated_at =
22 NaiveDateTime.utc_now()
23 |> NaiveDateTime.truncate(:second)
24
25 Ecto.Changeset.put_change(changeset, :updated_at, updated_at)
26 end
27
28 defp maybe_reinject_internal_fields(%{data: %{} = old_data}, new_data) do
29 internal_fields = Map.take(old_data, Pleroma.Constants.object_internal_fields())
30
31 Map.merge(new_data, internal_fields)
32 end
33
34 defp maybe_reinject_internal_fields(_, new_data), do: new_data
35
36 @spec reinject_object(struct(), map()) :: {:ok, Object.t()} | {:error, any()}
37 defp reinject_object(%Object{data: %{"type" => "Question"}} = object, new_data) do
38 Logger.debug("Reinjecting object #{new_data["id"]}")
39
40 with data <- maybe_reinject_internal_fields(object, new_data),
41 {:ok, data, _} <- ObjectValidator.validate(data, %{}),
42 changeset <- Object.change(object, %{data: data}),
43 changeset <- touch_changeset(changeset),
44 {:ok, object} <- Repo.insert_or_update(changeset),
45 {:ok, object} <- Object.set_cache(object) do
46 {:ok, object}
47 else
48 e ->
49 Logger.error("Error while processing object: #{inspect(e)}")
50 {:error, e}
51 end
52 end
53
54 defp reinject_object(%Object{} = object, new_data) do
55 Logger.debug("Reinjecting object #{new_data["id"]}")
56
57 with new_data <- Transmogrifier.fix_object(new_data),
58 data <- maybe_reinject_internal_fields(object, new_data),
59 changeset <- Object.change(object, %{data: data}),
60 changeset <- touch_changeset(changeset),
61 {:ok, object} <- Repo.insert_or_update(changeset),
62 {:ok, object} <- Object.set_cache(object) do
63 {:ok, object}
64 else
65 e ->
66 Logger.error("Error while processing object: #{inspect(e)}")
67 {:error, e}
68 end
69 end
70
71 def refetch_object(%Object{data: %{"id" => id}} = object) do
72 with {:local, false} <- {:local, Object.local?(object)},
73 {:ok, new_data} <- fetch_and_contain_remote_object_from_id(id),
74 {:ok, object} <- reinject_object(object, new_data) do
75 {:ok, object}
76 else
77 {:local, true} -> {:ok, object}
78 e -> {:error, e}
79 end
80 end
81
82 # Note: will create a Create activity, which we need internally at the moment.
83 def fetch_object_from_id(id, options \\ []) do
84 with {_, nil} <- {:fetch_object, Object.get_cached_by_ap_id(id)},
85 {_, true} <- {:allowed_depth, Federator.allowed_thread_distance?(options[:depth])},
86 {_, {:ok, data}} <- {:fetch, fetch_and_contain_remote_object_from_id(id)},
87 {_, nil} <- {:normalize, Object.normalize(data, fetch: false)},
88 params <- prepare_activity_params(data),
89 {_, :ok} <- {:containment, Containment.contain_origin(id, params)},
90 {_, {:ok, activity}} <-
91 {:transmogrifier, Transmogrifier.handle_incoming(params, options)},
92 {_, _data, %Object{} = object} <-
93 {:object, data, Object.normalize(activity, fetch: false)} do
94 {:ok, object}
95 else
96 {:allowed_depth, false} ->
97 {:error, "Max thread distance exceeded."}
98
99 {:containment, _} ->
100 {:error, "Object containment failed."}
101
102 {:transmogrifier, {:error, {:reject, e}}} ->
103 {:reject, e}
104
105 {:transmogrifier, {:reject, e}} ->
106 {:reject, e}
107
108 {:transmogrifier, _} = e ->
109 {:error, e}
110
111 {:object, data, nil} ->
112 reinject_object(%Object{}, data)
113
114 {:normalize, object = %Object{}} ->
115 {:ok, object}
116
117 {:fetch_object, %Object{} = object} ->
118 {:ok, object}
119
120 {:fetch, {:error, error}} ->
121 {:error, error}
122
123 e ->
124 e
125 end
126 end
127
128 defp prepare_activity_params(data) do
129 %{
130 "type" => "Create",
131 # Should we seriously keep this attributedTo thing?
132 "actor" => data["actor"] || data["attributedTo"],
133 "object" => data
134 }
135 |> Maps.put_if_present("to", data["to"])
136 |> Maps.put_if_present("cc", data["cc"])
137 |> Maps.put_if_present("bto", data["bto"])
138 |> Maps.put_if_present("bcc", data["bcc"])
139 end
140
141 def fetch_object_from_id!(id, options \\ []) do
142 with {:ok, object} <- fetch_object_from_id(id, options) do
143 object
144 else
145 {:error, %Tesla.Mock.Error{}} ->
146 nil
147
148 {:error, "Object has been deleted"} ->
149 nil
150
151 {:reject, reason} ->
152 Logger.info("Rejected #{id} while fetching: #{inspect(reason)}")
153 nil
154
155 e ->
156 Logger.error("Error while fetching #{id}: #{inspect(e)}")
157 nil
158 end
159 end
160
161 defp make_signature(id, date) do
162 uri = URI.parse(id)
163
164 signature =
165 InternalFetchActor.get_actor()
166 |> Signature.sign(%{
167 "(request-target)": "get #{uri.path}",
168 host: uri.host,
169 date: date
170 })
171
172 {"signature", signature}
173 end
174
175 defp sign_fetch(headers, id, date) do
176 if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do
177 [make_signature(id, date) | headers]
178 else
179 headers
180 end
181 end
182
183 defp maybe_date_fetch(headers, date) do
184 if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do
185 [{"date", date} | headers]
186 else
187 headers
188 end
189 end
190
191 def fetch_and_contain_remote_object_from_id(id)
192
193 def fetch_and_contain_remote_object_from_id(%{"id" => id}),
194 do: fetch_and_contain_remote_object_from_id(id)
195
196 def fetch_and_contain_remote_object_from_id(id) when is_binary(id) do
197 Logger.debug("Fetching object #{id} via AP")
198
199 with {:scheme, true} <- {:scheme, String.starts_with?(id, "http")},
200 {:ok, body} <- get_object(id),
201 {:ok, data} <- safe_json_decode(body),
202 :ok <- Containment.contain_origin_from_id(id, data) do
203 {:ok, data}
204 else
205 {:scheme, _} ->
206 {:error, "Unsupported URI scheme"}
207
208 {:error, e} ->
209 {:error, e}
210
211 e ->
212 {:error, e}
213 end
214 end
215
216 def fetch_and_contain_remote_object_from_id(_id),
217 do: {:error, "id must be a string"}
218
219 defp get_object(id) do
220 date = Pleroma.Signature.signed_date()
221
222 headers =
223 [{"accept", "application/activity+json"}]
224 |> maybe_date_fetch(date)
225 |> sign_fetch(id, date)
226
227 case HTTP.get(id, headers) do
228 {:ok, %{body: body, status: code, headers: headers}} when code in 200..299 ->
229 case List.keyfind(headers, "content-type", 0) do
230 {_, content_type} ->
231 case Plug.Conn.Utils.media_type(content_type) do
232 {:ok, "application", "activity+json", _} ->
233 {:ok, body}
234
235 {:ok, "application", "ld+json",
236 %{"profile" => "https://www.w3.org/ns/activitystreams"}} ->
237 {:ok, body}
238
239 _ ->
240 {:error, {:content_type, content_type}}
241 end
242
243 _ ->
244 {:error, {:content_type, nil}}
245 end
246
247 {:ok, %{status: code}} when code in [404, 410] ->
248 {:error, "Object has been deleted"}
249
250 {:error, e} ->
251 {:error, e}
252
253 e ->
254 {:error, e}
255 end
256 end
257
258 defp safe_json_decode(nil), do: {:ok, nil}
259 defp safe_json_decode(json), do: Jason.decode(json)
260 end