Track object/create activity fetches
authorrinpatch <rinpatch@sdf.org>
Thu, 12 Sep 2019 18:37:36 +0000 (21:37 +0300)
committerrinpatch <rinpatch@sdf.org>
Thu, 12 Sep 2019 18:37:36 +0000 (21:37 +0300)
lib/pleroma/delivery.ex [new file with mode: 0644]
lib/pleroma/plugs/cache.ex
lib/pleroma/user.ex
lib/pleroma/web/activity_pub/activity_pub_controller.ex
priv/repo/migrations/20190912065617_create_deliveries.exs [new file with mode: 0644]
test/web/activity_pub/activity_pub_controller_test.exs

diff --git a/lib/pleroma/delivery.ex b/lib/pleroma/delivery.ex
new file mode 100644 (file)
index 0000000..f9a9e35
--- /dev/null
@@ -0,0 +1,58 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Delivery do
+  use Ecto.Schema
+
+  alias Pleroma.Delivery
+  alias Pleroma.FlakeId
+  alias Pleroma.User
+  alias Pleroma.Repo
+  alias Pleroma.Object
+  alias Pleroma.User
+
+  import Ecto.Changeset
+  import Ecto.Query
+
+  schema "deliveries" do
+    belongs_to(:user, User, type: FlakeId)
+    belongs_to(:object, Object)
+  end
+
+  def changeset(delivery, params \\ %{}) do
+    delivery
+    |> cast(params, [:user_id, :object_id])
+    |> foreign_key_constraint(:object_id)
+    |> foreign_key_constraint(:user_id)
+    |> unique_constraint(:user_id, name: :deliveries_user_id_object_id_index)
+  end
+
+  def create(object_id, user_id) do
+    %Delivery{}
+    |> changeset(%{user_id: user_id, object_id: object_id})
+    |> Repo.insert()
+  end
+
+  def get(object_id, user_id) do
+    from(d in Delivery, where: d.user_id == ^user_id and d.object_id == ^object_id)
+    |> Repo.one()
+  end
+
+  def get_or_create(object_id, user_id) do
+    case get(object_id, user_id) do
+      %Delivery{} = delivery -> {:ok, delivery}
+      nil -> create(object_id, user_id)
+    end
+  end
+
+  def delete_all_by_object_id(object_id) do
+    from(d in Delivery, where: d.object_id == ^object_id)
+    |> Repo.delete_all()
+  end
+
+  def get_all_by_object_id(object_id) do
+    from(d in Delivery, where: d.object_id == ^object_id)
+    |> Repo.all()
+  end
+end
index a81a861d03202226061111886ee199f4100da533..42d77fc1f9b43d98b5ab446d18bda804590d04ba 100644 (file)
@@ -20,6 +20,7 @@ defmodule Pleroma.Plugs.Cache do
 
   - `ttl`:  An expiration time (time-to-live). This value should be in milliseconds or `nil` to disable expiration. Defaults to `nil`.
   - `query_params`: Take URL query string into account (`true`), ignore it (`false`) or limit to specific params only (list). Defaults to `true`.
+  - `tracking_fun`: A function that is called on successfull responses, no matter if the request is cached or not. It should accept a conn as the first argument and the value assigned to `tracking_fun_data` as the second.
 
   Additionally, you can overwrite the TTL inside a controller action by assigning `cache_ttl` to the connection struct:
 
@@ -56,6 +57,10 @@ defmodule Pleroma.Plugs.Cache do
       {:ok, nil} ->
         cache_resp(conn, opts)
 
+      {:ok, {content_type, body, tracking_fun_data}} ->
+        conn = opts.tracking_fun(conn, tracking_fun_data)
+        send_cached(conn, {content_type, body})
+
       {:ok, record} ->
         send_cached(conn, record)
 
@@ -90,7 +95,16 @@ defmodule Pleroma.Plugs.Cache do
         content_type = content_type(conn)
         record = {content_type, body}
 
