Merge branch 'feature/delivery-tracking' into 'develop'
authorkaniini <ariadne@dereferenced.org>
Mon, 16 Sep 2019 07:39:58 +0000 (07:39 +0000)
committerkaniini <ariadne@dereferenced.org>
Mon, 16 Sep 2019 07:39:58 +0000 (07:39 +0000)
Track signed fetches of objects and use them for delete federation

See merge request pleroma/pleroma!1661

1  2 
lib/pleroma/user.ex
lib/pleroma/web/activity_pub/publisher.ex
test/web/activity_pub/activity_pub_controller_test.exs
test/web/activity_pub/publisher_test.exs

diff --combined lib/pleroma/user.ex
index f0306652c2bc73890dcb1a28b0a7881bee61f06e,e502279155d4a67e2789ce19b22acd48629cfa97..dd2b1c8c436e8a479884317a52164cc5ececb993
@@@ -11,6 -11,7 +11,7 @@@ defmodule Pleroma.User d
    alias Comeonin.Pbkdf2
    alias Ecto.Multi
    alias Pleroma.Activity
+   alias Pleroma.Delivery
    alias Pleroma.Keys
    alias Pleroma.Notification
    alias Pleroma.Object
@@@ -27,7 -28,6 +28,7 @@@
    alias Pleroma.Web.OStatus
    alias Pleroma.Web.RelMe
    alias Pleroma.Web.Websub
 +  alias Pleroma.Workers.BackgroundWorker
  
    require Logger
  
@@@ -62,6 -62,7 +63,7 @@@
      field(:last_digest_emailed_at, :naive_datetime)
      has_many(:notifications, Notification)
      has_many(:registrations, Registration)
+     has_many(:deliveries, Delivery)
      embeds_one(:info, User.Info)
  
      timestamps()
      |> Repo.aggregate(:count, :id)
    end
  
 +  defp truncate_if_exists(params, key, max_length) do
 +    if Map.has_key?(params, key) and is_binary(params[key]) do
 +      {value, _chopped} = String.split_at(params[key], max_length)
 +      Map.put(params, key, value)
 +    else
 +      params
 +    end
 +  end
 +
    def remote_user_creation(params) do
      bio_limit = Pleroma.Config.get([:instance, :user_bio_length], 5000)
      name_limit = Pleroma.Config.get([:instance, :user_name_length], 100)
  
 -    params = Map.put(params, :info, params[:info] || %{})
 +    params =
 +      params
 +      |> Map.put(:info, params[:info] || %{})
 +      |> truncate_if_exists(:name, name_limit)
 +      |> truncate_if_exists(:bio, bio_limit)
 +
      info_cng = User.Info.remote_user_creation(%User.Info{}, params[:info])
  
      changes =
    end
  
    @doc "Fetch some posts when the user has just been federated with"
 -  def fetch_initial_posts(user),
 -    do: PleromaJobQueue.enqueue(:background, __MODULE__, [:fetch_initial_posts, user])
 +  def fetch_initial_posts(user) do
 +    BackgroundWorker.enqueue("fetch_initial_posts", %{"user_id" => user.id})
 +  end
  
    @spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t()
    def get_followers_query(%User{} = user, nil) do
    end
  
    def deactivate_async(user, status \\ true) do
 -    PleromaJobQueue.enqueue(:background, __MODULE__, [:deactivate_async, user, status])
 +    BackgroundWorker.enqueue("deactivate_user", %{"user_id" => user.id, "status" => status})
    end
  
    def deactivate(%User{} = user, status \\ true) do
      |> update_and_set_cache()
    end
  
 -  @spec delete(User.t()) :: :ok
 -  def delete(%User{} = user),
 -    do: PleromaJobQueue.enqueue(:background, __MODULE__, [:delete, user])
 +  def delete(%User{} = user) do
 +    BackgroundWorker.enqueue("delete_user", %{"user_id" => user.id})
 +  end
  
    @spec perform(atom(), User.t()) :: {:ok, User.t()}
    def perform(:delete, %User{} = user) do
      Repo.all(query)
    end
  
 -  def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers),
 -    do:
 -      PleromaJobQueue.enqueue(:background, __MODULE__, [
 -        :blocks_import,
 -        blocker,
 -        blocked_identifiers
 -      ])
 -
 -  def follow_import(%User{} = follower, followed_identifiers) when is_list(followed_identifiers),
 -    do:
 -      PleromaJobQueue.enqueue(:background, __MODULE__, [
 -        :follow_import,
 -        follower,
 -        followed_identifiers
 -      ])
 +  def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do
 +    BackgroundWorker.enqueue("blocks_import", %{
 +      "blocker_id" => blocker.id,
 +      "blocked_identifiers" => blocked_identifiers
 +    })
 +  end
 +
 +  def follow_import(%User{} = follower, followed_identifiers)
 +      when is_list(followed_identifiers) do
 +    BackgroundWorker.enqueue("follow_import", %{
 +      "follower_id" => follower.id,
 +      "followed_identifiers" => followed_identifiers
 +    })
 +  end
  
    def delete_user_activities(%User{ap_id: ap_id} = user) do
      ap_id
    def is_internal_user?(%User{local: true, nickname: "internal." <> _}), do: true
    def is_internal_user?(_), do: false
  
