@spec get_recipients_from_activity(Activity.t()) :: [User.t()]
- def get_recipients_from_activity(%Activity{recipients: to}) do
+ def get_recipients_from_activity(%Activity{recipients: to, actor: actor}) do
+ to = [actor | to]
User.Query.build(%{recipients_from_activity: to, local: true, deactivated: false})
|> Repo.all()
should_send?(user, activity)
- def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
- Enum.each(topics[topic] || [], fn %StreamerSocket{
- transport_pid: transport_pid,
- user: socket_user
- } ->
- # Get the current user so we have up-to-date blocks etc.
- if socket_user do
- user = User.get_cached_by_ap_id(socket_user.ap_id)
- if should_send?(user, item) do
- send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
- end
- else
- send(transport_pid, {:text, StreamerView.render("update.json", item)})
- end
- end)
- end
def push_to_socket(topics, topic, %Participation{} = participation) do
Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
describe "get_recipients_from_activity" do
+ test "works for announces" do
+ actor = insert(:user)
+ user = insert(:user, local: true)
+ {:ok, activity} = CommonAPI.post(actor, %{"status" => "hello"})
+ {:ok, announce, _} = CommonAPI.repeat(activity.id, user)
+ recipients = User.get_recipients_from_activity(announce)
+ assert user in recipients
+ end
test "get recipients" do
actor = insert(:user)
user = insert(:user, local: true)
{:ok, %{user: user, notify: notify}}
+ test "it streams the user's post in the 'user' stream", %{user: user} do
+ task =
+ Task.async(fn ->
+ assert_receive {:text, _}, @streamer_timeout
+ end)
+ Streamer.add_socket(
+ "user",
+ %{transport_pid: task.pid, assigns: %{user: user}}
+ )
+ {:ok, activity} = CommonAPI.post(user, %{"status" => "hey"})
+ Streamer.stream("user", activity)
+ Task.await(task)
+ end
+ test "it streams boosts of the user in the 'user' stream", %{user: user} do
+ task =
+ Task.async(fn ->
+ assert_receive {:text, _}, @streamer_timeout
+ end)
+ Streamer.add_socket(
+ "user",
+ %{transport_pid: task.pid, assigns: %{user: user}}
+ )
+ other_user = insert(:user)
+ {:ok, activity} = CommonAPI.post(other_user, %{"status" => "hey"})
+ {:ok, announce, _} = CommonAPI.repeat(activity.id, user)
+ Streamer.stream("user", announce)
+ Task.await(task)
+ end
test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
task =
Task.async(fn ->