Merge branch 'issue/2089' into 'develop'
authorlain <lain@soykaf.club>
Wed, 16 Sep 2020 12:22:48 +0000 (12:22 +0000)
committerlain <lain@soykaf.club>
Wed, 16 Sep 2020 12:22:48 +0000 (12:22 +0000)
[#2089] fix notifications

See merge request pleroma/pleroma!3000

13 files changed:
lib/mix/tasks/pleroma/database.ex
lib/mix/tasks/pleroma/user.ex
lib/pleroma/migration_helper/notification_backfill.ex
lib/pleroma/repo.ex
lib/pleroma/repo_streamer.ex [deleted file]
lib/pleroma/user.ex
priv/repo/migrations/20200914105638_delete_notification_without_activity.exs [new file with mode: 0644]
priv/repo/migrations/20200914105800_add_notification_constraints.exs [new file with mode: 0644]
test/marker_test.exs
test/repo_test.exs
test/web/mastodon_api/controllers/account_controller_test.exs
test/web/mastodon_api/controllers/marker_controller_test.exs
test/web/mastodon_api/views/account_view_test.exs

index 7f1108dcfa87cee488266a2cc1343be7824ece75..a01c36ece30754ada443c22dffc5e3d29b9f73cf 100644 (file)
@@ -99,7 +99,7 @@ defmodule Mix.Tasks.Pleroma.Database do
       where: fragment("(?)->>'likes' is not null", object.data),
       select: %{id: object.id, likes: fragment("(?)->>'likes'", object.data)}
     )
-    |> Pleroma.RepoStreamer.chunk_stream(100)
+    |> Pleroma.Repo.chunk_stream(100, :batches)
     |> Stream.each(fn objects ->
       ids =
         objects
@@ -145,7 +145,7 @@ defmodule Mix.Tasks.Pleroma.Database do
     |> where(local: true)
     |> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data))
     |> where([_a, o], fragment("?->>'type' = 'Note'", o.data))
