Merge remote-tracking branch 'pleroma/develop' into features/poll-validation
[akkoma] / lib / pleroma / reverse_proxy / reverse_proxy.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.ReverseProxy do
6 @range_headers ~w(range if-range)
7 @keep_req_headers ~w(accept user-agent accept-encoding cache-control if-modified-since) ++
8 ~w(if-unmodified-since if-none-match) ++ @range_headers
9 @resp_cache_headers ~w(etag date last-modified)
10 @keep_resp_headers @resp_cache_headers ++
11 ~w(content-length content-type content-disposition content-encoding) ++
12 ~w(content-range accept-ranges vary)
13 @default_cache_control_header "public, max-age=1209600"
14 @valid_resp_codes [200, 206, 304]
15 @max_read_duration :timer.seconds(30)
16 @max_body_length :infinity
17 @failed_request_ttl :timer.seconds(60)
18 @methods ~w(GET HEAD)
19
20 @moduledoc """
21 A reverse proxy.
22
23 Pleroma.ReverseProxy.call(conn, url, options)
24
25 It is not meant to be added into a plug pipeline, but to be called from another plug or controller.
26
27 Supports `#{inspect(@methods)}` HTTP methods, and only allows `#{inspect(@valid_resp_codes)}` status codes.
28
29 Responses are chunked to the client while downloading from the upstream.
30
31 Some request / responses headers are preserved:
32
33 * request: `#{inspect(@keep_req_headers)}`
34 * response: `#{inspect(@keep_resp_headers)}`
35
36 Options:
37
38 * `redirect_on_failure` (default `false`). Redirects the client to the real remote URL if there's any HTTP
39 errors. Any error during body processing will not be redirected as the response is chunked. This may expose
40 remote URL, clients IPs, ….
41
42 * `max_body_length` (default `#{inspect(@max_body_length)}`): limits the content length to be approximately the
43 specified length. It is validated with the `content-length` header and also verified when proxying.
44
45 * `max_read_duration` (default `#{inspect(@max_read_duration)}` ms): the total time the connection is allowed to
46 read from the remote upstream.
47
48 * `failed_request_ttl` (default `#{inspect(@failed_request_ttl)}` ms): the time the failed request is cached and cannot be retried.
49
50 * `inline_content_types`:
51 * `true` will not alter `content-disposition` (up to the upstream),
52 * `false` will add `content-disposition: attachment` to any request,
53 * a list of whitelisted content types
54
55 * `keep_user_agent` will forward the client's user-agent to the upstream. This may be useful if the upstream is
56 doing content transformation (encoding, …) depending on the request.
57
58 * `req_headers`, `resp_headers` additional headers.
59
60 * `http`: options for [hackney](https://github.com/benoitc/hackney) or [gun](https://github.com/ninenines/gun).
61
62 """
63 @default_options [pool: :media]
64
65 @inline_content_types [
66 "image/gif",
67 "image/jpeg",
68 "image/jpg",
69 "image/png",
70 "image/svg+xml",
71 "audio/mpeg",
72 "audio/mp3",
73 "video/webm",
74 "video/mp4",
75 "video/quicktime"
76 ]
77
78 require Logger
79 import Plug.Conn
80
81 @type option() ::
82 {:keep_user_agent, boolean}
83 | {:max_read_duration, :timer.time() | :infinity}
84 | {:max_body_length, non_neg_integer() | :infinity}
85 | {:failed_request_ttl, :timer.time() | :infinity}
86 | {:http, []}
87 | {:req_headers, [{String.t(), String.t()}]}
88 | {:resp_headers, [{String.t(), String.t()}]}
89 | {:inline_content_types, boolean() | [String.t()]}
90 | {:redirect_on_failure, boolean()}
91
92 @spec call(Plug.Conn.t(), url :: String.t(), [option()]) :: Plug.Conn.t()
93 def call(_conn, _url, _opts \\ [])
94
95 def call(conn = %{method: method}, url, opts) when method in @methods do
96 client_opts = Keyword.merge(@default_options, Keyword.get(opts, :http, []))
97
98 req_headers = build_req_headers(conn.req_headers, opts)
99
100 opts =
101 if filename = Pleroma.Web.MediaProxy.filename(url) do
102 Keyword.put_new(opts, :attachment_name, filename)
103 else
104 opts
105 end
106
107 with {:ok, nil} <- Cachex.get(:failed_proxy_url_cache, url),
108 {:ok, code, headers, client} <- request(method, url, req_headers, client_opts),
109 :ok <-
110 header_length_constraint(
111 headers,
112 Keyword.get(opts, :max_body_length, @max_body_length)
113 ) do
114 response(conn, client, url, code, headers, opts)
115 else
116 {:ok, true} ->
117 conn
118 |> error_or_redirect(url, 500, "Request failed", opts)
119 |> halt()
120
121 {:ok, code, headers} ->
122 head_response(conn, url, code, headers, opts)
123 |> halt()
124
125 {:error, {:invalid_http_response, code}} ->
126 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed with HTTP status #{code}")
127 track_failed_url(url, code, opts)
128
129 conn
130 |> error_or_redirect(
131 url,
132 code,
133 "Request failed: " <> Plug.Conn.Status.reason_phrase(code),
134 opts
135 )
136 |> halt()
137
138 {:error, error} ->
139 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed: #{inspect(error)}")
140 track_failed_url(url, error, opts)
141
142 conn
143 |> error_or_redirect(url, 500, "Request failed", opts)
144 |> halt()
145 end
146 end
147
148 def call(conn, _, _) do
149 conn
150 |> send_resp(400, Plug.Conn.Status.reason_phrase(400))
151 |> halt()
152 end
153
154 defp request(method, url, headers, opts) do
155 Logger.debug("#{__MODULE__} #{method} #{url} #{inspect(headers)}")
156 method = method |> String.downcase() |> String.to_existing_atom()
157
158 case client().request(method, url, headers, "", opts) do
159 {:ok, code, headers, client} when code in @valid_resp_codes ->
160 {:ok, code, downcase_headers(headers), client}
161
162 {:ok, code, headers} when code in @valid_resp_codes ->
163 {:ok, code, downcase_headers(headers)}
164
165 {:ok, code, _, _} ->
166 {:error, {:invalid_http_response, code}}
167
168 {:ok, code, _} ->
169 {:error, {:invalid_http_response, code}}
170
171 {:error, error} ->
172 {:error, error}
173 end
174 end
175
176 defp response(conn, client, url, status, headers, opts) do
177 Logger.debug("#{__MODULE__} #{status} #{url} #{inspect(headers)}")
178
179 result =
180 conn
181 |> put_resp_headers(build_resp_headers(headers, opts))
182 |> send_chunked(status)
183 |> chunk_reply(client, opts)
184
185 case result do
186 {:ok, conn} ->
187 halt(conn)
188
189 {:error, :closed, conn} ->
190 client().close(client)
191 halt(conn)
192
193 {:error, error, conn} ->
194 Logger.warn(
195 "#{__MODULE__} request to #{url} failed while reading/chunking: #{inspect(error)}"
196 )
197
198 client().close(client)
199 halt(conn)
200 end
201 end
202
203 defp chunk_reply(conn, client, opts) do
204 chunk_reply(conn, client, opts, 0, 0)
205 end
206
207 defp chunk_reply(conn, client, opts, sent_so_far, duration) do
208 with {:ok, duration} <-
209 check_read_duration(
210 duration,
211 Keyword.get(opts, :max_read_duration, @max_read_duration)
212 ),
213 {:ok, data, client} <- client().stream_body(client),
214 {:ok, duration} <- increase_read_duration(duration),
215 sent_so_far = sent_so_far + byte_size(data),
216 :ok <-
217 body_size_constraint(
218 sent_so_far,
219 Keyword.get(opts, :max_body_length, @max_body_length)
220 ),
221 {:ok, conn} <- chunk(conn, data) do
222 chunk_reply(conn, client, opts, sent_so_far, duration)
223 else
224 :done -> {:ok, conn}
225 {:error, error} -> {:error, error, conn}
226 end
227 end
228
229 defp head_response(conn, url, code, headers, opts) do
230 Logger.debug("#{__MODULE__} #{code} #{url} #{inspect(headers)}")
231
232 conn
233 |> put_resp_headers(build_resp_headers(headers, opts))
234 |> send_resp(code, "")
235 end
236
237 defp error_or_redirect(conn, url, code, body, opts) do
238 if Keyword.get(opts, :redirect_on_failure, false) do
239 conn
240 |> Phoenix.Controller.redirect(external: url)
241 |> halt()
242 else
243 conn
244 |> send_resp(code, body)
245 |> halt
246 end
247 end
248
249 defp downcase_headers(headers) do
250 Enum.map(headers, fn {k, v} ->
251 {String.downcase(k), v}
252 end)
253 end
254
255 defp get_content_type(headers) do
256 {_, content_type} =
257 List.keyfind(headers, "content-type", 0, {"content-type", "application/octet-stream"})
258
259 [content_type | _] = String.split(content_type, ";")
260 content_type
261 end
262
263 defp put_resp_headers(conn, headers) do
264 Enum.reduce(headers, conn, fn {k, v}, conn ->
265 put_resp_header(conn, k, v)
266 end)
267 end
268
269 defp build_req_headers(headers, opts) do
270 headers
271 |> downcase_headers()
272 |> Enum.filter(fn {k, _} -> k in @keep_req_headers end)
273 |> build_req_range_or_encoding_header(opts)
274 |> build_req_user_agent_header(opts)
275 |> Keyword.merge(Keyword.get(opts, :req_headers, []))
276 end
277
278 # Disable content-encoding if any @range_headers are requested (see #1823).
279 defp build_req_range_or_encoding_header(headers, _opts) do
280 range? = Enum.any?(headers, fn {header, _} -> Enum.member?(@range_headers, header) end)
281
282 if range? && List.keymember?(headers, "accept-encoding", 0) do
283 List.keydelete(headers, "accept-encoding", 0)
284 else
285 headers
286 end
287 end
288
289 defp build_req_user_agent_header(headers, opts) do
290 if Keyword.get(opts, :keep_user_agent, false) do
291 List.keystore(
292 headers,
293 "user-agent",
294 0,
295 {"user-agent", Pleroma.Application.user_agent()}
296 )
297 else
298 headers
299 end
300 end
301
302 defp build_resp_headers(headers, opts) do
303 headers
304 |> Enum.filter(fn {k, _} -> k in @keep_resp_headers end)
305 |> build_resp_cache_headers(opts)
306 |> build_resp_content_disposition_header(opts)
307 |> Keyword.merge(Keyword.get(opts, :resp_headers, []))
308 end
309
310 defp build_resp_cache_headers(headers, _opts) do
311 has_cache? = Enum.any?(headers, fn {k, _} -> k in @resp_cache_headers end)
312
313 cond do
314 has_cache? ->
315 # There's caching header present but no cache-control -- we need to set our own
316 # as Plug defaults to "max-age=0, private, must-revalidate"
317 List.keystore(
318 headers,
319 "cache-control",
320 0,
321 {"cache-control", @default_cache_control_header}
322 )
323
324 true ->
325 List.keystore(
326 headers,
327 "cache-control",
328 0,
329 {"cache-control", @default_cache_control_header}
330 )
331 end
332 end
333
334 defp build_resp_content_disposition_header(headers, opts) do
335 opt = Keyword.get(opts, :inline_content_types, @inline_content_types)
336
337 content_type = get_content_type(headers)
338
339 attachment? =
340 cond do
341 is_list(opt) && !Enum.member?(opt, content_type) -> true
342 opt == false -> true
343 true -> false
344 end
345
346 if attachment? do
347 name =
348 try do
349 {{"content-disposition", content_disposition_string}, _} =
350 List.keytake(headers, "content-disposition", 0)
351
352 [name | _] =
353 Regex.run(
354 ~r/filename="((?:[^"\\]|\\.)*)"/u,
355 content_disposition_string || "",
356 capture: :all_but_first
357 )
358
359 name
360 rescue
361 MatchError -> Keyword.get(opts, :attachment_name, "attachment")
362 end
363
364 disposition = "attachment; filename=\"#{name}\""
365
366 List.keystore(headers, "content-disposition", 0, {"content-disposition", disposition})
367 else
368 headers
369 end
370 end
371
372 defp header_length_constraint(headers, limit) when is_integer(limit) and limit > 0 do
373 with {_, size} <- List.keyfind(headers, "content-length", 0),
374 {size, _} <- Integer.parse(size),
375 true <- size <= limit do
376 :ok
377 else
378 false ->
379 {:error, :body_too_large}
380
381 _ ->
382 :ok
383 end
384 end
385
386 defp header_length_constraint(_, _), do: :ok
387
388 defp body_size_constraint(size, limit) when is_integer(limit) and limit > 0 and size >= limit do
389 {:error, :body_too_large}
390 end
391
392 defp body_size_constraint(_, _), do: :ok
393
394 defp check_read_duration(duration, max)
395 when is_integer(duration) and is_integer(max) and max > 0 do
396 if duration > max do
397 {:error, :read_duration_exceeded}
398 else
399 {:ok, {duration, :erlang.system_time(:millisecond)}}
400 end
401 end
402
403 defp check_read_duration(_, _), do: {:ok, :no_duration_limit, :no_duration_limit}
404
405 defp increase_read_duration({previous_duration, started})
406 when is_integer(previous_duration) and is_integer(started) do
407 duration = :erlang.system_time(:millisecond) - started
408 {:ok, previous_duration + duration}
409 end
410
411 defp increase_read_duration(_) do
412 {:ok, :no_duration_limit, :no_duration_limit}
413 end
414
415 defp client, do: Pleroma.ReverseProxy.Client
416
417 defp track_failed_url(url, error, opts) do
418 ttl =
419 unless error in [:body_too_large, 400, 204] do
420 Keyword.get(opts, :failed_request_ttl, @failed_request_ttl)
421 else
422 nil
423 end
424
425 Cachex.put(:failed_proxy_url_cache, url, true, ttl: ttl)
426 end
427 end