+ def request_subscription(websub, poster \\ &@httpoison.post/3, timeout \\ 10_000) do
+ data = [
+ "hub.mode": "subscribe",
+ "hub.topic": websub.topic,
+ "hub.secret": websub.secret,
+ "hub.callback": Helpers.websub_url(Endpoint, :websub_subscription_confirmation, websub.id)
+ ]
+
+ # This checks once a second if we are confirmed yet
+ websub_checker = fn ->
+ helper = fn (helper) ->
+ :timer.sleep(1000)
+ websub = Repo.get_by(WebsubClientSubscription, id: websub.id, state: "accepted")
+ if websub, do: websub, else: helper.(helper)
+ end
+ helper.(helper)
+ end
+
+ task = Task.async(websub_checker)
+
+ with {:ok, %{status_code: 202}} <- poster.(websub.hub, {:form, data}, ["Content-type": "application/x-www-form-urlencoded"]),
+ {:ok, websub} <- Task.yield(task, timeout) do
+ {:ok, websub}
+ else e ->
+ Task.shutdown(task)
+
+ change = Ecto.Changeset.change(websub, %{state: "rejected"})
+ {:ok, websub} = Repo.update(change)
+
+ Logger.debug(fn -> "Couldn't confirm subscription: #{inspect(websub)}" end)
+ Logger.debug(fn -> "error: #{inspect(e)}" end)
+
+ {:error, websub}
+ end
+ end
+
+ def refresh_subscriptions(delta \\ 60 * 60 * 24) do
+ Logger.debug("Refreshing subscriptions")
+
+ cut_off = NaiveDateTime.add(NaiveDateTime.utc_now, delta)
+
+ query = from sub in WebsubClientSubscription,
+ where: sub.valid_until < ^cut_off
+
+ subs = Repo.all(query)
+
+ Enum.each(subs, fn (sub) ->
+ Pleroma.Web.Federator.enqueue(:request_subscription, sub)
+ end)