Merge branch 'develop' of git.pleroma.social:pleroma/pleroma into feature/jobs
[akkoma] / lib / pleroma / web / websub / websub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Websub do
6 alias Ecto.Changeset
7 alias Pleroma.Instances
8 alias Pleroma.Repo
9 alias Pleroma.Web.Websub.WebsubServerSubscription
10 alias Pleroma.Web.Websub.WebsubClientSubscription
11 alias Pleroma.Web.OStatus.FeedRepresenter
12 alias Pleroma.Web.XML
13 alias Pleroma.Web.Endpoint
14 alias Pleroma.Web.OStatus
15 alias Pleroma.Web.Router.Helpers
16 alias Pleroma.Web.Federator
17 require Logger
18
19 import Ecto.Query
20
21 @httpoison Application.get_env(:pleroma, :httpoison)
22
23 def verify(subscription, getter \\ &@httpoison.get/3) do
24 challenge = Base.encode16(:crypto.strong_rand_bytes(8))
25 lease_seconds = NaiveDateTime.diff(subscription.valid_until, subscription.updated_at)
26 lease_seconds = lease_seconds |> to_string
27
28 params = %{
29 "hub.challenge": challenge,
30 "hub.lease_seconds": lease_seconds,
31 "hub.topic": subscription.topic,
32 "hub.mode": "subscribe"
33 }
34
35 url = hd(String.split(subscription.callback, "?"))
36 query = URI.parse(subscription.callback).query || ""
37 params = Map.merge(params, URI.decode_query(query))
38
39 with {:ok, response} <- getter.(url, [], params: params),
40 ^challenge <- response.body do
41 changeset = Changeset.change(subscription, %{state: "active"})
42 Repo.update(changeset)
43 else
44 e ->
45 Logger.debug("Couldn't verify subscription")
46 Logger.debug(inspect(e))
47 {:error, subscription}
48 end
49 end
50
51 @supported_activities [
52 "Create",
53 "Follow",
54 "Like",
55 "Announce",
56 "Undo",
57 "Delete"
58 ]
59 def publish(topic, user, %{data: %{"type" => type}} = activity)
60 when type in @supported_activities do
61 response =
62 user
63 |> FeedRepresenter.to_simple_form([activity], [user])
64 |> :xmerl.export_simple(:xmerl_xml)
65 |> to_string
66
67 query =
68 from(
69 sub in WebsubServerSubscription,
70 where: sub.topic == ^topic and sub.state == "active",
71 where: fragment("? > (NOW() at time zone 'UTC')", sub.valid_until)
72 )
73
74 subscriptions = Repo.all(query)
75
76 callbacks = Enum.map(subscriptions, & &1.callback)
77 reachable_callbacks_metadata = Instances.filter_reachable(callbacks)
78 reachable_callbacks = Map.keys(reachable_callbacks_metadata)
79
80 subscriptions
81 |> Enum.filter(&(&1.callback in reachable_callbacks))
82 |> Enum.each(fn sub ->
83 data = %{
84 xml: response,
85 topic: topic,
86 callback: sub.callback,
87 secret: sub.secret,
88 unreachable_since: reachable_callbacks_metadata[sub.callback]
89 }
90
91 Federator.publish_single_websub(data)
92 end)
93 end
94
95 def publish(_, _, _), do: ""
96
97 def sign(secret, doc) do
98 :crypto.hmac(:sha, secret, to_string(doc)) |> Base.encode16() |> String.downcase()
99 end
100
101 def incoming_subscription_request(user, %{"hub.mode" => "subscribe"} = params) do
102 with {:ok, topic} <- valid_topic(params, user),
103 {:ok, lease_time} <- lease_time(params),
104 secret <- params["hub.secret"],
105 callback <- params["hub.callback"] do
106 subscription = get_subscription(topic, callback)
107
108 data = %{
109 state: subscription.state || "requested",
110 topic: topic,
111 secret: secret,
112 callback: callback
113 }
114
115 change = Changeset.change(subscription, data)
116 websub = Repo.insert_or_update!(change)
117
118 change =
119 Changeset.change(websub, %{valid_until: NaiveDateTime.add(websub.updated_at, lease_time)})
120
121 websub = Repo.update!(change)
122
123 Federator.verify_websub(websub)
124
125 {:ok, websub}
126 else
127 {:error, reason} ->
128 Logger.debug("Couldn't create subscription")
129 Logger.debug(inspect(reason))
130
131 {:error, reason}
132 end
133 end
134
135 def incoming_subscription_request(user, params) do
136 Logger.info("Unhandled WebSub request for #{user.nickname}: #{inspect(params)}")
137
138 {:error, "Invalid WebSub request"}
139 end
140
141 defp get_subscription(topic, callback) do
142 Repo.get_by(WebsubServerSubscription, topic: topic, callback: callback) ||
143 %WebsubServerSubscription{}
144 end
145
146 # Temp hack for mastodon.
147 defp lease_time(%{"hub.lease_seconds" => ""}) do
148 # three days
149 {:ok, 60 * 60 * 24 * 3}
150 end
151
152 defp lease_time(%{"hub.lease_seconds" => lease_seconds}) do
153 {:ok, String.to_integer(lease_seconds)}
154 end
155
156 defp lease_time(_) do
157 # three days
158 {:ok, 60 * 60 * 24 * 3}
159 end
160
161 defp valid_topic(%{"hub.topic" => topic}, user) do
162 if topic == OStatus.feed_path(user) do
163 {:ok, OStatus.feed_path(user)}
164 else
165 {:error, "Wrong topic requested, expected #{OStatus.feed_path(user)}, got #{topic}"}
166 end
167 end
168
169 def subscribe(subscriber, subscribed, requester \\ &request_subscription/1) do
170 topic = subscribed.info.topic
171 # FIXME: Race condition, use transactions
172 {:ok, subscription} =
173 with subscription when not is_nil(subscription) <-
174 Repo.get_by(WebsubClientSubscription, topic: topic) do
175 subscribers = [subscriber.ap_id | subscription.subscribers] |> Enum.uniq()
176 change = Ecto.Changeset.change(subscription, %{subscribers: subscribers})
177 Repo.update(change)
178 else
179 _e ->
180 subscription = %WebsubClientSubscription{
181 topic: topic,
182 hub: subscribed.info.hub,
183 subscribers: [subscriber.ap_id],
184 state: "requested",
185 secret: :crypto.strong_rand_bytes(8) |> Base.url_encode64(),
186 user: subscribed
187 }
188
189 Repo.insert(subscription)
190 end
191
192 requester.(subscription)
193 end
194
195 def gather_feed_data(topic, getter \\ &@httpoison.get/1) do
196 with {:ok, response} <- getter.(topic),
197 status when status in 200..299 <- response.status,
198 body <- response.body,
199 doc <- XML.parse_document(body),
200 uri when not is_nil(uri) <- XML.string_from_xpath("/feed/author[1]/uri", doc),
201 hub when not is_nil(hub) <- XML.string_from_xpath(~S{/feed/link[@rel="hub"]/@href}, doc) do
202 name = XML.string_from_xpath("/feed/author[1]/name", doc)
203 preferredUsername = XML.string_from_xpath("/feed/author[1]/poco:preferredUsername", doc)
204 displayName = XML.string_from_xpath("/feed/author[1]/poco:displayName", doc)
205 avatar = OStatus.make_avatar_object(doc)
206 bio = XML.string_from_xpath("/feed/author[1]/summary", doc)
207
208 {:ok,
209 %{
210 "uri" => uri,
211 "hub" => hub,
212 "nickname" => preferredUsername || name,
213 "name" => displayName || name,
214 "host" => URI.parse(uri).host,
215 "avatar" => avatar,
216 "bio" => bio
217 }}
218 else
219 e ->
220 {:error, e}
221 end
222 end
223
224 def request_subscription(websub, poster \\ &@httpoison.post/3, timeout \\ 10_000) do
225 data = [
226 "hub.mode": "subscribe",
227 "hub.topic": websub.topic,
228 "hub.secret": websub.secret,
229 "hub.callback": Helpers.websub_url(Endpoint, :websub_subscription_confirmation, websub.id)
230 ]
231
232 # This checks once a second if we are confirmed yet
233 websub_checker = fn ->
234 helper = fn helper ->
235 :timer.sleep(1000)
236 websub = Repo.get_by(WebsubClientSubscription, id: websub.id, state: "accepted")
237 if websub, do: websub, else: helper.(helper)
238 end
239
240 helper.(helper)
241 end
242
243 task = Task.async(websub_checker)
244
245 with {:ok, %{status: 202}} <-
246 poster.(websub.hub, {:form, data}, "Content-type": "application/x-www-form-urlencoded"),
247 {:ok, websub} <- Task.yield(task, timeout) do
248 {:ok, websub}
249 else
250 e ->
251 Task.shutdown(task)
252
253 change = Ecto.Changeset.change(websub, %{state: "rejected"})
254 {:ok, websub} = Repo.update(change)
255
256 Logger.debug(fn -> "Couldn't confirm subscription: #{inspect(websub)}" end)
257 Logger.debug(fn -> "error: #{inspect(e)}" end)
258
259 {:error, websub}
260 end
261 end
262
263 def refresh_subscriptions(delta \\ 60 * 60 * 24) do
264 Logger.debug("Refreshing subscriptions")
265
266 cut_off = NaiveDateTime.add(NaiveDateTime.utc_now(), delta)
267
268 query = from(sub in WebsubClientSubscription, where: sub.valid_until < ^cut_off)
269
270 subs = Repo.all(query)
271
272 Enum.each(subs, fn sub ->
273 Federator.request_subscription(sub)
274 end)
275 end
276
277 def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret} = params) do
278 signature = sign(secret || "", xml)
279 Logger.info(fn -> "Pushing #{topic} to #{callback}" end)
280
281 with {:ok, %{status: code}} when code in 200..299 <-
282 @httpoison.post(
283 callback,
284 xml,
285 [
286 {"Content-Type", "application/atom+xml"},
287 {"X-Hub-Signature", "sha1=#{signature}"}
288 ]
289 ) do
290 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
291 do: Instances.set_reachable(callback)
292
293 Logger.info(fn -> "Pushed to #{callback}, code #{code}" end)
294 {:ok, code}
295 else
296 {_post_result, response} ->
297 unless params[:unreachable_since], do: Instances.set_reachable(callback)
298 Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(response)}" end)
299 {:error, response}
300 end
301 end
302 end