Fix addressing
[akkoma] / lib / pleroma / object / fetcher.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Object.Fetcher do
6 alias Pleroma.HTTP
7 alias Pleroma.Maps
8 alias Pleroma.Object
9 alias Pleroma.Object.Containment
10 alias Pleroma.Repo
11 alias Pleroma.Signature
12 alias Pleroma.Web.ActivityPub.InternalFetchActor
13 alias Pleroma.Web.ActivityPub.ObjectValidator
14 alias Pleroma.Web.ActivityPub.Transmogrifier
15 alias Pleroma.Web.Federator
16
17 require Logger
18 require Pleroma.Constants
19
20 defp touch_changeset(changeset) do
21 updated_at =
22 NaiveDateTime.utc_now()
23 |> NaiveDateTime.truncate(:second)
24
25 Ecto.Changeset.put_change(changeset, :updated_at, updated_at)
26 end
27
28 defp maybe_reinject_internal_fields(%{data: %{} = old_data}, new_data) do
29 internal_fields = Map.take(old_data, Pleroma.Constants.object_internal_fields())
30
31 Map.merge(new_data, internal_fields)
32 end
33
34 defp maybe_reinject_internal_fields(_, new_data), do: new_data
35
36 @spec reinject_object(struct(), map()) :: {:ok, Object.t()} | {:error, any()}
37 defp reinject_object(%Object{data: %{"type" => "Question"}} = object, new_data) do
38 Logger.debug("Reinjecting object #{new_data["id"]}")
39
40 with data <- maybe_reinject_internal_fields(object, new_data),
41 {:ok, data, _} <- ObjectValidator.validate(data, %{}),
42 changeset <- Object.change(object, %{data: data}),
43 changeset <- touch_changeset(changeset),
44 {:ok, object} <- Repo.insert_or_update(changeset),
45 {:ok, object} <- Object.set_cache(object) do
46 {:ok, object}
47 else
48 e ->
49 Logger.error("Error while processing object: #{inspect(e)}")
50 {:error, e}
51 end
52 end
53
54 defp reinject_object(%Object{} = object, new_data) do
55 Logger.debug("Reinjecting object #{new_data["id"]}")
56
57 with new_data <- Transmogrifier.fix_object(new_data),
58 data <- maybe_reinject_internal_fields(object, new_data),
59 changeset <- Object.change(object, %{data: data}),
60 changeset <- touch_changeset(changeset),
61 {:ok, object} <- Repo.insert_or_update(changeset),
62 {:ok, object} <- Object.set_cache(object) do
63 {:ok, object}
64 else
65 e ->
66 Logger.error("Error while processing object: #{inspect(e)}")
67 {:error, e}
68 end
69 end
70
71 def refetch_object(%Object{data: %{"id" => id}} = object) do
72 with {:local, false} <- {:local, Object.local?(object)},
73 {:ok, new_data} <- fetch_and_contain_remote_object_from_id(id),
74 {:ok, object} <- reinject_object(object, new_data) do
75 {:ok, object}
76 else
77 {:local, true} -> {:ok, object}
78 e -> {:error, e}
79 end
80 end
81
82 # Note: will create a Create activity, which we need internally at the moment.
83 def fetch_object_from_id(id, options \\ []) do
84 with {_, nil} <- {:fetch_object, Object.get_cached_by_ap_id(id)},
85 {_, true} <- {:allowed_depth, Federator.allowed_thread_distance?(options[:depth])},
86 {_, {:ok, data}} <- {:fetch, fetch_and_contain_remote_object_from_id(id)},
87 {_, nil} <- {:normalize, Object.normalize(data, fetch: false)},
88 params <- prepare_activity_params(data),
89 {_, :ok} <- {:containment, Containment.contain_origin(id, params)},
90 {_, {:ok, activity}} <-
91 {:transmogrifier, Transmogrifier.handle_incoming(params, options)},
92 {_, _data, %Object{} = object} <-
93 {:object, data, Object.normalize(activity, fetch: false)} do
94 {:ok, object}
95 else
96 {:allowed_depth, false} ->
97 {:error, "Max thread distance exceeded."}
98
99 {:containment, _} ->
100 {:error, "Object containment failed."}
101
102 {:transmogrifier, {:error, {:reject, e}}} ->
103 {:reject, e}
104
105 {:transmogrifier, _} = e ->
106 {:error, e}
107
108 {:object, data, nil} ->
109 reinject_object(%Object{}, data)
110
111 {:normalize, object = %Object{}} ->
112 {:ok, object}
113
114 {:fetch_object, %Object{} = object} ->
115 {:ok, object}
116
117 {:fetch, {:error, error}} ->
118 {:error, error}
119
120 e ->
121 e
122 end
123 end
124
125 defp prepare_activity_params(data) do
126 %{
127 "type" => "Create",
128 # Should we seriously keep this attributedTo thing?
129 "actor" => data["actor"] || data["attributedTo"],
130 "object" => data
131 }
132 |> Maps.put_if_present("to", data["to"])
133 |> Maps.put_if_present("cc", data["cc"])
134 |> Maps.put_if_present("bto", data["bto"])
135 |> Maps.put_if_present("bcc", data["bcc"])
136 end
137
138 def fetch_object_from_id!(id, options \\ []) do
139 with {:ok, object} <- fetch_object_from_id(id, options) do
140 object
141 else
142 {:error, %Tesla.Mock.Error{}} ->
143 nil
144
145 {:error, "Object has been deleted"} ->
146 nil
147
148 {:reject, reason} ->
149 Logger.info("Rejected #{id} while fetching: #{inspect(reason)}")
150 nil
151
152 e ->
153 Logger.error("Error while fetching #{id}: #{inspect(e)}")
154 nil
155 end
156 end
157
158 defp make_signature(id, date) do
159 uri = URI.parse(id)
160
161 signature =
162 InternalFetchActor.get_actor()
163 |> Signature.sign(%{
164 "(request-target)": "get #{uri.path}",
165 host: uri.host,
166 date: date
167 })
168
169 {"signature", signature}
170 end
171
172 defp sign_fetch(headers, id, date) do
173 if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do
174 [make_signature(id, date) | headers]
175 else
176 headers
177 end
178 end
179
180 defp maybe_date_fetch(headers, date) do
181 if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do
182 [{"date", date} | headers]
183 else
184 headers
185 end
186 end
187
188 def fetch_and_contain_remote_object_from_id(id)
189
190 def fetch_and_contain_remote_object_from_id(%{"id" => id}),
191 do: fetch_and_contain_remote_object_from_id(id)
192
193 def fetch_and_contain_remote_object_from_id(id) when is_binary(id) do
194 Logger.debug("Fetching object #{id} via AP")
195
196 with {:scheme, true} <- {:scheme, String.starts_with?(id, "http")},
197 {:ok, body} <- get_object(id),
198 {:ok, data} <- safe_json_decode(body),
199 :ok <- Containment.contain_origin_from_id(id, data) do
200 {:ok, data}
201 else
202 {:scheme, _} ->
203 {:error, "Unsupported URI scheme"}
204
205 {:error, e} ->
206 {:error, e}
207
208 e ->
209 {:error, e}
210 end
211 end
212
213 def fetch_and_contain_remote_object_from_id(_id),
214 do: {:error, "id must be a string"}
215
216 defp get_object(id) do
217 date = Pleroma.Signature.signed_date()
218
219 headers =
220 [{"accept", "application/activity+json"}]
221 |> maybe_date_fetch(date)
222 |> sign_fetch(id, date)
223
224 case HTTP.get(id, headers) do
225 {:ok, %{body: body, status: code, headers: headers}} when code in 200..299 ->
226 case List.keyfind(headers, "content-type", 0) do
227 {_, content_type} ->
228 case Plug.Conn.Utils.media_type(content_type) do
229 {:ok, "application", "activity+json", _} ->
230 {:ok, body}
231
232 {:ok, "application", "ld+json",
233 %{"profile" => "https://www.w3.org/ns/activitystreams"}} ->
234 {:ok, body}
235
236 _ ->
237 {:error, {:content_type, content_type}}
238 end
239
240 _ ->
241 {:error, {:content_type, nil}}
242 end
243
244 {:ok, %{status: code}} when code in [404, 410] ->
245 {:error, "Object has been deleted"}
246
247 {:error, e} ->
248 {:error, e}
249
250 e ->
251 {:error, e}
252 end
253 end
254
255 defp safe_json_decode(nil), do: {:ok, nil}
256 defp safe_json_decode(json), do: Jason.decode(json)
257 end