X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fweb%2Factivity_pub%2Factivity_pub.ex;h=45feae25a720fba541480753dd9ef97dd95c7e59;hb=5bd41fef8b5aeff53ed6b096e04507d51c93a83a;hp=cd849503568b2c04d3210d6afc0a1dce03389765;hpb=920bd4705526d8dfa8ada516853bbb4e5438cbf1;p=akkoma diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index cd8495035..45feae25a 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -5,7 +5,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.Activity alias Pleroma.Conversation - alias Pleroma.Instances alias Pleroma.Notification alias Pleroma.Object alias Pleroma.Object.Fetcher @@ -15,7 +14,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.User alias Pleroma.Web.ActivityPub.MRF alias Pleroma.Web.ActivityPub.Transmogrifier - alias Pleroma.Web.Federator alias Pleroma.Web.WebFinger import Ecto.Query @@ -24,8 +22,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do require Logger - @httpoison Application.get_env(:pleroma, :httpoison) - # For Announce activities, we filter the recipients based on following status for any actors # that match actual users. See issue #164 for more information about why this is necessary. defp get_recipients(%{"type" => "Announce"} = data) do @@ -112,6 +108,15 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do def decrease_replies_count_if_reply(_object), do: :noop + def increase_poll_votes_if_vote(%{ + "object" => %{"inReplyTo" => reply_ap_id, "name" => name}, + "type" => "Create" + }) do + Object.increase_vote_count(reply_ap_id, name) + end + + def increase_poll_votes_if_vote(_create_data), do: :noop + def insert(map, local \\ true, fake \\ false) when is_map(map) do with nil <- Activity.normalize(map), map <- lazy_put_activity_defaults(map, fake), @@ -137,9 +142,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do activity end - Task.start(fn -> - Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) - end) + PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity]) Notification.create_notifications(activity) @@ -189,40 +192,42 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do public = "https://www.w3.org/ns/activitystreams#Public" if activity.data["type"] in ["Create", "Announce", "Delete"] do - Pleroma.Web.Streamer.stream("user", activity) - Pleroma.Web.Streamer.stream("list", activity) - - if Enum.member?(activity.data["to"], public) do - Pleroma.Web.Streamer.stream("public", activity) + object = Object.normalize(activity) + # Do not stream out poll replies + unless object.data["type"] == "Answer" do + Pleroma.Web.Streamer.stream("user", activity) + Pleroma.Web.Streamer.stream("list", activity) - if activity.local do - Pleroma.Web.Streamer.stream("public:local", activity) - end + if Enum.member?(activity.data["to"], public) do + Pleroma.Web.Streamer.stream("public", activity) - if activity.data["type"] in ["Create"] do - object = Object.normalize(activity) + if activity.local do + Pleroma.Web.Streamer.stream("public:local", activity) + end - object.data - |> Map.get("tag", []) - |> Enum.filter(fn tag -> is_bitstring(tag) end) - |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end) + if activity.data["type"] in ["Create"] do + object.data + |> Map.get("tag", []) + |> Enum.filter(fn tag -> is_bitstring(tag) end) + |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end) - if object.data["attachment"] != [] do - Pleroma.Web.Streamer.stream("public:media", activity) + if object.data["attachment"] != [] do + Pleroma.Web.Streamer.stream("public:media", activity) - if activity.local do - Pleroma.Web.Streamer.stream("public:local:media", activity) + if activity.local do + Pleroma.Web.Streamer.stream("public:local:media", activity) + end end end + else + # TODO: Write test, replace with visibility test + if !Enum.member?(activity.data["cc"] || [], public) && + !Enum.member?( + activity.data["to"], + User.get_cached_by_ap_id(activity.data["actor"]).follower_address + ), + do: Pleroma.Web.Streamer.stream("direct", activity) end - else - # TODO: Write test, replace with visibility test - if !Enum.member?(activity.data["cc"] || [], public) && - !Enum.member?( - activity.data["to"], - User.get_cached_by_ap_id(activity.data["actor"]).follower_address - ), - do: Pleroma.Web.Streamer.stream("direct", activity) end end end @@ -241,6 +246,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do {:ok, activity} <- insert(create_data, local, fake), {:fake, false, activity} <- {:fake, fake, activity}, _ <- increase_replies_count_if_reply(create_data), + _ <- increase_poll_votes_if_vote(create_data), # Changing note count prior to enqueuing federation task in order to avoid # race conditions on updating user.info {:ok, _actor} <- increase_note_count_if_public(actor, activity), @@ -405,16 +411,12 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end def block(blocker, blocked, activity_id \\ nil, local \\ true) do - ap_config = Application.get_env(:pleroma, :activitypub) - unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked) - outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks) + outgoing_blocks = Pleroma.Config.get([:activitypub, :outgoing_blocks]) + unfollow_blocked = Pleroma.Config.get([:activitypub, :unfollow_blocked]) - with true <- unfollow_blocked do + if unfollow_blocked do follow_activity = fetch_latest_follow(blocker, blocked) - - if follow_activity do - unfollow(blocker, blocked, nil, local) - end + if follow_activity, do: unfollow(blocker, blocked, nil, local) end with true <- outgoing_blocks, @@ -486,6 +488,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public from(activity in Activity) + |> maybe_preload_objects(opts) |> restrict_blocked(opts) |> restrict_recipients(recipients, opts["user"]) |> where( @@ -498,6 +501,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do ^context ) ) + |> exclude_poll_votes(opts) |> order_by([activity], desc: activity.id) end @@ -505,7 +509,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do def fetch_activities_for_context(context, opts \\ %{}) do context |> fetch_activities_for_context_query(opts) - |> Activity.with_preloaded_object() |> Repo.all() end @@ -513,7 +516,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do Pleroma.FlakeId.t() | nil def fetch_latest_activity_id_for_context(context, opts \\ %{}) do context - |> fetch_activities_for_context_query(opts) + |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts)) |> limit(1) |> select([a], a.id) |> Repo.one() @@ -533,17 +536,20 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp restrict_visibility(query, %{visibility: visibility}) when is_list(visibility) do if Enum.all?(visibility, &(&1 in @valid_visibilities)) do - from( - a in query, - where: - fragment( - "activity_visibility(?, ?, ?) = ANY (?)", - a.actor, - a.recipients, - a.data, - ^visibility - ) - ) + query = + from( + a in query, + where: + fragment( + "activity_visibility(?, ?, ?) = ANY (?)", + a.actor, + a.recipients, + a.data, + ^visibility + ) + ) + + query else Logger.error("Could not restrict visibility to #{visibility}") end @@ -551,11 +557,14 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp restrict_visibility(query, %{visibility: visibility}) when visibility in @valid_visibilities do - from( - a in query, - where: - fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility) - ) + query = + from( + a in query, + where: + fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility) + ) + + query end defp restrict_visibility(_query, %{visibility: visibility}) @@ -565,6 +574,18 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp restrict_visibility(query, _visibility), do: query + defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}) do + query = + from( + a in query, + where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data) + ) + + query + end + + defp restrict_thread_visibility(query, _), do: query + def fetch_user_activities(user, reading_user, params \\ %{}) do params = params @@ -641,20 +662,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp restrict_tag(query, _), do: query - defp restrict_to_cc(query, recipients_to, recipients_cc) do - from( - activity in query, - where: - fragment( - "(?->'to' \\?| ?) or (?->'cc' \\?| ?)", - activity.data, - ^recipients_to, - activity.data, - ^recipients_cc - ) - ) - end - defp restrict_recipients(query, [], _user), do: query defp restrict_recipients(query, recipients, nil) do @@ -691,6 +698,12 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp restrict_type(query, _), do: query + defp restrict_state(query, %{"state" => state}) do + from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state)) + end + + defp restrict_state(query, _), do: query + defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do from( activity in query, @@ -746,8 +759,11 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do blocks = info.blocks || [] domain_blocks = info.domain_blocks || [] + query = + if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query) + from( - activity in query, + [activity, object: o] in query, where: fragment("not (? = ANY(?))", activity.actor, ^blocks), where: fragment("not (? && ?)", activity.recipients, ^blocks), where: @@ -757,7 +773,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do activity.data, ^blocks ), - where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks) + where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks), + where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks) ) end @@ -798,6 +815,18 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp restrict_muted_reblogs(query, _), do: query + defp exclude_poll_votes(query, %{"include_poll_votes" => "true"}), do: query + + defp exclude_poll_votes(query, _) do + if has_named_binding?(query, :object) do + from([activity, object: o] in query, + where: fragment("not(?->>'type' = ?)", o.data, "Answer") + ) + else + query + end + end + defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query defp maybe_preload_objects(query, _) do @@ -812,6 +841,13 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do |> Activity.with_preloaded_bookmark(opts["user"]) end + defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query + + defp maybe_set_thread_muted_field(query, opts) do + query + |> Activity.with_set_thread_muted_field(opts["user"]) + end + defp maybe_order(query, %{order: :desc}) do query |> order_by(desc: :id) @@ -830,6 +866,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do base_query |> maybe_preload_objects(opts) |> maybe_preload_bookmarks(opts) + |> maybe_set_thread_muted_field(opts) |> maybe_order(opts) |> restrict_recipients(recipients, opts["user"]) |> restrict_tag(opts) @@ -839,15 +876,19 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do |> restrict_local(opts) |> restrict_actor(opts) |> restrict_type(opts) + |> restrict_state(opts) |> restrict_favorited_by(opts) |> restrict_blocked(opts) |> restrict_muted(opts) |> restrict_media(opts) |> restrict_visibility(opts) + |> restrict_thread_visibility(opts) |> restrict_replies(opts) |> restrict_reblogs(opts) |> restrict_pinned(opts) |> restrict_muted_reblogs(opts) + |> Activity.restrict_deactivated_users() + |> exclude_poll_votes(opts) end def fetch_activities(recipients, opts \\ %{}) do @@ -856,9 +897,18 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do |> Enum.reverse() end - def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do + def fetch_activities_bounded_query(query, recipients, recipients_with_public) do + from(activity in query, + where: + fragment("? && ?", activity.recipients, ^recipients) or + (fragment("? && ?", activity.recipients, ^recipients_with_public) and + "https://www.w3.org/ns/activitystreams#Public" in activity.recipients) + ) + end + + def fetch_activities_bounded(recipients, recipients_with_public, opts \\ %{}) do fetch_activities_query([], opts) - |> restrict_to_cc(recipients_to, recipients_cc) + |> fetch_activities_bounded_query(recipients, recipients_with_public) |> Pagination.fetch_paginated(opts) |> Enum.reverse() end @@ -876,7 +926,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end end - def user_data_from_user_object(data) do + defp object_to_user_data(data) do avatar = data["icon"]["url"] && %{ @@ -923,9 +973,19 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do {:ok, user_data} end + def user_data_from_user_object(data) do + with {:ok, data} <- MRF.filter(data), + {:ok, data} <- object_to_user_data(data) do + {:ok, data} + else + e -> {:error, e} + end + end + def fetch_and_prepare_user_from_ap_id(ap_id) do - with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do - user_data_from_user_object(data) + with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id), + {:ok, data} <- user_data_from_user_object(data) do + {:ok, data} else e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}") end @@ -951,89 +1011,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end end - def should_federate?(inbox, public) do - if public do - true - else - inbox_info = URI.parse(inbox) - !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host) - end - end - - def publish(actor, activity) do - remote_followers = - if actor.follower_address in activity.recipients do - {:ok, followers} = User.get_followers(actor) - followers |> Enum.filter(&(!&1.local)) - else - [] - end - - public = is_public?(activity) - - {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) - json = Jason.encode!(data) - - (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers) - |> Enum.filter(fn user -> User.ap_enabled?(user) end) - |> Enum.map(fn %{info: %{source_data: data}} -> - (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"] - end) - |> Enum.uniq() - |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) - |> Instances.filter_reachable() - |> Enum.each(fn {inbox, unreachable_since} -> - Federator.publish_single_ap(%{ - inbox: inbox, - json: json, - actor: actor, - id: activity.data["id"], - unreachable_since: unreachable_since - }) - end) - end - - def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do - Logger.info("Federating #{id} to #{inbox}") - host = URI.parse(inbox).host - - digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64()) - - date = - NaiveDateTime.utc_now() - |> Timex.format!("{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT") - - signature = - Pleroma.Web.HTTPSignatures.sign(actor, %{ - host: host, - "content-length": byte_size(json), - digest: digest, - date: date - }) - - with {:ok, %{status: code}} when code in 200..299 <- - result = - @httpoison.post( - inbox, - json, - [ - {"Content-Type", "application/activity+json"}, - {"Date", date}, - {"signature", signature}, - {"digest", digest} - ] - ) do - if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since], - do: Instances.set_reachable(inbox) - - result - else - {_post_result, response} -> - unless params[:unreachable_since], do: Instances.set_unreachable(inbox) - {:error, response} - end - end - # filter out broken threads def contain_broken_threads(%Activity{} = activity, %User{} = user) do entire_thread_visible_for_user?(activity, user) @@ -1044,17 +1021,10 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do contain_broken_threads(activity, user) end - # do post-processing on a timeline - def contain_timeline(timeline, user) do - timeline - |> Enum.filter(fn activity -> - contain_activity(activity, user) - end) - end - - def fetch_direct_messages_query() do + def fetch_direct_messages_query do Activity |> restrict_type(%{"type" => "Create"}) |> restrict_visibility(%{visibility: "direct"}) + |> order_by([activity], asc: activity.id) end end