from([a] in query,
left_join: tm in ThreadMute,
on: tm.user_id == ^user.id and tm.context == fragment("?->>'context'", a.data),
+ as: :thread_mute,
select: %Activity{a | thread_muted?: not is_nil(tm.id)}
)
end
defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
- defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
+ defp restrict_muted(query, %{"muting_user" => %User{info: info}} = opts) do
mutes = info.mutes
- from(
- activity in query,
- where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
- where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
- )
+ query =
+ from([activity] in query,
+ where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
+ where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
+ )
+
+ unless opts["skip_preload"] do
+ from([thread_mute: tm] in query, where: is_nil(tm))
+ else
+ query
+ end
end
defp restrict_muted(query, _), do: query
defp maybe_set_thread_muted_field(query, opts) do
query
- |> Activity.with_set_thread_muted_field(opts["user"])
+ |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
end
defp maybe_order(query, %{order: :desc}) do
|> Map.get("#{topic}:#{item.user_id}", [])
|> Enum.each(fn socket ->
with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id),
- true <- should_send?(user, item),
- false <- CommonAPI.thread_muted?(user, item.activity) do
+ true <- should_send?(user, item) do
send(
socket.transport_pid,
{:text, represent_notification(socket.assigns[:user], item)}
%{host: parent_host} <- URI.parse(parent.data["actor"]),
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
- true <- thread_containment(item, user) do
+ true <- thread_containment(item, user),
+ false <- CommonAPI.thread_muted?(user, item) do
true
else
_ -> false
assert Enum.member?(activities, activity_one)
end
+ test "doesn't return thread muted activities" do
+ user = insert(:user)
+ activity_one = insert(:note_activity)
+ note_two = insert(:note, data: %{"context" => "suya.."})
+ activity_two = insert(:note_activity, note: note_two)
+
+ {:ok, _activity_two} = CommonAPI.add_mute(user, activity_two)
+
+ assert [activity_one] = ActivityPub.fetch_activities([], %{"muting_user" => user})
+ end
+
+ test "returns thread muted activities when with_muted is set" do
+ user = insert(:user)
+ activity_one = insert(:note_activity)
+ note_two = insert(:note, data: %{"context" => "suya.."})
+ activity_two = insert(:note_activity, note: note_two)
+
+ {:ok, activity_two} = CommonAPI.add_mute(user, activity_two)
+
+ assert [activity_two, activity_one] =
+ ActivityPub.fetch_activities([], %{"muting_user" => user, "with_muted" => true})
+ end
+
test "does include announces on request" do
activity_three = insert(:note_activity)
user = insert(:user)
Task.await(task)
end
+ test "it doesn't send posts from muted threads" do
+ user = insert(:user)
+ user2 = insert(:user)
+ {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
+
+ {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
+
+ {:ok, activity} = CommonAPI.add_mute(user2, activity)
+
+ task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
+
+ Streamer.add_socket(
+ "user",
+ %{transport_pid: task.pid, assigns: %{user: user2}}
+ )
+
+ Streamer.stream("user", activity)
+ Task.await(task)
+ end
+
describe "direct streams" do
setup do
GenServer.start(Streamer, %{}, name: Streamer)