- defp open_conn(key, uri, state, %{proxy: {proxy_host, proxy_port}} = opts) do
- connect_opts =
- uri
- |> destination_opts()
- |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
-
- with open_opts <- Map.delete(opts, :tls_opts),
- {:ok, conn} <- API.open(proxy_host, proxy_port, open_opts),
- {:ok, _} <- API.await_up(conn),
- stream <- API.connect(conn, connect_opts),
- {:response, :fin, 200, _} <- API.await(conn, stream),
- state <-
- put_in(state.conns[key], %Conn{
- conn: conn,
- gun_state: :up,
- conn_state: :active,
- last_reference: :os.system_time(:second)
- }) do
- {:noreply, state}
- else
- error ->
- Logger.warn(
- "Received error on opening connection with http proxy #{uri.scheme}://#{
- compose_uri(uri)
- }: #{inspect(error)}"
- )
-
- {:noreply, state}
- end
- end
-
- defp open_conn(key, uri, state, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
- version =
- proxy_type
- |> to_string()
- |> String.last()
- |> case do
- "4" -> 4
- _ -> 5
- end
-
- socks_opts =
- uri
- |> destination_opts()
- |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
- |> Map.put(:version, version)
-
- opts =
- opts
- |> Map.put(:protocols, [:socks])
- |> Map.put(:socks_opts, socks_opts)
-
- with {:ok, conn} <- API.open(proxy_host, proxy_port, opts),
- {:ok, _} <- API.await_up(conn),
- state <-
- put_in(state.conns[key], %Conn{
- conn: conn,
- gun_state: :up,
- conn_state: :active,
- last_reference: :os.system_time(:second)
- }) do
- {:noreply, state}
- else
- error ->
- Logger.warn(
- "Received error on opening connection with socks proxy #{uri.scheme}://#{
- compose_uri(uri)
- }: #{inspect(error)}"
- )
-
- {:noreply, state}
- end
- end
-
- defp open_conn(key, %URI{host: host, port: port} = uri, state, opts) do
- Logger.debug("opening conn #{compose_uri(uri)}")
- {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
-
- with {:ok, conn} <- API.open(host, port, opts),
- {:ok, _} <- API.await_up(conn),
- state <-
- put_in(state.conns[key], %Conn{
- conn: conn,
- gun_state: :up,
- conn_state: :active,
- last_reference: :os.system_time(:second)
- }) do
- Logger.debug("new conn opened #{compose_uri(uri)}")
- Logger.debug("replying to the call #{compose_uri(uri)}")
- {:noreply, state}
- else
- error ->
- Logger.warn(
- "Received error on opening connection #{uri.scheme}://#{compose_uri(uri)}: #{
- inspect(error)
- }"
- )
-
- {:noreply, state}
- end
- end
-
- defp destination_opts(%URI{host: host, port: port}) do
- {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
- %{host: host, port: port}
- end
-
- defp add_http2_opts(opts, "https", tls_opts) do
- Map.merge(opts, %{protocols: [:http2], transport: :tls, tls_opts: tls_opts})
- end
-
- defp add_http2_opts(opts, _, _), do: opts
-
- @spec get_unused_conns(map()) :: [{domain(), conn()}]
- def get_unused_conns(conns) do
- conns
- |> Enum.filter(fn {_k, v} ->
- v.conn_state == :idle and v.used_by == []
- end)
- |> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
- x.crf <= y.crf and x.last_reference <= y.last_reference
- end)
- end
-
- defp try_to_open_conn(key, uri, state, opts) do
- Logger.debug("try to open conn #{compose_uri(uri)}")
-
- with [{close_key, least_used} | _conns] <- get_unused_conns(state.conns),
- :ok <- API.close(least_used.conn),
- state <-
- put_in(
- state.conns,
- Map.delete(state.conns, close_key)
- ) do
- Logger.debug(
- "least used conn found and closed #{inspect(least_used.conn)} #{compose_uri(uri)}"
- )
-
- open_conn(key, uri, state, opts)
- else
- [] -> {:noreply, state}
- end
- end
-