-    |> Pleroma.RepoStreamer.chunk_stream(100)
+    |> Pleroma.Repo.chunk_stream(100, :batches)
     |> Stream.each(fn activities ->
       Enum.each(activities, fn activity ->
         expires_at =
index 01824aa18b0f45dda65716d6189856f3394f63c1..b20c49d89538db746214e7b6b17256995d3879c8 100644 (file)
@@ -179,7 +179,7 @@ defmodule Mix.Tasks.Pleroma.User do
     start_pleroma()
 
     Pleroma.User.Query.build(%{nickname: "@#{instance}"})
-    |> Pleroma.RepoStreamer.chunk_stream(500)
+    |> Pleroma.Repo.chunk_stream(500, :batches)
     |> Stream.each(fn users ->
       users
       |> Enum.each(fn user ->
@@ -370,7 +370,7 @@ defmodule Mix.Tasks.Pleroma.User do
     start_pleroma()
 
     Pleroma.User.Query.build(%{local: true})
-    |> Pleroma.RepoStreamer.chunk_stream(500)
+    |> Pleroma.Repo.chunk_stream(500, :batches)
     |> Stream.each(fn users ->
       users
       |> Enum.each(fn user ->
index d260e62cacde990436e314027b3d308619013354..24f4733fe796c47905c14f014bb5ff36a2cd21a2 100644 (file)
@@ -19,13 +19,13 @@ defmodule Pleroma.MigrationHelper.NotificationBackfill do
     query
     |> Repo.chunk_stream(100)
     |> Enum.each(fn notification ->
-      type =
-        notification.activity
-        |> type_from_activity()
+      if notification.activity do
+        type = type_from_activity(notification.activity)
 
-      notification
-      |> Ecto.Changeset.change(%{type: type})
-      |> Repo.update()
+        notification
+        |> Ecto.Changeset.change(%{type: type})
+        |> Repo.update()
+      end
     end)
   end
 
@@ -72,8 +72,7 @@ defmodule Pleroma.MigrationHelper.NotificationBackfill do
         "pleroma:emoji_reaction"
 
       "Create" ->
-        activity
-        |> type_from_activity_object()
+        type_from_activity_object(activity)
 
       t ->
         raise "No notification type for activity type #{t}"
index f317e4d582e6cea6fee974190e30015797cffb8a..4524bd5e2c766636382ba2c0c34d409d10c289b4 100644 (file)
@@ -49,7 +49,21 @@ defmodule Pleroma.Repo do
     end
   end
 
-  def chunk_stream(query, chunk_size) do
+  @doc """
+  Returns a lazy enumerable that emits all entries from the data store matching the given query.
+
+  `returns_as` use to group records. use the `batches` option to fetch records in bulk.
+
+  ## Examples
+
+  # fetch records one-by-one
+  iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500)
+
+  # fetch records in bulk
+  iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500, :batches)
+  """
+  @spec chunk_stream(Ecto.Query.t(), integer(), atom()) :: Enumerable.t()
+  def chunk_stream(query, chunk_size, returns_as \\ :one) do
     # We don't actually need start and end funcitons of resource streaming,
     # but it seems to be the only way to not fetch records one-by-one and
     # have individual records be the elements of the stream, instead of
@@ -69,7 +83,12 @@ defmodule Pleroma.Repo do
 
             records ->
               last_id = List.last(records).id
-              {records, last_id}
+
+              if returns_as == :one do
+                {records, last_id}
+              else
+                {[records], last_id}
+              end
           end
       end,
       fn _ -> :ok end
diff --git a/lib/pleroma/repo_streamer.ex b/lib/pleroma/repo_streamer.ex
deleted file mode 100644 (file)
index cb4d7bb..0000000
+++ /dev/null
@@ -1,34 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.RepoStreamer do
-  alias Pleroma.Repo
-  import Ecto.Query
-
-  def chunk_stream(query, chunk_size) do
-    Stream.unfold(0, fn
-      :halt ->
-        {[], :halt}
-
-      last_id ->
-        query
-        |> order_by(asc: :id)
-        |> where([r], r.id > ^last_id)
-        |> limit(^chunk_size)
-        |> Repo.all()
-        |> case do
-          [] ->
-            {[], :halt}
-
-          records ->
-            last_id = List.last(records).id
-            {records, last_id}
-        end
-    end)
-    |> Stream.take_while(fn
-      [] -> false
-      _ -> true
-    end)
-  end
-end
index e73d199648f616d0a57a1c9ad2014cc83fc77ac9..57497eb83e66da2f9a799346703df4c7d997f11d 100644 (file)
@@ -25,7 +25,6 @@ defmodule Pleroma.User do
   alias Pleroma.Object
   alias Pleroma.Registration
   alias Pleroma.Repo
-  alias Pleroma.RepoStreamer
   alias Pleroma.User
   alias Pleroma.UserRelationship
   alias Pleroma.Web
@@ -1775,7 +1774,7 @@ defmodule Pleroma.User do
   def delete_user_activities(%User{ap_id: ap_id} = user) do
     ap_id
     |> Activity.Queries.by_actor()
-    |> RepoStreamer.chunk_stream(50)
+    |> Repo.chunk_stream(50, :batches)
     |> Stream.each(fn activities ->
       Enum.each(activities, fn activity -> delete_activity(activity, user) end)
     end)
diff --git a/priv/repo/migrations/20200914105638_delete_notification_without_activity.exs b/priv/repo/migrations/20200914105638_delete_notification_without_activity.exs
new file mode 100644 (file)
index 0000000..9333fc5
--- /dev/null
@@ -0,0 +1,30 @@
+defmodule Pleroma.Repo.Migrations.DeleteNotificationWithoutActivity do
+  use Ecto.Migration
+
+  import Ecto.Query
+  alias Pleroma.Repo
+
+  def up do
+    from(
+      q in Pleroma.Notification,
+      left_join: c in assoc(q, :activity),
+      select: %{id: type(q.id, :integer)},
+      where: is_nil(c.id)
+    )
+    |> Repo.chunk_stream(1_000, :batches)
+    |> Stream.each(fn records ->
+      notification_ids = Enum.map(records, fn %{id: id} -> id end)
+
+      Repo.delete_all(
+        from(n in "notifications",
+          where: n.id in ^notification_ids
+        )
+      )
+    end)
+    |> Stream.run()
+  end
+
+  def down do
+    :ok
+  end
+end
diff --git a/priv/repo/migrations/20200914105800_add_notification_constraints.exs b/priv/repo/migrations/20200914105800_add_notification_constraints.exs
new file mode 100644 (file)
index 0000000..a65c35f
--- /dev/null
@@ -0,0 +1,23 @@
+defmodule Pleroma.Repo.Migrations.AddNotificationConstraints do
+  use Ecto.Migration
+
+  def up do
+    drop(constraint(:notifications, "notifications_activity_id_fkey"))
+
+    alter table(:notifications) do
+      modify(:activity_id, references(:activities, type: :uuid, on_delete: :delete_all),
+        null: false
+      )
+    end
+  end
+
+  def down do
+    drop(constraint(:notifications, "notifications_activity_id_fkey"))
+
+    alter table(:notifications) do
+      modify(:activity_id, references(:activities, type: :uuid, on_delete: :delete_all),
+        null: true
+      )
+    end
+  end
+end
index 5b6d0b4a4729b4af0886fefb25f013222a7eefe7..7b3943c7be5ca907b3c990daa46c72abc7d2a104 100644 (file)
@@ -33,8 +33,8 @@ defmodule Pleroma.MarkerTest do
     test "returns user markers" do
       user = insert(:user)
       marker = insert(:marker, user: user)
-      insert(:notification, user: user)
-      insert(:notification, user: user)
+      insert(:notification, user: user, activity: insert(:note_activity))
+      insert(:notification, user: user, activity: insert(:note_activity))
       insert(:marker, timeline: "home", user: user)
 
       assert Marker.get_markers(
index 92e827c950fb68401b38d2327f8fdc00d963fe85..155791be22fd1fb2905ca0c82d97d9d1f14d4f01 100644 (file)
@@ -37,7 +37,9 @@ defmodule Pleroma.RepoTest do
 
     test "get one-to-many assoc from repo" do
       user = insert(:user)
-      notification = refresh_record(insert(:notification, user: user))
+
+      notification =
+        refresh_record(insert(:notification, user: user, activity: insert(:note_activity)))
 
       assert Repo.get_assoc(user, :notifications) == {:ok, [notification]}
     end
@@ -47,4 +49,32 @@ defmodule Pleroma.RepoTest do
       assert Repo.get_assoc(token, :user) == {:error, :not_found}
     end
   end
+
+  describe "chunk_stream/3" do
+    test "fetch records one-by-one" do
+      users = insert_list(50, :user)
+
+      {fetch_users, 50} =
+        from(t in User)
+        |> Repo.chunk_stream(5)
+        |> Enum.reduce({[], 0}, fn %User{} = user, {acc, count} ->
+          {acc ++ [user], count + 1}
+        end)
+
+      assert users == fetch_users
+    end
+
+    test "fetch records in bulk" do
+      users = insert_list(50, :user)
+
+      {fetch_users, 10} =
+        from(t in User)
+        |> Repo.chunk_stream(5, :batches)
+        |> Enum.reduce({[], 0}, fn users, {acc, count} ->
+          {acc ++ users, count + 1}
+        end)
+
+      assert users == fetch_users
+    end
+  end
 end
index 17a1e7d661f9234102c28807e54ee0fa7ea8953a..f7f1369e48e35d70a19d925a2a3c2d2f66a05b00 100644 (file)
@@ -1442,7 +1442,10 @@ defmodule Pleroma.Web.MastodonAPI.AccountControllerTest do
   describe "verify_credentials" do
     test "verify_credentials" do
       %{user: user, conn: conn} = oauth_access(["read:accounts"])
-      [notification | _] = insert_list(7, :notification, user: user)
+
+      [notification | _] =
+        insert_list(7, :notification, user: user, activity: insert(:note_activity))
+
       Pleroma.Notification.set_read_up_to(user, notification.id)
       conn = get(conn, "/api/v1/accounts/verify_credentials")
 
index 6dd40fb4a9f741f3b4e613ad3cbd6f20ab029b01..9f0481120df495e61ab4f4812b9ac50beb8c9e4f 100644 (file)
@@ -11,7 +11,7 @@ defmodule Pleroma.Web.MastodonAPI.MarkerControllerTest do
     test "gets markers with correct scopes", %{conn: conn} do
       user = insert(:user)
       token = insert(:oauth_token, user: user, scopes: ["read:statuses"])
-      insert_list(7, :notification, user: user)
+      insert_list(7, :notification, user: user, activity: insert(:note_activity))
 
       {:ok, %{"notifications" => marker}} =
         Pleroma.Marker.upsert(
index 9f22f9dcf99e7a2a2445c8739583098cd595d4d1..c5f491d6bd69da77ab2a7e5584e3bacf14739351 100644 (file)
@@ -448,7 +448,7 @@ defmodule Pleroma.Web.MastodonAPI.AccountViewTest do
 
     test "shows unread_count only to the account owner" do
       user = insert(:user)
-      insert_list(7, :notification, user: user)
+      insert_list(7, :notification, user: user, activity: insert(:note_activity))
       other_user = insert(:user)
 
       user = User.get_cached_by_ap_id(user.ap_id)