Merge branch 'develop' into gun
[akkoma] / lib / pleroma / pool / connections.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Pool.Connections do
6 use GenServer
7
8 alias Pleroma.Config
9 alias Pleroma.Gun
10
11 require Logger
12
13 @type domain :: String.t()
14 @type conn :: Pleroma.Gun.Conn.t()
15
16 @type t :: %__MODULE__{
17 conns: %{domain() => conn()},
18 opts: keyword()
19 }
20
21 defstruct conns: %{}, opts: []
22
23 @spec start_link({atom(), keyword()}) :: {:ok, pid()}
24 def start_link({name, opts}) do
25 GenServer.start_link(__MODULE__, opts, name: name)
26 end
27
28 @impl true
29 def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
30
31 @spec checkin(String.t() | URI.t(), atom()) :: pid() | nil
32 def checkin(url, name)
33 def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name)
34
35 def checkin(%URI{} = uri, name) do
36 timeout = Config.get([:connections_pool, :checkin_timeout], 250)
37
38 GenServer.call(name, {:checkin, uri}, timeout)
39 end
40
41 @spec alive?(atom()) :: boolean()
42 def alive?(name) do
43 if pid = Process.whereis(name) do
44 Process.alive?(pid)
45 else
46 false
47 end
48 end
49
50 @spec get_state(atom()) :: t()
51 def get_state(name) do
52 GenServer.call(name, :state)
53 end
54
55 @spec count(atom()) :: pos_integer()
56 def count(name) do
57 GenServer.call(name, :count)
58 end
59
60 @spec get_unused_conns(atom()) :: [{domain(), conn()}]
61 def get_unused_conns(name) do
62 GenServer.call(name, :unused_conns)
63 end
64
65 @spec checkout(pid(), pid(), atom()) :: :ok
66 def checkout(conn, pid, name) do
67 GenServer.cast(name, {:checkout, conn, pid})
68 end
69
70 @spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok
71 def add_conn(name, key, conn) do
72 GenServer.cast(name, {:add_conn, key, conn})
73 end
74
75 @spec remove_conn(atom(), String.t()) :: :ok
76 def remove_conn(name, key) do
77 GenServer.cast(name, {:remove_conn, key})
78 end
79
80 @impl true
81 def handle_cast({:add_conn, key, conn}, state) do
82 state = put_in(state.conns[key], conn)
83
84 Process.monitor(conn.conn)
85 {:noreply, state}
86 end
87
88 @impl true
89 def handle_cast({:checkout, conn_pid, pid}, state) do
90 Logger.debug("checkout #{inspect(conn_pid)}")
91
92 state =
93 with true <- Process.alive?(conn_pid),
94 {key, conn} <- find_conn(state.conns, conn_pid),
95 used_by <- List.keydelete(conn.used_by, pid, 0) do
96 conn_state =
97 if used_by == [] do
98 :idle
99 else
100 conn.conn_state
101 end
102
103 put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
104 else
105 false ->
106 Logger.debug("checkout for closed conn #{inspect(conn_pid)}")
107 state
108
109 nil ->
110 Logger.debug("checkout for alive conn #{inspect(conn_pid)}, but is not in state")
111 state
112 end
113
114 {:noreply, state}
115 end
116
117 @impl true
118 def handle_cast({:remove_conn, key}, state) do
119 state = put_in(state.conns, Map.delete(state.conns, key))
120 {:noreply, state}
121 end
122
123 @impl true
124 def handle_call({:checkin, uri}, from, state) do
125 key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
126 Logger.debug("checkin #{key}")
127
128 case state.conns[key] do
129 %{conn: conn, gun_state: :up} = current_conn ->
130 Logger.debug("reusing conn #{key}")
131
132 time = :os.system_time(:second)
133 last_reference = time - current_conn.last_reference
134 current_crf = crf(last_reference, 100, current_conn.crf)
135
136 state =
137 put_in(state.conns[key], %{
138 current_conn
139 | last_reference: time,
140 crf: current_crf,
141 conn_state: :active,
142 used_by: [from | current_conn.used_by]
143 })
144
145 {:reply, conn, state}
146
147 %{gun_state: :down} ->
148 {:reply, nil, state}
149
150 nil ->
151 {:reply, nil, state}
152 end
153 end
154
155 @impl true
156 def handle_call(:state, _from, state), do: {:reply, state, state}
157
158 @impl true
159 def handle_call(:count, _from, state) do
160 {:reply, Enum.count(state.conns), state}
161 end
162
163 @impl true
164 def handle_call(:unused_conns, _from, state) do
165 unused_conns =
166 state.conns
167 |> Enum.filter(fn {_k, v} ->
168 v.conn_state == :idle and v.used_by == []
169 end)
170 |> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
171 x.crf <= y.crf and x.last_reference <= y.last_reference
172 end)
173
174 {:reply, unused_conns, state}
175 end
176
177 @impl true
178 def handle_info({:gun_up, conn_pid, _protocol}, state) do
179 state =
180 with conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid),
181 {key, conn} <- find_conn(state.conns, conn_pid, conn_key),
182 {true, key} <- {Process.alive?(conn_pid), key} do
183 time = :os.system_time(:second)
184 last_reference = time - conn.last_reference
185 current_crf = crf(last_reference, 100, conn.crf)
186
187 put_in(state.conns[key], %{
188 conn
189 | gun_state: :up,
190 last_reference: time,
191 crf: current_crf,
192 conn_state: :active,
193 retries: 0
194 })
195 else
196 :error_gun_info ->
197 Logger.debug(":gun.info caused error")
198 state
199
200 {false, key} ->
201 Logger.debug(":gun_up message for closed conn #{inspect(conn_pid)}")
202
203 put_in(
204 state.conns,
205 Map.delete(state.conns, key)
206 )
207
208 nil ->
209 Logger.debug(":gun_up message for conn which is not found in state")
210
211 :ok = Gun.close(conn_pid)
212
213 state
214 end
215
216 {:noreply, state}
217 end
218
219 @impl true
220 def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do
221 retries = Config.get([:connections_pool, :retry], 1)
222 # we can't get info on this pid, because pid is dead
223 state =
224 with {key, conn} <- find_conn(state.conns, conn_pid),
225 {true, key} <- {Process.alive?(conn_pid), key} do
226 if conn.retries == retries do
227 Logger.debug("closing conn if retries is eq #{inspect(conn_pid)}")
228 :ok = Gun.close(conn.conn)
229
230 put_in(
231 state.conns,
232 Map.delete(state.conns, key)
233 )
234 else
235 put_in(state.conns[key], %{
236 conn
237 | gun_state: :down,
238 retries: conn.retries + 1
239 })
240 end
241 else
242 {false, key} ->
243 # gun can send gun_down for closed conn, maybe connection is not closed yet
244 Logger.debug(":gun_down message for closed conn #{inspect(conn_pid)}")
245
246 put_in(
247 state.conns,
248 Map.delete(state.conns, key)
249 )
250
251 nil ->
252 Logger.debug(":gun_down message for conn which is not found in state")
253
254 :ok = Gun.close(conn_pid)
255
256 state
257 end
258
259 {:noreply, state}
260 end
261
262 @impl true
263 def handle_info({:DOWN, _ref, :process, conn_pid, reason}, state) do
264 Logger.debug("received DOWM message for #{inspect(conn_pid)} reason -> #{inspect(reason)}")
265
266 state =
267 with {key, conn} <- find_conn(state.conns, conn_pid) do
268 Enum.each(conn.used_by, fn {pid, _ref} ->
269 Process.exit(pid, reason)
270 end)
271
272 put_in(
273 state.conns,
274 Map.delete(state.conns, key)
275 )
276 else
277 nil ->
278 Logger.debug(":DOWN message for conn which is not found in state")
279
280 state
281 end
282
283 {:noreply, state}
284 end
285
286 defp compose_key_gun_info(pid) do
287 %{origin_host: origin_host, origin_scheme: scheme, origin_port: port} = Gun.info(pid)
288
289 host =
290 case :inet.ntoa(origin_host) do
291 {:error, :einval} -> origin_host
292 ip -> ip
293 end
294
295 "#{scheme}:#{host}:#{port}"
296 end
297
298 defp find_conn(conns, conn_pid) do
299 Enum.find(conns, fn {_key, conn} ->
300 conn.conn == conn_pid
301 end)
302 end
303
304 defp find_conn(conns, conn_pid, conn_key) do
305 Enum.find(conns, fn {key, conn} ->
306 key == conn_key and conn.conn == conn_pid
307 end)
308 end
309
310 def crf(current, steps, crf) do
311 1 + :math.pow(0.5, current / steps) * crf
312 end
313
314 def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do
315 "#{scheme}://#{host}#{path}"
316 end
317 end