def decrease_replies_count_if_reply(_object), do: :noop
+ def increase_poll_votes_if_vote(%{
+ "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
+ "type" => "Create"
+ }) do
+ Object.increase_vote_count(reply_ap_id, name)
+ end
+
+ def increase_poll_votes_if_vote(_create_data), do: :noop
+
def insert(map, local \\ true, fake \\ false) when is_map(map) do
with nil <- Activity.normalize(map),
map <- lazy_put_activity_defaults(map, fake),
end)
end
+ def stream_out_participations(%Object{data: %{"context" => context}}, user) do
+ with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
+ conversation = Repo.preload(conversation, :participations),
+ last_activity_id =
+ fetch_latest_activity_id_for_context(conversation.ap_id, %{
+ "user" => user,
+ "blocking_user" => user
+ }) do
+ if last_activity_id do
+ stream_out_participations(conversation.participations)
+ end
+ end
+ end
+
+ def stream_out_participations(_, _), do: :noop
+
def stream_out(activity) do
public = "https://www.w3.org/ns/activitystreams#Public"
if activity.data["type"] in ["Create", "Announce", "Delete"] do
- Pleroma.Web.Streamer.stream("user", activity)
- Pleroma.Web.Streamer.stream("list", activity)
+ object = Object.normalize(activity)
+ # Do not stream out poll replies
+ unless object.data["type"] == "Answer" do
+ Pleroma.Web.Streamer.stream("user", activity)
+ Pleroma.Web.Streamer.stream("list", activity)
- if Enum.member?(activity.data["to"], public) do
- Pleroma.Web.Streamer.stream("public", activity)
+ if Enum.member?(activity.data["to"], public) do
+ Pleroma.Web.Streamer.stream("public", activity)
- if activity.local do
- Pleroma.Web.Streamer.stream("public:local", activity)
- end
-
- if activity.data["type"] in ["Create"] do
- object = Object.normalize(activity)
+ if activity.local do
+ Pleroma.Web.Streamer.stream("public:local", activity)
+ end
- object.data
- |> Map.get("tag", [])
- |> Enum.filter(fn tag -> is_bitstring(tag) end)
- |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
+ if activity.data["type"] in ["Create"] do
+ object.data
+ |> Map.get("tag", [])
+ |> Enum.filter(fn tag -> is_bitstring(tag) end)
+ |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
- if object.data["attachment"] != [] do
- Pleroma.Web.Streamer.stream("public:media", activity)
+ if object.data["attachment"] != [] do
+ Pleroma.Web.Streamer.stream("public:media", activity)
- if activity.local do
- Pleroma.Web.Streamer.stream("public:local:media", activity)
+ if activity.local do
+ Pleroma.Web.Streamer.stream("public:local:media", activity)
+ end
end
end
+ else
+ # TODO: Write test, replace with visibility test
+ if !Enum.member?(activity.data["cc"] || [], public) &&
+ !Enum.member?(
+ activity.data["to"],
+ User.get_cached_by_ap_id(activity.data["actor"]).follower_address
+ ),
+ do: Pleroma.Web.Streamer.stream("direct", activity)
end
- else
- # TODO: Write test, replace with visibility test
- if !Enum.member?(activity.data["cc"] || [], public) &&
- !Enum.member?(
- activity.data["to"],
- User.get_cached_by_ap_id(activity.data["actor"]).follower_address
- ),
- do: Pleroma.Web.Streamer.stream("direct", activity)
end
end
end
{:ok, activity} <- insert(create_data, local, fake),
{:fake, false, activity} <- {:fake, fake, activity},
_ <- increase_replies_count_if_reply(create_data),
+ _ <- increase_poll_votes_if_vote(create_data),
# Changing note count prior to enqueuing federation task in order to avoid
# race conditions on updating user.info
{:ok, _actor} <- increase_note_count_if_public(actor, activity),
"to" => to,
"deleted_activity_id" => activity && activity.id
},
- {:ok, activity} <- insert(data, local),
+ {:ok, activity} <- insert(data, local, false),
+ stream_out_participations(object, user),
_ <- decrease_replies_count_if_reply(object),
# Changing note count prior to enqueuing federation task in order to avoid
# race conditions on updating user.info
if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
from(activity in Activity)
+ |> maybe_preload_objects(opts)
|> restrict_blocked(opts)
|> restrict_recipients(recipients, opts["user"])
|> where(
^context
)
)
+ |> exclude_poll_votes(opts)
|> order_by([activity], desc: activity.id)
end
def fetch_activities_for_context(context, opts \\ %{}) do
context
|> fetch_activities_for_context_query(opts)
- |> Activity.with_preloaded_object()
|> Repo.all()
end
Pleroma.FlakeId.t() | nil
def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
context
- |> fetch_activities_for_context_query(opts)
+ |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
|> limit(1)
|> select([a], a.id)
|> Repo.one()
defp restrict_visibility(query, %{visibility: visibility})
when visibility in @valid_visibilities do
- query =
- from(
- a in query,
- where:
- fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
- )
-
- query
+ from(
+ a in query,
+ where:
+ fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
+ )
end
defp restrict_visibility(_query, %{visibility: visibility})
defp restrict_visibility(query, _visibility), do: query
- defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}) do
- query =
- from(
- a in query,
- where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
- )
+ defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
+ do: query
- query
+ defp restrict_thread_visibility(
+ query,
+ %{"user" => %User{info: %{skip_thread_containment: true}}},
+ _
+ ),
+ do: query
+
+ defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
+ from(
+ a in query,
+ where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
+ )
end
- defp restrict_thread_visibility(query, _), do: query
+ defp restrict_thread_visibility(query, _, _), do: query
def fetch_user_activities(user, reading_user, params \\ %{}) do
params =
defp restrict_muted_reblogs(query, _), do: query
+ defp exclude_poll_votes(query, %{"include_poll_votes" => "true"}), do: query
+
+ defp exclude_poll_votes(query, _) do
+ if has_named_binding?(query, :object) do
+ from([activity, object: o] in query,
+ where: fragment("not(?->>'type' = ?)", o.data, "Answer")
+ )
+ else
+ query
+ end
+ end
+
defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
defp maybe_preload_objects(query, _) do
def fetch_activities_query(recipients, opts \\ %{}) do
base_query = from(activity in Activity)
+ config = %{
+ skip_thread_containment: Config.get([:instance, :skip_thread_containment])
+ }
+
base_query
|> maybe_preload_objects(opts)
|> maybe_preload_bookmarks(opts)
|> restrict_muted(opts)
|> restrict_media(opts)
|> restrict_visibility(opts)
- |> restrict_thread_visibility(opts)
+ |> restrict_thread_visibility(opts, config)
|> restrict_replies(opts)
|> restrict_reblogs(opts)
|> restrict_pinned(opts)
|> restrict_muted_reblogs(opts)
|> Activity.restrict_deactivated_users()
+ |> exclude_poll_votes(opts)
end
def fetch_activities(recipients, opts \\ %{}) do