+ def stream_out_participations(%Object{data: %{"context" => context}}, user) do
+ with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
+ conversation = Repo.preload(conversation, :participations),
+ last_activity_id =
+ fetch_latest_activity_id_for_context(conversation.ap_id, %{
+ "user" => user,
+ "blocking_user" => user
+ }) do
+ if last_activity_id do
+ stream_out_participations(conversation.participations)
+ end
+ end
+ end
+ def stream_out_participations(_, _), do: :noop
def stream_out(activity) do
public = "https://www.w3.org/ns/activitystreams#Public"
"to" => to,
"deleted_activity_id" => activity && activity.id
- {:ok, activity} <- insert(data, local),
+ {:ok, activity} <- insert(data, local, false),
+ stream_out_participations(object, user),
_ <- decrease_replies_count_if_reply(object),
# Changing note count prior to enqueuing federation task in order to avoid
# race conditions on updating user.info
+ describe "direct streams" do
+ setup do
+ GenServer.start(Streamer, %{}, name: Streamer)
+ on_exit(fn ->
+ if pid = Process.whereis(Streamer) do
+ Process.exit(pid, :kill)
+ end
+ end)
+ :ok
+ end
+ test "it sends conversation update to the 'direct' stream", %{} do
+ user = insert(:user)
+ another_user = insert(:user)
+ task =
+ Task.async(fn ->
+ assert_receive {:text, _received_event}, 4_000
+ end)
+ Streamer.add_socket(
+ "direct",
+ %{transport_pid: task.pid, assigns: %{user: user}}
+ )
+ {:ok, _create_activity} =
+ CommonAPI.post(another_user, %{
+ "status" => "hey @#{user.nickname}",
+ "visibility" => "direct"
+ })
+ Task.await(task)
+ end
+ test "it doesn't send conversation update to the 'direct' streamj when the last message in the conversation is deleted" do
+ user = insert(:user)
+ another_user = insert(:user)
+ {:ok, create_activity} =
+ CommonAPI.post(another_user, %{
+ "status" => "hi @#{user.nickname}",
+ "visibility" => "direct"
+ })
+ task =
+ Task.async(fn ->
+ assert_receive {:text, received_event}, 4_000
+ assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
+ refute_receive {:text, _}, 4_000
+ end)
+ Streamer.add_socket(
+ "direct",
+ %{transport_pid: task.pid, assigns: %{user: user}}
+ )
+ {:ok, _} = CommonAPI.delete(create_activity.id, another_user)
+ Task.await(task)
+ end
+ test "it sends conversation update to the 'direct' stream when a message is deleted" do
+ user = insert(:user)
+ another_user = insert(:user)
+ {:ok, create_activity} =
+ CommonAPI.post(another_user, %{
+ "status" => "hi @#{user.nickname}",
+ "visibility" => "direct"
+ })
+ {:ok, create_activity2} =
+ CommonAPI.post(another_user, %{
+ "status" => "hi @#{user.nickname}",
+ "in_reply_to_status_id" => create_activity.id,
+ "visibility" => "direct"
+ })
+ task =
+ Task.async(fn ->
+ assert_receive {:text, received_event}, 4_000
+ assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
+ assert_receive {:text, received_event}, 4_000
+ assert %{"event" => "conversation", "payload" => received_payload} =
+ Jason.decode!(received_event)
+ assert %{"last_status" => last_status} = Jason.decode!(received_payload)
+ assert last_status["id"] == to_string(create_activity.id)
+ end)
+ Streamer.add_socket(
+ "direct",
+ %{transport_pid: task.pid, assigns: %{user: user}}
+ )
+ {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
+ Task.await(task)
+ end
+ end