+   # A hack because user delete activities have a fake id for whatever reason
+   # TODO: Get rid of this
+   def get_delivered_users_by_object_id("pleroma:fake_object_id"), do: []
+   def get_delivered_users_by_object_id(object_id) do
+     from(u in User,
+       inner_join: delivery in assoc(u, :deliveries),
+       where: delivery.object_id == ^object_id
+     )
+     |> Repo.all()
+   end
    def change_email(user, email) do
      user
      |> cast(%{email: email}, [:email])
index a6322e25a075624eb2669956e7e0b1e193807407,c39e89a6a039a11f7044cd1db7b5483291ca48ef..114251b248626a4fb2ef172ebb06b0ebcbdca65d
@@@ -5,8 -5,10 +5,10 @@@
  defmodule Pleroma.Web.ActivityPub.Publisher do
    alias Pleroma.Activity
    alias Pleroma.Config
+   alias Pleroma.Delivery
    alias Pleroma.HTTP
    alias Pleroma.Instances
+   alias Pleroma.Object
    alias Pleroma.User
    alias Pleroma.Web.ActivityPub.Relay
    alias Pleroma.Web.ActivityPub.Transmogrifier
      end
    end
  
 +  def publish_one(%{actor_id: actor_id} = params) do
 +    actor = User.get_cached_by_id(actor_id)
 +
 +    params
 +    |> Map.delete(:actor_id)
 +    |> Map.put(:actor, actor)
 +    |> publish_one()
 +  end
 +
    defp should_federate?(inbox, public) do
      if public do
        true
          {:ok, []}
        end
  
