Merge remote-tracking branch 'remotes/origin/develop' into 2168-media-preview-proxy
[akkoma] / lib / pleroma / reverse_proxy / reverse_proxy.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.ReverseProxy do
6 @range_headers ~w(range if-range)
7 @keep_req_headers ~w(accept user-agent accept-encoding cache-control if-modified-since) ++
8 ~w(if-unmodified-since if-none-match) ++ @range_headers
9 @resp_cache_headers ~w(etag date last-modified)
10 @keep_resp_headers @resp_cache_headers ++
11 ~w(content-length content-type content-disposition content-encoding) ++
12 ~w(content-range accept-ranges vary)
13 @default_cache_control_header "public, max-age=1209600"
14 @valid_resp_codes [200, 206, 304]
15 @max_read_duration :timer.seconds(30)
16 @max_body_length :infinity
17 @failed_request_ttl :timer.seconds(60)
18 @methods ~w(GET HEAD)
19
20 def max_read_duration_default, do: @max_read_duration
21
22 @moduledoc """
23 A reverse proxy.
24
25 Pleroma.ReverseProxy.call(conn, url, options)
26
27 It is not meant to be added into a plug pipeline, but to be called from another plug or controller.
28
29 Supports `#{inspect(@methods)}` HTTP methods, and only allows `#{inspect(@valid_resp_codes)}` status codes.
30
31 Responses are chunked to the client while downloading from the upstream.
32
33 Some request / responses headers are preserved:
34
35 * request: `#{inspect(@keep_req_headers)}`
36 * response: `#{inspect(@keep_resp_headers)}`
37
38 Options:
39
40 * `redirect_on_failure` (default `false`). Redirects the client to the real remote URL if there's any HTTP
41 errors. Any error during body processing will not be redirected as the response is chunked. This may expose
42 remote URL, clients IPs, ….
43
44 * `max_body_length` (default `#{inspect(@max_body_length)}`): limits the content length to be approximately the
45 specified length. It is validated with the `content-length` header and also verified when proxying.
46
47 * `max_read_duration` (default `#{inspect(@max_read_duration)}` ms): the total time the connection is allowed to
48 read from the remote upstream.
49
50 * `failed_request_ttl` (default `#{inspect(@failed_request_ttl)}` ms): the time the failed request is cached and cannot be retried.
51
52 * `inline_content_types`:
53 * `true` will not alter `content-disposition` (up to the upstream),
54 * `false` will add `content-disposition: attachment` to any request,
55 * a list of whitelisted content types
56
57 * `keep_user_agent` will forward the client's user-agent to the upstream. This may be useful if the upstream is
58 doing content transformation (encoding, …) depending on the request.
59
60 * `req_headers`, `resp_headers` additional headers.
61
62 * `http`: options for [hackney](https://github.com/benoitc/hackney) or [gun](https://github.com/ninenines/gun).
63
64 """
65 @default_options [pool: :media]
66
67 @inline_content_types [
68 "image/gif",
69 "image/jpeg",
70 "image/jpg",
71 "image/png",
72 "image/svg+xml",
73 "audio/mpeg",
74 "audio/mp3",
75 "video/webm",
76 "video/mp4",
77 "video/quicktime"
78 ]
79
80 require Logger
81 import Plug.Conn
82
83 @type option() ::
84 {:keep_user_agent, boolean}
85 | {:max_read_duration, :timer.time() | :infinity}
86 | {:max_body_length, non_neg_integer() | :infinity}
87 | {:failed_request_ttl, :timer.time() | :infinity}
88 | {:http, []}
89 | {:req_headers, [{String.t(), String.t()}]}
90 | {:resp_headers, [{String.t(), String.t()}]}
91 | {:inline_content_types, boolean() | [String.t()]}
92 | {:redirect_on_failure, boolean()}
93
94 @spec call(Plug.Conn.t(), url :: String.t(), [option()]) :: Plug.Conn.t()
95 def call(_conn, _url, _opts \\ [])
96
97 def call(conn = %{method: method}, url, opts) when method in @methods do
98 client_opts = Keyword.merge(@default_options, Keyword.get(opts, :http, []))
99
100 req_headers = build_req_headers(conn.req_headers, opts)
101
102 opts =
103 if filename = Pleroma.Web.MediaProxy.filename(url) do
104 Keyword.put_new(opts, :attachment_name, filename)
105 else
106 opts
107 end
108
109 with {:ok, nil} <- Cachex.get(:failed_proxy_url_cache, url),
110 {:ok, code, headers, client} <- request(method, url, req_headers, client_opts),
111 :ok <-
112 header_length_constraint(
113 headers,
114 Keyword.get(opts, :max_body_length, @max_body_length)
115 ) do
116 response(conn, client, url, code, headers, opts)
117 else
118 {:ok, true} ->
119 conn
120 |> error_or_redirect(url, 500, "Request failed", opts)
121 |> halt()
122
123 {:ok, code, headers} ->
124 head_response(conn, url, code, headers, opts)
125 |> halt()
126
127 {:error, {:invalid_http_response, code}} ->
128 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed with HTTP status #{code}")
129 track_failed_url(url, code, opts)
130
131 conn
132 |> error_or_redirect(
133 url,
134 code,
135 "Request failed: " <> Plug.Conn.Status.reason_phrase(code),
136 opts
137 )
138 |> halt()
139
140 {:error, error} ->
141 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed: #{inspect(error)}")
142 track_failed_url(url, error, opts)
143
144 conn
145 |> error_or_redirect(url, 500, "Request failed", opts)
146 |> halt()
147 end
148 end
149
150 def call(conn, _, _) do
151 conn
152 |> send_resp(400, Plug.Conn.Status.reason_phrase(400))
153 |> halt()
154 end
155
156 defp request(method, url, headers, opts) do
157 Logger.debug("#{__MODULE__} #{method} #{url} #{inspect(headers)}")
158 method = method |> String.downcase() |> String.to_existing_atom()
159
160 case client().request(method, url, headers, "", opts) do
161 {:ok, code, headers, client} when code in @valid_resp_codes ->
162 {:ok, code, downcase_headers(headers), client}
163
164 {:ok, code, headers} when code in @valid_resp_codes ->
165 {:ok, code, downcase_headers(headers)}
166
167 {:ok, code, _, _} ->
168 {:error, {:invalid_http_response, code}}
169
170 {:ok, code, _} ->
171 {:error, {:invalid_http_response, code}}
172
173 {:error, error} ->
174 {:error, error}
175 end
176 end
177
178 defp response(conn, client, url, status, headers, opts) do
179 Logger.debug("#{__MODULE__} #{status} #{url} #{inspect(headers)}")
180
181 result =
182 conn
183 |> put_resp_headers(build_resp_headers(headers, opts))
184 |> send_chunked(status)
185 |> chunk_reply(client, opts)
186
187 case result do
188 {:ok, conn} ->
189 halt(conn)
190
191 {:error, :closed, conn} ->
192 client().close(client)
193 halt(conn)
194
195 {:error, error, conn} ->
196 Logger.warn(
197 "#{__MODULE__} request to #{url} failed while reading/chunking: #{inspect(error)}"
198 )
199
200 client().close(client)
201 halt(conn)
202 end
203 end
204
205 defp chunk_reply(conn, client, opts) do
206 chunk_reply(conn, client, opts, 0, 0)
207 end
208
209 defp chunk_reply(conn, client, opts, sent_so_far, duration) do
210 with {:ok, duration} <-
211 check_read_duration(
212 duration,
213 Keyword.get(opts, :max_read_duration, @max_read_duration)
214 ),
215 {:ok, data, client} <- client().stream_body(client),
216 {:ok, duration} <- increase_read_duration(duration),
217 sent_so_far = sent_so_far + byte_size(data),
218 :ok <-
219 body_size_constraint(
220 sent_so_far,
221 Keyword.get(opts, :max_body_length, @max_body_length)
222 ),
223 {:ok, conn} <- chunk(conn, data) do
224 chunk_reply(conn, client, opts, sent_so_far, duration)
225 else
226 :done -> {:ok, conn}
227 {:error, error} -> {:error, error, conn}
228 end
229 end
230
231 defp head_response(conn, url, code, headers, opts) do
232 Logger.debug("#{__MODULE__} #{code} #{url} #{inspect(headers)}")
233
234 conn
235 |> put_resp_headers(build_resp_headers(headers, opts))
236 |> send_resp(code, "")
237 end
238
239 defp error_or_redirect(conn, url, code, body, opts) do
240 if Keyword.get(opts, :redirect_on_failure, false) do
241 conn
242 |> Phoenix.Controller.redirect(external: url)
243 |> halt()
244 else
245 conn
246 |> send_resp(code, body)
247 |> halt
248 end
249 end
250
251 defp downcase_headers(headers) do
252 Enum.map(headers, fn {k, v} ->
253 {String.downcase(k), v}
254 end)
255 end
256
257 defp get_content_type(headers) do
258 {_, content_type} =
259 List.keyfind(headers, "content-type", 0, {"content-type", "application/octet-stream"})
260
261 [content_type | _] = String.split(content_type, ";")
262 content_type
263 end
264
265 defp put_resp_headers(conn, headers) do
266 Enum.reduce(headers, conn, fn {k, v}, conn ->
267 put_resp_header(conn, k, v)
268 end)
269 end
270
271 defp build_req_headers(headers, opts) do
272 headers
273 |> downcase_headers()
274 |> Enum.filter(fn {k, _} -> k in @keep_req_headers end)
275 |> build_req_range_or_encoding_header(opts)
276 |> build_req_user_agent_header(opts)
277 |> Keyword.merge(Keyword.get(opts, :req_headers, []))
278 end
279
280 # Disable content-encoding if any @range_headers are requested (see #1823).
281 defp build_req_range_or_encoding_header(headers, _opts) do
282 range? = Enum.any?(headers, fn {header, _} -> Enum.member?(@range_headers, header) end)
283
284 if range? && List.keymember?(headers, "accept-encoding", 0) do
285 List.keydelete(headers, "accept-encoding", 0)
286 else
287 headers
288 end
289 end
290
291 defp build_req_user_agent_header(headers, opts) do
292 if Keyword.get(opts, :keep_user_agent, false) do
293 List.keystore(
294 headers,
295 "user-agent",
296 0,
297 {"user-agent", Pleroma.Application.user_agent()}
298 )
299 else
300 headers
301 end
302 end
303
304 defp build_resp_headers(headers, opts) do
305 headers
306 |> Enum.filter(fn {k, _} -> k in @keep_resp_headers end)
307 |> build_resp_cache_headers(opts)
308 |> build_resp_content_disposition_header(opts)
309 |> Keyword.merge(Keyword.get(opts, :resp_headers, []))
310 end
311
312 defp build_resp_cache_headers(headers, _opts) do
313 has_cache? = Enum.any?(headers, fn {k, _} -> k in @resp_cache_headers end)
314
315 cond do
316 has_cache? ->
317 # There's caching header present but no cache-control -- we need to set our own
318 # as Plug defaults to "max-age=0, private, must-revalidate"
319 List.keystore(
320 headers,
321 "cache-control",
322 0,
323 {"cache-control", @default_cache_control_header}
324 )
325
326 true ->
327 List.keystore(
328 headers,
329 "cache-control",
330 0,
331 {"cache-control", @default_cache_control_header}
332 )
333 end
334 end
335
336 defp build_resp_content_disposition_header(headers, opts) do
337 opt = Keyword.get(opts, :inline_content_types, @inline_content_types)
338
339 content_type = get_content_type(headers)
340
341 attachment? =
342 cond do
343 is_list(opt) && !Enum.member?(opt, content_type) -> true
344 opt == false -> true
345 true -> false
346 end
347
348 if attachment? do
349 name =
350 try do
351 {{"content-disposition", content_disposition_string}, _} =
352 List.keytake(headers, "content-disposition", 0)
353
354 [name | _] =
355 Regex.run(
356 ~r/filename="((?:[^"\\]|\\.)*)"/u,
357 content_disposition_string || "",
358 capture: :all_but_first
359 )
360
361 name
362 rescue
363 MatchError -> Keyword.get(opts, :attachment_name, "attachment")
364 end
365
366 disposition = "attachment; filename=\"#{name}\""
367
368 List.keystore(headers, "content-disposition", 0, {"content-disposition", disposition})
369 else
370 headers
371 end
372 end
373
374 defp header_length_constraint(headers, limit) when is_integer(limit) and limit > 0 do
375 with {_, size} <- List.keyfind(headers, "content-length", 0),
376 {size, _} <- Integer.parse(size),
377 true <- size <= limit do
378 :ok
379 else
380 false ->
381 {:error, :body_too_large}
382
383 _ ->
384 :ok
385 end
386 end
387
388 defp header_length_constraint(_, _), do: :ok
389
390 defp body_size_constraint(size, limit) when is_integer(limit) and limit > 0 and size >= limit do
391 {:error, :body_too_large}
392 end
393
394 defp body_size_constraint(_, _), do: :ok
395
396 defp check_read_duration(nil = _duration, max), do: check_read_duration(@max_read_duration, max)
397
398 defp check_read_duration(duration, max)
399 when is_integer(duration) and is_integer(max) and max > 0 do
400 if duration > max do
401 {:error, :read_duration_exceeded}
402 else
403 {:ok, {duration, :erlang.system_time(:millisecond)}}
404 end
405 end
406
407 defp check_read_duration(_, _), do: {:ok, :no_duration_limit, :no_duration_limit}
408
409 defp increase_read_duration({previous_duration, started})
410 when is_integer(previous_duration) and is_integer(started) do
411 duration = :erlang.system_time(:millisecond) - started
412 {:ok, previous_duration + duration}
413 end
414
415 defp increase_read_duration(_) do
416 {:ok, :no_duration_limit, :no_duration_limit}
417 end
418
419 defp client, do: Pleroma.ReverseProxy.Client
420
421 defp track_failed_url(url, error, opts) do
422 ttl =
423 unless error in [:body_too_large, 400, 204] do
424 Keyword.get(opts, :failed_request_ttl, @failed_request_ttl)
425 else
426 nil
427 end
428
429 Cachex.put(:failed_proxy_url_cache, url, true, ttl: ttl)
430 end
431 end