- def create_notifications(%Activity{data: %{"to" => _, "type" => "Create"}} = activity) do
+ def create_notifications(activity, options \\ [])
+ def create_notifications(%Activity{data: %{"to" => _, "type" => "Create"}} = activity, options) do
object = Object.normalize(activity, false)
if object && object.data["type"] == "Answer" do
{:ok, []}
- do_create_notifications(activity)
+ do_create_notifications(activity, options)
- def create_notifications(%Activity{data: %{"type" => type}} = activity)
+ def create_notifications(%Activity{data: %{"type" => type}} = activity, options)
when type in ["Follow", "Like", "Announce", "Move", "EmojiReact"] do
- do_create_notifications(activity)
+ do_create_notifications(activity, options)
- def create_notifications(_), do: {:ok, []}
+ def create_notifications(_, _), do: {:ok, []}
+ defp do_create_notifications(%Activity{} = activity, options) do
+ do_send = Keyword.get(options, :do_send, true)
- defp do_create_notifications(%Activity{} = activity) do
{enabled_receivers, disabled_receivers} = get_notified_from_activity(activity)
potential_receivers = enabled_receivers ++ disabled_receivers
notifications =
Enum.map(potential_receivers, fn user ->
- do_send = user in enabled_receivers
+ do_send = do_send && user in enabled_receivers
create_notification(activity, user, do_send)
def skip?(_, _, _), do: false
+ def for_user_and_activity(user, activity) do
+ from(n in __MODULE__,
+ where: n.user_id == ^user.id,
+ where: n.activity_id == ^activity.id
+ )
+ |> Repo.one()
+ end
{:ok, Activity.t() | Object.t(), keyword()} | {:error, any()}
def common_pipeline(object, meta) do
case Repo.transaction(fn -> do_common_pipeline(object, meta) end) do
+ {:ok, {:ok, activity, meta}} ->
+ SideEffects.handle_after_transaction(meta)
+ {:ok, activity, meta}
{:ok, value} ->
alias Pleroma.Web.ActivityPub.Pipeline
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.Streamer
+ alias Pleroma.Web.Push
def handle(object, meta \\ [])
# - Set up notifications
def handle(%{data: %{"type" => "Create"}} = activity, meta) do
with {:ok, _object, _meta} <- handle_object_creation(meta[:object_data], meta) do
- Notification.create_notifications(activity)
+ {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
+ meta =
+ meta
+ |> add_notifications(notifications)
{:ok, activity, meta}
e -> Repo.rollback(e)
def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
+ defp send_notifications(meta) do
+ Keyword.get(meta, :created_notifications, [])
+ |> Enum.each(fn notification ->
+ Streamer.stream(["user", "user:notification"], notification)
+ Push.send(notification)
+ end)
+ meta
+ end
+ defp add_notifications(meta, notifications) do
+ existing = Keyword.get(meta, :created_notifications, [])
+ meta
+ |> Keyword.put(:created_notifications, notifications ++ existing)
+ end
+ def handle_after_transaction(meta) do
+ meta
+ |> send_notifications()
+ end
- [handle: fn o, m -> {:ok, o, m} end]
+ [
+ handle: fn o, m -> {:ok, o, m} end,
+ handle_after_transaction: fn m -> m end
+ ]
- [handle: fn o, m -> {:ok, o, m} end]
+ [handle: fn o, m -> {:ok, o, m} end, handle_after_transaction: fn m -> m end]
- [handle: fn o, m -> {:ok, o, m} end]
+ [handle: fn o, m -> {:ok, o, m} end, handle_after_transaction: fn m -> m end]
import Pleroma.Factory
import Mock
+ describe "handle_after_transaction" do
+ test "it streams out notifications" do
+ author = insert(:user, local: true)
+ recipient = insert(:user, local: true)
+ {:ok, chat_message_data, _meta} = Builder.chat_message(author, recipient.ap_id, "hey")
+ {:ok, create_activity_data, _meta} =
+ Builder.create(author, chat_message_data["id"], [recipient.ap_id])
+ {:ok, create_activity, _meta} = ActivityPub.persist(create_activity_data, local: false)
+ {:ok, _create_activity, meta} =
+ SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
+ assert [notification] = meta[:created_notifications]
+ with_mocks([
+ {
+ Pleroma.Web.Streamer,
+ [],
+ [
+ stream: fn _, _ -> nil end
+ ]
+ },
+ {
+ Pleroma.Web.Push,
+ [],
+ [
+ send: fn _ -> nil end
+ ]
+ }
+ ]) do
+ SideEffects.handle_after_transaction(meta)
+ assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
+ assert called(Pleroma.Web.Push.send(notification))
+ end
+ end
+ end
describe "delete objects" do
setup do
user = insert(:user)
{:ok, create_activity, _meta} = ActivityPub.persist(create_activity_data, local: false)
- {:ok, _create_activity, _meta} =
- SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
+ with_mocks([
+ {
+ Pleroma.Web.Streamer,
+ [],
+ [
+ stream: fn _, _ -> nil end
+ ]
+ },
+ {
+ Pleroma.Web.Push,
+ [],
+ [
+ send: fn _ -> nil end
+ ]
+ }
+ ]) do
+ {:ok, _create_activity, meta} =
+ SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
- chat = Chat.get(author.id, recipient.ap_id)
+ # The notification gets created
+ assert [notification] = meta[:created_notifications]
+ assert notification.activity_id == create_activity.id
- [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all()
+ # But it is not sent out
+ refute called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
+ refute called(Pleroma.Web.Push.send(notification))
- assert cm_ref.object.data["content"] == "hey"
- assert cm_ref.unread == false
+ chat = Chat.get(author.id, recipient.ap_id)
- chat = Chat.get(recipient.id, author.ap_id)
+ [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all()
- [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all()
+ assert cm_ref.object.data["content"] == "hey"
+ assert cm_ref.unread == false
- assert cm_ref.object.data["content"] == "hey"
- assert cm_ref.unread == true
+ chat = Chat.get(recipient.id, author.ap_id)
+ [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all()
+ assert cm_ref.object.data["content"] == "hey"
+ assert cm_ref.unread == true
+ end
test "it creates a Chat for the local users and bumps the unread count" do
alias Pleroma.Activity
alias Pleroma.Chat
alias Pleroma.Conversation.Participation
+ alias Pleroma.Notification
alias Pleroma.Object
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
{:ok, upload} = ActivityPub.upload(file, actor: author.ap_id)
- {:ok, activity} =
- CommonAPI.post_chat_message(
- author,
- recipient,
- nil,
- media_id: upload.id
- )
- assert activity
+ with_mocks([
+ {
+ Pleroma.Web.Streamer,
+ [],
+ [
+ stream: fn _, _ ->
+ nil
+ end
+ ]
+ },
+ {
+ Pleroma.Web.Push,
+ [],
+ [
+ send: fn _ -> nil end
+ ]
+ }
+ ]) do
+ {:ok, activity} =
+ CommonAPI.post_chat_message(
+ author,
+ recipient,
+ nil,
+ media_id: upload.id
+ )
+ notification =
+ Notification.for_user_and_activity(recipient, activity)
+ |> Repo.preload(:activity)
+ assert called(Pleroma.Web.Push.send(notification))
+ assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
+ assert activity
+ end
test "it adds html newlines" do