-     Pleroma.Web.Salmon.remote_users(actor, activity) ++ followers
+     fetchers =
+       with %Activity{data: %{"type" => "Delete"}} <- activity,
+            %Object{id: object_id} <- Object.normalize(activity),
+            fetchers <- User.get_delivered_users_by_object_id(object_id),
+            _ <- Delivery.delete_all_by_object_id(object_id) do
+         fetchers
+       else
+         _ ->
+           []
+       end
+     Pleroma.Web.Salmon.remote_users(actor, activity) ++ followers ++ fetchers
    end
  
    defp get_cc_ap_ids(ap_id, recipients) do
    Publishes an activity with BCC to all relevant peers.
    """
  
 -  def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do
 +  def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity)
 +      when is_list(bcc) and bcc != [] do
      public = is_public?(activity)
      {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
  
        Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{
          inbox: inbox,
          json: json,
 -        actor: actor,
 +        actor_id: actor.id,
          id: activity.data["id"],
          unreachable_since: unreachable_since
        })
          %{
            inbox: inbox,
            json: json,
 -          actor: actor,
 +          actor_id: actor.id,
            id: activity.data["id"],
            unreachable_since: unreachable_since
          }
index 9b78fb72d9fd79076af2f3e6c8aaf62951f6b2df,0bab555b5717c7f6815b2bbefa4461ce73581d1f..f83b14452d87b28745d788a3911c88ad5968699f
@@@ -4,20 -4,17 +4,21 @@@
  
  defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
    use Pleroma.Web.ConnCase
 +  use Oban.Testing, repo: Pleroma.Repo
 +
    import Pleroma.Factory
    alias Pleroma.Activity
+   alias Pleroma.Delivery
    alias Pleroma.Instances
    alias Pleroma.Object
 +  alias Pleroma.Tests.ObanHelpers
    alias Pleroma.User
    alias Pleroma.Web.ActivityPub.ObjectView
    alias Pleroma.Web.ActivityPub.Relay
    alias Pleroma.Web.ActivityPub.UserView
    alias Pleroma.Web.ActivityPub.Utils
    alias Pleroma.Web.CommonAPI
 +  alias Pleroma.Workers.ReceiverWorker
  
    setup_all do
      Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
          |> post("/inbox", data)
  
        assert "ok" == json_response(conn, 200)
 -      :timer.sleep(500)
 +
 +      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
        assert Activity.get_by_ap_id(data["id"])
      end
  
          |> post("/users/#{user.nickname}/inbox", data)
  
        assert "ok" == json_response(conn, 200)
 -      :timer.sleep(500)
 +      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
        assert Activity.get_by_ap_id(data["id"])
      end
  
          |> post("/users/#{recipient.nickname}/inbox", data)
  
        assert "ok" == json_response(conn, 200)
 -      :timer.sleep(500)
 +      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
        assert Activity.get_by_ap_id(data["id"])
      end
  
        |> post("/users/#{recipient.nickname}/inbox", data)
        |> json_response(200)
  
 +      ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
 +
        activity = Activity.get_by_ap_id(data["id"])
  
        assert activity.id
          |> post("/users/#{user.nickname}/outbox", data)
  
        result = json_response(conn, 201)
 +
        assert Activity.get_by_ap_id(result["id"])
      end
  
        assert result["totalItems"] == 15
      end
    end
+   describe "delivery tracking" do
+     test "it tracks a signed object fetch", %{conn: conn} do
+       user = insert(:user, local: false)
+       activity = insert(:note_activity)
+       object = Object.normalize(activity)
+       object_path = String.trim_leading(object.data["id"], Pleroma.Web.Endpoint.url())
+       conn
+       |> put_req_header("accept", "application/activity+json")
+       |> assign(:user, user)
+       |> get(object_path)
+       |> json_response(200)
+       assert Delivery.get(object.id, user.id)
+     end
+     test "it tracks a signed activity fetch", %{conn: conn} do
+       user = insert(:user, local: false)
+       activity = insert(:note_activity)
+       object = Object.normalize(activity)
+       activity_path = String.trim_leading(activity.data["id"], Pleroma.Web.Endpoint.url())
+       conn
+       |> put_req_header("accept", "application/activity+json")
+       |> assign(:user, user)
+       |> get(activity_path)
+       |> json_response(200)
+       assert Delivery.get(object.id, user.id)
+     end
+     test "it tracks a signed object fetch when the json is cached", %{conn: conn} do
+       user = insert(:user, local: false)
+       other_user = insert(:user, local: false)
+       activity = insert(:note_activity)
+       object = Object.normalize(activity)
+       object_path = String.trim_leading(object.data["id"], Pleroma.Web.Endpoint.url())
+       conn
+       |> put_req_header("accept", "application/activity+json")
+       |> assign(:user, user)
+       |> get(object_path)
+       |> json_response(200)
+       build_conn()
+       |> put_req_header("accept", "application/activity+json")
+       |> assign(:user, other_user)
+       |> get(object_path)
+       |> json_response(200)
+       assert Delivery.get(object.id, user.id)
+       assert Delivery.get(object.id, other_user.id)
+     end
+     test "it tracks a signed activity fetch when the json is cached", %{conn: conn} do
+       user = insert(:user, local: false)
+       other_user = insert(:user, local: false)
+       activity = insert(:note_activity)
+       object = Object.normalize(activity)
+       activity_path = String.trim_leading(activity.data["id"], Pleroma.Web.Endpoint.url())
+       conn
+       |> put_req_header("accept", "application/activity+json")
+       |> assign(:user, user)
+       |> get(activity_path)
+       |> json_response(200)
+       build_conn()
+       |> put_req_header("accept", "application/activity+json")
+       |> assign(:user, other_user)
+       |> get(activity_path)
+       |> json_response(200)
+       assert Delivery.get(object.id, user.id)
+       assert Delivery.get(object.id, other_user.id)
+     end
+   end
  end
index c7d0dc3a53c9ea2f714a34e77077e966f802af79,0ef97464e6147480579bc764b52c2363d63ad90c..c7d1d05aaa1a2eafd134525cbfad7ea81c0b2267
@@@ -3,16 -3,17 +3,18 @@@
  # SPDX-License-Identifier: AGPL-3.0-only
  
  defmodule Pleroma.Web.ActivityPub.PublisherTest do
-   use Pleroma.DataCase
+   use Pleroma.Web.ConnCase
  
 +  import ExUnit.CaptureLog
    import Pleroma.Factory
    import Tesla.Mock
    import Mock
  
    alias Pleroma.Activity
    alias Pleroma.Instances
+   alias Pleroma.Object
    alias Pleroma.Web.ActivityPub.Publisher
+   alias Pleroma.Web.CommonAPI
  
    @as_public "https://www.w3.org/ns/activitystreams#Public"
  
        actor = insert(:user)
        inbox = "http://connrefused.site/users/nick1/inbox"
  
 -      assert {:error, _} = Publisher.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1})
 +      assert capture_log(fn ->
 +               assert {:error, _} =
 +                        Publisher.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1})
 +             end) =~ "connrefused"
  
        assert called(Instances.set_unreachable(inbox))
      end
        actor = insert(:user)
        inbox = "http://connrefused.site/users/nick1/inbox"
  
 -      assert {:error, _} =
 -               Publisher.publish_one(%{
 -                 inbox: inbox,
 -                 json: "{}",
 -                 actor: actor,
 -                 id: 1,
 -                 unreachable_since: NaiveDateTime.utc_now()
 -               })
 +      assert capture_log(fn ->
 +               assert {:error, _} =
 +                        Publisher.publish_one(%{
 +                          inbox: inbox,
 +                          json: "{}",
 +                          actor: actor,
 +                          id: 1,
 +                          unreachable_since: NaiveDateTime.utc_now()
 +                        })
 +             end) =~ "connrefused"
  
        refute called(Instances.set_unreachable(inbox))
      end
        assert called(
                 Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{
                   inbox: "https://domain.com/users/nick1/inbox",
 -                 actor: actor,
 +                 actor_id: actor.id,
                   id: note_activity.data["id"]
                 })
               )
      end
+     test_with_mock "publishes a delete activity to peers who signed fetch requests to the create acitvity/object.",
+                    Pleroma.Web.Federator.Publisher,
+                    [:passthrough],
+                    [] do
+       fetcher =
+         insert(:user,
+           local: false,
+           info: %{
+             ap_enabled: true,
+             source_data: %{"inbox" => "https://domain.com/users/nick1/inbox"}
+           }
+         )
+       another_fetcher =
+         insert(:user,
+           local: false,
+           info: %{
+             ap_enabled: true,
+             source_data: %{"inbox" => "https://domain2.com/users/nick1/inbox"}
+           }
+         )
+       actor = insert(:user)
+       note_activity = insert(:note_activity, user: actor)
+       object = Object.normalize(note_activity)
+       activity_path = String.trim_leading(note_activity.data["id"], Pleroma.Web.Endpoint.url())
+       object_path = String.trim_leading(object.data["id"], Pleroma.Web.Endpoint.url())
+       build_conn()
+       |> put_req_header("accept", "application/activity+json")
+       |> assign(:user, fetcher)
+       |> get(object_path)
+       |> json_response(200)
+       build_conn()
+       |> put_req_header("accept", "application/activity+json")
+       |> assign(:user, another_fetcher)
+       |> get(activity_path)
+       |> json_response(200)
+       {:ok, delete} = CommonAPI.delete(note_activity.id, actor)
+       res = Publisher.publish(actor, delete)
+       assert res == :ok
+       assert called(
+                Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{
+                  inbox: "https://domain.com/users/nick1/inbox",
+                  actor: actor,
+                  id: delete.data["id"]
+                })
+              )
+       assert called(
+                Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{
+                  inbox: "https://domain2.com/users/nick1/inbox",
+                  actor: actor,
+                  id: delete.data["id"]
+                })
+              )
+     end
    end
  end