Merge branch 'deprecate-public_endpoint' into 'develop'
[akkoma] / lib / pleroma / reverse_proxy.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.ReverseProxy do
6 @range_headers ~w(range if-range)
7 @keep_req_headers ~w(accept user-agent accept-encoding cache-control if-modified-since) ++
8 ~w(if-unmodified-since if-none-match) ++ @range_headers
9 @resp_cache_headers ~w(etag date last-modified)
10 @keep_resp_headers @resp_cache_headers ++
11 ~w(content-length content-type content-disposition content-encoding) ++
12 ~w(content-range accept-ranges vary)
13 @default_cache_control_header "public, max-age=1209600"
14 @valid_resp_codes [200, 206, 304]
15 @max_read_duration :timer.seconds(30)
16 @max_body_length :infinity
17 @failed_request_ttl :timer.seconds(60)
18 @methods ~w(GET HEAD)
19
20 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
21
22 def max_read_duration_default, do: @max_read_duration
23 def default_cache_control_header, do: @default_cache_control_header
24
25 @moduledoc """
26 A reverse proxy.
27
28 Pleroma.ReverseProxy.call(conn, url, options)
29
30 It is not meant to be added into a plug pipeline, but to be called from another plug or controller.
31
32 Supports `#{inspect(@methods)}` HTTP methods, and only allows `#{inspect(@valid_resp_codes)}` status codes.
33
34 Responses are chunked to the client while downloading from the upstream.
35
36 Some request / responses headers are preserved:
37
38 * request: `#{inspect(@keep_req_headers)}`
39 * response: `#{inspect(@keep_resp_headers)}`
40
41 Options:
42
43 * `redirect_on_failure` (default `false`). Redirects the client to the real remote URL if there's any HTTP
44 errors. Any error during body processing will not be redirected as the response is chunked. This may expose
45 remote URL, clients IPs, ….
46
47 * `max_body_length` (default `#{inspect(@max_body_length)}`): limits the content length to be approximately the
48 specified length. It is validated with the `content-length` header and also verified when proxying.
49
50 * `max_read_duration` (default `#{inspect(@max_read_duration)}` ms): the total time the connection is allowed to
51 read from the remote upstream.
52
53 * `failed_request_ttl` (default `#{inspect(@failed_request_ttl)}` ms): the time the failed request is cached and cannot be retried.
54
55 * `inline_content_types`:
56 * `true` will not alter `content-disposition` (up to the upstream),
57 * `false` will add `content-disposition: attachment` to any request,
58 * a list of whitelisted content types
59
60 * `keep_user_agent` will forward the client's user-agent to the upstream. This may be useful if the upstream is
61 doing content transformation (encoding, …) depending on the request.
62
63 * `req_headers`, `resp_headers` additional headers.
64
65 * `http`: options for [hackney](https://github.com/benoitc/hackney) or [gun](https://github.com/ninenines/gun).
66
67 """
68 @default_options [pool: :media]
69
70 @inline_content_types [
71 "image/gif",
72 "image/jpeg",
73 "image/jpg",
74 "image/png",
75 "image/svg+xml",
76 "audio/mpeg",
77 "audio/mp3",
78 "video/webm",
79 "video/mp4",
80 "video/quicktime"
81 ]
82
83 require Logger
84 import Plug.Conn
85
86 @type option() ::
87 {:keep_user_agent, boolean}
88 | {:max_read_duration, :timer.time() | :infinity}
89 | {:max_body_length, non_neg_integer() | :infinity}
90 | {:failed_request_ttl, :timer.time() | :infinity}
91 | {:http, []}
92 | {:req_headers, [{String.t(), String.t()}]}
93 | {:resp_headers, [{String.t(), String.t()}]}
94 | {:inline_content_types, boolean() | [String.t()]}
95 | {:redirect_on_failure, boolean()}
96
97 @spec call(Plug.Conn.t(), url :: String.t(), [option()]) :: Plug.Conn.t()
98 def call(_conn, _url, _opts \\ [])
99
100 def call(conn = %{method: method}, url, opts) when method in @methods do
101 client_opts = Keyword.merge(@default_options, Keyword.get(opts, :http, []))
102
103 req_headers = build_req_headers(conn.req_headers, opts)
104
105 opts =
106 if filename = Pleroma.Web.MediaProxy.filename(url) do
107 Keyword.put_new(opts, :attachment_name, filename)
108 else
109 opts
110 end
111
112 with {:ok, nil} <- @cachex.get(:failed_proxy_url_cache, url),
113 {:ok, code, headers, client} <- request(method, url, req_headers, client_opts),
114 :ok <-
115 header_length_constraint(
116 headers,
117 Keyword.get(opts, :max_body_length, @max_body_length)
118 ) do
119 response(conn, client, url, code, headers, opts)
120 else
121 {:ok, true} ->
122 conn
123 |> error_or_redirect(url, 500, "Request failed", opts)
124 |> halt()
125
126 {:ok, code, headers} ->
127 head_response(conn, url, code, headers, opts)
128 |> halt()
129
130 {:error, {:invalid_http_response, code}} ->
131 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed with HTTP status #{code}")
132 track_failed_url(url, code, opts)
133
134 conn
135 |> error_or_redirect(
136 url,
137 code,
138 "Request failed: " <> Plug.Conn.Status.reason_phrase(code),
139 opts
140 )
141 |> halt()
142
143 {:error, error} ->
144 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed: #{inspect(error)}")
145 track_failed_url(url, error, opts)
146
147 conn
148 |> error_or_redirect(url, 500, "Request failed", opts)
149 |> halt()
150 end
151 end
152
153 def call(conn, _, _) do
154 conn
155 |> send_resp(400, Plug.Conn.Status.reason_phrase(400))
156 |> halt()
157 end
158
159 defp request(method, url, headers, opts) do
160 Logger.debug("#{__MODULE__} #{method} #{url} #{inspect(headers)}")
161 method = method |> String.downcase() |> String.to_existing_atom()
162
163 case client().request(method, url, headers, "", opts) do
164 {:ok, code, headers, client} when code in @valid_resp_codes ->
165 {:ok, code, downcase_headers(headers), client}
166
167 {:ok, code, headers} when code in @valid_resp_codes ->
168 {:ok, code, downcase_headers(headers)}
169
170 {:ok, code, _, _} ->
171 {:error, {:invalid_http_response, code}}
172
173 {:ok, code, _} ->
174 {:error, {:invalid_http_response, code}}
175
176 {:error, error} ->
177 {:error, error}
178 end
179 end
180
181 defp response(conn, client, url, status, headers, opts) do
182 Logger.debug("#{__MODULE__} #{status} #{url} #{inspect(headers)}")
183
184 result =
185 conn
186 |> put_resp_headers(build_resp_headers(headers, opts))
187 |> send_chunked(status)
188 |> chunk_reply(client, opts)
189
190 case result do
191 {:ok, conn} ->
192 halt(conn)
193
194 {:error, :closed, conn} ->
195 client().close(client)
196 halt(conn)
197
198 {:error, error, conn} ->
199 Logger.warn(
200 "#{__MODULE__} request to #{url} failed while reading/chunking: #{inspect(error)}"
201 )
202
203 client().close(client)
204 halt(conn)
205 end
206 end
207
208 defp chunk_reply(conn, client, opts) do
209 chunk_reply(conn, client, opts, 0, 0)
210 end
211
212 defp chunk_reply(conn, client, opts, sent_so_far, duration) do
213 with {:ok, duration} <-
214 check_read_duration(
215 duration,
216 Keyword.get(opts, :max_read_duration, @max_read_duration)
217 ),
218 {:ok, data, client} <- client().stream_body(client),
219 {:ok, duration} <- increase_read_duration(duration),
220 sent_so_far = sent_so_far + byte_size(data),
221 :ok <-
222 body_size_constraint(
223 sent_so_far,
224 Keyword.get(opts, :max_body_length, @max_body_length)
225 ),
226 {:ok, conn} <- chunk(conn, data) do
227 chunk_reply(conn, client, opts, sent_so_far, duration)
228 else
229 :done -> {:ok, conn}
230 {:error, error} -> {:error, error, conn}
231 end
232 end
233
234 defp head_response(conn, url, code, headers, opts) do
235 Logger.debug("#{__MODULE__} #{code} #{url} #{inspect(headers)}")
236
237 conn
238 |> put_resp_headers(build_resp_headers(headers, opts))
239 |> send_resp(code, "")
240 end
241
242 defp error_or_redirect(conn, url, code, body, opts) do
243 if Keyword.get(opts, :redirect_on_failure, false) do
244 conn
245 |> Phoenix.Controller.redirect(external: url)
246 |> halt()
247 else
248 conn
249 |> send_resp(code, body)
250 |> halt
251 end
252 end
253
254 defp downcase_headers(headers) do
255 Enum.map(headers, fn {k, v} ->
256 {String.downcase(k), v}
257 end)
258 end
259
260 defp get_content_type(headers) do
261 {_, content_type} =
262 List.keyfind(headers, "content-type", 0, {"content-type", "application/octet-stream"})
263
264 [content_type | _] = String.split(content_type, ";")
265 content_type
266 end
267
268 defp put_resp_headers(conn, headers) do
269 Enum.reduce(headers, conn, fn {k, v}, conn ->
270 put_resp_header(conn, k, v)
271 end)
272 end
273
274 defp build_req_headers(headers, opts) do
275 headers
276 |> downcase_headers()
277 |> Enum.filter(fn {k, _} -> k in @keep_req_headers end)
278 |> build_req_range_or_encoding_header(opts)
279 |> build_req_user_agent_header(opts)
280 |> Keyword.merge(Keyword.get(opts, :req_headers, []))
281 end
282
283 # Disable content-encoding if any @range_headers are requested (see #1823).
284 defp build_req_range_or_encoding_header(headers, _opts) do
285 range? = Enum.any?(headers, fn {header, _} -> Enum.member?(@range_headers, header) end)
286
287 if range? && List.keymember?(headers, "accept-encoding", 0) do
288 List.keydelete(headers, "accept-encoding", 0)
289 else
290 headers
291 end
292 end
293
294 defp build_req_user_agent_header(headers, opts) do
295 if Keyword.get(opts, :keep_user_agent, false) do
296 List.keystore(
297 headers,
298 "user-agent",
299 0,
300 {"user-agent", Pleroma.Application.user_agent()}
301 )
302 else
303 headers
304 end
305 end
306
307 defp build_resp_headers(headers, opts) do
308 headers
309 |> Enum.filter(fn {k, _} -> k in @keep_resp_headers end)
310 |> build_resp_cache_headers(opts)
311 |> build_resp_content_disposition_header(opts)
312 |> Keyword.merge(Keyword.get(opts, :resp_headers, []))
313 end
314
315 defp build_resp_cache_headers(headers, _opts) do
316 has_cache? = Enum.any?(headers, fn {k, _} -> k in @resp_cache_headers end)
317
318 cond do
319 has_cache? ->
320 # There's caching header present but no cache-control -- we need to set our own
321 # as Plug defaults to "max-age=0, private, must-revalidate"
322 List.keystore(
323 headers,
324 "cache-control",
325 0,
326 {"cache-control", @default_cache_control_header}
327 )
328
329 true ->
330 List.keystore(
331 headers,
332 "cache-control",
333 0,
334 {"cache-control", @default_cache_control_header}
335 )
336 end
337 end
338
339 defp build_resp_content_disposition_header(headers, opts) do
340 opt = Keyword.get(opts, :inline_content_types, @inline_content_types)
341
342 content_type = get_content_type(headers)
343
344 attachment? =
345 cond do
346 is_list(opt) && !Enum.member?(opt, content_type) -> true
347 opt == false -> true
348 true -> false
349 end
350
351 if attachment? do
352 name =
353 try do
354 {{"content-disposition", content_disposition_string}, _} =
355 List.keytake(headers, "content-disposition", 0)
356
357 [name | _] =
358 Regex.run(
359 ~r/filename="((?:[^"\\]|\\.)*)"/u,
360 content_disposition_string || "",
361 capture: :all_but_first
362 )
363
364 name
365 rescue
366 MatchError -> Keyword.get(opts, :attachment_name, "attachment")
367 end
368
369 disposition = "attachment; filename=\"#{name}\""
370
371 List.keystore(headers, "content-disposition", 0, {"content-disposition", disposition})
372 else
373 headers
374 end
375 end
376
377 defp header_length_constraint(headers, limit) when is_integer(limit) and limit > 0 do
378 with {_, size} <- List.keyfind(headers, "content-length", 0),
379 {size, _} <- Integer.parse(size),
380 true <- size <= limit do
381 :ok
382 else
383 false ->
384 {:error, :body_too_large}
385
386 _ ->
387 :ok
388 end
389 end
390
391 defp header_length_constraint(_, _), do: :ok
392
393 defp body_size_constraint(size, limit) when is_integer(limit) and limit > 0 and size >= limit do
394 {:error, :body_too_large}
395 end
396
397 defp body_size_constraint(_, _), do: :ok
398
399 defp check_read_duration(nil = _duration, max), do: check_read_duration(@max_read_duration, max)
400
401 defp check_read_duration(duration, max)
402 when is_integer(duration) and is_integer(max) and max > 0 do
403 if duration > max do
404 {:error, :read_duration_exceeded}
405 else
406 {:ok, {duration, :erlang.system_time(:millisecond)}}
407 end
408 end
409
410 defp check_read_duration(_, _), do: {:ok, :no_duration_limit, :no_duration_limit}
411
412 defp increase_read_duration({previous_duration, started})
413 when is_integer(previous_duration) and is_integer(started) do
414 duration = :erlang.system_time(:millisecond) - started
415 {:ok, previous_duration + duration}
416 end
417
418 defp increase_read_duration(_) do
419 {:ok, :no_duration_limit, :no_duration_limit}
420 end
421
422 defp client, do: Pleroma.ReverseProxy.Client
423
424 defp track_failed_url(url, error, opts) do
425 ttl =
426 unless error in [:body_too_large, 400, 204] do
427 Keyword.get(opts, :failed_request_ttl, @failed_request_ttl)
428 else
429 nil
430 end
431
432 @cachex.put(:failed_proxy_url_cache, url, true, ttl: ttl)
433 end
434 end