Merge branch 'develop' into gun
[akkoma] / lib / pleroma / pool / connections.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Pool.Connections do
6 use GenServer
7
8 alias Pleroma.Config
9 alias Pleroma.Gun
10
11 require Logger
12
13 @type domain :: String.t()
14 @type conn :: Pleroma.Gun.Conn.t()
15
16 @type t :: %__MODULE__{
17 conns: %{domain() => conn()},
18 opts: keyword()
19 }
20
21 defstruct conns: %{}, opts: []
22
23 @spec start_link({atom(), keyword()}) :: {:ok, pid()}
24 def start_link({name, opts}) do
25 GenServer.start_link(__MODULE__, opts, name: name)
26 end
27
28 @impl true
29 def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
30
31 @spec checkin(String.t() | URI.t(), atom()) :: pid() | nil
32 def checkin(url, name)
33 def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name)
34
35 def checkin(%URI{} = uri, name) do
36 timeout = Config.get([:connections_pool, :checkin_timeout], 250)
37
38 GenServer.call(name, {:checkin, uri}, timeout)
39 end
40
41 @spec alive?(atom()) :: boolean()
42 def alive?(name) do
43 if pid = Process.whereis(name) do
44 Process.alive?(pid)
45 else
46 false
47 end
48 end
49
50 @spec get_state(atom()) :: t()
51 def get_state(name) do
52 GenServer.call(name, :state)
53 end
54
55 @spec count(atom()) :: pos_integer()
56 def count(name) do
57 GenServer.call(name, :count)
58 end
59
60 @spec get_unused_conns(atom()) :: [{domain(), conn()}]
61 def get_unused_conns(name) do
62 GenServer.call(name, :unused_conns)
63 end
64
65 @spec checkout(pid(), pid(), atom()) :: :ok
66 def checkout(conn, pid, name) do
67 GenServer.cast(name, {:checkout, conn, pid})
68 end
69
70 @spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok
71 def add_conn(name, key, conn) do
72 GenServer.cast(name, {:add_conn, key, conn})
73 end
74
75 @spec remove_conn(atom(), String.t()) :: :ok
76 def remove_conn(name, key) do
77 GenServer.cast(name, {:remove_conn, key})
78 end
79
80 @impl true
81 def handle_cast({:add_conn, key, conn}, state) do
82 state = put_in(state.conns[key], conn)
83
84 Process.monitor(conn.conn)
85 {:noreply, state}
86 end
87
88 @impl true
89 def handle_cast({:checkout, conn_pid, pid}, state) do
90 state =
91 with true <- Process.alive?(conn_pid),
92 {key, conn} <- find_conn(state.conns, conn_pid),
93 used_by <- List.keydelete(conn.used_by, pid, 0) do
94 conn_state = if used_by == [], do: :idle, else: conn.conn_state
95
96 put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
97 else
98 false ->
99 Logger.debug("checkout for closed conn #{inspect(conn_pid)}")
100 state
101
102 nil ->
103 Logger.debug("checkout for alive conn #{inspect(conn_pid)}, but is not in state")
104 state
105 end
106
107 {:noreply, state}
108 end
109
110 @impl true
111 def handle_cast({:remove_conn, key}, state) do
112 state = put_in(state.conns, Map.delete(state.conns, key))
113 {:noreply, state}
114 end
115
116 @impl true
117 def handle_call({:checkin, uri}, from, state) do
118 key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
119
120 case state.conns[key] do
121 %{conn: pid, gun_state: :up} = conn ->
122 time = :os.system_time(:second)
123 last_reference = time - conn.last_reference
124 crf = crf(last_reference, 100, conn.crf)
125
126 state =
127 put_in(state.conns[key], %{
128 conn
129 | last_reference: time,
130 crf: crf,
131 conn_state: :active,
132 used_by: [from | conn.used_by]
133 })
134
135 {:reply, pid, state}
136
137 %{gun_state: :down} ->
138 {:reply, nil, state}
139
140 nil ->
141 {:reply, nil, state}
142 end
143 end
144
145 @impl true
146 def handle_call(:state, _from, state), do: {:reply, state, state}
147
148 @impl true
149 def handle_call(:count, _from, state) do
150 {:reply, Enum.count(state.conns), state}
151 end
152
153 @impl true
154 def handle_call(:unused_conns, _from, state) do
155 unused_conns =
156 state.conns
157 |> Enum.filter(&filter_conns/1)
158 |> Enum.sort(&sort_conns/2)
159
160 {:reply, unused_conns, state}
161 end
162
163 defp filter_conns({_, %{conn_state: :idle, used_by: []}}), do: true
164 defp filter_conns(_), do: false
165
166 defp sort_conns({_, c1}, {_, c2}) do
167 c1.crf <= c2.crf and c1.last_reference <= c2.last_reference
168 end
169
170 defp find_conn_from_gun_info(conns, pid) do
171 # TODO: temp fix for gun MatchError https://github.com/ninenines/gun/issues/222
172 # TODO: REMOVE LATER
173 try do
174 %{origin_host: host, origin_scheme: scheme, origin_port: port} = Gun.info(pid)
175
176 host =
177 case :inet.ntoa(host) do
178 {:error, :einval} -> host
179 ip -> ip
180 end
181
182 key = "#{scheme}:#{host}:#{port}"
183 find_conn(conns, pid, key)
184 rescue
185 MatcheError -> find_conn(conns, pid)
186 end
187 end
188
189 @impl true
190 def handle_info({:gun_up, conn_pid, _protocol}, state) do
191 state =
192 with {key, conn} <- find_conn_from_gun_info(state.conns, conn_pid),
193 {true, key} <- {Process.alive?(conn_pid), key} do
194 put_in(state.conns[key], %{
195 conn
196 | gun_state: :up,
197 conn_state: :active,
198 retries: 0
199 })
200 else
201 {false, key} ->
202 put_in(
203 state.conns,
204 Map.delete(state.conns, key)
205 )
206
207 nil ->
208 :ok = Gun.close(conn_pid)
209
210 state
211 end
212
213 {:noreply, state}
214 end
215
216 @impl true
217 def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do
218 retries = Config.get([:connections_pool, :retry], 1)
219 # we can't get info on this pid, because pid is dead
220 state =
221 with {key, conn} <- find_conn(state.conns, conn_pid),
222 {true, key} <- {Process.alive?(conn_pid), key} do
223 if conn.retries == retries do
224 :ok = Gun.close(conn.conn)
225
226 put_in(
227 state.conns,
228 Map.delete(state.conns, key)
229 )
230 else
231 put_in(state.conns[key], %{
232 conn
233 | gun_state: :down,
234 retries: conn.retries + 1
235 })
236 end
237 else
238 {false, key} ->
239 put_in(
240 state.conns,
241 Map.delete(state.conns, key)
242 )
243
244 nil ->
245 Logger.debug(":gun_down for conn which isn't found in state")
246
247 state
248 end
249
250 {:noreply, state}
251 end
252
253 @impl true
254 def handle_info({:DOWN, _ref, :process, conn_pid, reason}, state) do
255 Logger.debug("received DOWM message for #{inspect(conn_pid)} reason -> #{inspect(reason)}")
256
257 state =
258 with {key, conn} <- find_conn(state.conns, conn_pid) do
259 Enum.each(conn.used_by, fn {pid, _ref} ->
260 Process.exit(pid, reason)
261 end)
262
263 put_in(
264 state.conns,
265 Map.delete(state.conns, key)
266 )
267 else
268 nil ->
269 Logger.debug(":DOWN for conn which isn't found in state")
270
271 state
272 end
273
274 {:noreply, state}
275 end
276
277 defp find_conn(conns, conn_pid) do
278 Enum.find(conns, fn {_key, conn} ->
279 conn.conn == conn_pid
280 end)
281 end
282
283 defp find_conn(conns, conn_pid, conn_key) do
284 Enum.find(conns, fn {key, conn} ->
285 key == conn_key and conn.conn == conn_pid
286 end)
287 end
288
289 def crf(current, steps, crf) do
290 1 + :math.pow(0.5, current / steps) * crf
291 end
292 end