--- /dev/null
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Delivery do
+ use Ecto.Schema
+
+ alias Pleroma.Delivery
+ alias Pleroma.FlakeId
+ alias Pleroma.User
+ alias Pleroma.Repo
+ alias Pleroma.Object
+ alias Pleroma.User
+
+ import Ecto.Changeset
+ import Ecto.Query
+
+ schema "deliveries" do
+ belongs_to(:user, User, type: FlakeId)
+ belongs_to(:object, Object)
+ end
+
+ def changeset(delivery, params \\ %{}) do
+ delivery
+ |> cast(params, [:user_id, :object_id])
+ |> foreign_key_constraint(:object_id)
+ |> foreign_key_constraint(:user_id)
+ |> unique_constraint(:user_id, name: :deliveries_user_id_object_id_index)
+ end
+
+ def create(object_id, user_id) do
+ %Delivery{}
+ |> changeset(%{user_id: user_id, object_id: object_id})
+ |> Repo.insert()
+ end
+
+ def get(object_id, user_id) do
+ from(d in Delivery, where: d.user_id == ^user_id and d.object_id == ^object_id)
+ |> Repo.one()
+ end
+
+ def get_or_create(object_id, user_id) do
+ case get(object_id, user_id) do
+ %Delivery{} = delivery -> {:ok, delivery}
+ nil -> create(object_id, user_id)
+ end
+ end
+
+ def delete_all_by_object_id(object_id) do
+ from(d in Delivery, where: d.object_id == ^object_id)
+ |> Repo.delete_all()
+ end
+
+ def get_all_by_object_id(object_id) do
+ from(d in Delivery, where: d.object_id == ^object_id)
+ |> Repo.all()
+ end
+end
- `ttl`: An expiration time (time-to-live). This value should be in milliseconds or `nil` to disable expiration. Defaults to `nil`.
- `query_params`: Take URL query string into account (`true`), ignore it (`false`) or limit to specific params only (list). Defaults to `true`.
+ - `tracking_fun`: A function that is called on successfull responses, no matter if the request is cached or not. It should accept a conn as the first argument and the value assigned to `tracking_fun_data` as the second.
Additionally, you can overwrite the TTL inside a controller action by assigning `cache_ttl` to the connection struct:
{:ok, nil} ->
cache_resp(conn, opts)
+ {:ok, {content_type, body, tracking_fun_data}} ->
+ conn = opts.tracking_fun(conn, tracking_fun_data)
+ send_cached(conn, {content_type, body})
+
{:ok, record} ->
send_cached(conn, record)
content_type = content_type(conn)
record = {content_type, body}
- Cachex.put(:web_resp_cache, key, record, ttl: ttl)
+ conn =
+ unless opts[:tracking_fun] do
+ Cachex.put(:web_resp_cache, key, {content_type, body}, ttl: ttl)
+ conn
+ else
+ tracking_fun_data = Map.get(conn.assigns, :tracking_fun_data, nil)
+ Cachex.put(:web_resp_cache, {content_type, body, tracking_fun_data}, record, ttl: ttl)
+
+ opts.tracking_fun.(conn, tracking_fun_data)
+ end
put_resp_header(conn, "x-cache", "MISS from Pleroma")
alias Comeonin.Pbkdf2
alias Ecto.Multi
alias Pleroma.Activity
+ alias Pleroma.Delivery
alias Pleroma.Keys
alias Pleroma.Notification
alias Pleroma.Object
field(:last_digest_emailed_at, :naive_datetime)
has_many(:notifications, Notification)
has_many(:registrations, Registration)
+ has_many(:deliveries, Delivery)
embeds_one(:info, User.Info)
timestamps()
def is_internal_user?(%User{nickname: nil}), do: true
def is_internal_user?(%User{local: true, nickname: "internal." <> _}), do: true
def is_internal_user?(_), do: false
+
+ def get_delivered_users_by_object_id(object_id) do
+ from(u in User,
+ inner_join: delivery in assoc(u, :deliveries),
+ where: delivery.object_id == ^object_id
+ )
+ |> Repo.all()
+ end
end
use Pleroma.Web, :controller
alias Pleroma.Activity
+ alias Pleroma.Delivery
alias Pleroma.Object
alias Pleroma.Object.Fetcher
alias Pleroma.User
action_fallback(:errors)
- plug(Pleroma.Plugs.Cache, [query_params: false] when action in [:activity, :object])
+ plug(
+ Pleroma.Plugs.Cache,
+ [
+ query_params: false,
+ tracking_fun: &Pleroma.Web.ActivityPub.ActivityPubController.track_object_fetch/2
+ ]
+ when action in [:activity, :object]
+ )
+
plug(Pleroma.Web.FederatingPlug when action in [:inbox, :relay])
plug(:set_requester_reachable when action in [:inbox])
plug(:relay_active? when action in [:relay])
%Object{} = object <- Object.get_cached_by_ap_id(ap_id),
{_, true} <- {:public?, Visibility.is_public?(object)} do
conn
+ |> assign(:tracking_fun_data, object.id)
|> set_cache_ttl_for(object)
|> put_resp_content_type("application/activity+json")
|> put_view(ObjectView)
end
end
+ def track_object_fetch(conn, object_id) do
+ case conn.assigns[:user] do
+ %User{id: user_id} -> Delivery.create(object_id, user_id)
+ _ -> nil
+ end
+
+ conn
+ end
+
def object_likes(conn, %{"uuid" => uuid, "page" => page}) do
with ap_id <- o_status_url(conn, :object, uuid),
%Object{} = object <- Object.get_cached_by_ap_id(ap_id),
%Activity{} = activity <- Activity.normalize(ap_id),
{_, true} <- {:public?, Visibility.is_public?(activity)} do
conn
+ |> maybe_set_tracking_data(activity)
|> set_cache_ttl_for(activity)
|> put_resp_content_type("application/activity+json")
|> put_view(ObjectView)
end
end
+ defp maybe_set_tracking_data(conn, %Activity{data: %{"type" => "Create"}} = activity) do
+ object_id = Object.normalize(activity).id
+ assign(conn, :tracking_fun_data, object_id)
+ end
+
+ defp maybe_set_tracking_data(conn, _activity), do: assign(conn, :tracking_fun_data, nil)
+
defp set_cache_ttl_for(conn, %Activity{object: object}) do
set_cache_ttl_for(conn, object)
end
--- /dev/null
+defmodule Pleroma.Repo.Migrations.CreateDeliveries do
+ use Ecto.Migration
+
+ def change do
+ create_if_not_exists table(:deliveries) do
+ add(:object_id, references(:objects, type: :id))
+ add(:user_id, references(:users, type: :uuid, on_delete: :delete_all))
+ end
+ create_if_not_exists index(:deliveries, :object_id, name: :deliveries_object_id)
+ create_if_not_exists(unique_index(:deliveries, [:user_id, :object_id]))
+ end
+end
use Pleroma.Web.ConnCase
import Pleroma.Factory
alias Pleroma.Activity
+ alias Pleroma.Delivery
alias Pleroma.Instances
alias Pleroma.Object
alias Pleroma.User
assert result["totalItems"] == 15
end
end
+
+ describe "delivery tracking" do
+ test "it tracks a signed object fetch", %{conn: conn} do
+ user = insert(:user, local: false)
+ activity = insert(:note_activity)
+ object = Object.normalize(activity)
+
+ object_path = String.trim_leading(object.data["id"], Pleroma.Web.Endpoint.url())
+
+ conn
+ |> put_req_header("accept", "application/activity+json")
+ |> assign(:user, user)
+ |> get(object_path)
+ |> json_response(200)
+
+ assert Delivery.get(object.id, user.id)
+ end
+
+ test "it tracks a signed activity fetch", %{conn: conn} do
+ user = insert(:user, local: false)
+ activity = insert(:note_activity)
+ object = Object.normalize(activity)
+
+ activity_path = String.trim_leading(activity.data["id"], Pleroma.Web.Endpoint.url())
+
+ conn
+ |> put_req_header("accept", "application/activity+json")
+ |> assign(:user, user)
+ |> get(activity_path)
+ |> json_response(200)
+
+ assert Delivery.get(object.id, user.id)
+ end
+
+ test "it tracks a signed object fetch when the json is cached", %{conn: conn} do
+ user = insert(:user, local: false)
+ other_user = insert(:user, local: false)
+ activity = insert(:note_activity)
+ object = Object.normalize(activity)
+
+ object_path = String.trim_leading(object.data["id"], Pleroma.Web.Endpoint.url())
+
+ conn
+ |> put_req_header("accept", "application/activity+json")
+ |> assign(:user, user)
+ |> get(object_path)
+ |> json_response(200)
+
+ build_conn()
+ |> put_req_header("accept", "application/activity+json")
+ |> assign(:user, other_user)
+ |> get(object_path)
+ |> json_response(200)
+
+ assert Delivery.get(object.id, user.id)
+ assert Delivery.get(object.id, other_user.id)
+ end
+
+ test "it tracks a signed activity fetch when the json is cached", %{conn: conn} do
+ user = insert(:user, local: false)
+ other_user = insert(:user, local: false)
+ activity = insert(:note_activity)
+ object = Object.normalize(activity)
+
+ activity_path = String.trim_leading(activity.data["id"], Pleroma.Web.Endpoint.url())
+
+ conn
+ |> put_req_header("accept", "application/activity+json")
+ |> assign(:user, user)
+ |> get(activity_path)
+ |> json_response(200)
+
+ build_conn()
+ |> put_req_header("accept", "application/activity+json")
+ |> assign(:user, other_user)
+ |> get(activity_path)
+ |> json_response(200)
+
+ assert Delivery.get(object.id, user.id)
+ assert Delivery.get(object.id, other_user.id)
+ end
+ end
end