1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Pool.Connections do
12 @type domain :: String.t()
13 @type conn :: Pleroma.Gun.Conn.t()
15 @type t :: %__MODULE__{
16 conns: %{domain() => conn()},
20 defstruct conns: %{}, opts: []
24 @spec start_link({atom(), keyword()}) :: {:ok, pid()}
25 def start_link({name, opts}) do
26 GenServer.start_link(__MODULE__, opts, name: name)
30 def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
32 @spec checkin(String.t() | URI.t(), atom()) :: pid() | nil
33 def checkin(url, name)
34 def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name)
36 def checkin(%URI{} = uri, name) do
37 timeout = Config.get([:connections_pool, :receive_connection_timeout], 250)
46 @spec alive?(atom()) :: boolean()
48 pid = Process.whereis(name)
49 if pid, do: Process.alive?(pid), else: false
52 @spec get_state(atom()) :: t()
53 def get_state(name) do
54 GenServer.call(name, :state)
57 @spec count(atom()) :: pos_integer()
59 GenServer.call(name, :count)
62 @spec get_unused_conns(atom()) :: [{domain(), conn()}]
63 def get_unused_conns(name) do
64 GenServer.call(name, :unused_conns)
67 @spec checkout(pid(), pid(), atom()) :: :ok
68 def checkout(conn, pid, name) do
69 GenServer.cast(name, {:checkout, conn, pid})
72 @spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok
73 def add_conn(name, key, conn) do
74 GenServer.cast(name, {:add_conn, key, conn})
77 @spec remove_conn(atom(), String.t()) :: :ok
78 def remove_conn(name, key) do
79 GenServer.cast(name, {:remove_conn, key})
83 def handle_cast({:add_conn, key, conn}, state) do
84 state = put_in(state.conns[key], conn)
86 Process.monitor(conn.conn)
91 def handle_cast({:checkout, conn_pid, pid}, state) do
92 Logger.debug("checkout #{inspect(conn_pid)}")
95 with true <- Process.alive?(conn_pid),
96 {key, conn} <- find_conn(state.conns, conn_pid),
97 used_by <- List.keydelete(conn.used_by, pid, 0) do
105 put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
108 Logger.debug("checkout for closed conn #{inspect(conn_pid)}")
112 Logger.debug("checkout for alive conn #{inspect(conn_pid)}, but is not in state")
120 def handle_cast({:remove_conn, key}, state) do
121 state = put_in(state.conns, Map.delete(state.conns, key))
126 def handle_call({:checkin, uri}, from, state) do
127 key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
128 Logger.debug("checkin #{key}")
130 case state.conns[key] do
131 %{conn: conn, gun_state: gun_state} = current_conn when gun_state == :up ->
132 Logger.debug("reusing conn #{key}")
134 with time <- :os.system_time(:second),
135 last_reference <- time - current_conn.last_reference,
136 current_crf <- crf(last_reference, 100, current_conn.crf),
138 put_in(state.conns[key], %{
140 | last_reference: time,
143 used_by: [from | current_conn.used_by]
145 {:reply, conn, state}
148 %{gun_state: gun_state} when gun_state == :down ->
157 def handle_call(:state, _from, state), do: {:reply, state, state}
160 def handle_call(:count, _from, state) do
161 {:reply, Enum.count(state.conns), state}
165 def handle_call(:unused_conns, _from, state) do
168 |> Enum.filter(fn {_k, v} ->
169 v.conn_state == :idle and v.used_by == []
171 |> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
172 x.crf <= y.crf and x.last_reference <= y.last_reference
175 {:reply, unused_conns, state}
179 def handle_info({:gun_up, conn_pid, _protocol}, state) do
181 with conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid),
182 {key, conn} <- find_conn(state.conns, conn_pid, conn_key),
183 {true, key} <- {Process.alive?(conn_pid), key},
184 time <- :os.system_time(:second),
185 last_reference <- time - conn.last_reference,
186 current_crf <- crf(last_reference, 100, conn.crf) do
187 put_in(state.conns[key], %{
190 last_reference: time,
197 Logger.debug(":gun.info caused error")
201 Logger.debug(":gun_up message for closed conn #{inspect(conn_pid)}")
205 Map.delete(state.conns, key)
209 Logger.debug(":gun_up message for conn which is not found in state")
211 :ok = API.close(conn_pid)
220 def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do
221 retries = Config.get([:connections_pool, :retry], 0)
222 # we can't get info on this pid, because pid is dead
224 with {key, conn} <- find_conn(state.conns, conn_pid),
225 {true, key} <- {Process.alive?(conn_pid), key} do
226 if conn.retries == retries do
227 Logger.debug("closing conn if retries is eq #{inspect(conn_pid)}")
228 :ok = API.close(conn.conn)
232 Map.delete(state.conns, key)
235 put_in(state.conns[key], %{
238 retries: conn.retries + 1
243 # gun can send gun_down for closed conn, maybe connection is not closed yet
244 Logger.debug(":gun_down message for closed conn #{inspect(conn_pid)}")
248 Map.delete(state.conns, key)
252 Logger.debug(":gun_down message for conn which is not found in state")
254 :ok = API.close(conn_pid)
263 def handle_info({:DOWN, _ref, :process, conn_pid, reason}, state) do
264 Logger.debug("received DOWM message for #{inspect(conn_pid)} reason -> #{inspect(reason)}")
267 with {key, conn} <- find_conn(state.conns, conn_pid) do
268 Enum.each(conn.used_by, fn {pid, _ref} ->
269 Process.exit(pid, reason)
274 Map.delete(state.conns, key)
278 Logger.debug(":DOWN message for conn which is not found in state")
286 defp compose_key_gun_info(pid) do
288 # sometimes :gun.info can raise MatchError, which lead to pool terminate
289 %{origin_host: origin_host, origin_scheme: scheme, origin_port: port} = API.info(pid)
292 case :inet.ntoa(origin_host) do
293 {:error, :einval} -> origin_host
297 "#{scheme}:#{host}:#{port}"
303 defp find_conn(conns, conn_pid) do
304 Enum.find(conns, fn {_key, conn} ->
305 conn.conn == conn_pid
309 defp find_conn(conns, conn_pid, conn_key) do
310 Enum.find(conns, fn {key, conn} ->
311 key == conn_key and conn.conn == conn_pid
315 def crf(current, steps, crf) do
316 1 + :math.pow(0.5, current / steps) * crf
319 def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do
320 "#{scheme}://#{host}#{path}"