[#2332] Misc. improvements per code change requests.
[akkoma] / lib / pleroma / reverse_proxy / reverse_proxy.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.ReverseProxy do
6 alias Pleroma.HTTP
7
8 @keep_req_headers ~w(accept user-agent accept-encoding cache-control if-modified-since) ++
9 ~w(if-unmodified-since if-none-match if-range range)
10 @resp_cache_headers ~w(etag date last-modified)
11 @keep_resp_headers @resp_cache_headers ++
12 ~w(content-type content-disposition content-encoding content-range) ++
13 ~w(accept-ranges vary)
14 @default_cache_control_header "public, max-age=1209600"
15 @valid_resp_codes [200, 206, 304]
16 @max_read_duration :timer.seconds(30)
17 @max_body_length :infinity
18 @failed_request_ttl :timer.seconds(60)
19 @methods ~w(GET HEAD)
20
21 @moduledoc """
22 A reverse proxy.
23
24 Pleroma.ReverseProxy.call(conn, url, options)
25
26 It is not meant to be added into a plug pipeline, but to be called from another plug or controller.
27
28 Supports `#{inspect(@methods)}` HTTP methods, and only allows `#{inspect(@valid_resp_codes)}` status codes.
29
30 Responses are chunked to the client while downloading from the upstream.
31
32 Some request / responses headers are preserved:
33
34 * request: `#{inspect(@keep_req_headers)}`
35 * response: `#{inspect(@keep_resp_headers)}`
36
37 Options:
38
39 * `redirect_on_failure` (default `false`). Redirects the client to the real remote URL if there's any HTTP
40 errors. Any error during body processing will not be redirected as the response is chunked. This may expose
41 remote URL, clients IPs, ….
42
43 * `max_body_length` (default `#{inspect(@max_body_length)}`): limits the content length to be approximately the
44 specified length. It is validated with the `content-length` header and also verified when proxying.
45
46 * `max_read_duration` (default `#{inspect(@max_read_duration)}` ms): the total time the connection is allowed to
47 read from the remote upstream.
48
49 * `failed_request_ttl` (default `#{inspect(@failed_request_ttl)}` ms): the time the failed request is cached and cannot be retried.
50
51 * `inline_content_types`:
52 * `true` will not alter `content-disposition` (up to the upstream),
53 * `false` will add `content-disposition: attachment` to any request,
54 * a list of whitelisted content types
55
56 * `keep_user_agent` will forward the client's user-agent to the upstream. This may be useful if the upstream is
57 doing content transformation (encoding, …) depending on the request.
58
59 * `req_headers`, `resp_headers` additional headers.
60
61 * `http`: options for [hackney](https://github.com/benoitc/hackney).
62
63 """
64 @default_hackney_options [pool: :media]
65
66 @inline_content_types [
67 "image/gif",
68 "image/jpeg",
69 "image/jpg",
70 "image/png",
71 "image/svg+xml",
72 "audio/mpeg",
73 "audio/mp3",
74 "video/webm",
75 "video/mp4",
76 "video/quicktime"
77 ]
78
79 require Logger
80 import Plug.Conn
81
82 @type option() ::
83 {:keep_user_agent, boolean}
84 | {:max_read_duration, :timer.time() | :infinity}
85 | {:max_body_length, non_neg_integer() | :infinity}
86 | {:failed_request_ttl, :timer.time() | :infinity}
87 | {:http, []}
88 | {:req_headers, [{String.t(), String.t()}]}
89 | {:resp_headers, [{String.t(), String.t()}]}
90 | {:inline_content_types, boolean() | [String.t()]}
91 | {:redirect_on_failure, boolean()}
92
93 @spec call(Plug.Conn.t(), url :: String.t(), [option()]) :: Plug.Conn.t()
94 def call(_conn, _url, _opts \\ [])
95
96 def call(conn = %{method: method}, url, opts) when method in @methods do
97 hackney_opts =
98 Pleroma.HTTP.Connection.hackney_options([])
99 |> Keyword.merge(@default_hackney_options)
100 |> Keyword.merge(Keyword.get(opts, :http, []))
101 |> HTTP.process_request_options()
102
103 req_headers = build_req_headers(conn.req_headers, opts)
104
105 opts =
106 if filename = Pleroma.Web.MediaProxy.filename(url) do
107 Keyword.put_new(opts, :attachment_name, filename)
108 else
109 opts
110 end
111
112 with {:ok, nil} <- Cachex.get(:failed_proxy_url_cache, url),
113 {:ok, code, headers, client} <- request(method, url, req_headers, hackney_opts),
114 :ok <-
115 header_length_constraint(
116 headers,
117 Keyword.get(opts, :max_body_length, @max_body_length)
118 ) do
119 response(conn, client, url, code, headers, opts)
120 else
121 {:ok, true} ->
122 conn
123 |> error_or_redirect(url, 500, "Request failed", opts)
124 |> halt()
125
126 {:ok, code, headers} ->
127 head_response(conn, url, code, headers, opts)
128 |> halt()
129
130 {:error, {:invalid_http_response, code}} ->
131 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed with HTTP status #{code}")
132 track_failed_url(url, code, opts)
133
134 conn
135 |> error_or_redirect(
136 url,
137 code,
138 "Request failed: " <> Plug.Conn.Status.reason_phrase(code),
139 opts
140 )
141 |> halt()
142
143 {:error, error} ->
144 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed: #{inspect(error)}")
145 track_failed_url(url, error, opts)
146
147 conn
148 |> error_or_redirect(url, 500, "Request failed", opts)
149 |> halt()
150 end
151 end
152
153 def call(conn, _, _) do
154 conn
155 |> send_resp(400, Plug.Conn.Status.reason_phrase(400))
156 |> halt()
157 end
158
159 defp request(method, url, headers, hackney_opts) do
160 Logger.debug("#{__MODULE__} #{method} #{url} #{inspect(headers)}")
161 method = method |> String.downcase() |> String.to_existing_atom()
162
163 case client().request(method, url, headers, "", hackney_opts) do
164 {:ok, code, headers, client} when code in @valid_resp_codes ->
165 {:ok, code, downcase_headers(headers), client}
166
167 {:ok, code, headers} when code in @valid_resp_codes ->
168 {:ok, code, downcase_headers(headers)}
169
170 {:ok, code, _, _} ->
171 {:error, {:invalid_http_response, code}}
172
173 {:error, error} ->
174 {:error, error}
175 end
176 end
177
178 defp response(conn, client, url, status, headers, opts) do
179 result =
180 conn
181 |> put_resp_headers(build_resp_headers(headers, opts))
182 |> send_chunked(status)
183 |> chunk_reply(client, opts)
184
185 case result do
186 {:ok, conn} ->
187 halt(conn)
188
189 {:error, :closed, conn} ->
190 client().close(client)
191 halt(conn)
192
193 {:error, error, conn} ->
194 Logger.warn(
195 "#{__MODULE__} request to #{url} failed while reading/chunking: #{inspect(error)}"
196 )
197
198 client().close(client)
199 halt(conn)
200 end
201 end
202
203 defp chunk_reply(conn, client, opts) do
204 chunk_reply(conn, client, opts, 0, 0)
205 end
206
207 defp chunk_reply(conn, client, opts, sent_so_far, duration) do
208 with {:ok, duration} <-
209 check_read_duration(
210 duration,
211 Keyword.get(opts, :max_read_duration, @max_read_duration)
212 ),
213 {:ok, data} <- client().stream_body(client),
214 {:ok, duration} <- increase_read_duration(duration),
215 sent_so_far = sent_so_far + byte_size(data),
216 :ok <-
217 body_size_constraint(
218 sent_so_far,
219 Keyword.get(opts, :max_body_length, @max_body_length)
220 ),
221 {:ok, conn} <- chunk(conn, data) do
222 chunk_reply(conn, client, opts, sent_so_far, duration)
223 else
224 :done -> {:ok, conn}
225 {:error, error} -> {:error, error, conn}
226 end
227 end
228
229 defp head_response(conn, _url, code, headers, opts) do
230 conn
231 |> put_resp_headers(build_resp_headers(headers, opts))
232 |> send_resp(code, "")
233 end
234
235 defp error_or_redirect(conn, url, code, body, opts) do
236 if Keyword.get(opts, :redirect_on_failure, false) do
237 conn
238 |> Phoenix.Controller.redirect(external: url)
239 |> halt()
240 else
241 conn
242 |> send_resp(code, body)
243 |> halt
244 end
245 end
246
247 defp downcase_headers(headers) do
248 Enum.map(headers, fn {k, v} ->
249 {String.downcase(k), v}
250 end)
251 end
252
253 defp get_content_type(headers) do
254 {_, content_type} =
255 List.keyfind(headers, "content-type", 0, {"content-type", "application/octet-stream"})
256
257 [content_type | _] = String.split(content_type, ";")
258 content_type
259 end
260
261 defp put_resp_headers(conn, headers) do
262 Enum.reduce(headers, conn, fn {k, v}, conn ->
263 put_resp_header(conn, k, v)
264 end)
265 end
266
267 defp build_req_headers(headers, opts) do
268 headers
269 |> downcase_headers()
270 |> Enum.filter(fn {k, _} -> k in @keep_req_headers end)
271 |> (fn headers ->
272 headers = headers ++ Keyword.get(opts, :req_headers, [])
273
274 if Keyword.get(opts, :keep_user_agent, false) do
275 List.keystore(
276 headers,
277 "user-agent",
278 0,
279 {"user-agent", Pleroma.Application.user_agent()}
280 )
281 else
282 headers
283 end
284 end).()
285 end
286
287 defp build_resp_headers(headers, opts) do
288 headers
289 |> Enum.filter(fn {k, _} -> k in @keep_resp_headers end)
290 |> build_resp_cache_headers(opts)
291 |> build_resp_content_disposition_header(opts)
292 |> (fn headers -> headers ++ Keyword.get(opts, :resp_headers, []) end).()
293 end
294
295 defp build_resp_cache_headers(headers, _opts) do
296 has_cache? = Enum.any?(headers, fn {k, _} -> k in @resp_cache_headers end)
297
298 cond do
299 has_cache? ->
300 # There's caching header present but no cache-control -- we need to set our own
301 # as Plug defaults to "max-age=0, private, must-revalidate"
302 List.keystore(
303 headers,
304 "cache-control",
305 0,
306 {"cache-control", @default_cache_control_header}
307 )
308
309 true ->
310 List.keystore(
311 headers,
312 "cache-control",
313 0,
314 {"cache-control", @default_cache_control_header}
315 )
316 end
317 end
318
319 defp build_resp_content_disposition_header(headers, opts) do
320 opt = Keyword.get(opts, :inline_content_types, @inline_content_types)
321
322 content_type = get_content_type(headers)
323
324 attachment? =
325 cond do
326 is_list(opt) && !Enum.member?(opt, content_type) -> true
327 opt == false -> true
328 true -> false
329 end
330
331 if attachment? do
332 name =
333 try do
334 {{"content-disposition", content_disposition_string}, _} =
335 List.keytake(headers, "content-disposition", 0)
336
337 [name | _] =
338 Regex.run(
339 ~r/filename="((?:[^"\\]|\\.)*)"/u,
340 content_disposition_string || "",
341 capture: :all_but_first
342 )
343
344 name
345 rescue
346 MatchError -> Keyword.get(opts, :attachment_name, "attachment")
347 end
348
349 disposition = "attachment; filename=\"#{name}\""
350
351 List.keystore(headers, "content-disposition", 0, {"content-disposition", disposition})
352 else
353 headers
354 end
355 end
356
357 defp header_length_constraint(headers, limit) when is_integer(limit) and limit > 0 do
358 with {_, size} <- List.keyfind(headers, "content-length", 0),
359 {size, _} <- Integer.parse(size),
360 true <- size <= limit do
361 :ok
362 else
363 false ->
364 {:error, :body_too_large}
365
366 _ ->
367 :ok
368 end
369 end
370
371 defp header_length_constraint(_, _), do: :ok
372
373 defp body_size_constraint(size, limit) when is_integer(limit) and limit > 0 and size >= limit do
374 {:error, :body_too_large}
375 end
376
377 defp body_size_constraint(_, _), do: :ok
378
379 defp check_read_duration(duration, max)
380 when is_integer(duration) and is_integer(max) and max > 0 do
381 if duration > max do
382 {:error, :read_duration_exceeded}
383 else
384 {:ok, {duration, :erlang.system_time(:millisecond)}}
385 end
386 end
387
388 defp check_read_duration(_, _), do: {:ok, :no_duration_limit, :no_duration_limit}
389
390 defp increase_read_duration({previous_duration, started})
391 when is_integer(previous_duration) and is_integer(started) do
392 duration = :erlang.system_time(:millisecond) - started
393 {:ok, previous_duration + duration}
394 end
395
396 defp increase_read_duration(_) do
397 {:ok, :no_duration_limit, :no_duration_limit}
398 end
399
400 defp client, do: Pleroma.ReverseProxy.Client
401
402 defp track_failed_url(url, error, opts) do
403 ttl =
404 unless error in [:body_too_large, 400, 204] do
405 Keyword.get(opts, :failed_request_ttl, @failed_request_ttl)
406 else
407 nil
408 end
409
410 Cachex.put(:failed_proxy_url_cache, url, true, ttl: ttl)
411 end
412 end