ae4301738a0d240aa6b250d558d9400bed621522
[akkoma] / lib / pleroma / object / fetcher.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Object.Fetcher do
6 alias Pleroma.HTTP
7 alias Pleroma.Object
8 alias Pleroma.Object.Containment
9 alias Pleroma.Repo
10 alias Pleroma.Signature
11 alias Pleroma.Web.ActivityPub.InternalFetchActor
12 alias Pleroma.Web.ActivityPub.ObjectValidator
13 alias Pleroma.Web.ActivityPub.Transmogrifier
14 alias Pleroma.Web.Federator
15 alias Pleroma.Web.FedSockets
16
17 require Logger
18 require Pleroma.Constants
19
20 defp touch_changeset(changeset) do
21 updated_at =
22 NaiveDateTime.utc_now()
23 |> NaiveDateTime.truncate(:second)
24
25 Ecto.Changeset.put_change(changeset, :updated_at, updated_at)
26 end
27
28 defp maybe_reinject_internal_fields(%{data: %{} = old_data}, new_data) do
29 internal_fields = Map.take(old_data, Pleroma.Constants.object_internal_fields())
30
31 Map.merge(new_data, internal_fields)
32 end
33
34 defp maybe_reinject_internal_fields(_, new_data), do: new_data
35
36 @spec reinject_object(struct(), map()) :: {:ok, Object.t()} | {:error, any()}
37 defp reinject_object(%Object{data: %{"type" => "Question"}} = object, new_data) do
38 Logger.debug("Reinjecting object #{new_data["id"]}")
39
40 with data <- maybe_reinject_internal_fields(object, new_data),
41 {:ok, data, _} <- ObjectValidator.validate(data, %{}),
42 changeset <- Object.change(object, %{data: data}),
43 changeset <- touch_changeset(changeset),
44 {:ok, object} <- Repo.insert_or_update(changeset),
45 {:ok, object} <- Object.set_cache(object) do
46 {:ok, object}
47 else
48 e ->
49 Logger.error("Error while processing object: #{inspect(e)}")
50 {:error, e}
51 end
52 end
53
54 defp reinject_object(%Object{} = object, new_data) do
55 Logger.debug("Reinjecting object #{new_data["id"]}")
56
57 with new_data <- Transmogrifier.fix_object(new_data),
58 data <- maybe_reinject_internal_fields(object, new_data),
59 changeset <- Object.change(object, %{data: data}),
60 changeset <- touch_changeset(changeset),
61 {:ok, object} <- Repo.insert_or_update(changeset),
62 {:ok, object} <- Object.set_cache(object) do
63 {:ok, object}
64 else
65 e ->
66 Logger.error("Error while processing object: #{inspect(e)}")
67 {:error, e}
68 end
69 end
70
71 def refetch_object(%Object{data: %{"id" => id}} = object) do
72 with {:local, false} <- {:local, Object.local?(object)},
73 {:ok, new_data} <- fetch_and_contain_remote_object_from_id(id),
74 {:ok, object} <- reinject_object(object, new_data) do
75 {:ok, object}
76 else
77 {:local, true} -> {:ok, object}
78 e -> {:error, e}
79 end
80 end
81
82 # Note: will create a Create activity, which we need internally at the moment.
83 def fetch_object_from_id(id, options \\ []) do
84 with {_, nil} <- {:fetch_object, Object.get_cached_by_ap_id(id)},
85 {_, true} <- {:allowed_depth, Federator.allowed_thread_distance?(options[:depth])},
86 {_, {:ok, data}} <- {:fetch, fetch_and_contain_remote_object_from_id(id)},
87 {_, nil} <- {:normalize, Object.normalize(data, false)},
88 params <- prepare_activity_params(data),
89 {_, :ok} <- {:containment, Containment.contain_origin(id, params)},
90 {_, {:ok, activity}} <-
91 {:transmogrifier, Transmogrifier.handle_incoming(params, options)},
92 {_, _data, %Object{} = object} <-
93 {:object, data, Object.normalize(activity, false)} do
94 {:ok, object}
95 else
96 {:allowed_depth, false} ->
97 {:error, "Max thread distance exceeded."}
98
99 {:containment, _} ->
100 {:error, "Object containment failed."}
101
102 {:transmogrifier, {:error, {:reject, e}}} ->
103 {:reject, e}
104
105 {:transmogrifier, _} = e ->
106 {:error, e}
107
108 {:object, data, nil} ->
109 reinject_object(%Object{}, data)
110
111 {:normalize, object = %Object{}} ->
112 {:ok, object}
113
114 {:fetch_object, %Object{} = object} ->
115 {:ok, object}
116
117 {:fetch, {:error, error}} ->
118 {:error, error}
119
120 e ->
121 e
122 end
123 end
124
125 defp prepare_activity_params(data) do
126 %{
127 "type" => "Create",
128 "to" => data["to"] || [],
129 "cc" => data["cc"] || [],
130 # Should we seriously keep this attributedTo thing?
131 "actor" => data["actor"] || data["attributedTo"],
132 "object" => data
133 }
134 end
135
136 def fetch_object_from_id!(id, options \\ []) do
137 with {:ok, object} <- fetch_object_from_id(id, options) do
138 object
139 else
140 {:error, %Tesla.Mock.Error{}} ->
141 nil
142
143 {:error, "Object has been deleted"} ->
144 nil
145
146 {:reject, reason} ->
147 Logger.info("Rejected #{id} while fetching: #{inspect(reason)}")
148 nil
149
150 e ->
151 Logger.error("Error while fetching #{id}: #{inspect(e)}")
152 nil
153 end
154 end
155
156 defp make_signature(id, date) do
157 uri = URI.parse(id)
158
159 signature =
160 InternalFetchActor.get_actor()
161 |> Signature.sign(%{
162 "(request-target)": "get #{uri.path}",
163 host: uri.host,
164 date: date
165 })
166
167 {"signature", signature}
168 end
169
170 defp sign_fetch(headers, id, date) do
171 if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do
172 [make_signature(id, date) | headers]
173 else
174 headers
175 end
176 end
177
178 defp maybe_date_fetch(headers, date) do
179 if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do
180 [{"date", date} | headers]
181 else
182 headers
183 end
184 end
185
186 def fetch_and_contain_remote_object_from_id(prm, opts \\ [])
187
188 def fetch_and_contain_remote_object_from_id(%{"id" => id}, opts),
189 do: fetch_and_contain_remote_object_from_id(id, opts)
190
191 def fetch_and_contain_remote_object_from_id(id, opts) when is_binary(id) do
192 Logger.debug("Fetching object #{id} via AP")
193
194 with {:scheme, true} <- {:scheme, String.starts_with?(id, "http")},
195 {:ok, body} <- get_object(id, opts),
196 {:ok, data} <- safe_json_decode(body),
197 :ok <- Containment.contain_origin_from_id(id, data) do
198 {:ok, data}
199 else
200 {:scheme, _} ->
201 {:error, "Unsupported URI scheme"}
202
203 {:error, e} ->
204 {:error, e}
205
206 e ->
207 {:error, e}
208 end
209 end
210
211 def fetch_and_contain_remote_object_from_id(_id, _opts),
212 do: {:error, "id must be a string"}
213
214 defp get_object(id, opts) do
215 with false <- Keyword.get(opts, :force_http, false),
216 {:ok, fedsocket} <- FedSockets.get_or_create_fed_socket(id) do
217 Logger.debug("fetching via fedsocket - #{inspect(id)}")
218 FedSockets.fetch(fedsocket, id)
219 else
220 _other ->
221 Logger.debug("fetching via http - #{inspect(id)}")
222 get_object_http(id)
223 end
224 end
225
226 defp get_object_http(id) do
227 date = Pleroma.Signature.signed_date()
228
229 headers =
230 [{"accept", "application/activity+json"}]
231 |> maybe_date_fetch(date)
232 |> sign_fetch(id, date)
233
234 case HTTP.get(id, headers) do
235 {:ok, %{body: body, status: code, headers: headers}} when code in 200..299 ->
236 case List.keyfind(headers, "content-type", 0) do
237 {_, content_type} ->
238 case Plug.Conn.Utils.media_type(content_type) do
239 {:ok, "application", "activity+json", _} ->
240 {:ok, body}
241
242 {:ok, "application", "ld+json",
243 %{"profile" => "https://www.w3.org/ns/activitystreams"}} ->
244 {:ok, body}
245
246 _ ->
247 {:error, {:content_type, content_type}}
248 end
249
250 _ ->
251 {:error, {:content_type, nil}}
252 end
253
254 {:ok, %{status: code}} when code in [404, 410] ->
255 {:error, "Object has been deleted"}
256
257 {:error, e} ->
258 {:error, e}
259
260 e ->
261 {:error, e}
262 end
263 end
264
265 defp safe_json_decode(nil), do: {:ok, nil}
266 defp safe_json_decode(json), do: Jason.decode(json)
267 end