Merge remote-tracking branch 'remotes/origin/develop' into 2168-media-preview-proxy
[akkoma] / lib / pleroma / reverse_proxy / reverse_proxy.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.ReverseProxy do
6 @range_headers ~w(range if-range)
7 @keep_req_headers ~w(accept user-agent accept-encoding cache-control if-modified-since) ++
8 ~w(if-unmodified-since if-none-match) ++ @range_headers
9 @resp_cache_headers ~w(etag date last-modified)
10 @keep_resp_headers @resp_cache_headers ++
11 ~w(content-length content-type content-disposition content-encoding) ++
12 ~w(content-range accept-ranges vary)
13 @default_cache_control_header "public, max-age=1209600"
14 @valid_resp_codes [200, 206, 304]
15 @max_read_duration :timer.seconds(30)
16 @max_body_length :infinity
17 @failed_request_ttl :timer.seconds(60)
18 @methods ~w(GET HEAD)
19
20 def max_read_duration_default, do: @max_read_duration
21
22 @moduledoc """
23 A reverse proxy.
24
25 Pleroma.ReverseProxy.call(conn, url, options)
26
27 It is not meant to be added into a plug pipeline, but to be called from another plug or controller.
28
29 Supports `#{inspect(@methods)}` HTTP methods, and only allows `#{inspect(@valid_resp_codes)}` status codes.
30
31 Responses are chunked to the client while downloading from the upstream.
32
33 Some request / responses headers are preserved:
34
35 * request: `#{inspect(@keep_req_headers)}`
36 * response: `#{inspect(@keep_resp_headers)}`
37
38 Options:
39
40 * `redirect_on_failure` (default `false`). Redirects the client to the real remote URL if there's any HTTP
41 errors. Any error during body processing will not be redirected as the response is chunked. This may expose
42 remote URL, clients IPs, ….
43
44 * `max_body_length` (default `#{inspect(@max_body_length)}`): limits the content length to be approximately the
45 specified length. It is validated with the `content-length` header and also verified when proxying.
46
47 * `max_read_duration` (default `#{inspect(@max_read_duration)}` ms): the total time the connection is allowed to
48 read from the remote upstream.
49
50 * `failed_request_ttl` (default `#{inspect(@failed_request_ttl)}` ms): the time the failed request is cached and cannot be retried.
51
52 * `inline_content_types`:
53 * `true` will not alter `content-disposition` (up to the upstream),
54 * `false` will add `content-disposition: attachment` to any request,
55 * a list of whitelisted content types
56
57 * `keep_user_agent` will forward the client's user-agent to the upstream. This may be useful if the upstream is
58 doing content transformation (encoding, …) depending on the request.
59
60 * `req_headers`, `resp_headers` additional headers.
61
62 * `http`: options for [hackney](https://github.com/benoitc/hackney) or [gun](https://github.com/ninenines/gun).
63
64 """
65 @default_options [pool: :media]
66
67 @inline_content_types [
68 "image/gif",
69 "image/jpeg",
70 "image/jpg",
71 "image/png",
72 "image/svg+xml",
73 "audio/mpeg",
74 "audio/mp3",
75 "video/webm",
76 "video/mp4",
77 "video/quicktime"
78 ]
79
80 require Logger
81 import Plug.Conn
82
83 @type option() ::
84 {:keep_user_agent, boolean}
85 | {:max_read_duration, :timer.time() | :infinity}
86 | {:max_body_length, non_neg_integer() | :infinity}
87 | {:failed_request_ttl, :timer.time() | :infinity}
88 | {:http, []}
89 | {:req_headers, [{String.t(), String.t()}]}
90 | {:resp_headers, [{String.t(), String.t()}]}
91 | {:inline_content_types, boolean() | [String.t()]}
92 | {:redirect_on_failure, boolean()}
93
94 @spec call(Plug.Conn.t(), url :: String.t(), [option()]) :: Plug.Conn.t()
95 def call(_conn, _url, _opts \\ [])
96
97 def call(conn = %{method: method}, url, opts) when method in @methods do
98 client_opts = Keyword.merge(@default_options, Keyword.get(opts, :http, []))
99
100 req_headers = build_req_headers(conn.req_headers, opts)
101
102 opts =
103 if filename = Pleroma.Web.MediaProxy.filename(url) do
104 Keyword.put_new(opts, :attachment_name, filename)
105 else
106 opts
107 end
108
109 with {:ok, nil} <- Cachex.get(:failed_proxy_url_cache, url),
110 {:ok, code, headers, client} <- request(method, url, req_headers, client_opts),
111 :ok <-
112 header_length_constraint(
113 headers,
114 Keyword.get(opts, :max_body_length, @max_body_length)
115 ) do
116 response(conn, client, url, code, headers, opts)
117 else
118 {:ok, true} ->
119 conn
120 |> error_or_redirect(url, 500, "Request failed", opts)
121 |> halt()
122
123 {:ok, code, headers} ->
124 head_response(conn, url, code, headers, opts)
125 |> halt()
126
127 {:error, {:invalid_http_response, code}} ->
128 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed with HTTP status #{code}")
129 track_failed_url(url, code, opts)
130
131 conn
132 |> error_or_redirect(
133 url,
134 code,
135 "Request failed: " <> Plug.Conn.Status.reason_phrase(code),
136 opts
137 )
138 |> halt()
139
140 {:error, error} ->
141 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed: #{inspect(error)}")
142 track_failed_url(url, error, opts)
143
144 conn
145 |> error_or_redirect(url, 500, "Request failed", opts)
146 |> halt()
147 end
148 end
149
150 def call(conn, _, _) do
151 conn
152 |> send_resp(400, Plug.Conn.Status.reason_phrase(400))
153 |> halt()
154 end
155
156 defp request(method, url, headers, opts) do
157 Logger.debug("#{__MODULE__} #{method} #{url} #{inspect(headers)}")
158 method = method |> String.downcase() |> String.to_existing_atom()
159
160 case client().request(method, url, headers, "", opts) do
161 {:ok, code, headers, client} when code in @valid_resp_codes ->
162 {:ok, code, downcase_headers(headers), client}
163
164 {:ok, code, headers} when code in @valid_resp_codes ->
165 {:ok, code, downcase_headers(headers)}
166
167 {:ok, code, _, _} ->
168 {:error, {:invalid_http_response, code}}
169
170 {:error, error} ->
171 {:error, error}
172 end
173 end
174
175 defp response(conn, client, url, status, headers, opts) do
176 Logger.debug("#{__MODULE__} #{status} #{url} #{inspect(headers)}")
177
178 result =
179 conn
180 |> put_resp_headers(build_resp_headers(headers, opts))
181 |> send_chunked(status)
182 |> chunk_reply(client, opts)
183
184 case result do
185 {:ok, conn} ->
186 halt(conn)
187
188 {:error, :closed, conn} ->
189 client().close(client)
190 halt(conn)
191
192 {:error, error, conn} ->
193 Logger.warn(
194 "#{__MODULE__} request to #{url} failed while reading/chunking: #{inspect(error)}"
195 )
196
197 client().close(client)
198 halt(conn)
199 end
200 end
201
202 defp chunk_reply(conn, client, opts) do
203 chunk_reply(conn, client, opts, 0, 0)
204 end
205
206 defp chunk_reply(conn, client, opts, sent_so_far, duration) do
207 with {:ok, duration} <-
208 check_read_duration(
209 duration,
210 Keyword.get(opts, :max_read_duration, @max_read_duration)
211 ),
212 {:ok, data, client} <- client().stream_body(client),
213 {:ok, duration} <- increase_read_duration(duration),
214 sent_so_far = sent_so_far + byte_size(data),
215 :ok <-
216 body_size_constraint(
217 sent_so_far,
218 Keyword.get(opts, :max_body_length, @max_body_length)
219 ),
220 {:ok, conn} <- chunk(conn, data) do
221 chunk_reply(conn, client, opts, sent_so_far, duration)
222 else
223 :done -> {:ok, conn}
224 {:error, error} -> {:error, error, conn}
225 end
226 end
227
228 defp head_response(conn, url, code, headers, opts) do
229 Logger.debug("#{__MODULE__} #{code} #{url} #{inspect(headers)}")
230
231 conn
232 |> put_resp_headers(build_resp_headers(headers, opts))
233 |> send_resp(code, "")
234 end
235
236 defp error_or_redirect(conn, url, code, body, opts) do
237 if Keyword.get(opts, :redirect_on_failure, false) do
238 conn
239 |> Phoenix.Controller.redirect(external: url)
240 |> halt()
241 else
242 conn
243 |> send_resp(code, body)
244 |> halt
245 end
246 end
247
248 defp downcase_headers(headers) do
249 Enum.map(headers, fn {k, v} ->
250 {String.downcase(k), v}
251 end)
252 end
253
254 defp get_content_type(headers) do
255 {_, content_type} =
256 List.keyfind(headers, "content-type", 0, {"content-type", "application/octet-stream"})
257
258 [content_type | _] = String.split(content_type, ";")
259 content_type
260 end
261
262 defp put_resp_headers(conn, headers) do
263 Enum.reduce(headers, conn, fn {k, v}, conn ->
264 put_resp_header(conn, k, v)
265 end)
266 end
267
268 defp build_req_headers(headers, opts) do
269 headers
270 |> downcase_headers()
271 |> Enum.filter(fn {k, _} -> k in @keep_req_headers end)
272 |> build_req_range_or_encoding_header(opts)
273 |> build_req_user_agent_header(opts)
274 |> Keyword.merge(Keyword.get(opts, :req_headers, []))
275 end
276
277 # Disable content-encoding if any @range_headers are requested (see #1823).
278 defp build_req_range_or_encoding_header(headers, _opts) do
279 range? = Enum.any?(headers, fn {header, _} -> Enum.member?(@range_headers, header) end)
280
281 if range? && List.keymember?(headers, "accept-encoding", 0) do
282 List.keydelete(headers, "accept-encoding", 0)
283 else
284 headers
285 end
286 end
287
288 defp build_req_user_agent_header(headers, opts) do
289 if Keyword.get(opts, :keep_user_agent, false) do
290 List.keystore(
291 headers,
292 "user-agent",
293 0,
294 {"user-agent", Pleroma.Application.user_agent()}
295 )
296 else
297 headers
298 end
299 end
300
301 defp build_resp_headers(headers, opts) do
302 headers
303 |> Enum.filter(fn {k, _} -> k in @keep_resp_headers end)
304 |> build_resp_cache_headers(opts)
305 |> build_resp_content_disposition_header(opts)
306 |> Keyword.merge(Keyword.get(opts, :resp_headers, []))
307 end
308
309 defp build_resp_cache_headers(headers, _opts) do
310 has_cache? = Enum.any?(headers, fn {k, _} -> k in @resp_cache_headers end)
311
312 cond do
313 has_cache? ->
314 # There's caching header present but no cache-control -- we need to set our own
315 # as Plug defaults to "max-age=0, private, must-revalidate"
316 List.keystore(
317 headers,
318 "cache-control",
319 0,
320 {"cache-control", @default_cache_control_header}
321 )
322
323 true ->
324 List.keystore(
325 headers,
326 "cache-control",
327 0,
328 {"cache-control", @default_cache_control_header}
329 )
330 end
331 end
332
333 defp build_resp_content_disposition_header(headers, opts) do
334 opt = Keyword.get(opts, :inline_content_types, @inline_content_types)
335
336 content_type = get_content_type(headers)
337
338 attachment? =
339 cond do
340 is_list(opt) && !Enum.member?(opt, content_type) -> true
341 opt == false -> true
342 true -> false
343 end
344
345 if attachment? do
346 name =
347 try do
348 {{"content-disposition", content_disposition_string}, _} =
349 List.keytake(headers, "content-disposition", 0)
350
351 [name | _] =
352 Regex.run(
353 ~r/filename="((?:[^"\\]|\\.)*)"/u,
354 content_disposition_string || "",
355 capture: :all_but_first
356 )
357
358 name
359 rescue
360 MatchError -> Keyword.get(opts, :attachment_name, "attachment")
361 end
362
363 disposition = "attachment; filename=\"#{name}\""
364
365 List.keystore(headers, "content-disposition", 0, {"content-disposition", disposition})
366 else
367 headers
368 end
369 end
370
371 defp header_length_constraint(headers, limit) when is_integer(limit) and limit > 0 do
372 with {_, size} <- List.keyfind(headers, "content-length", 0),
373 {size, _} <- Integer.parse(size),
374 true <- size <= limit do
375 :ok
376 else
377 false ->
378 {:error, :body_too_large}
379
380 _ ->
381 :ok
382 end
383 end
384
385 defp header_length_constraint(_, _), do: :ok
386
387 defp body_size_constraint(size, limit) when is_integer(limit) and limit > 0 and size >= limit do
388 {:error, :body_too_large}
389 end
390
391 defp body_size_constraint(_, _), do: :ok
392
393 defp check_read_duration(nil = _duration, max), do: check_read_duration(@max_read_duration, max)
394
395 defp check_read_duration(duration, max)
396 when is_integer(duration) and is_integer(max) and max > 0 do
397 if duration > max do
398 {:error, :read_duration_exceeded}
399 else
400 {:ok, {duration, :erlang.system_time(:millisecond)}}
401 end
402 end
403
404 defp check_read_duration(_, _), do: {:ok, :no_duration_limit, :no_duration_limit}
405
406 defp increase_read_duration({previous_duration, started})
407 when is_integer(previous_duration) and is_integer(started) do
408 duration = :erlang.system_time(:millisecond) - started
409 {:ok, previous_duration + duration}
410 end
411
412 defp increase_read_duration(_) do
413 {:ok, :no_duration_limit, :no_duration_limit}
414 end
415
416 defp client, do: Pleroma.ReverseProxy.Client
417
418 defp track_failed_url(url, error, opts) do
419 ttl =
420 unless error in [:body_too_large, 400, 204] do
421 Keyword.get(opts, :failed_request_ttl, @failed_request_ttl)
422 else
423 nil
424 end
425
426 Cachex.put(:failed_proxy_url_cache, url, true, ttl: ttl)
427 end
428 end