e235a7c4382e6b28a2954b9932c641d7501c6eaa
[akkoma] / lib / pleroma / web / fed_sockets / outgoing_handler.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.FedSockets.OutgoingHandler do
6 use GenServer
7
8 require Logger
9
10 alias Pleroma.Application
11 alias Pleroma.Web.ActivityPub.InternalFetchActor
12 alias Pleroma.Web.FedSockets
13 alias Pleroma.Web.FedSockets.FedRegistry
14 alias Pleroma.Web.FedSockets.FedSocket
15 alias Pleroma.Web.FedSockets.SocketInfo
16
17 def start_link(uri) do
18 GenServer.start_link(__MODULE__, %{uri: uri})
19 end
20
21 def init(%{uri: uri}) do
22 case initiate_connection(uri) do
23 {:ok, ws_origin, conn_pid} ->
24 FedRegistry.add_fed_socket(ws_origin, conn_pid)
25
26 {:error, reason} ->
27 Logger.debug("Outgoing connection failed - #{inspect(reason)}")
28 :ignore
29 end
30 end
31
32 def handle_info({:gun_ws, conn_pid, _ref, {:text, data}}, socket_info) do
33 socket_info = SocketInfo.touch(socket_info)
34
35 case FedSocket.receive_package(socket_info, data) do
36 {:noreply, _} ->
37 {:noreply, socket_info}
38
39 {:reply, reply} ->
40 :gun.ws_send(conn_pid, {:text, Jason.encode!(reply)})
41 {:noreply, socket_info}
42
43 {:error, reason} ->
44 Logger.error("incoming error - receive_package: #{inspect(reason)}")
45 {:noreply, socket_info}
46 end
47 end
48
49 def handle_info(:close, state) do
50 Logger.debug("Sending close frame !!!!!!!")
51 {:close, state}
52 end
53
54 def handle_info({:gun_down, _pid, _prot, :closed, _}, state) do
55 {:stop, :normal, state}
56 end
57
58 def handle_info({:send, data}, %{conn_pid: conn_pid} = socket_info) do
59 socket_info = SocketInfo.touch(socket_info)
60 :gun.ws_send(conn_pid, {:text, data})
61 {:noreply, socket_info}
62 end
63
64 def handle_info({:gun_ws, _, _, :pong}, state) do
65 {:noreply, state, :hibernate}
66 end
67
68 def handle_info(msg, state) do
69 Logger.debug("#{__MODULE__} unhandled event #{inspect(msg)}")
70 {:noreply, state}
71 end
72
73 def terminate(reason, state) do
74 Logger.debug(
75 "#{__MODULE__} terminating outgoing connection for #{inspect(state)} for #{inspect(reason)}"
76 )
77
78 {:ok, state}
79 end
80
81 def initiate_connection(uri) do
82 ws_uri =
83 uri
84 |> SocketInfo.origin()
85 |> FedSockets.uri_for_origin()
86
87 %{host: host, port: port, path: path} = URI.parse(ws_uri)
88
89 with {:ok, conn_pid} <- :gun.open(to_charlist(host), port, %{protocols: [:http]}),
90 {:ok, _} <- :gun.await_up(conn_pid),
91 reference <-
92 :gun.get(conn_pid, to_charlist(path), [
93 {'user-agent', to_charlist(Application.user_agent())}
94 ]),
95 {:response, :fin, 204, _} <- :gun.await(conn_pid, reference),
96 headers <- build_headers(uri),
97 ref <- :gun.ws_upgrade(conn_pid, to_charlist(path), headers, %{silence_pings: false}) do
98 receive do
99 {:gun_upgrade, ^conn_pid, ^ref, [<<"websocket">>], _} ->
100 {:ok, ws_uri, conn_pid}
101 after
102 15_000 ->
103 Logger.debug("Fedsocket timeout connecting to #{inspect(uri)}")
104 {:error, :timeout}
105 end
106 else
107 {:response, :nofin, 404, _} ->
108 {:error, :fedsockets_not_supported}
109
110 e ->
111 Logger.debug("Fedsocket error connecting to #{inspect(uri)}")
112 {:error, e}
113 end
114 end
115
116 defp build_headers(uri) do
117 host_for_sig = uri |> URI.parse() |> host_signature()
118
119 shake = FedSocket.shake()
120 digest = "SHA-256=" <> (:crypto.hash(:sha256, shake) |> Base.encode64())
121 date = Pleroma.Signature.signed_date()
122 shake_size = byte_size(shake)
123
124 signature_opts = %{
125 "(request-target)": shake,
126 "content-length": to_charlist("#{shake_size}"),
127 date: date,
128 digest: digest,
129 host: host_for_sig
130 }
131
132 signature = Pleroma.Signature.sign(InternalFetchActor.get_actor(), signature_opts)
133
134 [
135 {'signature', to_charlist(signature)},
136 {'date', date},
137 {'digest', to_charlist(digest)},
138 {'content-length', to_charlist("#{shake_size}")},
139 {to_charlist("(request-target)"), to_charlist(shake)},
140 {'user-agent', to_charlist(Application.user_agent())}
141 ]
142 end
143
144 defp host_signature(%{host: host, scheme: scheme, port: port}) do
145 if port == URI.default_port(scheme) do
146 host
147 else
148 "#{host}:#{port}"
149 end
150 end
151 end