unless skip?(activity, user) do
notification = %Notification{user_id: user.id, activity: activity}
{:ok, notification} = Repo.insert(notification)
- Streamer.stream("user", notification)
- Streamer.stream("user:notification", notification)
+
+ ["user", "user:notification"]
+ |> Streamer.stream(notification)
+
Push.send(notification)
notification
end
[]
|> Utils.maybe_notify_to_recipients(activity)
|> Utils.maybe_notify_mentioned_recipients(activity)
- |> Utils.maybe_notify_subscribers(activity)
|> Enum.uniq()
User.get_users_from_set(recipients, local_only)
defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.Activity
+ alias Pleroma.Activity.Ir.Topics
alias Pleroma.Config
alias Pleroma.Conversation
alias Pleroma.Notification
alias Pleroma.Object.Fetcher
alias Pleroma.Pagination
alias Pleroma.Repo
+ alias Pleroma.SubscriptionNotification
alias Pleroma.Upload
alias Pleroma.User
alias Pleroma.Web.ActivityPub.MRF
alias Pleroma.Web.ActivityPub.Transmogrifier
+ alias Pleroma.Web.Streamer
alias Pleroma.Web.WebFinger
+ alias Pleroma.Workers.BackgroundWorker
import Ecto.Query
import Pleroma.Web.ActivityPub.Utils
activity
end
- PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
+ BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
Notification.create_notifications(activity)
+ SubscriptionNotification.create_notifications(activity)
participations =
activity
participations
|> Repo.preload(:user)
- Enum.each(participations, fn participation ->
- Pleroma.Web.Streamer.stream("participation", participation)
- end)
+ Streamer.stream("participation", participations)
end
def stream_out_participations(%Object{data: %{"context" => context}}, user) do
def stream_out_participations(_, _), do: :noop
- def stream_out(activity) do
- if activity.data["type"] in ["Create", "Announce", "Delete"] do
- object = Object.normalize(activity)
- # Do not stream out poll replies
- unless object.data["type"] == "Answer" do
- Pleroma.Web.Streamer.stream("user", activity)
- Pleroma.Web.Streamer.stream("list", activity)
-
- if get_visibility(activity) == "public" do
- Pleroma.Web.Streamer.stream("public", activity)
-
- if activity.local do
- Pleroma.Web.Streamer.stream("public:local", activity)
- end
-
- if activity.data["type"] in ["Create"] do
- object.data
- |> Map.get("tag", [])
- |> Enum.filter(fn tag -> is_bitstring(tag) end)
- |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
-
- if object.data["attachment"] != [] do
- Pleroma.Web.Streamer.stream("public:media", activity)
-
- if activity.local do
- Pleroma.Web.Streamer.stream("public:local:media", activity)
- end
- end
- end
- else
- if get_visibility(activity) == "direct",
- do: Pleroma.Web.Streamer.stream("direct", activity)
- end
- end
- end
+ def stream_out(%Activity{data: %{"type" => data_type}} = activity)
+ when data_type in ["Create", "Announce", "Delete"] do
+ activity
+ |> Topics.get_activity_topics()
+ |> Streamer.stream(activity)
+ end
+
+ def stream_out(_activity) do
+ :noop
end
def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
pipeline :http_signature do
plug(Pleroma.Web.Plugs.HTTPSignaturePlug)
+ plug(Pleroma.Web.Plugs.MappedSignatureToIdentityPlug)
end
scope "/api/pleroma", Pleroma.Web.TwitterAPI do
get("/bookmarks", MastodonAPIController, :bookmarks)
post("/notifications/clear", MastodonAPIController, :clear_notifications)
+
+ post(
+ "/notifications/subscription/clear",
+ MastodonAPIController,
+ :clear_subscription_notifications
+ )
+
post("/notifications/dismiss", MastodonAPIController, :dismiss_notification)
+
+ post(
+ "/notifications/subscription/dismiss",
+ MastodonAPIController,
+ :dismiss_subscription_notification
+ )
+
get("/notifications", MastodonAPIController, :notifications)
+ get("/notifications/subscription", MastodonAPIController, :subscription_notifications)
get("/notifications/:id", MastodonAPIController, :get_notification)
+
+ get(
+ "/notifications/subscription/:id",
+ MastodonAPIController,
+ :get_subscription_notification
+ )
+
delete("/notifications/destroy_multiple", MastodonAPIController, :destroy_multiple)
+ delete(
+ "/notifications/subscription/destroy_multiple",
+ MastodonAPIController,
+ :destroy_multiple_subscription_notifications
+ )
+
get("/scheduled_statuses", MastodonAPIController, :scheduled_statuses)
get("/scheduled_statuses/:id", MastodonAPIController, :show_scheduled_status)
scope "/", Pleroma.Web do
pipe_through(:ostatus)
+ pipe_through(:http_signature)
get("/objects/:uuid", OStatus.OStatusController, :object)
get("/activities/:uuid", OStatus.OStatusController, :activity)
import Pleroma.Factory
alias Pleroma.Notification
+ alias Pleroma.Tests.ObanHelpers
alias Pleroma.User
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.CommonAPI
assert other_notification.activity_id == activity.id
end
- test "it creates a notification for subscribed users" do
+ test "it does not create a notification for subscribed users" do
user = insert(:user)
subscriber = insert(:user)
User.subscribe(subscriber, user)
{:ok, status} = CommonAPI.post(user, %{"status" => "Akariiiin"})
- {:ok, [notification]} = Notification.create_notifications(status)
+ {:ok, notifications} = Notification.create_notifications(status)
- assert notification.user_id == subscriber.id
+ assert notifications == []
end
test "does not create a notification for subscribed users if status is a reply" do
end
describe "create_notification" do
- setup do
- GenServer.start(Streamer, %{}, name: Streamer)
-
- on_exit(fn ->
- if pid = Process.whereis(Streamer) do
- Process.exit(pid, :kill)
- end
- end)
- end
-
+ @tag needs_streamer: true
test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do
user = insert(:user)
task = Task.async(fn -> assert_receive {:text, _}, 4_000 end)
refute Notification.create_notification(activity_dupe, followed_user)
end
- test "it doesn't create duplicate notifications for follow+subscribed users" do
+ test "it doesn't create notifications for follow+subscribed users" do
user = insert(:user)
subscriber = insert(:user)
{:ok, _, _, _} = CommonAPI.follow(subscriber, user)
User.subscribe(subscriber, user)
{:ok, status} = CommonAPI.post(user, %{"status" => "Akariiiin"})
- {:ok, [_notif]} = Notification.create_notifications(status)
+ {:ok, notifications} = Notification.create_notifications(status)
+
+ assert notifications == []
end
test "it doesn't create subscription notifications if the recipient cannot see the status" do
refute Enum.empty?(Notification.for_user(other_user))
- User.delete(user)
+ {:ok, job} = User.delete(user)
+ ObanHelpers.perform(job)
assert Enum.empty?(Notification.for_user(other_user))
end
}
{:ok, _delete_activity} = Transmogrifier.handle_incoming(delete_user_message)
+ ObanHelpers.perform_all()
assert Enum.empty?(Notification.for_user(local_user))
end
alias Pleroma.Object
alias Pleroma.Repo
alias Pleroma.ScheduledActivity
+ alias Pleroma.SubscriptionNotification
+ alias Pleroma.Tests.ObanHelpers
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.CommonAPI
query_string = "ids[]=#{id1}&ids[]=#{id2}"
conn = get(conn, "/api/v1/statuses/?#{query_string}")
- assert [%{"id" => ^id1}, %{"id" => ^id2}] = json_response(conn, :ok)
+ assert [%{"id" => ^id1}, %{"id" => ^id2}] = Enum.sort_by(json_response(conn, :ok), & &1["id"])
end
describe "deleting a status" do
end
end
+ describe "subscription_notifications" do
+ setup do
+ user = insert(:user)
+ subscriber = insert(:user)
+
+ User.subscribe(subscriber, user)
+
+ {:ok, %{user: user, subscriber: subscriber}}
+ end
+
+ test "list of notifications", %{conn: conn, user: user, subscriber: subscriber} do
+ status_text = "Hello"
+ {:ok, _activity} = CommonAPI.post(user, %{"status" => status_text})
+
+ conn =
+ conn
+ |> assign(:user, subscriber)
+ |> get("/api/v1/notifications/subscription")
+
+ assert [%{"status" => %{"content" => response}} | _rest] = json_response(conn, 200)
+ assert response == status_text
+ end
+
+ test "getting a single notification", %{conn: conn, user: user, subscriber: subscriber} do
+ status_text = "Hello"
+
+ {:ok, _activity} = CommonAPI.post(user, %{"status" => status_text})
+ [notification] = Repo.all(SubscriptionNotification)
+
+ conn =
+ conn
+ |> assign(:user, subscriber)
+ |> get("/api/v1/notifications/subscription/#{notification.id}")
+
+ assert %{"status" => %{"content" => response}} = json_response(conn, 200)
+ assert response == status_text
+ end
+
+ test "dismissing a single notification also deletes it", %{
+ conn: conn,
+ user: user,
+ subscriber: subscriber
+ } do
+ status_text = "Hello"
+ {:ok, _activity} = CommonAPI.post(user, %{"status" => status_text})
+
+ [notification] = Repo.all(SubscriptionNotification)
+
+ conn =
+ conn
+ |> assign(:user, subscriber)
+ |> post("/api/v1/notifications/subscription/dismiss", %{"id" => notification.id})
+
+ assert %{} = json_response(conn, 200)
+
+ assert Repo.all(SubscriptionNotification) == []
+ end
+
+ test "clearing all notifications also deletes them", %{
+ conn: conn,
+ user: user,
+ subscriber: subscriber
+ } do
+ status_text1 = "Hello"
+ status_text2 = "Hello again"
+ {:ok, _activity1} = CommonAPI.post(user, %{"status" => status_text1})
+ {:ok, _activity2} = CommonAPI.post(user, %{"status" => status_text2})
+
+ conn =
+ conn
+ |> assign(:user, subscriber)
+ |> post("/api/v1/notifications/subscription/clear")
+
+ assert %{} = json_response(conn, 200)
+
+ conn =
+ build_conn()
+ |> assign(:user, subscriber)
+ |> get("/api/v1/notifications/subscription")
+
+ assert json_response(conn, 200) == []
+
+ assert Repo.all(SubscriptionNotification) == []
+ end
+
+ test "paginates notifications using min_id, since_id, max_id, and limit", %{
+ conn: conn,
+ user: user,
+ subscriber: subscriber
+ } do
+ {:ok, activity1} = CommonAPI.post(user, %{"status" => "Hello 1"})
+ {:ok, activity2} = CommonAPI.post(user, %{"status" => "Hello 2"})
+ {:ok, activity3} = CommonAPI.post(user, %{"status" => "Hello 3"})
+ {:ok, activity4} = CommonAPI.post(user, %{"status" => "Hello 4"})
+
+ notification1_id =
+ Repo.get_by(SubscriptionNotification, activity_id: activity1.id).id |> to_string()
+
+ notification2_id =
+ Repo.get_by(SubscriptionNotification, activity_id: activity2.id).id |> to_string()
+
+ notification3_id =
+ Repo.get_by(SubscriptionNotification, activity_id: activity3.id).id |> to_string()
+
+ notification4_id =
+ Repo.get_by(SubscriptionNotification, activity_id: activity4.id).id |> to_string()
+
+ conn = assign(conn, :user, subscriber)
+
+ # min_id
+ conn_res =
+ get(conn, "/api/v1/notifications/subscription?limit=2&min_id=#{notification1_id}")
+
+ result = json_response(conn_res, 200)
+ assert [%{"id" => ^notification3_id}, %{"id" => ^notification2_id}] = result
+
+ # since_id
+ conn_res =
+ get(conn, "/api/v1/notifications/subscription?limit=2&since_id=#{notification1_id}")
+
+ result = json_response(conn_res, 200)
+ assert [%{"id" => ^notification4_id}, %{"id" => ^notification3_id}] = result
+
+ # max_id
+ conn_res =
+ get(conn, "/api/v1/notifications/subscription?limit=2&max_id=#{notification4_id}")
+
+ result = json_response(conn_res, 200)
+ assert [%{"id" => ^notification3_id}, %{"id" => ^notification2_id}] = result
+ end
+
+ test "destroy multiple", %{conn: conn, user: user1, subscriber: user2} do
+ # mutual subscription
+ User.subscribe(user1, user2)
+
+ {:ok, activity1} = CommonAPI.post(user1, %{"status" => "Hello 1"})
+ {:ok, activity2} = CommonAPI.post(user1, %{"status" => "World 1"})
+ {:ok, activity3} = CommonAPI.post(user2, %{"status" => "Hello 2"})
+ {:ok, activity4} = CommonAPI.post(user2, %{"status" => "World 2"})
+
+ notification1_id =
+ Repo.get_by(SubscriptionNotification, activity_id: activity1.id).id |> to_string()
+
+ notification2_id =
+ Repo.get_by(SubscriptionNotification, activity_id: activity2.id).id |> to_string()
+
+ notification3_id =
+ Repo.get_by(SubscriptionNotification, activity_id: activity3.id).id |> to_string()
+
+ notification4_id =
+ Repo.get_by(SubscriptionNotification, activity_id: activity4.id).id |> to_string()
+
+ conn = assign(conn, :user, user1)
+
+ conn_res = get(conn, "/api/v1/notifications/subscription")
+
+ result = json_response(conn_res, 200)
+
+ Enum.each(result, fn %{"id" => id} ->
+ assert id in [notification3_id, notification4_id]
+ end)
+
+ conn2 = assign(conn, :user, user2)
+
+ conn_res = get(conn2, "/api/v1/notifications/subscription")
+
+ result = json_response(conn_res, 200)
+
+ Enum.each(result, fn %{"id" => id} ->
+ assert id in [notification1_id, notification2_id]
+ end)
+
+ conn_destroy =
+ delete(conn, "/api/v1/notifications/subscription/destroy_multiple", %{
+ "ids" => [notification3_id, notification4_id]
+ })
+
+ assert json_response(conn_destroy, 200) == %{}
+
+ conn_res = get(conn2, "/api/v1/notifications/subscription")
+
+ result = json_response(conn_res, 200)
+
+ Enum.each(result, fn %{"id" => id} ->
+ assert id in [notification1_id, notification2_id]
+ end)
+
+ assert length(Repo.all(SubscriptionNotification)) == 2
+ end
+ end
+
describe "reblogging" do
test "reblogs and returns the reblogged status", %{conn: conn} do
activity = insert(:note_activity)
end
test "it sends an email to user", %{user: user} do
+ ObanHelpers.perform_all()
token_record = Repo.get_by(Pleroma.PasswordResetToken, user_id: user.id)
email = Pleroma.Emails.UserEmail.password_reset_email(user, token_record.token)
|> post("/api/v1/pleroma/accounts/confirmation_resend?email=#{user.email}")
|> json_response(:no_content)
+ ObanHelpers.perform_all()
+
email = Pleroma.Emails.UserEmail.account_confirmation_email(user)
notify_email = Config.get([:instance, :notify_email])
instance_name = Config.get([:instance, :name])