ec69a177988d30b9c25c451fc1ca5431aea9c51f
[akkoma] / lib / pleroma / reverse_proxy.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.ReverseProxy do
6 @range_headers ~w(range if-range)
7 @keep_req_headers ~w(accept accept-encoding cache-control if-modified-since) ++
8 ~w(if-unmodified-since if-none-match) ++ @range_headers
9 @resp_cache_headers ~w(etag date last-modified)
10 @keep_resp_headers @resp_cache_headers ++
11 ~w(content-length content-type content-disposition content-encoding) ++
12 ~w(content-range accept-ranges vary)
13 @default_cache_control_header "public, max-age=1209600"
14 @valid_resp_codes [200, 206, 304]
15 @max_read_duration :timer.seconds(30)
16 @max_body_length :infinity
17 @failed_request_ttl :timer.seconds(60)
18 @methods ~w(GET HEAD)
19
20 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
21
22 def max_read_duration_default, do: @max_read_duration
23 def default_cache_control_header, do: @default_cache_control_header
24
25 @moduledoc """
26 A reverse proxy.
27
28 Pleroma.ReverseProxy.call(conn, url, options)
29
30 It is not meant to be added into a plug pipeline, but to be called from another plug or controller.
31
32 Supports `#{inspect(@methods)}` HTTP methods, and only allows `#{inspect(@valid_resp_codes)}` status codes.
33
34 Responses are chunked to the client while downloading from the upstream.
35
36 Some request / responses headers are preserved:
37
38 * request: `#{inspect(@keep_req_headers)}`
39 * response: `#{inspect(@keep_resp_headers)}`
40
41 Options:
42
43 * `redirect_on_failure` (default `false`). Redirects the client to the real remote URL if there's any HTTP
44 errors. Any error during body processing will not be redirected as the response is chunked. This may expose
45 remote URL, clients IPs, ….
46
47 * `max_body_length` (default `#{inspect(@max_body_length)}`): limits the content length to be approximately the
48 specified length. It is validated with the `content-length` header and also verified when proxying.
49
50 * `max_read_duration` (default `#{inspect(@max_read_duration)}` ms): the total time the connection is allowed to
51 read from the remote upstream.
52
53 * `failed_request_ttl` (default `#{inspect(@failed_request_ttl)}` ms): the time the failed request is cached and cannot be retried.
54
55 * `inline_content_types`:
56 * `true` will not alter `content-disposition` (up to the upstream),
57 * `false` will add `content-disposition: attachment` to any request,
58 * a list of whitelisted content types
59
60 * `req_headers`, `resp_headers` additional headers.
61
62 * `http`: options for [hackney](https://github.com/benoitc/hackney) or [gun](https://github.com/ninenines/gun).
63
64 """
65 @default_options [pool: :media]
66
67 @inline_content_types [
68 "image/gif",
69 "image/jpeg",
70 "image/jpg",
71 "image/png",
72 "image/svg+xml",
73 "audio/mpeg",
74 "audio/mp3",
75 "video/webm",
76 "video/mp4",
77 "video/quicktime"
78 ]
79
80 require Logger
81 import Plug.Conn
82
83 @type option() ::
84 {:max_read_duration, :timer.time() | :infinity}
85 | {:max_body_length, non_neg_integer() | :infinity}
86 | {:failed_request_ttl, :timer.time() | :infinity}
87 | {:http, []}
88 | {:req_headers, [{String.t(), String.t()}]}
89 | {:resp_headers, [{String.t(), String.t()}]}
90 | {:inline_content_types, boolean() | [String.t()]}
91 | {:redirect_on_failure, boolean()}
92
93 @spec call(Plug.Conn.t(), url :: String.t(), [option()]) :: Plug.Conn.t()
94 def call(_conn, _url, _opts \\ [])
95
96 def call(conn = %{method: method}, url, opts) when method in @methods do
97 client_opts = Keyword.merge(@default_options, Keyword.get(opts, :http, []))
98
99 req_headers = build_req_headers(conn.req_headers, opts)
100
101 opts =
102 if filename = Pleroma.Web.MediaProxy.filename(url) do
103 Keyword.put_new(opts, :attachment_name, filename)
104 else
105 opts
106 end
107
108 with {:ok, nil} <- @cachex.get(:failed_proxy_url_cache, url),
109 {:ok, code, headers, client} <- request(method, url, req_headers, client_opts),
110 :ok <-
111 header_length_constraint(
112 headers,
113 Keyword.get(opts, :max_body_length, @max_body_length)
114 ) do
115 response(conn, client, url, code, headers, opts)
116 else
117 {:ok, true} ->
118 conn
119 |> error_or_redirect(url, 500, "Request failed", opts)
120 |> halt()
121
122 {:ok, code, headers} ->
123 head_response(conn, url, code, headers, opts)
124 |> halt()
125
126 {:error, {:invalid_http_response, code}} ->
127 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed with HTTP status #{code}")
128 track_failed_url(url, code, opts)
129
130 conn
131 |> error_or_redirect(
132 url,
133 code,
134 "Request failed: " <> Plug.Conn.Status.reason_phrase(code),
135 opts
136 )
137 |> halt()
138
139 {:error, error} ->
140 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed: #{inspect(error)}")
141 track_failed_url(url, error, opts)
142
143 conn
144 |> error_or_redirect(url, 500, "Request failed", opts)
145 |> halt()
146 end
147 end
148
149 def call(conn, _, _) do
150 conn
151 |> send_resp(400, Plug.Conn.Status.reason_phrase(400))
152 |> halt()
153 end
154
155 defp request(method, url, headers, opts) do
156 Logger.debug("#{__MODULE__} #{method} #{url} #{inspect(headers)}")
157 method = method |> String.downcase() |> String.to_existing_atom()
158
159 case client().request(method, url, headers, "", opts) do
160 {:ok, code, headers, client} when code in @valid_resp_codes ->
161 {:ok, code, downcase_headers(headers), client}
162
163 {:ok, code, headers} when code in @valid_resp_codes ->
164 {:ok, code, downcase_headers(headers)}
165
166 {:ok, code, _, _} ->
167 {:error, {:invalid_http_response, code}}
168
169 {:ok, code, _} ->
170 {:error, {:invalid_http_response, code}}
171
172 {:error, error} ->
173 {:error, error}
174 end
175 end
176
177 defp response(conn, client, url, status, headers, opts) do
178 Logger.debug("#{__MODULE__} #{status} #{url} #{inspect(headers)}")
179
180 result =
181 conn
182 |> put_resp_headers(build_resp_headers(headers, opts))
183 |> send_chunked(status)
184 |> chunk_reply(client, opts)
185
186 case result do
187 {:ok, conn} ->
188 halt(conn)
189
190 {:error, :closed, conn} ->
191 client().close(client)
192 halt(conn)
193
194 {:error, error, conn} ->
195 Logger.warn(
196 "#{__MODULE__} request to #{url} failed while reading/chunking: #{inspect(error)}"
197 )
198
199 client().close(client)
200 halt(conn)
201 end
202 end
203
204 defp chunk_reply(conn, client, opts) do
205 chunk_reply(conn, client, opts, 0, 0)
206 end
207
208 defp chunk_reply(conn, client, opts, sent_so_far, duration) do
209 with {:ok, duration} <-
210 check_read_duration(
211 duration,
212 Keyword.get(opts, :max_read_duration, @max_read_duration)
213 ),
214 {:ok, data, client} <- client().stream_body(client),
215 {:ok, duration} <- increase_read_duration(duration),
216 sent_so_far = sent_so_far + byte_size(data),
217 :ok <-
218 body_size_constraint(
219 sent_so_far,
220 Keyword.get(opts, :max_body_length, @max_body_length)
221 ),
222 {:ok, conn} <- chunk(conn, data) do
223 chunk_reply(conn, client, opts, sent_so_far, duration)
224 else
225 :done -> {:ok, conn}
226 {:error, error} -> {:error, error, conn}
227 end
228 end
229
230 defp head_response(conn, url, code, headers, opts) do
231 Logger.debug("#{__MODULE__} #{code} #{url} #{inspect(headers)}")
232
233 conn
234 |> put_resp_headers(build_resp_headers(headers, opts))
235 |> send_resp(code, "")
236 end
237
238 defp error_or_redirect(conn, url, code, body, opts) do
239 if Keyword.get(opts, :redirect_on_failure, false) do
240 conn
241 |> Phoenix.Controller.redirect(external: url)
242 |> halt()
243 else
244 conn
245 |> send_resp(code, body)
246 |> halt
247 end
248 end
249
250 defp downcase_headers(headers) do
251 Enum.map(headers, fn {k, v} ->
252 {String.downcase(k), v}
253 end)
254 end
255
256 defp get_content_type(headers) do
257 {_, content_type} =
258 List.keyfind(headers, "content-type", 0, {"content-type", "application/octet-stream"})
259
260 [content_type | _] = String.split(content_type, ";")
261 content_type
262 end
263
264 defp put_resp_headers(conn, headers) do
265 Enum.reduce(headers, conn, fn {k, v}, conn ->
266 put_resp_header(conn, k, v)
267 end)
268 end
269
270 defp build_req_headers(headers, opts) do
271 headers
272 |> downcase_headers()
273 |> Enum.filter(fn {k, _} -> k in @keep_req_headers end)
274 |> build_req_range_or_encoding_header(opts)
275 |> build_req_user_agent_header(opts)
276 |> Keyword.merge(Keyword.get(opts, :req_headers, []))
277 end
278
279 # Disable content-encoding if any @range_headers are requested (see #1823).
280 defp build_req_range_or_encoding_header(headers, _opts) do
281 range? = Enum.any?(headers, fn {header, _} -> Enum.member?(@range_headers, header) end)
282
283 if range? && List.keymember?(headers, "accept-encoding", 0) do
284 List.keydelete(headers, "accept-encoding", 0)
285 else
286 headers
287 end
288 end
289
290 defp build_req_user_agent_header(headers, _opts) do
291 List.keystore(
292 headers,
293 "user-agent",
294 0,
295 {"user-agent", Pleroma.Application.user_agent()}
296 )
297 end
298
299 defp build_resp_headers(headers, opts) do
300 headers
301 |> Enum.filter(fn {k, _} -> k in @keep_resp_headers end)
302 |> build_resp_cache_headers(opts)
303 |> build_resp_content_disposition_header(opts)
304 |> Keyword.merge(Keyword.get(opts, :resp_headers, []))
305 end
306
307 defp build_resp_cache_headers(headers, _opts) do
308 has_cache? = Enum.any?(headers, fn {k, _} -> k in @resp_cache_headers end)
309
310 cond do
311 has_cache? ->
312 # There's caching header present but no cache-control -- we need to set our own
313 # as Plug defaults to "max-age=0, private, must-revalidate"
314 List.keystore(
315 headers,
316 "cache-control",
317 0,
318 {"cache-control", @default_cache_control_header}
319 )
320
321 true ->
322 List.keystore(
323 headers,
324 "cache-control",
325 0,
326 {"cache-control", @default_cache_control_header}
327 )
328 end
329 end
330
331 defp build_resp_content_disposition_header(headers, opts) do
332 opt = Keyword.get(opts, :inline_content_types, @inline_content_types)
333
334 content_type = get_content_type(headers)
335
336 attachment? =
337 cond do
338 is_list(opt) && !Enum.member?(opt, content_type) -> true
339 opt == false -> true
340 true -> false
341 end
342
343 if attachment? do
344 name =
345 try do
346 {{"content-disposition", content_disposition_string}, _} =
347 List.keytake(headers, "content-disposition", 0)
348
349 [name | _] =
350 Regex.run(
351 ~r/filename="((?:[^"\\]|\\.)*)"/u,
352 content_disposition_string || "",
353 capture: :all_but_first
354 )
355
356 name
357 rescue
358 MatchError -> Keyword.get(opts, :attachment_name, "attachment")
359 end
360
361 disposition = "attachment; filename=\"#{name}\""
362
363 List.keystore(headers, "content-disposition", 0, {"content-disposition", disposition})
364 else
365 headers
366 end
367 end
368
369 defp header_length_constraint(headers, limit) when is_integer(limit) and limit > 0 do
370 with {_, size} <- List.keyfind(headers, "content-length", 0),
371 {size, _} <- Integer.parse(size),
372 true <- size <= limit do
373 :ok
374 else
375 false ->
376 {:error, :body_too_large}
377
378 _ ->
379 :ok
380 end
381 end
382
383 defp header_length_constraint(_, _), do: :ok
384
385 defp body_size_constraint(size, limit) when is_integer(limit) and limit > 0 and size >= limit do
386 {:error, :body_too_large}
387 end
388
389 defp body_size_constraint(_, _), do: :ok
390
391 defp check_read_duration(nil = _duration, max), do: check_read_duration(@max_read_duration, max)
392
393 defp check_read_duration(duration, max)
394 when is_integer(duration) and is_integer(max) and max > 0 do
395 if duration > max do
396 {:error, :read_duration_exceeded}
397 else
398 {:ok, {duration, :erlang.system_time(:millisecond)}}
399 end
400 end
401
402 defp check_read_duration(_, _), do: {:ok, :no_duration_limit, :no_duration_limit}
403
404 defp increase_read_duration({previous_duration, started})
405 when is_integer(previous_duration) and is_integer(started) do
406 duration = :erlang.system_time(:millisecond) - started
407 {:ok, previous_duration + duration}
408 end
409
410 defp increase_read_duration(_) do
411 {:ok, :no_duration_limit, :no_duration_limit}
412 end
413
414 defp client, do: Pleroma.ReverseProxy.Client.Wrapper
415
416 defp track_failed_url(url, error, opts) do
417 ttl =
418 unless error in [:body_too_large, 400, 204] do
419 Keyword.get(opts, :failed_request_ttl, @failed_request_ttl)
420 else
421 nil
422 end
423
424 @cachex.put(:failed_proxy_url_cache, url, true, ttl: ttl)
425 end
426 end