Merge branch 'develop' of https://git.pleroma.social/pleroma/pleroma into develop
[akkoma] / lib / pleroma / reverse_proxy / reverse_proxy.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.ReverseProxy do
6 alias Pleroma.HTTP
7
8 @keep_req_headers ~w(accept user-agent accept-encoding cache-control if-modified-since) ++
9 ~w(if-unmodified-since if-none-match if-range range)
10 @resp_cache_headers ~w(etag date last-modified cache-control)
11 @keep_resp_headers @resp_cache_headers ++
12 ~w(content-type content-disposition content-encoding content-range) ++
13 ~w(accept-ranges vary)
14 @default_cache_control_header "public, max-age=1209600"
15 @valid_resp_codes [200, 206, 304]
16 @max_read_duration :timer.seconds(30)
17 @max_body_length :infinity
18 @failed_request_ttl :timer.seconds(60)
19 @methods ~w(GET HEAD)
20
21 @moduledoc """
22 A reverse proxy.
23
24 Pleroma.ReverseProxy.call(conn, url, options)
25
26 It is not meant to be added into a plug pipeline, but to be called from another plug or controller.
27
28 Supports `#{inspect(@methods)}` HTTP methods, and only allows `#{inspect(@valid_resp_codes)}` status codes.
29
30 Responses are chunked to the client while downloading from the upstream.
31
32 Some request / responses headers are preserved:
33
34 * request: `#{inspect(@keep_req_headers)}`
35 * response: `#{inspect(@keep_resp_headers)}`
36
37 If no caching headers (`#{inspect(@resp_cache_headers)}`) are returned by upstream, `cache-control` will be
38 set to `#{inspect(@default_cache_control_header)}`.
39
40 Options:
41
42 * `redirect_on_failure` (default `false`). Redirects the client to the real remote URL if there's any HTTP
43 errors. Any error during body processing will not be redirected as the response is chunked. This may expose
44 remote URL, clients IPs, ….
45
46 * `max_body_length` (default `#{inspect(@max_body_length)}`): limits the content length to be approximately the
47 specified length. It is validated with the `content-length` header and also verified when proxying.
48
49 * `max_read_duration` (default `#{inspect(@max_read_duration)}` ms): the total time the connection is allowed to
50 read from the remote upstream.
51
52 * `failed_request_ttl` (default `#{inspect(@failed_request_ttl)}` ms): the time the failed request is cached and cannot be retried.
53
54 * `inline_content_types`:
55 * `true` will not alter `content-disposition` (up to the upstream),
56 * `false` will add `content-disposition: attachment` to any request,
57 * a list of whitelisted content types
58
59 * `keep_user_agent` will forward the client's user-agent to the upstream. This may be useful if the upstream is
60 doing content transformation (encoding, …) depending on the request.
61
62 * `req_headers`, `resp_headers` additional headers.
63
64 * `http`: options for [hackney](https://github.com/benoitc/hackney).
65
66 """
67 @default_hackney_options [pool: :media]
68
69 @inline_content_types [
70 "image/gif",
71 "image/jpeg",
72 "image/jpg",
73 "image/png",
74 "image/svg+xml",
75 "audio/mpeg",
76 "audio/mp3",
77 "video/webm",
78 "video/mp4",
79 "video/quicktime"
80 ]
81
82 require Logger
83 import Plug.Conn
84
85 @type option() ::
86 {:keep_user_agent, boolean}
87 | {:max_read_duration, :timer.time() | :infinity}
88 | {:max_body_length, non_neg_integer() | :infinity}
89 | {:failed_request_ttl, :timer.time() | :infinity}
90 | {:http, []}
91 | {:req_headers, [{String.t(), String.t()}]}
92 | {:resp_headers, [{String.t(), String.t()}]}
93 | {:inline_content_types, boolean() | [String.t()]}
94 | {:redirect_on_failure, boolean()}
95
96 @spec call(Plug.Conn.t(), url :: String.t(), [option()]) :: Plug.Conn.t()
97 def call(_conn, _url, _opts \\ [])
98
99 def call(conn = %{method: method}, url, opts) when method in @methods do
100 hackney_opts =
101 Pleroma.HTTP.Connection.hackney_options([])
102 |> Keyword.merge(@default_hackney_options)
103 |> Keyword.merge(Keyword.get(opts, :http, []))
104 |> HTTP.process_request_options()
105
106 req_headers = build_req_headers(conn.req_headers, opts)
107
108 opts =
109 if filename = Pleroma.Web.MediaProxy.filename(url) do
110 Keyword.put_new(opts, :attachment_name, filename)
111 else
112 opts
113 end
114
115 with {:ok, nil} <- Cachex.get(:failed_proxy_url_cache, url),
116 {:ok, code, headers, client} <- request(method, url, req_headers, hackney_opts),
117 :ok <-
118 header_length_constraint(
119 headers,
120 Keyword.get(opts, :max_body_length, @max_body_length)
121 ) do
122 response(conn, client, url, code, headers, opts)
123 else
124 {:ok, true} ->
125 conn
126 |> error_or_redirect(url, 500, "Request failed", opts)
127 |> halt()
128
129 {:ok, code, headers} ->
130 head_response(conn, url, code, headers, opts)
131 |> halt()
132
133 {:error, {:invalid_http_response, code}} ->
134 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed with HTTP status #{code}")
135 track_failed_url(url, code, opts)
136
137 conn
138 |> error_or_redirect(
139 url,
140 code,
141 "Request failed: " <> Plug.Conn.Status.reason_phrase(code),
142 opts
143 )
144 |> halt()
145
146 {:error, error} ->
147 Logger.error("#{__MODULE__}: request to #{inspect(url)} failed: #{inspect(error)}")
148 track_failed_url(url, error, opts)
149
150 conn
151 |> error_or_redirect(url, 500, "Request failed", opts)
152 |> halt()
153 end
154 end
155
156 def call(conn, _, _) do
157 conn
158 |> send_resp(400, Plug.Conn.Status.reason_phrase(400))
159 |> halt()
160 end
161
162 defp request(method, url, headers, hackney_opts) do
163 Logger.debug("#{__MODULE__} #{method} #{url} #{inspect(headers)}")
164 method = method |> String.downcase() |> String.to_existing_atom()
165
166 case client().request(method, url, headers, "", hackney_opts) do
167 {:ok, code, headers, client} when code in @valid_resp_codes ->
168 {:ok, code, downcase_headers(headers), client}
169
170 {:ok, code, headers} when code in @valid_resp_codes ->
171 {:ok, code, downcase_headers(headers)}
172
173 {:ok, code, _, _} ->
174 {:error, {:invalid_http_response, code}}
175
176 {:error, error} ->
177 {:error, error}
178 end
179 end
180
181 defp response(conn, client, url, status, headers, opts) do
182 result =
183 conn
184 |> put_resp_headers(build_resp_headers(headers, opts))
185 |> send_chunked(status)
186 |> chunk_reply(client, opts)
187
188 case result do
189 {:ok, conn} ->
190 halt(conn)
191
192 {:error, :closed, conn} ->
193 client().close(client)
194 halt(conn)
195
196 {:error, error, conn} ->
197 Logger.warn(
198 "#{__MODULE__} request to #{url} failed while reading/chunking: #{inspect(error)}"
199 )
200
201 client().close(client)
202 halt(conn)
203 end
204 end
205
206 defp chunk_reply(conn, client, opts) do
207 chunk_reply(conn, client, opts, 0, 0)
208 end
209
210 defp chunk_reply(conn, client, opts, sent_so_far, duration) do
211 with {:ok, duration} <-
212 check_read_duration(
213 duration,
214 Keyword.get(opts, :max_read_duration, @max_read_duration)
215 ),
216 {:ok, data} <- client().stream_body(client),
217 {:ok, duration} <- increase_read_duration(duration),
218 sent_so_far = sent_so_far + byte_size(data),
219 :ok <-
220 body_size_constraint(
221 sent_so_far,
222 Keyword.get(opts, :max_body_length, @max_body_length)
223 ),
224 {:ok, conn} <- chunk(conn, data) do
225 chunk_reply(conn, client, opts, sent_so_far, duration)
226 else
227 :done -> {:ok, conn}
228 {:error, error} -> {:error, error, conn}
229 end
230 end
231
232 defp head_response(conn, _url, code, headers, opts) do
233 conn
234 |> put_resp_headers(build_resp_headers(headers, opts))
235 |> send_resp(code, "")
236 end
237
238 defp error_or_redirect(conn, url, code, body, opts) do
239 if Keyword.get(opts, :redirect_on_failure, false) do
240 conn
241 |> Phoenix.Controller.redirect(external: url)
242 |> halt()
243 else
244 conn
245 |> send_resp(code, body)
246 |> halt
247 end
248 end
249
250 defp downcase_headers(headers) do
251 Enum.map(headers, fn {k, v} ->
252 {String.downcase(k), v}
253 end)
254 end
255
256 defp get_content_type(headers) do
257 {_, content_type} =
258 List.keyfind(headers, "content-type", 0, {"content-type", "application/octet-stream"})
259
260 [content_type | _] = String.split(content_type, ";")
261 content_type
262 end
263
264 defp put_resp_headers(conn, headers) do
265 Enum.reduce(headers, conn, fn {k, v}, conn ->
266 put_resp_header(conn, k, v)
267 end)
268 end
269
270 defp build_req_headers(headers, opts) do
271 headers
272 |> downcase_headers()
273 |> Enum.filter(fn {k, _} -> k in @keep_req_headers end)
274 |> (fn headers ->
275 headers = headers ++ Keyword.get(opts, :req_headers, [])
276
277 if Keyword.get(opts, :keep_user_agent, false) do
278 List.keystore(
279 headers,
280 "user-agent",
281 0,
282 {"user-agent", Pleroma.Application.user_agent()}
283 )
284 else
285 headers
286 end
287 end).()
288 end
289
290 defp build_resp_headers(headers, opts) do
291 headers
292 |> Enum.filter(fn {k, _} -> k in @keep_resp_headers end)
293 |> build_resp_cache_headers(opts)
294 |> build_resp_content_disposition_header(opts)
295 |> (fn headers -> headers ++ Keyword.get(opts, :resp_headers, []) end).()
296 end
297
298 defp build_resp_cache_headers(headers, _opts) do
299 has_cache? = Enum.any?(headers, fn {k, _} -> k in @resp_cache_headers end)
300 has_cache_control? = List.keymember?(headers, "cache-control", 0)
301
302 cond do
303 has_cache? && has_cache_control? ->
304 headers
305
306 has_cache? ->
307 # There's caching header present but no cache-control -- we need to explicitely override it
308 # to public as Plug defaults to "max-age=0, private, must-revalidate"
309 List.keystore(headers, "cache-control", 0, {"cache-control", "public"})
310
311 true ->
312 List.keystore(
313 headers,
314 "cache-control",
315 0,
316 {"cache-control", @default_cache_control_header}
317 )
318 end
319 end
320
321 defp build_resp_content_disposition_header(headers, opts) do
322 opt = Keyword.get(opts, :inline_content_types, @inline_content_types)
323
324 content_type = get_content_type(headers)
325
326 attachment? =
327 cond do
328 is_list(opt) && !Enum.member?(opt, content_type) -> true
329 opt == false -> true
330 true -> false
331 end
332
333 if attachment? do
334 name =
335 try do
336 {{"content-disposition", content_disposition_string}, _} =
337 List.keytake(headers, "content-disposition", 0)
338
339 [name | _] =
340 Regex.run(
341 ~r/filename="((?:[^"\\]|\\.)*)"/u,
342 content_disposition_string || "",
343 capture: :all_but_first
344 )
345
346 name
347 rescue
348 MatchError -> Keyword.get(opts, :attachment_name, "attachment")
349 end
350
351 disposition = "attachment; filename=\"#{name}\""
352
353 List.keystore(headers, "content-disposition", 0, {"content-disposition", disposition})
354 else
355 headers
356 end
357 end
358
359 defp header_length_constraint(headers, limit) when is_integer(limit) and limit > 0 do
360 with {_, size} <- List.keyfind(headers, "content-length", 0),
361 {size, _} <- Integer.parse(size),
362 true <- size <= limit do
363 :ok
364 else
365 false ->
366 {:error, :body_too_large}
367
368 _ ->
369 :ok
370 end
371 end
372
373 defp header_length_constraint(_, _), do: :ok
374
375 defp body_size_constraint(size, limit) when is_integer(limit) and limit > 0 and size >= limit do
376 {:error, :body_too_large}
377 end
378
379 defp body_size_constraint(_, _), do: :ok
380
381 defp check_read_duration(duration, max)
382 when is_integer(duration) and is_integer(max) and max > 0 do
383 if duration > max do
384 {:error, :read_duration_exceeded}
385 else
386 {:ok, {duration, :erlang.system_time(:millisecond)}}
387 end
388 end
389
390 defp check_read_duration(_, _), do: {:ok, :no_duration_limit, :no_duration_limit}
391
392 defp increase_read_duration({previous_duration, started})
393 when is_integer(previous_duration) and is_integer(started) do
394 duration = :erlang.system_time(:millisecond) - started
395 {:ok, previous_duration + duration}
396 end
397
398 defp increase_read_duration(_) do
399 {:ok, :no_duration_limit, :no_duration_limit}
400 end
401
402 defp client, do: Pleroma.ReverseProxy.Client
403
404 defp track_failed_url(url, error, opts) do
405 ttl =
406 unless error in [:body_too_large, 400, 204] do
407 Keyword.get(opts, :failed_request_ttl, @failed_request_ttl)
408 else
409 nil
410 end
411
412 Cachex.put(:failed_proxy_url_cache, url, true, ttl: ttl)
413 end
414 end