Merge branch 'develop' into gun
[akkoma] / lib / pleroma / reverse_proxy / reverse_proxy.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.ReverseProxy do
6 @keep_req_headers ~w(accept user-agent accept-encoding cache-control if-modified-since) ++
7 ~w(if-unmodified-since if-none-match if-range range)
8 @resp_cache_headers ~w(etag date last-modified cache-control)
9 @keep_resp_headers @resp_cache_headers ++
10 ~w(content-type content-disposition content-encoding content-range) ++
11 ~w(accept-ranges vary)
12 @default_cache_control_header "public, max-age=1209600"
13 @valid_resp_codes [200, 206, 304]
14 @max_read_duration :timer.seconds(30)
15 @max_body_length :infinity
16 @failed_request_ttl :timer.seconds(60)
17 @methods ~w(GET HEAD)
18
19 @moduledoc """
20 A reverse proxy.
21
22 Pleroma.ReverseProxy.call(conn, url, options)
23
24 It is not meant to be added into a plug pipeline, but to be called from another plug or controller.
25
26 Supports `#{inspect(@methods)}` HTTP methods, and only allows `#{inspect(@valid_resp_codes)}` status codes.
27
28 Responses are chunked to the client while downloading from the upstream.
29
30 Some request / responses headers are preserved:
31
32 * request: `#{inspect(@keep_req_headers)}`
33 * response: `#{inspect(@keep_resp_headers)}`
34
35 If no caching headers (`#{inspect(@resp_cache_headers)}`) are returned by upstream, `cache-control` will be
36 set to `#{inspect(@default_cache_control_header)}`.
37
38 Options:
39
40 * `redirect_on_failure` (default `false`). Redirects the client to the real remote URL if there's any HTTP
41 errors. Any error during body processing will not be redirected as the response is chunked. This may expose
42 remote URL, clients IPs, ….
43
44 * `max_body_length` (default `#{inspect(@max_body_length)}`): limits the content length to be approximately the
45 specified length. It is validated with the `content-length` header and also verified when proxying.
46
47 * `max_read_duration` (default `#{inspect(@max_read_duration)}` ms): the total time the connection is allowed to
48 read from the remote upstream.
49
50 * `failed_request_ttl` (default `#{inspect(@failed_request_ttl)}` ms): the time the failed request is cached and cannot be retried.
51
52 * `inline_content_types`:
53 * `true` will not alter `content-disposition` (up to the upstream),
54 * `false` will add `content-disposition: attachment` to any request,
55 * a list of whitelisted content types
56
57 * `keep_user_agent` will forward the client's user-agent to the upstream. This may be useful if the upstream is
58 doing content transformation (encoding, …) depending on the request.
59
60 * `req_headers`, `resp_headers` additional headers.
61
62 * `http`: options for [gun](https://github.com/ninenines/gun).
63
64 """
65 @default_options [pool: :media]
66
67 @inline_content_types [
68 "image/gif",
69 "image/jpeg",
70 "image/jpg",
71 "image/png",
72 "image/svg+xml",
73 "audio/mpeg",
74 "audio/mp3",
75 "video/webm",
76 "video/mp4",
77 "video/quicktime"
78 ]
79
80 require Logger
81 import Plug.Conn
82
83 @type option() ::
84 {:keep_user_agent, boolean}
85 | {:max_read_duration, :timer.time() | :infinity}
86 | {:max_body_length, non_neg_integer() | :infinity}
87 | {:failed_request_ttl, :timer.time() | :infinity}
88 | {:http, []}
89 | {:req_headers, [{String.t(), String.t()}]}
90 | {:resp_headers, [{String.t(), String.t()}]}
91 | {:inline_content_types, boolean() | [String.t()]}
92 | {:redirect_on_failure, boolean()}
93
94 @spec call(Plug.Conn.t(), url :: String.t(), [option()]) :: Plug.Conn.t()
95 def call(_conn, _url, _opts \\ [])
96
97 def call(conn = %{method: method}, url, opts) when method in @methods do
98 client_opts = Keyword.merge(@default_options, Keyword.get(opts, :http, []))
99
100 req_headers = build_req_headers(conn.req_headers, opts)
101
102 opts =
103 if filename = Pleroma.Web.MediaProxy.filename(url) do
104 Keyword.put_new(opts, :attachment_name, filename)
105 else
106 opts
107 end
108
109 with {:ok, nil} <- Cachex.get(:failed_proxy_url_cache, url),
110 {:ok, code, headers, client} <- request(method, url, req_headers, client_opts),
111 :ok <-
112 header_length_constraint(
113 headers,
114 Keyword.get(opts, :max_body_length, @max_body_length)
115 ) do
116 response(conn, client, url, code, headers, opts)
117 else
118 {:ok, true} ->
119 conn
120 |> error_or_redirect(url, 500, "Request failed", opts)
121 |> halt()
122
123 {:ok, code, headers} ->
124 head_response(conn, url, code, headers, opts)
125 |> halt()
126
127 {:error, {:invalid_http_response, code}} ->
128 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed with HTTP status #{code}")
129 track_failed_url(url, code, opts)
130
131 conn
132 |> error_or_redirect(
133 url,
134 code,
135 "Request failed: " <> Plug.Conn.Status.reason_phrase(code),
136 opts
137 )
138 |> halt()
139
140 {:error, error} ->
141 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed: #{inspect(error)}")
142 track_failed_url(url, error, opts)
143
144 conn
145 |> error_or_redirect(url, 500, "Request failed", opts)
146 |> halt()
147 end
148 end
149
150 def call(conn, _, _) do
151 conn
152 |> send_resp(400, Plug.Conn.Status.reason_phrase(400))
153 |> halt()
154 end
155
156 defp request(method, url, headers, opts) do
157 Logger.debug("#{__MODULE__} #{method} #{url} #{inspect(headers)}")
158 method = method |> String.downcase() |> String.to_existing_atom()
159
160 case client().request(method, url, headers, "", opts) do
161 {:ok, code, headers, client} when code in @valid_resp_codes ->
162 {:ok, code, downcase_headers(headers), client}
163
164 {:ok, code, headers} when code in @valid_resp_codes ->
165 {:ok, code, downcase_headers(headers)}
166
167 {:ok, code, _, _} ->
168 {:error, {:invalid_http_response, code}}
169
170 {:error, error} ->
171 {:error, error}
172 end
173 end
174
175 defp response(conn, client, url, status, headers, opts) do
176 result =
177 conn
178 |> put_resp_headers(build_resp_headers(headers, opts))
179 |> send_chunked(status)
180 |> chunk_reply(client, opts)
181
182 case result do
183 {:ok, conn} ->
184 halt(conn)
185
186 {:error, :closed, conn} ->
187 client().close(client)
188 halt(conn)
189
190 {:error, error, conn} ->
191 Logger.warn(
192 "#{__MODULE__} request to #{url} failed while reading/chunking: #{inspect(error)}"
193 )
194
195 client().close(client)
196 halt(conn)
197 end
198 end
199
200 defp chunk_reply(conn, client, opts) do
201 chunk_reply(conn, client, opts, 0, 0)
202 end
203
204 defp chunk_reply(conn, client, opts, sent_so_far, duration) do
205 with {:ok, duration} <-
206 check_read_duration(
207 duration,
208 Keyword.get(opts, :max_read_duration, @max_read_duration)
209 ),
210 {:ok, data, client} <- client().stream_body(client),
211 {:ok, duration} <- increase_read_duration(duration),
212 sent_so_far = sent_so_far + byte_size(data),
213 :ok <-
214 body_size_constraint(
215 sent_so_far,
216 Keyword.get(opts, :max_body_length, @max_body_length)
217 ),
218 {:ok, conn} <- chunk(conn, data) do
219 chunk_reply(conn, client, opts, sent_so_far, duration)
220 else
221 :done -> {:ok, conn}
222 {:error, error} -> {:error, error, conn}
223 end
224 end
225
226 defp head_response(conn, _url, code, headers, opts) do
227 conn
228 |> put_resp_headers(build_resp_headers(headers, opts))
229 |> send_resp(code, "")
230 end
231
232 defp error_or_redirect(conn, url, code, body, opts) do
233 if Keyword.get(opts, :redirect_on_failure, false) do
234 conn
235 |> Phoenix.Controller.redirect(external: url)
236 |> halt()
237 else
238 conn
239 |> send_resp(code, body)
240 |> halt
241 end
242 end
243
244 defp downcase_headers(headers) do
245 Enum.map(headers, fn {k, v} ->
246 {String.downcase(k), v}
247 end)
248 end
249
250 defp get_content_type(headers) do
251 {_, content_type} =
252 List.keyfind(headers, "content-type", 0, {"content-type", "application/octet-stream"})
253
254 [content_type | _] = String.split(content_type, ";")
255 content_type
256 end
257
258 defp put_resp_headers(conn, headers) do
259 Enum.reduce(headers, conn, fn {k, v}, conn ->
260 put_resp_header(conn, k, v)
261 end)
262 end
263
264 defp build_req_headers(headers, opts) do
265 headers
266 |> downcase_headers()
267 |> Enum.filter(fn {k, _} -> k in @keep_req_headers end)
268 |> (fn headers ->
269 headers = headers ++ Keyword.get(opts, :req_headers, [])
270
271 if Keyword.get(opts, :keep_user_agent, false) do
272 List.keystore(
273 headers,
274 "user-agent",
275 0,
276 {"user-agent", Pleroma.Application.user_agent()}
277 )
278 else
279 headers
280 end
281 end).()
282 end
283
284 defp build_resp_headers(headers, opts) do
285 headers
286 |> Enum.filter(fn {k, _} -> k in @keep_resp_headers end)
287 |> build_resp_cache_headers(opts)
288 |> build_resp_content_disposition_header(opts)
289 |> (fn headers -> headers ++ Keyword.get(opts, :resp_headers, []) end).()
290 end
291
292 defp build_resp_cache_headers(headers, _opts) do
293 has_cache? = Enum.any?(headers, fn {k, _} -> k in @resp_cache_headers end)
294 has_cache_control? = List.keymember?(headers, "cache-control", 0)
295
296 cond do
297 has_cache? && has_cache_control? ->
298 headers
299
300 has_cache? ->
301 # There's caching header present but no cache-control -- we need to explicitely override it
302 # to public as Plug defaults to "max-age=0, private, must-revalidate"
303 List.keystore(headers, "cache-control", 0, {"cache-control", "public"})
304
305 true ->
306 List.keystore(
307 headers,
308 "cache-control",
309 0,
310 {"cache-control", @default_cache_control_header}
311 )
312 end
313 end
314
315 defp build_resp_content_disposition_header(headers, opts) do
316 opt = Keyword.get(opts, :inline_content_types, @inline_content_types)
317
318 content_type = get_content_type(headers)
319
320 attachment? =
321 cond do
322 is_list(opt) && !Enum.member?(opt, content_type) -> true
323 opt == false -> true
324 true -> false
325 end
326
327 if attachment? do
328 name =
329 try do
330 {{"content-disposition", content_disposition_string}, _} =
331 List.keytake(headers, "content-disposition", 0)
332
333 [name | _] =
334 Regex.run(
335 ~r/filename="((?:[^"\\]|\\.)*)"/u,
336 content_disposition_string || "",
337 capture: :all_but_first
338 )
339
340 name
341 rescue
342 MatchError -> Keyword.get(opts, :attachment_name, "attachment")
343 end
344
345 disposition = "attachment; filename=\"#{name}\""
346
347 List.keystore(headers, "content-disposition", 0, {"content-disposition", disposition})
348 else
349 headers
350 end
351 end
352
353 defp header_length_constraint(headers, limit) when is_integer(limit) and limit > 0 do
354 with {_, size} <- List.keyfind(headers, "content-length", 0),
355 {size, _} <- Integer.parse(size),
356 true <- size <= limit do
357 :ok
358 else
359 false ->
360 {:error, :body_too_large}
361
362 _ ->
363 :ok
364 end
365 end
366
367 defp header_length_constraint(_, _), do: :ok
368
369 defp body_size_constraint(size, limit) when is_integer(limit) and limit > 0 and size >= limit do
370 {:error, :body_too_large}
371 end
372
373 defp body_size_constraint(_, _), do: :ok
374
375 defp check_read_duration(duration, max)
376 when is_integer(duration) and is_integer(max) and max > 0 do
377 if duration > max do
378 {:error, :read_duration_exceeded}
379 else
380 {:ok, {duration, :erlang.system_time(:millisecond)}}
381 end
382 end
383
384 defp check_read_duration(_, _), do: {:ok, :no_duration_limit, :no_duration_limit}
385
386 defp increase_read_duration({previous_duration, started})
387 when is_integer(previous_duration) and is_integer(started) do
388 duration = :erlang.system_time(:millisecond) - started
389 {:ok, previous_duration + duration}
390 end
391
392 defp increase_read_duration(_) do
393 {:ok, :no_duration_limit, :no_duration_limit}
394 end
395
396 defp client, do: Pleroma.ReverseProxy.Client
397
398 defp track_failed_url(url, error, opts) do
399 ttl =
400 unless error in [:body_too_large, 400, 204] do
401 Keyword.get(opts, :failed_request_ttl, @failed_request_ttl)
402 else
403 nil
404 end
405
406 Cachex.put(:failed_proxy_url_cache, url, true, ttl: ttl)
407 end
408 end