ReverseProxy: Fix a gun connection leak when there is an error with no
[akkoma] / lib / pleroma / reverse_proxy / client / tesla.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.ReverseProxy.Client.Tesla do
6 @behaviour Pleroma.ReverseProxy.Client
7
8 alias Pleroma.Gun.ConnectionPool
9
10 @type headers() :: [{String.t(), String.t()}]
11 @type status() :: pos_integer()
12
13 @spec request(atom(), String.t(), headers(), String.t(), keyword()) ::
14 {:ok, status(), headers}
15 | {:ok, status(), headers, map()}
16 | {:error, atom() | String.t()}
17 | no_return()
18
19 @impl true
20 def request(method, url, headers, body, opts \\ []) do
21 check_adapter()
22
23 opts = Keyword.put(opts, :body_as, :chunks)
24
25 with {:ok, response} <-
26 Pleroma.HTTP.request(
27 method,
28 url,
29 body,
30 headers,
31 Keyword.put(opts, :adapter, opts)
32 ) do
33 if is_map(response.body) and method != :head do
34 {:ok, response.status, response.headers, response.body}
35 else
36 conn_pid = response.opts[:adapter][:conn]
37 ConnectionPool.release_conn(conn_pid)
38 {:ok, response.status, response.headers}
39 end
40 else
41 {:error, error} -> {:error, error}
42 end
43 end
44
45 @impl true
46 @spec stream_body(map()) ::
47 {:ok, binary(), map()} | {:error, atom() | String.t()} | :done | no_return()
48 def stream_body(%{pid: pid, opts: opts, fin: true}) do
49 # if connection was reused, but in tesla were redirects,
50 # tesla returns new opened connection, which must be closed manually
51 if opts[:old_conn], do: Tesla.Adapter.Gun.close(pid)
52 # if there were redirects we need to checkout old conn
53 conn = opts[:old_conn] || opts[:conn]
54
55 if conn, do: :ok = ConnectionPool.release_conn(conn)
56
57 :done
58 end
59
60 def stream_body(client) do
61 case read_chunk!(client) do
62 {:fin, body} ->
63 {:ok, body, Map.put(client, :fin, true)}
64
65 {:nofin, part} ->
66 {:ok, part, client}
67
68 {:error, error} ->
69 {:error, error}
70 end
71 end
72
73 defp read_chunk!(%{pid: pid, stream: stream, opts: opts}) do
74 adapter = check_adapter()
75 adapter.read_chunk(pid, stream, opts)
76 end
77
78 @impl true
79 @spec close(map) :: :ok | no_return()
80 def close(%{pid: pid}) do
81 ConnectionPool.release_conn(pid)
82 end
83
84 defp check_adapter do
85 adapter = Application.get_env(:tesla, :adapter)
86
87 unless adapter == Tesla.Adapter.Gun do
88 raise "#{adapter} doesn't support reading body in chunks"
89 end
90
91 adapter
92 end
93 end