Merge remote-tracking branch 'upstream/develop' into registration-workflow
[akkoma] / lib / pleroma / reverse_proxy.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.ReverseProxy do
6 @range_headers ~w(range if-range)
7 @keep_req_headers ~w(accept user-agent accept-encoding cache-control if-modified-since) ++
8 ~w(if-unmodified-since if-none-match) ++ @range_headers
9 @resp_cache_headers ~w(etag date last-modified)
10 @keep_resp_headers @resp_cache_headers ++
11 ~w(content-length content-type content-disposition content-encoding) ++
12 ~w(content-range accept-ranges vary)
13 @default_cache_control_header "public, max-age=1209600"
14 @valid_resp_codes [200, 206, 304]
15 @max_read_duration :timer.seconds(30)
16 @max_body_length :infinity
17 @failed_request_ttl :timer.seconds(60)
18 @methods ~w(GET HEAD)
19
20 def max_read_duration_default, do: @max_read_duration
21 def default_cache_control_header, do: @default_cache_control_header
22
23 @moduledoc """
24 A reverse proxy.
25
26 Pleroma.ReverseProxy.call(conn, url, options)
27
28 It is not meant to be added into a plug pipeline, but to be called from another plug or controller.
29
30 Supports `#{inspect(@methods)}` HTTP methods, and only allows `#{inspect(@valid_resp_codes)}` status codes.
31
32 Responses are chunked to the client while downloading from the upstream.
33
34 Some request / responses headers are preserved:
35
36 * request: `#{inspect(@keep_req_headers)}`
37 * response: `#{inspect(@keep_resp_headers)}`
38
39 Options:
40
41 * `redirect_on_failure` (default `false`). Redirects the client to the real remote URL if there's any HTTP
42 errors. Any error during body processing will not be redirected as the response is chunked. This may expose
43 remote URL, clients IPs, ….
44
45 * `max_body_length` (default `#{inspect(@max_body_length)}`): limits the content length to be approximately the
46 specified length. It is validated with the `content-length` header and also verified when proxying.
47
48 * `max_read_duration` (default `#{inspect(@max_read_duration)}` ms): the total time the connection is allowed to
49 read from the remote upstream.
50
51 * `failed_request_ttl` (default `#{inspect(@failed_request_ttl)}` ms): the time the failed request is cached and cannot be retried.
52
53 * `inline_content_types`:
54 * `true` will not alter `content-disposition` (up to the upstream),
55 * `false` will add `content-disposition: attachment` to any request,
56 * a list of whitelisted content types
57
58 * `keep_user_agent` will forward the client's user-agent to the upstream. This may be useful if the upstream is
59 doing content transformation (encoding, …) depending on the request.
60
61 * `req_headers`, `resp_headers` additional headers.
62
63 * `http`: options for [hackney](https://github.com/benoitc/hackney) or [gun](https://github.com/ninenines/gun).
64
65 """
66 @default_options [pool: :media]
67
68 @inline_content_types [
69 "image/gif",
70 "image/jpeg",
71 "image/jpg",
72 "image/png",
73 "image/svg+xml",
74 "audio/mpeg",
75 "audio/mp3",
76 "video/webm",
77 "video/mp4",
78 "video/quicktime"
79 ]
80
81 require Logger
82 import Plug.Conn
83
84 @type option() ::
85 {:keep_user_agent, boolean}
86 | {:max_read_duration, :timer.time() | :infinity}
87 | {:max_body_length, non_neg_integer() | :infinity}
88 | {:failed_request_ttl, :timer.time() | :infinity}
89 | {:http, []}
90 | {:req_headers, [{String.t(), String.t()}]}
91 | {:resp_headers, [{String.t(), String.t()}]}
92 | {:inline_content_types, boolean() | [String.t()]}
93 | {:redirect_on_failure, boolean()}
94
95 @spec call(Plug.Conn.t(), url :: String.t(), [option()]) :: Plug.Conn.t()
96 def call(_conn, _url, _opts \\ [])
97
98 def call(conn = %{method: method}, url, opts) when method in @methods do
99 client_opts = Keyword.merge(@default_options, Keyword.get(opts, :http, []))
100
101 req_headers = build_req_headers(conn.req_headers, opts)
102
103 opts =
104 if filename = Pleroma.Web.MediaProxy.filename(url) do
105 Keyword.put_new(opts, :attachment_name, filename)
106 else
107 opts
108 end
109
110 with {:ok, nil} <- Cachex.get(:failed_proxy_url_cache, url),
111 {:ok, code, headers, client} <- request(method, url, req_headers, client_opts),
112 :ok <-
113 header_length_constraint(
114 headers,
115 Keyword.get(opts, :max_body_length, @max_body_length)
116 ) do
117 response(conn, client, url, code, headers, opts)
118 else
119 {:ok, true} ->
120 conn
121 |> error_or_redirect(url, 500, "Request failed", opts)
122 |> halt()
123
124 {:ok, code, headers} ->
125 head_response(conn, url, code, headers, opts)
126 |> halt()
127
128 {:error, {:invalid_http_response, code}} ->
129 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed with HTTP status #{code}")
130 track_failed_url(url, code, opts)
131
132 conn
133 |> error_or_redirect(
134 url,
135 code,
136 "Request failed: " <> Plug.Conn.Status.reason_phrase(code),
137 opts
138 )
139 |> halt()
140
141 {:error, error} ->
142 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed: #{inspect(error)}")
143 track_failed_url(url, error, opts)
144
145 conn
146 |> error_or_redirect(url, 500, "Request failed", opts)
147 |> halt()
148 end
149 end
150
151 def call(conn, _, _) do
152 conn
153 |> send_resp(400, Plug.Conn.Status.reason_phrase(400))
154 |> halt()
155 end
156
157 defp request(method, url, headers, opts) do
158 Logger.debug("#{__MODULE__} #{method} #{url} #{inspect(headers)}")
159 method = method |> String.downcase() |> String.to_existing_atom()
160
161 case client().request(method, url, headers, "", opts) do
162 {:ok, code, headers, client} when code in @valid_resp_codes ->
163 {:ok, code, downcase_headers(headers), client}
164
165 {:ok, code, headers} when code in @valid_resp_codes ->
166 {:ok, code, downcase_headers(headers)}
167
168 {:ok, code, _, _} ->
169 {:error, {:invalid_http_response, code}}
170
171 {:ok, code, _} ->
172 {:error, {:invalid_http_response, code}}
173
174 {:error, error} ->
175 {:error, error}
176 end
177 end
178
179 defp response(conn, client, url, status, headers, opts) do
180 Logger.debug("#{__MODULE__} #{status} #{url} #{inspect(headers)}")
181
182 result =
183 conn
184 |> put_resp_headers(build_resp_headers(headers, opts))
185 |> send_chunked(status)
186 |> chunk_reply(client, opts)
187
188 case result do
189 {:ok, conn} ->
190 halt(conn)
191
192 {:error, :closed, conn} ->
193 client().close(client)
194 halt(conn)
195
196 {:error, error, conn} ->
197 Logger.warn(
198 "#{__MODULE__} request to #{url} failed while reading/chunking: #{inspect(error)}"
199 )
200
201 client().close(client)
202 halt(conn)
203 end
204 end
205
206 defp chunk_reply(conn, client, opts) do
207 chunk_reply(conn, client, opts, 0, 0)
208 end
209
210 defp chunk_reply(conn, client, opts, sent_so_far, duration) do
211 with {:ok, duration} <-
212 check_read_duration(
213 duration,
214 Keyword.get(opts, :max_read_duration, @max_read_duration)
215 ),
216 {:ok, data, client} <- client().stream_body(client),
217 {:ok, duration} <- increase_read_duration(duration),
218 sent_so_far = sent_so_far + byte_size(data),
219 :ok <-
220 body_size_constraint(
221 sent_so_far,
222 Keyword.get(opts, :max_body_length, @max_body_length)
223 ),
224 {:ok, conn} <- chunk(conn, data) do
225 chunk_reply(conn, client, opts, sent_so_far, duration)
226 else
227 :done -> {:ok, conn}
228 {:error, error} -> {:error, error, conn}
229 end
230 end
231
232 defp head_response(conn, url, code, headers, opts) do
233 Logger.debug("#{__MODULE__} #{code} #{url} #{inspect(headers)}")
234
235 conn
236 |> put_resp_headers(build_resp_headers(headers, opts))
237 |> send_resp(code, "")
238 end
239
240 defp error_or_redirect(conn, url, code, body, opts) do
241 if Keyword.get(opts, :redirect_on_failure, false) do
242 conn
243 |> Phoenix.Controller.redirect(external: url)
244 |> halt()
245 else
246 conn
247 |> send_resp(code, body)
248 |> halt
249 end
250 end
251
252 defp downcase_headers(headers) do
253 Enum.map(headers, fn {k, v} ->
254 {String.downcase(k), v}
255 end)
256 end
257
258 defp get_content_type(headers) do
259 {_, content_type} =
260 List.keyfind(headers, "content-type", 0, {"content-type", "application/octet-stream"})
261
262 [content_type | _] = String.split(content_type, ";")
263 content_type
264 end
265
266 defp put_resp_headers(conn, headers) do
267 Enum.reduce(headers, conn, fn {k, v}, conn ->
268 put_resp_header(conn, k, v)
269 end)
270 end
271
272 defp build_req_headers(headers, opts) do
273 headers
274 |> downcase_headers()
275 |> Enum.filter(fn {k, _} -> k in @keep_req_headers end)
276 |> build_req_range_or_encoding_header(opts)
277 |> build_req_user_agent_header(opts)
278 |> Keyword.merge(Keyword.get(opts, :req_headers, []))
279 end
280
281 # Disable content-encoding if any @range_headers are requested (see #1823).
282 defp build_req_range_or_encoding_header(headers, _opts) do
283 range? = Enum.any?(headers, fn {header, _} -> Enum.member?(@range_headers, header) end)
284
285 if range? && List.keymember?(headers, "accept-encoding", 0) do
286 List.keydelete(headers, "accept-encoding", 0)
287 else
288 headers
289 end
290 end
291
292 defp build_req_user_agent_header(headers, opts) do
293 if Keyword.get(opts, :keep_user_agent, false) do
294 List.keystore(
295 headers,
296 "user-agent",
297 0,
298 {"user-agent", Pleroma.Application.user_agent()}
299 )
300 else
301 headers
302 end
303 end
304
305 defp build_resp_headers(headers, opts) do
306 headers
307 |> Enum.filter(fn {k, _} -> k in @keep_resp_headers end)
308 |> build_resp_cache_headers(opts)
309 |> build_resp_content_disposition_header(opts)
310 |> Keyword.merge(Keyword.get(opts, :resp_headers, []))
311 end
312
313 defp build_resp_cache_headers(headers, _opts) do
314 has_cache? = Enum.any?(headers, fn {k, _} -> k in @resp_cache_headers end)
315
316 cond do
317 has_cache? ->
318 # There's caching header present but no cache-control -- we need to set our own
319 # as Plug defaults to "max-age=0, private, must-revalidate"
320 List.keystore(
321 headers,
322 "cache-control",
323 0,
324 {"cache-control", @default_cache_control_header}
325 )
326
327 true ->
328 List.keystore(
329 headers,
330 "cache-control",
331 0,
332 {"cache-control", @default_cache_control_header}
333 )
334 end
335 end
336
337 defp build_resp_content_disposition_header(headers, opts) do
338 opt = Keyword.get(opts, :inline_content_types, @inline_content_types)
339
340 content_type = get_content_type(headers)
341
342 attachment? =
343 cond do
344 is_list(opt) && !Enum.member?(opt, content_type) -> true
345 opt == false -> true
346 true -> false
347 end
348
349 if attachment? do
350 name =
351 try do
352 {{"content-disposition", content_disposition_string}, _} =
353 List.keytake(headers, "content-disposition", 0)
354
355 [name | _] =
356 Regex.run(
357 ~r/filename="((?:[^"\\]|\\.)*)"/u,
358 content_disposition_string || "",
359 capture: :all_but_first
360 )
361
362 name
363 rescue
364 MatchError -> Keyword.get(opts, :attachment_name, "attachment")
365 end
366
367 disposition = "attachment; filename=\"#{name}\""
368
369 List.keystore(headers, "content-disposition", 0, {"content-disposition", disposition})
370 else
371 headers
372 end
373 end
374
375 defp header_length_constraint(headers, limit) when is_integer(limit) and limit > 0 do
376 with {_, size} <- List.keyfind(headers, "content-length", 0),
377 {size, _} <- Integer.parse(size),
378 true <- size <= limit do
379 :ok
380 else
381 false ->
382 {:error, :body_too_large}
383
384 _ ->
385 :ok
386 end
387 end
388
389 defp header_length_constraint(_, _), do: :ok
390
391 defp body_size_constraint(size, limit) when is_integer(limit) and limit > 0 and size >= limit do
392 {:error, :body_too_large}
393 end
394
395 defp body_size_constraint(_, _), do: :ok
396
397 defp check_read_duration(nil = _duration, max), do: check_read_duration(@max_read_duration, max)
398
399 defp check_read_duration(duration, max)
400 when is_integer(duration) and is_integer(max) and max > 0 do
401 if duration > max do
402 {:error, :read_duration_exceeded}
403 else
404 {:ok, {duration, :erlang.system_time(:millisecond)}}
405 end
406 end
407
408 defp check_read_duration(_, _), do: {:ok, :no_duration_limit, :no_duration_limit}
409
410 defp increase_read_duration({previous_duration, started})
411 when is_integer(previous_duration) and is_integer(started) do
412 duration = :erlang.system_time(:millisecond) - started
413 {:ok, previous_duration + duration}
414 end
415
416 defp increase_read_duration(_) do
417 {:ok, :no_duration_limit, :no_duration_limit}
418 end
419
420 defp client, do: Pleroma.ReverseProxy.Client
421
422 defp track_failed_url(url, error, opts) do
423 ttl =
424 unless error in [:body_too_large, 400, 204] do
425 Keyword.get(opts, :failed_request_ttl, @failed_request_ttl)
426 else
427 nil
428 end
429
430 Cachex.put(:failed_proxy_url_cache, url, true, ttl: ttl)
431 end
432 end