make bulk user creation from admin works as a transaction
[akkoma] / lib / pleroma / web / websub / websub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Websub do
6 alias Ecto.Changeset
7 alias Pleroma.Activity
8 alias Pleroma.Instances
9 alias Pleroma.Repo
10 alias Pleroma.User
11 alias Pleroma.Web.ActivityPub.Visibility
12 alias Pleroma.Web.Endpoint
13 alias Pleroma.Web.Federator
14 alias Pleroma.Web.Federator.Publisher
15 alias Pleroma.Web.OStatus
16 alias Pleroma.Web.OStatus.FeedRepresenter
17 alias Pleroma.Web.Router.Helpers
18 alias Pleroma.Web.Websub.WebsubClientSubscription
19 alias Pleroma.Web.Websub.WebsubServerSubscription
20 alias Pleroma.Web.XML
21 require Logger
22
23 import Ecto.Query
24
25 @behaviour Pleroma.Web.Federator.Publisher
26
27 @httpoison Application.get_env(:pleroma, :httpoison)
28
29 def verify(subscription, getter \\ &@httpoison.get/3) do
30 challenge = Base.encode16(:crypto.strong_rand_bytes(8))
31 lease_seconds = NaiveDateTime.diff(subscription.valid_until, subscription.updated_at)
32 lease_seconds = lease_seconds |> to_string
33
34 params = %{
35 "hub.challenge": challenge,
36 "hub.lease_seconds": lease_seconds,
37 "hub.topic": subscription.topic,
38 "hub.mode": "subscribe"
39 }
40
41 url = hd(String.split(subscription.callback, "?"))
42 query = URI.parse(subscription.callback).query || ""
43 params = Map.merge(params, URI.decode_query(query))
44
45 with {:ok, response} <- getter.(url, [], params: params),
46 ^challenge <- response.body do
47 changeset = Changeset.change(subscription, %{state: "active"})
48 Repo.update(changeset)
49 else
50 e ->
51 Logger.debug("Couldn't verify subscription")
52 Logger.debug(inspect(e))
53 {:error, subscription}
54 end
55 end
56
57 @supported_activities [
58 "Create",
59 "Follow",
60 "Like",
61 "Announce",
62 "Undo",
63 "Delete"
64 ]
65
66 def is_representable?(%Activity{data: %{"type" => type}} = activity)
67 when type in @supported_activities,
68 do: Visibility.is_public?(activity)
69
70 def is_representable?(_), do: false
71
72 def publish(topic, user, %{data: %{"type" => type}} = activity)
73 when type in @supported_activities do
74 response =
75 user
76 |> FeedRepresenter.to_simple_form([activity], [user])
77 |> :xmerl.export_simple(:xmerl_xml)
78 |> to_string
79
80 query =
81 from(
82 sub in WebsubServerSubscription,
83 where: sub.topic == ^topic and sub.state == "active",
84 where: fragment("? > (NOW() at time zone 'UTC')", sub.valid_until)
85 )
86
87 subscriptions = Repo.all(query)
88
89 callbacks = Enum.map(subscriptions, & &1.callback)
90 reachable_callbacks_metadata = Instances.filter_reachable(callbacks)
91 reachable_callbacks = Map.keys(reachable_callbacks_metadata)
92
93 subscriptions
94 |> Enum.filter(&(&1.callback in reachable_callbacks))
95 |> Enum.each(fn sub ->
96 data = %{
97 xml: response,
98 topic: topic,
99 callback: sub.callback,
100 secret: sub.secret,
101 unreachable_since: reachable_callbacks_metadata[sub.callback]
102 }
103
104 Publisher.enqueue_one(__MODULE__, data)
105 end)
106 end
107
108 def publish(_, _, _), do: ""
109
110 def publish(actor, activity), do: publish(Pleroma.Web.OStatus.feed_path(actor), actor, activity)
111
112 def sign(secret, doc) do
113 :crypto.hmac(:sha, secret, to_string(doc)) |> Base.encode16() |> String.downcase()
114 end
115
116 def incoming_subscription_request(user, %{"hub.mode" => "subscribe"} = params) do
117 with {:ok, topic} <- valid_topic(params, user),
118 {:ok, lease_time} <- lease_time(params),
119 secret <- params["hub.secret"],
120 callback <- params["hub.callback"] do
121 subscription = get_subscription(topic, callback)
122
123 data = %{
124 state: subscription.state || "requested",
125 topic: topic,
126 secret: secret,
127 callback: callback
128 }
129
130 change = Changeset.change(subscription, data)
131 websub = Repo.insert_or_update!(change)
132
133 change =
134 Changeset.change(websub, %{valid_until: NaiveDateTime.add(websub.updated_at, lease_time)})
135
136 websub = Repo.update!(change)
137
138 Federator.verify_websub(websub)
139
140 {:ok, websub}
141 else
142 {:error, reason} ->
143 Logger.debug("Couldn't create subscription")
144 Logger.debug(inspect(reason))
145
146 {:error, reason}
147 end
148 end
149
150 def incoming_subscription_request(user, params) do
151 Logger.info("Unhandled WebSub request for #{user.nickname}: #{inspect(params)}")
152
153 {:error, "Invalid WebSub request"}
154 end
155
156 defp get_subscription(topic, callback) do
157 Repo.get_by(WebsubServerSubscription, topic: topic, callback: callback) ||
158 %WebsubServerSubscription{}
159 end
160
161 # Temp hack for mastodon.
162 defp lease_time(%{"hub.lease_seconds" => ""}) do
163 # three days
164 {:ok, 60 * 60 * 24 * 3}
165 end
166
167 defp lease_time(%{"hub.lease_seconds" => lease_seconds}) do
168 {:ok, String.to_integer(lease_seconds)}
169 end
170
171 defp lease_time(_) do
172 # three days
173 {:ok, 60 * 60 * 24 * 3}
174 end
175
176 defp valid_topic(%{"hub.topic" => topic}, user) do
177 if topic == OStatus.feed_path(user) do
178 {:ok, OStatus.feed_path(user)}
179 else
180 {:error, "Wrong topic requested, expected #{OStatus.feed_path(user)}, got #{topic}"}
181 end
182 end
183
184 def subscribe(subscriber, subscribed, requester \\ &request_subscription/1) do
185 topic = subscribed.info.topic
186 # FIXME: Race condition, use transactions
187 {:ok, subscription} =
188 with subscription when not is_nil(subscription) <-
189 Repo.get_by(WebsubClientSubscription, topic: topic) do
190 subscribers = [subscriber.ap_id | subscription.subscribers] |> Enum.uniq()
191 change = Ecto.Changeset.change(subscription, %{subscribers: subscribers})
192 Repo.update(change)
193 else
194 _e ->
195 subscription = %WebsubClientSubscription{
196 topic: topic,
197 hub: subscribed.info.hub,
198 subscribers: [subscriber.ap_id],
199 state: "requested",
200 secret: :crypto.strong_rand_bytes(8) |> Base.url_encode64(),
201 user: subscribed
202 }
203
204 Repo.insert(subscription)
205 end
206
207 requester.(subscription)
208 end
209
210 def gather_feed_data(topic, getter \\ &@httpoison.get/1) do
211 with {:ok, response} <- getter.(topic),
212 status when status in 200..299 <- response.status,
213 body <- response.body,
214 doc <- XML.parse_document(body),
215 uri when not is_nil(uri) <- XML.string_from_xpath("/feed/author[1]/uri", doc),
216 hub when not is_nil(hub) <- XML.string_from_xpath(~S{/feed/link[@rel="hub"]/@href}, doc) do
217 name = XML.string_from_xpath("/feed/author[1]/name", doc)
218 preferred_username = XML.string_from_xpath("/feed/author[1]/poco:preferredUsername", doc)
219 display_name = XML.string_from_xpath("/feed/author[1]/poco:displayName", doc)
220 avatar = OStatus.make_avatar_object(doc)
221 bio = XML.string_from_xpath("/feed/author[1]/summary", doc)
222
223 {:ok,
224 %{
225 "uri" => uri,
226 "hub" => hub,
227 "nickname" => preferred_username || name,
228 "name" => display_name || name,
229 "host" => URI.parse(uri).host,
230 "avatar" => avatar,
231 "bio" => bio
232 }}
233 else
234 e ->
235 {:error, e}
236 end
237 end
238
239 def request_subscription(websub, poster \\ &@httpoison.post/3, timeout \\ 10_000) do
240 data = [
241 "hub.mode": "subscribe",
242 "hub.topic": websub.topic,
243 "hub.secret": websub.secret,
244 "hub.callback": Helpers.websub_url(Endpoint, :websub_subscription_confirmation, websub.id)
245 ]
246
247 # This checks once a second if we are confirmed yet
248 websub_checker = fn ->
249 helper = fn helper ->
250 :timer.sleep(1000)
251 websub = Repo.get_by(WebsubClientSubscription, id: websub.id, state: "accepted")
252 if websub, do: websub, else: helper.(helper)
253 end
254
255 helper.(helper)
256 end
257
258 task = Task.async(websub_checker)
259
260 with {:ok, %{status: 202}} <-
261 poster.(websub.hub, {:form, data}, "Content-type": "application/x-www-form-urlencoded"),
262 {:ok, websub} <- Task.yield(task, timeout) do
263 {:ok, websub}
264 else
265 e ->
266 Task.shutdown(task)
267
268 change = Ecto.Changeset.change(websub, %{state: "rejected"})
269 {:ok, websub} = Repo.update(change)
270
271 Logger.debug(fn -> "Couldn't confirm subscription: #{inspect(websub)}" end)
272 Logger.debug(fn -> "error: #{inspect(e)}" end)
273
274 {:error, websub}
275 end
276 end
277
278 def refresh_subscriptions(delta \\ 60 * 60 * 24) do
279 Logger.debug("Refreshing subscriptions")
280
281 cut_off = NaiveDateTime.add(NaiveDateTime.utc_now(), delta)
282
283 query = from(sub in WebsubClientSubscription, where: sub.valid_until < ^cut_off)
284
285 subs = Repo.all(query)
286
287 Enum.each(subs, fn sub ->
288 Federator.request_subscription(sub)
289 end)
290 end
291
292 def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret} = params) do
293 signature = sign(secret || "", xml)
294 Logger.info(fn -> "Pushing #{topic} to #{callback}" end)
295
296 with {:ok, %{status: code}} when code in 200..299 <-
297 @httpoison.post(
298 callback,
299 xml,
300 [
301 {"Content-Type", "application/atom+xml"},
302 {"X-Hub-Signature", "sha1=#{signature}"}
303 ]
304 ) do
305 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
306 do: Instances.set_reachable(callback)
307
308 Logger.info(fn -> "Pushed to #{callback}, code #{code}" end)
309 {:ok, code}
310 else
311 {_post_result, response} ->
312 unless params[:unreachable_since], do: Instances.set_reachable(callback)
313 Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(response)}" end)
314 {:error, response}
315 end
316 end
317
318 def gather_webfinger_links(%User{} = user) do
319 [
320 %{
321 "rel" => "http://schemas.google.com/g/2010#updates-from",
322 "type" => "application/atom+xml",
323 "href" => OStatus.feed_path(user)
324 },
325 %{
326 "rel" => "http://ostatus.org/schema/1.0/subscribe",
327 "template" => OStatus.remote_follow_path()
328 }
329 ]
330 end
331
332 def gather_nodeinfo_protocol_names, do: ["ostatus"]
333 end