Merge branch 'fix/mediaproxy-cache-settings-types' into 'develop'
[akkoma] / lib / pleroma / reverse_proxy / reverse_proxy.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.ReverseProxy do
6 @range_headers ~w(range if-range)
7 @keep_req_headers ~w(accept user-agent accept-encoding cache-control if-modified-since) ++
8 ~w(if-unmodified-since if-none-match) ++ @range_headers
9 @resp_cache_headers ~w(etag date last-modified)
10 @keep_resp_headers @resp_cache_headers ++
11 ~w(content-length content-type content-disposition content-encoding) ++
12 ~w(content-range accept-ranges vary)
13 @default_cache_control_header "public, max-age=1209600"
14 @valid_resp_codes [200, 206, 304]
15 @max_read_duration :timer.seconds(30)
16 @max_body_length :infinity
17 @failed_request_ttl :timer.seconds(60)
18 @methods ~w(GET HEAD)
19
20 @moduledoc """
21 A reverse proxy.
22
23 Pleroma.ReverseProxy.call(conn, url, options)
24
25 It is not meant to be added into a plug pipeline, but to be called from another plug or controller.
26
27 Supports `#{inspect(@methods)}` HTTP methods, and only allows `#{inspect(@valid_resp_codes)}` status codes.
28
29 Responses are chunked to the client while downloading from the upstream.
30
31 Some request / responses headers are preserved:
32
33 * request: `#{inspect(@keep_req_headers)}`
34 * response: `#{inspect(@keep_resp_headers)}`
35
36 Options:
37
38 * `redirect_on_failure` (default `false`). Redirects the client to the real remote URL if there's any HTTP
39 errors. Any error during body processing will not be redirected as the response is chunked. This may expose
40 remote URL, clients IPs, ….
41
42 * `max_body_length` (default `#{inspect(@max_body_length)}`): limits the content length to be approximately the
43 specified length. It is validated with the `content-length` header and also verified when proxying.
44
45 * `max_read_duration` (default `#{inspect(@max_read_duration)}` ms): the total time the connection is allowed to
46 read from the remote upstream.
47
48 * `failed_request_ttl` (default `#{inspect(@failed_request_ttl)}` ms): the time the failed request is cached and cannot be retried.
49
50 * `inline_content_types`:
51 * `true` will not alter `content-disposition` (up to the upstream),
52 * `false` will add `content-disposition: attachment` to any request,
53 * a list of whitelisted content types
54
55 * `keep_user_agent` will forward the client's user-agent to the upstream. This may be useful if the upstream is
56 doing content transformation (encoding, …) depending on the request.
57
58 * `req_headers`, `resp_headers` additional headers.
59
60 * `http`: options for [hackney](https://github.com/benoitc/hackney) or [gun](https://github.com/ninenines/gun).
61
62 """
63 @default_options [pool: :media]
64
65 @inline_content_types [
66 "image/gif",
67 "image/jpeg",
68 "image/jpg",
69 "image/png",
70 "image/svg+xml",
71 "audio/mpeg",
72 "audio/mp3",
73 "video/webm",
74 "video/mp4",
75 "video/quicktime"
76 ]
77
78 require Logger
79 import Plug.Conn
80
81 @type option() ::
82 {:keep_user_agent, boolean}
83 | {:max_read_duration, :timer.time() | :infinity}
84 | {:max_body_length, non_neg_integer() | :infinity}
85 | {:failed_request_ttl, :timer.time() | :infinity}
86 | {:http, []}
87 | {:req_headers, [{String.t(), String.t()}]}
88 | {:resp_headers, [{String.t(), String.t()}]}
89 | {:inline_content_types, boolean() | [String.t()]}
90 | {:redirect_on_failure, boolean()}
91
92 @spec call(Plug.Conn.t(), url :: String.t(), [option()]) :: Plug.Conn.t()
93 def call(_conn, _url, _opts \\ [])
94
95 def call(conn = %{method: method}, url, opts) when method in @methods do
96 client_opts = Keyword.merge(@default_options, Keyword.get(opts, :http, []))
97
98 req_headers = build_req_headers(conn.req_headers, opts)
99
100 opts =
101 if filename = Pleroma.Web.MediaProxy.filename(url) do
102 Keyword.put_new(opts, :attachment_name, filename)
103 else
104 opts
105 end
106
107 with {:ok, nil} <- Cachex.get(:failed_proxy_url_cache, url),
108 {:ok, code, headers, client} <- request(method, url, req_headers, client_opts),
109 :ok <-
110 header_length_constraint(
111 headers,
112 Keyword.get(opts, :max_body_length, @max_body_length)
113 ) do
114 response(conn, client, url, code, headers, opts)
115 else
116 {:ok, true} ->
117 conn
118 |> error_or_redirect(url, 500, "Request failed", opts)
119 |> halt()
120
121 {:ok, code, headers} ->
122 head_response(conn, url, code, headers, opts)
123 |> halt()
124
125 {:error, {:invalid_http_response, code}} ->
126 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed with HTTP status #{code}")
127 track_failed_url(url, code, opts)
128
129 conn
130 |> error_or_redirect(
131 url,
132 code,
133 "Request failed: " <> Plug.Conn.Status.reason_phrase(code),
134 opts
135 )
136 |> halt()
137
138 {:error, error} ->
139 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed: #{inspect(error)}")
140 track_failed_url(url, error, opts)
141
142 conn
143 |> error_or_redirect(url, 500, "Request failed", opts)
144 |> halt()
145 end
146 end
147
148 def call(conn, _, _) do
149 conn
150 |> send_resp(400, Plug.Conn.Status.reason_phrase(400))
151 |> halt()
152 end
153
154 defp request(method, url, headers, opts) do
155 Logger.debug("#{__MODULE__} #{method} #{url} #{inspect(headers)}")
156 method = method |> String.downcase() |> String.to_existing_atom()
157
158 case client().request(method, url, headers, "", opts) do
159 {:ok, code, headers, client} when code in @valid_resp_codes ->
160 {:ok, code, downcase_headers(headers), client}
161
162 {:ok, code, headers} when code in @valid_resp_codes ->
163 {:ok, code, downcase_headers(headers)}
164
165 {:ok, code, _, _} ->
166 {:error, {:invalid_http_response, code}}
167
168 {:error, error} ->
169 {:error, error}
170 end
171 end
172
173 defp response(conn, client, url, status, headers, opts) do
174 Logger.debug("#{__MODULE__} #{status} #{url} #{inspect(headers)}")
175
176 result =
177 conn
178 |> put_resp_headers(build_resp_headers(headers, opts))
179 |> send_chunked(status)
180 |> chunk_reply(client, opts)
181
182 case result do
183 {:ok, conn} ->
184 halt(conn)
185
186 {:error, :closed, conn} ->
187 client().close(client)
188 halt(conn)
189
190 {:error, error, conn} ->
191 Logger.warn(
192 "#{__MODULE__} request to #{url} failed while reading/chunking: #{inspect(error)}"
193 )
194
195 client().close(client)
196 halt(conn)
197 end
198 end
199
200 defp chunk_reply(conn, client, opts) do
201 chunk_reply(conn, client, opts, 0, 0)
202 end
203
204 defp chunk_reply(conn, client, opts, sent_so_far, duration) do
205 with {:ok, duration} <-
206 check_read_duration(
207 duration,
208 Keyword.get(opts, :max_read_duration, @max_read_duration)
209 ),
210 {:ok, data, client} <- client().stream_body(client),
211 {:ok, duration} <- increase_read_duration(duration),
212 sent_so_far = sent_so_far + byte_size(data),
213 :ok <-
214 body_size_constraint(
215 sent_so_far,
216 Keyword.get(opts, :max_body_length, @max_body_length)
217 ),
218 {:ok, conn} <- chunk(conn, data) do
219 chunk_reply(conn, client, opts, sent_so_far, duration)
220 else
221 :done -> {:ok, conn}
222 {:error, error} -> {:error, error, conn}
223 end
224 end
225
226 defp head_response(conn, url, code, headers, opts) do
227 Logger.debug("#{__MODULE__} #{code} #{url} #{inspect(headers)}")
228
229 conn
230 |> put_resp_headers(build_resp_headers(headers, opts))
231 |> send_resp(code, "")
232 end
233
234 defp error_or_redirect(conn, url, code, body, opts) do
235 if Keyword.get(opts, :redirect_on_failure, false) do
236 conn
237 |> Phoenix.Controller.redirect(external: url)
238 |> halt()
239 else
240 conn
241 |> send_resp(code, body)
242 |> halt
243 end
244 end
245
246 defp downcase_headers(headers) do
247 Enum.map(headers, fn {k, v} ->
248 {String.downcase(k), v}
249 end)
250 end
251
252 defp get_content_type(headers) do
253 {_, content_type} =
254 List.keyfind(headers, "content-type", 0, {"content-type", "application/octet-stream"})
255
256 [content_type | _] = String.split(content_type, ";")
257 content_type
258 end
259
260 defp put_resp_headers(conn, headers) do
261 Enum.reduce(headers, conn, fn {k, v}, conn ->
262 put_resp_header(conn, k, v)
263 end)
264 end
265
266 defp build_req_headers(headers, opts) do
267 headers
268 |> downcase_headers()
269 |> Enum.filter(fn {k, _} -> k in @keep_req_headers end)
270 |> build_req_range_or_encoding_header(opts)
271 |> build_req_user_agent_header(opts)
272 |> Keyword.merge(Keyword.get(opts, :req_headers, []))
273 end
274
275 # Disable content-encoding if any @range_headers are requested (see #1823).
276 defp build_req_range_or_encoding_header(headers, _opts) do
277 range? = Enum.any?(headers, fn {header, _} -> Enum.member?(@range_headers, header) end)
278
279 if range? && List.keymember?(headers, "accept-encoding", 0) do
280 List.keydelete(headers, "accept-encoding", 0)
281 else
282 headers
283 end
284 end
285
286 defp build_req_user_agent_header(headers, opts) do
287 if Keyword.get(opts, :keep_user_agent, false) do
288 List.keystore(
289 headers,
290 "user-agent",
291 0,
292 {"user-agent", Pleroma.Application.user_agent()}
293 )
294 else
295 headers
296 end
297 end
298
299 defp build_resp_headers(headers, opts) do
300 headers
301 |> Enum.filter(fn {k, _} -> k in @keep_resp_headers end)
302 |> build_resp_cache_headers(opts)
303 |> build_resp_content_disposition_header(opts)
304 |> Keyword.merge(Keyword.get(opts, :resp_headers, []))
305 end
306
307 defp build_resp_cache_headers(headers, _opts) do
308 has_cache? = Enum.any?(headers, fn {k, _} -> k in @resp_cache_headers end)
309
310 cond do
311 has_cache? ->
312 # There's caching header present but no cache-control -- we need to set our own
313 # as Plug defaults to "max-age=0, private, must-revalidate"
314 List.keystore(
315 headers,
316 "cache-control",
317 0,
318 {"cache-control", @default_cache_control_header}
319 )
320
321 true ->
322 List.keystore(
323 headers,
324 "cache-control",
325 0,
326 {"cache-control", @default_cache_control_header}
327 )
328 end
329 end
330
331 defp build_resp_content_disposition_header(headers, opts) do
332 opt = Keyword.get(opts, :inline_content_types, @inline_content_types)
333
334 content_type = get_content_type(headers)
335
336 attachment? =
337 cond do
338 is_list(opt) && !Enum.member?(opt, content_type) -> true
339 opt == false -> true
340 true -> false
341 end
342
343 if attachment? do
344 name =
345 try do
346 {{"content-disposition", content_disposition_string}, _} =
347 List.keytake(headers, "content-disposition", 0)
348
349 [name | _] =
350 Regex.run(
351 ~r/filename="((?:[^"\\]|\\.)*)"/u,
352 content_disposition_string || "",
353 capture: :all_but_first
354 )
355
356 name
357 rescue
358 MatchError -> Keyword.get(opts, :attachment_name, "attachment")
359 end
360
361 disposition = "attachment; filename=\"#{name}\""
362
363 List.keystore(headers, "content-disposition", 0, {"content-disposition", disposition})
364 else
365 headers
366 end
367 end
368
369 defp header_length_constraint(headers, limit) when is_integer(limit) and limit > 0 do
370 with {_, size} <- List.keyfind(headers, "content-length", 0),
371 {size, _} <- Integer.parse(size),
372 true <- size <= limit do
373 :ok
374 else
375 false ->
376 {:error, :body_too_large}
377
378 _ ->
379 :ok
380 end
381 end
382
383 defp header_length_constraint(_, _), do: :ok
384
385 defp body_size_constraint(size, limit) when is_integer(limit) and limit > 0 and size >= limit do
386 {:error, :body_too_large}
387 end
388
389 defp body_size_constraint(_, _), do: :ok
390
391 defp check_read_duration(duration, max)
392 when is_integer(duration) and is_integer(max) and max > 0 do
393 if duration > max do
394 {:error, :read_duration_exceeded}
395 else
396 {:ok, {duration, :erlang.system_time(:millisecond)}}
397 end
398 end
399
400 defp check_read_duration(_, _), do: {:ok, :no_duration_limit, :no_duration_limit}
401
402 defp increase_read_duration({previous_duration, started})
403 when is_integer(previous_duration) and is_integer(started) do
404 duration = :erlang.system_time(:millisecond) - started
405 {:ok, previous_duration + duration}
406 end
407
408 defp increase_read_duration(_) do
409 {:ok, :no_duration_limit, :no_duration_limit}
410 end
411
412 defp client, do: Pleroma.ReverseProxy.Client
413
414 defp track_failed_url(url, error, opts) do
415 ttl =
416 unless error in [:body_too_large, 400, 204] do
417 Keyword.get(opts, :failed_request_ttl, @failed_request_ttl)
418 else
419 nil
420 end
421
422 Cachex.put(:failed_proxy_url_cache, url, true, ttl: ttl)
423 end
424 end