-        Cachex.put(:web_resp_cache, key, record, ttl: ttl)
+        conn =
+          unless opts[:tracking_fun] do
+            Cachex.put(:web_resp_cache, key, {content_type, body}, ttl: ttl)
+            conn
+          else
+            tracking_fun_data = Map.get(conn.assigns, :tracking_fun_data, nil)
+            Cachex.put(:web_resp_cache, {content_type, body, tracking_fun_data}, record, ttl: ttl)
+
+            opts.tracking_fun.(conn, tracking_fun_data)
+          end
 
         put_resp_header(conn, "x-cache", "MISS from Pleroma")
 
index 3aa245f2aa43f60389fc8a8f43023de09a39268e..9614acdaba1c3c94a523a14bd582b761bacec377 100644 (file)
@@ -11,6 +11,7 @@ defmodule Pleroma.User do
   alias Comeonin.Pbkdf2
   alias Ecto.Multi
   alias Pleroma.Activity
+  alias Pleroma.Delivery
   alias Pleroma.Keys
   alias Pleroma.Notification
   alias Pleroma.Object
@@ -61,6 +62,7 @@ defmodule Pleroma.User do
     field(:last_digest_emailed_at, :naive_datetime)
     has_many(:notifications, Notification)
     has_many(:registrations, Registration)
+    has_many(:deliveries, Delivery)
     embeds_one(:info, User.Info)
 
     timestamps()
@@ -1624,4 +1626,12 @@ defmodule Pleroma.User do
   def is_internal_user?(%User{nickname: nil}), do: true
   def is_internal_user?(%User{local: true, nickname: "internal." <> _}), do: true
   def is_internal_user?(_), do: false
+
+  def get_delivered_users_by_object_id(object_id) do
+    from(u in User,
+      inner_join: delivery in assoc(u, :deliveries),
+      where: delivery.object_id == ^object_id
+    )
+    |> Repo.all()
+  end
 end
index 705dbc1c2bc584981564d8545b5071595b21c761..009260d3f2b43d46183f59a56df8944b0d0656eb 100644 (file)
@@ -6,6 +6,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
   use Pleroma.Web, :controller
 
   alias Pleroma.Activity
+  alias Pleroma.Delivery
   alias Pleroma.Object
   alias Pleroma.Object.Fetcher
   alias Pleroma.User
@@ -23,7 +24,15 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
 
   action_fallback(:errors)
 
-  plug(Pleroma.Plugs.Cache, [query_params: false] when action in [:activity, :object])
+  plug(
+    Pleroma.Plugs.Cache,
+    [
+      query_params: false,
+      tracking_fun: &Pleroma.Web.ActivityPub.ActivityPubController.track_object_fetch/2
+    ]
+    when action in [:activity, :object]
+  )
+
   plug(Pleroma.Web.FederatingPlug when action in [:inbox, :relay])
   plug(:set_requester_reachable when action in [:inbox])
   plug(:relay_active? when action in [:relay])
@@ -54,6 +63,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
          %Object{} = object <- Object.get_cached_by_ap_id(ap_id),
          {_, true} <- {:public?, Visibility.is_public?(object)} do
       conn
+      |> assign(:tracking_fun_data, object.id)
       |> set_cache_ttl_for(object)
       |> put_resp_content_type("application/activity+json")
       |> put_view(ObjectView)
@@ -64,6 +74,15 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
     end
   end
 
+  def track_object_fetch(conn, object_id) do
+    case conn.assigns[:user] do
+      %User{id: user_id} -> Delivery.create(object_id, user_id)
+      _ -> nil
+    end
+
+    conn
+  end
+
   def object_likes(conn, %{"uuid" => uuid, "page" => page}) do
     with ap_id <- o_status_url(conn, :object, uuid),
          %Object{} = object <- Object.get_cached_by_ap_id(ap_id),
@@ -99,6 +118,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
          %Activity{} = activity <- Activity.normalize(ap_id),
          {_, true} <- {:public?, Visibility.is_public?(activity)} do
       conn
+      |> maybe_set_tracking_data(activity)
       |> set_cache_ttl_for(activity)
       |> put_resp_content_type("application/activity+json")
       |> put_view(ObjectView)
@@ -109,6 +129,13 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
     end
   end
 
+  defp maybe_set_tracking_data(conn, %Activity{data: %{"type" => "Create"}} = activity) do
+    object_id = Object.normalize(activity).id
+    assign(conn, :tracking_fun_data, object_id)
+  end
+
+  defp maybe_set_tracking_data(conn, _activity), do: assign(conn, :tracking_fun_data, nil)
+
   defp set_cache_ttl_for(conn, %Activity{object: object}) do
     set_cache_ttl_for(conn, object)
   end
