+
+ defp user_upgrade_task(user) do
+ old_follower_address = User.ap_followers(user)
+
+ q =
+ from(
+ u in User,
+ where: ^old_follower_address in u.following,
+ update: [
+ set: [
+ following:
+ fragment(
+ "array_replace(?,?,?)",
+ u.following,
+ ^old_follower_address,
+ ^user.follower_address
+ )
+ ]
+ ]
+ )
+
+ Repo.update_all(q, [])
+
+ maybe_retire_websub(user.ap_id)
+
+ # Only do this for recent activties, don't go through the whole db.
+ # Only look at the last 1000 activities.
+ since = (Repo.aggregate(Activity, :max, :id) || 0) - 1_000
+
+ q =
+ from(
+ a in Activity,
+ where: ^old_follower_address in a.recipients,
+ where: a.id > ^since,
+ update: [
+ set: [
+ recipients:
+ fragment(
+ "array_replace(?,?,?)",
+ a.recipients,
+ ^old_follower_address,
+ ^user.follower_address
+ )
+ ]
+ ]
+ )
+
+ Repo.update_all(q, [])
+ end
+
+ def upgrade_user_from_ap_id(ap_id, async \\ true) do
+ with %User{local: false} = user <- User.get_by_ap_id(ap_id),
+ {:ok, data} <- ActivityPub.fetch_and_prepare_user_from_ap_id(ap_id) do
+ data =
+ data
+ |> Map.put(:info, Map.merge(user.info, data[:info]))
+
+ already_ap = User.ap_enabled?(user)
+
+ {:ok, user} =
+ User.upgrade_changeset(user, data)
+ |> Repo.update()
+
+ if !already_ap do
+ # This could potentially take a long time, do it in the background
+ if async do
+ Task.start(fn ->
+ user_upgrade_task(user)
+ end)
+ else
+ user_upgrade_task(user)
+ end
+ end
+
+ {:ok, user}
+ else
+ e -> e
+ end
+ end
+
+ def maybe_retire_websub(ap_id) do
+ # some sanity checks
+ if is_binary(ap_id) && String.length(ap_id) > 8 do
+ q =
+ from(
+ ws in Pleroma.Web.Websub.WebsubClientSubscription,
+ where: fragment("? like ?", ws.topic, ^"#{ap_id}%")
+ )
+
+ Repo.delete_all(q)
+ end
+ end