Merge branch 'develop' into gun
[akkoma] / lib / pleroma / reverse_proxy / client / tesla.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.ReverseProxy.Client.Tesla do
6 @type headers() :: [{String.t(), String.t()}]
7 @type status() :: pos_integer()
8
9 @behaviour Pleroma.ReverseProxy.Client
10
11 @spec request(atom(), String.t(), headers(), String.t(), keyword()) ::
12 {:ok, status(), headers}
13 | {:ok, status(), headers, map()}
14 | {:error, atom() | String.t()}
15 | no_return()
16
17 @impl true
18 def request(method, url, headers, body, opts \\ []) do
19 _adapter = check_adapter()
20
21 with opts <- Keyword.merge(opts, body_as: :chunks, mode: :passive),
22 {:ok, response} <-
23 Pleroma.HTTP.request(
24 method,
25 url,
26 body,
27 headers,
28 Keyword.put(opts, :adapter, opts)
29 ) do
30 if is_map(response.body) and method != :head do
31 {:ok, response.status, response.headers, response.body}
32 else
33 {:ok, response.status, response.headers}
34 end
35 else
36 {:error, error} -> {:error, error}
37 end
38 end
39
40 @impl true
41 @spec stream_body(map()) :: {:ok, binary(), map()} | {:error, atom() | String.t()} | :done
42 def stream_body(%{pid: pid, opts: opts, fin: true}) do
43 # if connection was sended and there were redirects, we need to close new conn - pid manually
44 if opts[:old_conn], do: Tesla.Adapter.Gun.close(pid)
45 # if there were redirects we need to checkout old conn
46 conn = opts[:old_conn] || opts[:conn]
47
48 if conn, do: :ok = Pleroma.Pool.Connections.checkout(conn, self(), :gun_connections)
49
50 :done
51 end
52
53 def stream_body(client) do
54 case read_chunk!(client) do
55 {:fin, body} ->
56 {:ok, body, Map.put(client, :fin, true)}
57
58 {:nofin, part} ->
59 {:ok, part, client}
60
61 {:error, error} ->
62 {:error, error}
63 end
64 end
65
66 defp read_chunk!(%{pid: pid, stream: stream, opts: opts}) do
67 adapter = check_adapter()
68 adapter.read_chunk(pid, stream, opts)
69 end
70
71 @impl true
72 @spec close(map) :: :ok | no_return()
73 def close(%{pid: pid}) do
74 adapter = check_adapter()
75 adapter.close(pid)
76 end
77
78 defp check_adapter do
79 adapter = Application.get_env(:tesla, :adapter)
80
81 unless adapter == Tesla.Adapter.Gun do
82 raise "#{adapter} doesn't support reading body in chunks"
83 end
84
85 adapter
86 end
87 end