diff --git a/priv/repo/migrations/20190912065617_create_deliveries.exs b/priv/repo/migrations/20190912065617_create_deliveries.exs
new file mode 100644 (file)
index 0000000..92ca565
--- /dev/null
@@ -0,0 +1,12 @@
+defmodule Pleroma.Repo.Migrations.CreateDeliveries do
+  use Ecto.Migration
+
+  def change do
+    create_if_not_exists table(:deliveries) do
+      add(:object_id, references(:objects, type: :id))
+      add(:user_id, references(:users, type: :uuid, on_delete: :delete_all))
+    end
+    create_if_not_exists index(:deliveries, :object_id, name: :deliveries_object_id)
+    create_if_not_exists(unique_index(:deliveries, [:user_id, :object_id]))
+  end
+end
index 9698c70997012502c1006a6c47558eb782a48e9c..0bab555b5717c7f6815b2bbefa4461ce73581d1f 100644 (file)
@@ -6,6 +6,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
   use Pleroma.Web.ConnCase
   import Pleroma.Factory
   alias Pleroma.Activity
+  alias Pleroma.Delivery
   alias Pleroma.Instances
   alias Pleroma.Object
   alias Pleroma.User
@@ -885,4 +886,86 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
       assert result["totalItems"] == 15
     end
   end
+
+  describe "delivery tracking" do
+    test "it tracks a signed object fetch", %{conn: conn} do
+      user = insert(:user, local: false)
+      activity = insert(:note_activity)
+      object = Object.normalize(activity)
+
+      object_path = String.trim_leading(object.data["id"], Pleroma.Web.Endpoint.url())
+
+      conn
+      |> put_req_header("accept", "application/activity+json")
+      |> assign(:user, user)
+      |> get(object_path)
+      |> json_response(200)
+
+      assert Delivery.get(object.id, user.id)
+    end
+
+    test "it tracks a signed activity fetch", %{conn: conn} do
+      user = insert(:user, local: false)
+      activity = insert(:note_activity)
+      object = Object.normalize(activity)
+
+      activity_path = String.trim_leading(activity.data["id"], Pleroma.Web.Endpoint.url())
+
+      conn
+      |> put_req_header("accept", "application/activity+json")
+      |> assign(:user, user)
+      |> get(activity_path)
+      |> json_response(200)
+
+      assert Delivery.get(object.id, user.id)
+    end
+
+    test "it tracks a signed object fetch when the json is cached", %{conn: conn} do
+      user = insert(:user, local: false)
+      other_user = insert(:user, local: false)
+      activity = insert(:note_activity)
+      object = Object.normalize(activity)
+
+      object_path = String.trim_leading(object.data["id"], Pleroma.Web.Endpoint.url())
+
+      conn
+      |> put_req_header("accept", "application/activity+json")
+      |> assign(:user, user)
+      |> get(object_path)
+      |> json_response(200)
+
+      build_conn()
+      |> put_req_header("accept", "application/activity+json")
+      |> assign(:user, other_user)
+      |> get(object_path)
+      |> json_response(200)
+
+      assert Delivery.get(object.id, user.id)
+      assert Delivery.get(object.id, other_user.id)
+    end
+
+    test "it tracks a signed activity fetch when the json is cached", %{conn: conn} do
+      user = insert(:user, local: false)
+      other_user = insert(:user, local: false)
+      activity = insert(:note_activity)
+      object = Object.normalize(activity)
+
+      activity_path = String.trim_leading(activity.data["id"], Pleroma.Web.Endpoint.url())
+
+      conn
+      |> put_req_header("accept", "application/activity+json")
+      |> assign(:user, user)
+      |> get(activity_path)
+      |> json_response(200)
+
+      build_conn()
+      |> put_req_header("accept", "application/activity+json")
+      |> assign(:user, other_user)
+      |> get(activity_path)
+      |> json_response(200)
+
+      assert Delivery.get(object.id, user.id)
+      assert Delivery.get(object.id, other_user.id)
+    end
+  end
 end