Merge remote-tracking branch 'remotes/origin/develop' into 2168-media-preview-proxy
[akkoma] / lib / pleroma / reverse_proxy / reverse_proxy.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.ReverseProxy do
6 @keep_req_headers ~w(accept user-agent accept-encoding cache-control if-modified-since) ++
7 ~w(if-unmodified-since if-none-match if-range range)
8 @resp_cache_headers ~w(etag date last-modified)
9 @keep_resp_headers @resp_cache_headers ++
10 ~w(content-type content-disposition content-encoding content-range) ++
11 ~w(accept-ranges vary)
12 @default_cache_control_header "public, max-age=1209600"
13 @valid_resp_codes [200, 206, 304]
14 @max_read_duration :timer.seconds(30)
15 @max_body_length :infinity
16 @failed_request_ttl :timer.seconds(60)
17 @methods ~w(GET HEAD)
18
19 def max_read_duration_default, do: @max_read_duration
20
21 @moduledoc """
22 A reverse proxy.
23
24 Pleroma.ReverseProxy.call(conn, url, options)
25
26 It is not meant to be added into a plug pipeline, but to be called from another plug or controller.
27
28 Supports `#{inspect(@methods)}` HTTP methods, and only allows `#{inspect(@valid_resp_codes)}` status codes.
29
30 Responses are chunked to the client while downloading from the upstream.
31
32 Some request / responses headers are preserved:
33
34 * request: `#{inspect(@keep_req_headers)}`
35 * response: `#{inspect(@keep_resp_headers)}`
36
37 Options:
38
39 * `redirect_on_failure` (default `false`). Redirects the client to the real remote URL if there's any HTTP
40 errors. Any error during body processing will not be redirected as the response is chunked. This may expose
41 remote URL, clients IPs, ….
42
43 * `max_body_length` (default `#{inspect(@max_body_length)}`): limits the content length to be approximately the
44 specified length. It is validated with the `content-length` header and also verified when proxying.
45
46 * `max_read_duration` (default `#{inspect(@max_read_duration)}` ms): the total time the connection is allowed to
47 read from the remote upstream.
48
49 * `failed_request_ttl` (default `#{inspect(@failed_request_ttl)}` ms): the time the failed request is cached and cannot be retried.
50
51 * `inline_content_types`:
52 * `true` will not alter `content-disposition` (up to the upstream),
53 * `false` will add `content-disposition: attachment` to any request,
54 * a list of whitelisted content types
55
56 * `keep_user_agent` will forward the client's user-agent to the upstream. This may be useful if the upstream is
57 doing content transformation (encoding, …) depending on the request.
58
59 * `req_headers`, `resp_headers` additional headers.
60
61 * `http`: options for [hackney](https://github.com/benoitc/hackney) or [gun](https://github.com/ninenines/gun).
62
63 """
64 @default_options [pool: :media]
65
66 @inline_content_types [
67 "image/gif",
68 "image/jpeg",
69 "image/jpg",
70 "image/png",
71 "image/svg+xml",
72 "audio/mpeg",
73 "audio/mp3",
74 "video/webm",
75 "video/mp4",
76 "video/quicktime"
77 ]
78
79 require Logger
80 import Plug.Conn
81
82 @type option() ::
83 {:keep_user_agent, boolean}
84 | {:max_read_duration, :timer.time() | :infinity}
85 | {:max_body_length, non_neg_integer() | :infinity}
86 | {:failed_request_ttl, :timer.time() | :infinity}
87 | {:http, []}
88 | {:req_headers, [{String.t(), String.t()}]}
89 | {:resp_headers, [{String.t(), String.t()}]}
90 | {:inline_content_types, boolean() | [String.t()]}
91 | {:redirect_on_failure, boolean()}
92
93 @spec call(Plug.Conn.t(), url :: String.t(), [option()]) :: Plug.Conn.t()
94 def call(_conn, _url, _opts \\ [])
95
96 def call(conn = %{method: method}, url, opts) when method in @methods do
97 client_opts = Keyword.merge(@default_options, Keyword.get(opts, :http, []))
98
99 req_headers = build_req_headers(conn.req_headers, opts)
100
101 opts =
102 if filename = Pleroma.Web.MediaProxy.filename(url) do
103 Keyword.put_new(opts, :attachment_name, filename)
104 else
105 opts
106 end
107
108 with {:ok, nil} <- Cachex.get(:failed_proxy_url_cache, url),
109 {:ok, code, headers, client} <- request(method, url, req_headers, client_opts),
110 :ok <-
111 header_length_constraint(
112 headers,
113 Keyword.get(opts, :max_body_length, @max_body_length)
114 ) do
115 response(conn, client, url, code, headers, opts)
116 else
117 {:ok, true} ->
118 conn
119 |> error_or_redirect(url, 500, "Request failed", opts)
120 |> halt()
121
122 {:ok, code, headers} ->
123 head_response(conn, url, code, headers, opts)
124 |> halt()
125
126 {:error, {:invalid_http_response, code}} ->
127 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed with HTTP status #{code}")
128 track_failed_url(url, code, opts)
129
130 conn
131 |> error_or_redirect(
132 url,
133 code,
134 "Request failed: " <> Plug.Conn.Status.reason_phrase(code),
135 opts
136 )
137 |> halt()
138
139 {:error, error} ->
140 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed: #{inspect(error)}")
141 track_failed_url(url, error, opts)
142
143 conn
144 |> error_or_redirect(url, 500, "Request failed", opts)
145 |> halt()
146 end
147 end
148
149 def call(conn, _, _) do
150 conn
151 |> send_resp(400, Plug.Conn.Status.reason_phrase(400))
152 |> halt()
153 end
154
155 defp request(method, url, headers, opts) do
156 Logger.debug("#{__MODULE__} #{method} #{url} #{inspect(headers)}")
157 method = method |> String.downcase() |> String.to_existing_atom()
158
159 case client().request(method, url, headers, "", opts) do
160 {:ok, code, headers, client} when code in @valid_resp_codes ->
161 {:ok, code, downcase_headers(headers), client}
162
163 {:ok, code, headers} when code in @valid_resp_codes ->
164 {:ok, code, downcase_headers(headers)}
165
166 {:ok, code, _, _} ->
167 {:error, {:invalid_http_response, code}}
168
169 {:error, error} ->
170 {:error, error}
171 end
172 end
173
174 defp response(conn, client, url, status, headers, opts) do
175 result =
176 conn
177 |> put_resp_headers(build_resp_headers(headers, opts))
178 |> send_chunked(status)
179 |> chunk_reply(client, opts)
180
181 case result do
182 {:ok, conn} ->
183 halt(conn)
184
185 {:error, :closed, conn} ->
186 client().close(client)
187 halt(conn)
188
189 {:error, error, conn} ->
190 Logger.warn(
191 "#{__MODULE__} request to #{url} failed while reading/chunking: #{inspect(error)}"
192 )
193
194 client().close(client)
195 halt(conn)
196 end
197 end
198
199 defp chunk_reply(conn, client, opts) do
200 chunk_reply(conn, client, opts, 0, 0)
201 end
202
203 defp chunk_reply(conn, client, opts, sent_so_far, duration) do
204 with {:ok, duration} <-
205 check_read_duration(
206 duration,
207 Keyword.get(opts, :max_read_duration, @max_read_duration)
208 ),
209 {:ok, data, client} <- client().stream_body(client),
210 {:ok, duration} <- increase_read_duration(duration),
211 sent_so_far = sent_so_far + byte_size(data),
212 :ok <-
213 body_size_constraint(
214 sent_so_far,
215 Keyword.get(opts, :max_body_length, @max_body_length)
216 ),
217 {:ok, conn} <- chunk(conn, data) do
218 chunk_reply(conn, client, opts, sent_so_far, duration)
219 else
220 :done -> {:ok, conn}
221 {:error, error} -> {:error, error, conn}
222 end
223 end
224
225 defp head_response(conn, _url, code, headers, opts) do
226 conn
227 |> put_resp_headers(build_resp_headers(headers, opts))
228 |> send_resp(code, "")
229 end
230
231 defp error_or_redirect(conn, url, code, body, opts) do
232 if Keyword.get(opts, :redirect_on_failure, false) do
233 conn
234 |> Phoenix.Controller.redirect(external: url)
235 |> halt()
236 else
237 conn
238 |> send_resp(code, body)
239 |> halt
240 end
241 end
242
243 defp downcase_headers(headers) do
244 Enum.map(headers, fn {k, v} ->
245 {String.downcase(k), v}
246 end)
247 end
248
249 defp get_content_type(headers) do
250 {_, content_type} =
251 List.keyfind(headers, "content-type", 0, {"content-type", "application/octet-stream"})
252
253 [content_type | _] = String.split(content_type, ";")
254 content_type
255 end
256
257 defp put_resp_headers(conn, headers) do
258 Enum.reduce(headers, conn, fn {k, v}, conn ->
259 put_resp_header(conn, k, v)
260 end)
261 end
262
263 defp build_req_headers(headers, opts) do
264 headers
265 |> downcase_headers()
266 |> Enum.filter(fn {k, _} -> k in @keep_req_headers end)
267 |> (fn headers ->
268 headers = headers ++ Keyword.get(opts, :req_headers, [])
269
270 if Keyword.get(opts, :keep_user_agent, false) do
271 List.keystore(
272 headers,
273 "user-agent",
274 0,
275 {"user-agent", Pleroma.Application.user_agent()}
276 )
277 else
278 headers
279 end
280 end).()
281 end
282
283 defp build_resp_headers(headers, opts) do
284 headers
285 |> Enum.filter(fn {k, _} -> k in @keep_resp_headers end)
286 |> build_resp_cache_headers(opts)
287 |> build_resp_content_disposition_header(opts)
288 |> (fn headers -> headers ++ Keyword.get(opts, :resp_headers, []) end).()
289 end
290
291 defp build_resp_cache_headers(headers, _opts) do
292 has_cache? = Enum.any?(headers, fn {k, _} -> k in @resp_cache_headers end)
293
294 cond do
295 has_cache? ->
296 # There's caching header present but no cache-control -- we need to set our own
297 # as Plug defaults to "max-age=0, private, must-revalidate"
298 List.keystore(
299 headers,
300 "cache-control",
301 0,
302 {"cache-control", @default_cache_control_header}
303 )
304
305 true ->
306 List.keystore(
307 headers,
308 "cache-control",
309 0,
310 {"cache-control", @default_cache_control_header}
311 )
312 end
313 end
314
315 defp build_resp_content_disposition_header(headers, opts) do
316 opt = Keyword.get(opts, :inline_content_types, @inline_content_types)
317
318 content_type = get_content_type(headers)
319
320 attachment? =
321 cond do
322 is_list(opt) && !Enum.member?(opt, content_type) -> true
323 opt == false -> true
324 true -> false
325 end
326
327 if attachment? do
328 name =
329 try do
330 {{"content-disposition", content_disposition_string}, _} =
331 List.keytake(headers, "content-disposition", 0)
332
333 [name | _] =
334 Regex.run(
335 ~r/filename="((?:[^"\\]|\\.)*)"/u,
336 content_disposition_string || "",
337 capture: :all_but_first
338 )
339
340 name
341 rescue
342 MatchError -> Keyword.get(opts, :attachment_name, "attachment")
343 end
344
345 disposition = "attachment; filename=\"#{name}\""
346
347 List.keystore(headers, "content-disposition", 0, {"content-disposition", disposition})
348 else
349 headers
350 end
351 end
352
353 defp header_length_constraint(headers, limit) when is_integer(limit) and limit > 0 do
354 with {_, size} <- List.keyfind(headers, "content-length", 0),
355 {size, _} <- Integer.parse(size),
356 true <- size <= limit do
357 :ok
358 else
359 false ->
360 {:error, :body_too_large}
361
362 _ ->
363 :ok
364 end
365 end
366
367 defp header_length_constraint(_, _), do: :ok
368
369 defp body_size_constraint(size, limit) when is_integer(limit) and limit > 0 and size >= limit do
370 {:error, :body_too_large}
371 end
372
373 defp body_size_constraint(_, _), do: :ok
374
375 defp check_read_duration(nil = _duration, max), do: check_read_duration(@max_read_duration, max)
376
377 defp check_read_duration(duration, max)
378 when is_integer(duration) and is_integer(max) and max > 0 do
379 if duration > max do
380 {:error, :read_duration_exceeded}
381 else
382 {:ok, {duration, :erlang.system_time(:millisecond)}}
383 end
384 end
385
386 defp check_read_duration(_, _), do: {:ok, :no_duration_limit, :no_duration_limit}
387
388 defp increase_read_duration({previous_duration, started})
389 when is_integer(previous_duration) and is_integer(started) do
390 duration = :erlang.system_time(:millisecond) - started
391 {:ok, previous_duration + duration}
392 end
393
394 defp increase_read_duration(_) do
395 {:ok, :no_duration_limit, :no_duration_limit}
396 end
397
398 defp client, do: Pleroma.ReverseProxy.Client
399
400 defp track_failed_url(url, error, opts) do
401 ttl =
402 unless error in [:body_too_large, 400, 204] do
403 Keyword.get(opts, :failed_request_ttl, @failed_request_ttl)
404 else
405 nil
406 end
407
408 Cachex.put(:failed_proxy_url_cache, url, true, ttl: ttl)
409 end
410 end