Merge branch 'develop' into 'remove-avatar-header'
[akkoma] / lib / pleroma / web / websub / websub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Websub do
6 alias Ecto.Changeset
7 alias Pleroma.Activity
8 alias Pleroma.HTTP
9 alias Pleroma.Instances
10 alias Pleroma.Repo
11 alias Pleroma.User
12 alias Pleroma.Web.ActivityPub.Visibility
13 alias Pleroma.Web.Endpoint
14 alias Pleroma.Web.Federator
15 alias Pleroma.Web.Federator.Publisher
16 alias Pleroma.Web.OStatus
17 alias Pleroma.Web.OStatus.FeedRepresenter
18 alias Pleroma.Web.Router.Helpers
19 alias Pleroma.Web.Websub.WebsubClientSubscription
20 alias Pleroma.Web.Websub.WebsubServerSubscription
21 alias Pleroma.Web.XML
22 require Logger
23
24 import Ecto.Query
25
26 @behaviour Pleroma.Web.Federator.Publisher
27
28 def verify(subscription, getter \\ &HTTP.get/3) do
29 challenge = Base.encode16(:crypto.strong_rand_bytes(8))
30 lease_seconds = NaiveDateTime.diff(subscription.valid_until, subscription.updated_at)
31 lease_seconds = lease_seconds |> to_string
32
33 params = %{
34 "hub.challenge": challenge,
35 "hub.lease_seconds": lease_seconds,
36 "hub.topic": subscription.topic,
37 "hub.mode": "subscribe"
38 }
39
40 url = hd(String.split(subscription.callback, "?"))
41 query = URI.parse(subscription.callback).query || ""
42 params = Map.merge(params, URI.decode_query(query))
43
44 with {:ok, response} <- getter.(url, [], params: params),
45 ^challenge <- response.body do
46 changeset = Changeset.change(subscription, %{state: "active"})
47 Repo.update(changeset)
48 else
49 e ->
50 Logger.debug("Couldn't verify subscription")
51 Logger.debug(inspect(e))
52 {:error, subscription}
53 end
54 end
55
56 @supported_activities [
57 "Create",
58 "Follow",
59 "Like",
60 "Announce",
61 "Undo",
62 "Delete"
63 ]
64
65 def is_representable?(%Activity{data: %{"type" => type}} = activity)
66 when type in @supported_activities,
67 do: Visibility.is_public?(activity)
68
69 def is_representable?(_), do: false
70
71 def publish(topic, user, %{data: %{"type" => type}} = activity)
72 when type in @supported_activities do
73 response =
74 user
75 |> FeedRepresenter.to_simple_form([activity], [user])
76 |> :xmerl.export_simple(:xmerl_xml)
77 |> to_string
78
79 query =
80 from(
81 sub in WebsubServerSubscription,
82 where: sub.topic == ^topic and sub.state == "active",
83 where: fragment("? > (NOW() at time zone 'UTC')", sub.valid_until)
84 )
85
86 subscriptions = Repo.all(query)
87
88 callbacks = Enum.map(subscriptions, & &1.callback)
89 reachable_callbacks_metadata = Instances.filter_reachable(callbacks)
90 reachable_callbacks = Map.keys(reachable_callbacks_metadata)
91
92 subscriptions
93 |> Enum.filter(&(&1.callback in reachable_callbacks))
94 |> Enum.each(fn sub ->
95 data = %{
96 xml: response,
97 topic: topic,
98 callback: sub.callback,
99 secret: sub.secret,
100 unreachable_since: reachable_callbacks_metadata[sub.callback]
101 }
102
103 Publisher.enqueue_one(__MODULE__, data)
104 end)
105 end
106
107 def publish(_, _, _), do: ""
108
109 def publish(actor, activity), do: publish(Pleroma.Web.OStatus.feed_path(actor), actor, activity)
110
111 def sign(secret, doc) do
112 :crypto.hmac(:sha, secret, to_string(doc)) |> Base.encode16() |> String.downcase()
113 end
114
115 def incoming_subscription_request(user, %{"hub.mode" => "subscribe"} = params) do
116 with {:ok, topic} <- valid_topic(params, user),
117 {:ok, lease_time} <- lease_time(params),
118 secret <- params["hub.secret"],
119 callback <- params["hub.callback"] do
120 subscription = get_subscription(topic, callback)
121
122 data = %{
123 state: subscription.state || "requested",
124 topic: topic,
125 secret: secret,
126 callback: callback
127 }
128
129 change = Changeset.change(subscription, data)
130 websub = Repo.insert_or_update!(change)
131
132 change =
133 Changeset.change(websub, %{valid_until: NaiveDateTime.add(websub.updated_at, lease_time)})
134
135 websub = Repo.update!(change)
136
137 Federator.verify_websub(websub)
138
139 {:ok, websub}
140 else
141 {:error, reason} ->
142 Logger.debug("Couldn't create subscription")
143 Logger.debug(inspect(reason))
144
145 {:error, reason}
146 end
147 end
148
149 def incoming_subscription_request(user, params) do
150 Logger.info("Unhandled WebSub request for #{user.nickname}: #{inspect(params)}")
151
152 {:error, "Invalid WebSub request"}
153 end
154
155 defp get_subscription(topic, callback) do
156 Repo.get_by(WebsubServerSubscription, topic: topic, callback: callback) ||
157 %WebsubServerSubscription{}
158 end
159
160 # Temp hack for mastodon.
161 defp lease_time(%{"hub.lease_seconds" => ""}) do
162 # three days
163 {:ok, 60 * 60 * 24 * 3}
164 end
165
166 defp lease_time(%{"hub.lease_seconds" => lease_seconds}) do
167 {:ok, String.to_integer(lease_seconds)}
168 end
169
170 defp lease_time(_) do
171 # three days
172 {:ok, 60 * 60 * 24 * 3}
173 end
174
175 defp valid_topic(%{"hub.topic" => topic}, user) do
176 if topic == OStatus.feed_path(user) do
177 {:ok, OStatus.feed_path(user)}
178 else
179 {:error, "Wrong topic requested, expected #{OStatus.feed_path(user)}, got #{topic}"}
180 end
181 end
182
183 def subscribe(subscriber, subscribed, requester \\ &request_subscription/1) do
184 topic = subscribed.info.topic
185 # FIXME: Race condition, use transactions
186 {:ok, subscription} =
187 with subscription when not is_nil(subscription) <-
188 Repo.get_by(WebsubClientSubscription, topic: topic) do
189 subscribers = [subscriber.ap_id | subscription.subscribers] |> Enum.uniq()
190 change = Ecto.Changeset.change(subscription, %{subscribers: subscribers})
191 Repo.update(change)
192 else
193 _e ->
194 subscription = %WebsubClientSubscription{
195 topic: topic,
196 hub: subscribed.info.hub,
197 subscribers: [subscriber.ap_id],
198 state: "requested",
199 secret: :crypto.strong_rand_bytes(8) |> Base.url_encode64(),
200 user: subscribed
201 }
202
203 Repo.insert(subscription)
204 end
205
206 requester.(subscription)
207 end
208
209 def gather_feed_data(topic, getter \\ &HTTP.get/1) do
210 with {:ok, response} <- getter.(topic),
211 status when status in 200..299 <- response.status,
212 body <- response.body,
213 doc <- XML.parse_document(body),
214 uri when not is_nil(uri) <- XML.string_from_xpath("/feed/author[1]/uri", doc),
215 hub when not is_nil(hub) <- XML.string_from_xpath(~S{/feed/link[@rel="hub"]/@href}, doc) do
216 name = XML.string_from_xpath("/feed/author[1]/name", doc)
217 preferred_username = XML.string_from_xpath("/feed/author[1]/poco:preferredUsername", doc)
218 display_name = XML.string_from_xpath("/feed/author[1]/poco:displayName", doc)
219 avatar = OStatus.make_avatar_object(doc)
220 bio = XML.string_from_xpath("/feed/author[1]/summary", doc)
221
222 {:ok,
223 %{
224 "uri" => uri,
225 "hub" => hub,
226 "nickname" => preferred_username || name,
227 "name" => display_name || name,
228 "host" => URI.parse(uri).host,
229 "avatar" => avatar,
230 "bio" => bio
231 }}
232 else
233 e ->
234 {:error, e}
235 end
236 end
237
238 def request_subscription(websub, poster \\ &HTTP.post/3, timeout \\ 10_000) do
239 data = [
240 "hub.mode": "subscribe",
241 "hub.topic": websub.topic,
242 "hub.secret": websub.secret,
243 "hub.callback": Helpers.websub_url(Endpoint, :websub_subscription_confirmation, websub.id)
244 ]
245
246 # This checks once a second if we are confirmed yet
247 websub_checker = fn ->
248 helper = fn helper ->
249 :timer.sleep(1000)
250 websub = Repo.get_by(WebsubClientSubscription, id: websub.id, state: "accepted")
251 if websub, do: websub, else: helper.(helper)
252 end
253
254 helper.(helper)
255 end
256
257 task = Task.async(websub_checker)
258
259 with {:ok, %{status: 202}} <-
260 poster.(websub.hub, {:form, data}, "Content-type": "application/x-www-form-urlencoded"),
261 {:ok, websub} <- Task.yield(task, timeout) do
262 {:ok, websub}
263 else
264 e ->
265 Task.shutdown(task)
266
267 change = Ecto.Changeset.change(websub, %{state: "rejected"})
268 {:ok, websub} = Repo.update(change)
269
270 Logger.debug(fn -> "Couldn't confirm subscription: #{inspect(websub)}" end)
271 Logger.debug(fn -> "error: #{inspect(e)}" end)
272
273 {:error, websub}
274 end
275 end
276
277 def refresh_subscriptions(delta \\ 60 * 60 * 24) do
278 Logger.debug("Refreshing subscriptions")
279
280 cut_off = NaiveDateTime.add(NaiveDateTime.utc_now(), delta)
281
282 query = from(sub in WebsubClientSubscription, where: sub.valid_until < ^cut_off)
283
284 subs = Repo.all(query)
285
286 Enum.each(subs, fn sub ->
287 Federator.request_subscription(sub)
288 end)
289 end
290
291 def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret} = params) do
292 signature = sign(secret || "", xml)
293 Logger.info(fn -> "Pushing #{topic} to #{callback}" end)
294
295 with {:ok, %{status: code}} when code in 200..299 <-
296 HTTP.post(
297 callback,
298 xml,
299 [
300 {"Content-Type", "application/atom+xml"},
301 {"X-Hub-Signature", "sha1=#{signature}"}
302 ]
303 ) do
304 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
305 do: Instances.set_reachable(callback)
306
307 Logger.info(fn -> "Pushed to #{callback}, code #{code}" end)
308 {:ok, code}
309 else
310 {_post_result, response} ->
311 unless params[:unreachable_since], do: Instances.set_reachable(callback)
312 Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(response)}" end)
313 {:error, response}
314 end
315 end
316
317 def gather_webfinger_links(%User{} = user) do
318 [
319 %{
320 "rel" => "http://schemas.google.com/g/2010#updates-from",
321 "type" => "application/atom+xml",
322 "href" => OStatus.feed_path(user)
323 },
324 %{
325 "rel" => "http://ostatus.org/schema/1.0/subscribe",
326 "template" => OStatus.remote_follow_path()
327 }
328 ]
329 end
330
331 def gather_nodeinfo_protocol_names, do: ["ostatus"]
332 end