X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fweb%2Factivity_pub%2Factivity_pub.ex;h=9a137d8de8bb8abb2ee888565db7d228b1a6e669;hb=5e2b491276d5cd8d90fddf219f7653d1c9b31ef3;hp=f217e7bac35c9271f633c3765ddb9477769d1e31;hpb=a9f805c87100fd2ee1d8426460b81af4a235d574;p=akkoma diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index f217e7bac..9a137d8de 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -4,16 +4,16 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.Activity - alias Pleroma.Instances + alias Pleroma.Conversation alias Pleroma.Notification alias Pleroma.Object + alias Pleroma.Object.Fetcher + alias Pleroma.Pagination alias Pleroma.Repo alias Pleroma.Upload alias Pleroma.User alias Pleroma.Web.ActivityPub.MRF alias Pleroma.Web.ActivityPub.Transmogrifier - alias Pleroma.Web.Federator - alias Pleroma.Web.OStatus alias Pleroma.Web.WebFinger import Ecto.Query @@ -22,8 +22,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do require Logger - @httpoison Application.get_env(:pleroma, :httpoison) - # For Announce activities, we filter the recipients based on following status for any actors # that match actual users. See issue #164 for more information about why this is necessary. defp get_recipients(%{"type" => "Announce"} = data) do @@ -90,12 +88,10 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end def increase_replies_count_if_reply(%{ - "object" => - %{"inReplyTo" => reply_ap_id, "inReplyToStatusId" => reply_status_id} = object, + "object" => %{"inReplyTo" => reply_ap_id} = object, "type" => "Create" }) do if is_public?(object) do - Activity.increase_replies_count(reply_status_id) Object.increase_replies_count(reply_ap_id) end end @@ -103,10 +99,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do def increase_replies_count_if_reply(_create_data), do: :noop def decrease_replies_count_if_reply(%Object{ - data: %{"inReplyTo" => reply_ap_id, "inReplyToStatusId" => reply_status_id} = object + data: %{"inReplyTo" => reply_ap_id} = object }) do if is_public?(object) do - Activity.decrease_replies_count(reply_status_id) Object.decrease_replies_count(reply_ap_id) end end @@ -121,7 +116,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do {:ok, map} <- MRF.filter(map), {recipients, _, _} = get_recipients(map), {:fake, false, map, recipients} <- {:fake, fake, map, recipients}, - {:ok, object} <- insert_full_object(map) do + {:ok, map, object} <- insert_full_object(map) do {:ok, activity} = Repo.insert(%Activity{ data: map, @@ -143,7 +138,14 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end) Notification.create_notifications(activity) + + participations = + activity + |> Conversation.create_or_bump_for() + |> get_participations() + stream_out(activity) + stream_out_participations(participations) {:ok, activity} else %Activity{} = activity -> @@ -166,6 +168,19 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end end + defp get_participations({:ok, %{participations: participations}}), do: participations + defp get_participations(_), do: [] + + def stream_out_participations(participations) do + participations = + participations + |> Repo.preload(:user) + + Enum.each(participations, fn participation -> + Pleroma.Web.Streamer.stream("participation", participation) + end) + end + def stream_out(activity) do public = "https://www.w3.org/ns/activitystreams#Public" @@ -181,12 +196,14 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end if activity.data["type"] in ["Create"] do - activity.data["object"] + object = Object.normalize(activity) + + object.data |> Map.get("tag", []) |> Enum.filter(fn tag -> is_bitstring(tag) end) |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end) - if activity.data["object"]["attachment"] != [] do + if object.data["attachment"] != [] do Pleroma.Web.Streamer.stream("public:media", activity) if activity.local do @@ -195,10 +212,11 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end end else + # TODO: Write test, replace with visibility test if !Enum.member?(activity.data["cc"] || [], public) && !Enum.member?( activity.data["to"], - User.get_by_ap_id(activity.data["actor"]).follower_address + User.get_cached_by_ap_id(activity.data["actor"]).follower_address ), do: Pleroma.Web.Streamer.stream("direct", activity) end @@ -449,43 +467,52 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do :ok <- maybe_federate(activity) do Enum.each(User.all_superusers(), fn superuser -> superuser - |> Pleroma.AdminEmail.report(actor, account, statuses, content) - |> Pleroma.Mailer.deliver_async() + |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content) + |> Pleroma.Emails.Mailer.deliver_async() end) {:ok, activity} end end - def fetch_activities_for_context(context, opts \\ %{}) do + defp fetch_activities_for_context_query(context, opts) do public = ["https://www.w3.org/ns/activitystreams#Public"] recipients = if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public - query = from(activity in Activity) - - query = - query - |> restrict_blocked(opts) - |> restrict_recipients(recipients, opts["user"]) - - query = - from( - activity in query, - where: - fragment( - "?->>'type' = ? and ?->>'context' = ?", - activity.data, - "Create", - activity.data, - ^context - ), - order_by: [desc: :id] + from(activity in Activity) + |> restrict_blocked(opts) + |> restrict_recipients(recipients, opts["user"]) + |> where( + [activity], + fragment( + "?->>'type' = ? and ?->>'context' = ?", + activity.data, + "Create", + activity.data, + ^context ) - |> Activity.with_preloaded_object() + ) + |> order_by([activity], desc: activity.id) + end + + @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()] + def fetch_activities_for_context(context, opts \\ %{}) do + context + |> fetch_activities_for_context_query(opts) + |> Activity.with_preloaded_object() + |> Repo.all() + end - Repo.all(query) + @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) :: + Pleroma.FlakeId.t() | nil + def fetch_latest_activity_id_for_context(context, opts \\ %{}) do + context + |> fetch_activities_for_context_query(opts) + |> limit(1) + |> select([a], a.id) + |> Repo.one() end def fetch_public_activities(opts \\ %{}) do @@ -493,7 +520,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do q |> restrict_unlisted() - |> Repo.all() + |> Pagination.fetch_paginated(opts) |> Enum.reverse() end @@ -572,37 +599,49 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp restrict_since(query, _), do: query + defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do + raise "Can't use the child object without preloading!" + end + defp restrict_tag_reject(query, %{"tag_reject" => tag_reject}) when is_list(tag_reject) and tag_reject != [] do from( - activity in query, - where: fragment(~s(\(not \(? #> '{"object","tag"}'\) \\?| ?\)), activity.data, ^tag_reject) + [_activity, object] in query, + where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject) ) end defp restrict_tag_reject(query, _), do: query + defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do + raise "Can't use the child object without preloading!" + end + defp restrict_tag_all(query, %{"tag_all" => tag_all}) when is_list(tag_all) and tag_all != [] do from( - activity in query, - where: fragment(~s(\(? #> '{"object","tag"}'\) \\?& ?), activity.data, ^tag_all) + [_activity, object] in query, + where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all) ) end defp restrict_tag_all(query, _), do: query + defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do + raise "Can't use the child object without preloading!" + end + defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do from( - activity in query, - where: fragment(~s(\(? #> '{"object","tag"}'\) \\?| ?), activity.data, ^tag) + [_activity, object] in query, + where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag) ) end defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do from( - activity in query, - where: fragment(~s(? <@ (? #> '{"object","tag"}'\)), ^tag, activity.data) + [_activity, object] in query, + where: fragment("(?)->'tag' \\? (?)", object.data, ^tag) ) end @@ -636,26 +675,12 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do ) end - defp restrict_limit(query, %{"limit" => limit}) do - from(activity in query, limit: ^limit) - end - - defp restrict_limit(query, _), do: query - defp restrict_local(query, %{"local_only" => true}) do from(activity in query, where: activity.local == true) end defp restrict_local(query, _), do: query - defp restrict_max(query, %{"max_id" => ""}), do: query - - defp restrict_max(query, %{"max_id" => max_id}) do - from(activity in query, where: activity.id < ^max_id) - end - - defp restrict_max(query, _), do: query - defp restrict_actor(query, %{"actor_id" => actor_id}) do from(activity in query, where: activity.actor == ^actor_id) end @@ -681,10 +706,14 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp restrict_favorited_by(query, _), do: query + defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do + raise "Can't use the child object without preloading!" + end + defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do from( - activity in query, - where: fragment(~s(not (? #> '{"object","attachment"}' = ?\)), activity.data, ^[]) + [_activity, object] in query, + where: fragment("not (?)->'attachment' = (?)", object.data, ^[]) ) end @@ -726,7 +755,14 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do from( activity in query, where: fragment("not (? = ANY(?))", activity.actor, ^blocks), - where: fragment("not (?->'to' \\?| ?)", activity.data, ^blocks), + where: fragment("not (? && ?)", activity.recipients, ^blocks), + where: + fragment( + "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)", + activity.data, + activity.data, + ^blocks + ), where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks) ) end @@ -775,24 +811,38 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do |> Activity.with_preloaded_object() end + defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query + + defp maybe_preload_bookmarks(query, opts) do + query + |> Activity.with_preloaded_bookmark(opts["user"]) + end + + defp maybe_order(query, %{order: :desc}) do + query + |> order_by(desc: :id) + end + + defp maybe_order(query, %{order: :asc}) do + query + |> order_by(asc: :id) + end + + defp maybe_order(query, _), do: query + def fetch_activities_query(recipients, opts \\ %{}) do - base_query = - from( - activity in Activity, - limit: 20, - order_by: [fragment("? desc nulls last", activity.id)] - ) + base_query = from(activity in Activity) base_query |> maybe_preload_objects(opts) + |> maybe_preload_bookmarks(opts) + |> maybe_order(opts) |> restrict_recipients(recipients, opts["user"]) |> restrict_tag(opts) |> restrict_tag_reject(opts) |> restrict_tag_all(opts) |> restrict_since(opts) |> restrict_local(opts) - |> restrict_limit(opts) - |> restrict_max(opts) |> restrict_actor(opts) |> restrict_type(opts) |> restrict_favorited_by(opts) @@ -804,18 +854,19 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do |> restrict_reblogs(opts) |> restrict_pinned(opts) |> restrict_muted_reblogs(opts) + |> Activity.restrict_deactivated_users() end def fetch_activities(recipients, opts \\ %{}) do fetch_activities_query(recipients, opts) - |> Repo.all() + |> Pagination.fetch_paginated(opts) |> Enum.reverse() end def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do fetch_activities_query([], opts) |> restrict_to_cc(recipients_to, recipients_cc) - |> Repo.all() + |> Pagination.fetch_paginated(opts) |> Enum.reverse() end @@ -880,7 +931,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end def fetch_and_prepare_user_from_ap_id(ap_id) do - with {:ok, data} <- fetch_and_contain_remote_object_from_id(ap_id) do + with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do user_data_from_user_object(data) else e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}") @@ -888,7 +939,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end def make_user_from_ap_id(ap_id) do - if _user = User.get_by_ap_id(ap_id) do + if _user = User.get_cached_by_ap_id(ap_id) do Transmogrifier.upgrade_user_from_ap_id(ap_id) else with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do @@ -907,143 +958,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end end - def should_federate?(inbox, public) do - if public do - true - else - inbox_info = URI.parse(inbox) - !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host) - end - end - - def publish(actor, activity) do - remote_followers = - if actor.follower_address in activity.recipients do - {:ok, followers} = User.get_followers(actor) - followers |> Enum.filter(&(!&1.local)) - else - [] - end - - public = is_public?(activity) - - {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) - json = Jason.encode!(data) - - (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers) - |> Enum.filter(fn user -> User.ap_enabled?(user) end) - |> Enum.map(fn %{info: %{source_data: data}} -> - (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"] - end) - |> Enum.uniq() - |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) - |> Instances.filter_reachable() - |> Enum.each(fn {inbox, unreachable_since} -> - Federator.publish_single_ap(%{ - inbox: inbox, - json: json, - actor: actor, - id: activity.data["id"], - unreachable_since: unreachable_since - }) - end) - end - - def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do - Logger.info("Federating #{id} to #{inbox}") - host = URI.parse(inbox).host - - digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64()) - - date = - NaiveDateTime.utc_now() - |> Timex.format!("{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT") - - signature = - Pleroma.Web.HTTPSignatures.sign(actor, %{ - host: host, - "content-length": byte_size(json), - digest: digest, - date: date - }) - - with {:ok, %{status: code}} when code in 200..299 <- - result = - @httpoison.post( - inbox, - json, - [ - {"Content-Type", "application/activity+json"}, - {"Date", date}, - {"signature", signature}, - {"digest", digest} - ] - ) do - if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since], - do: Instances.set_reachable(inbox) - - result - else - {_post_result, response} -> - unless params[:unreachable_since], do: Instances.set_unreachable(inbox) - {:error, response} - end - end - - # TODO: - # This will create a Create activity, which we need internally at the moment. - def fetch_object_from_id(id) do - if object = Object.get_cached_by_ap_id(id) do - {:ok, object} - else - with {:ok, data} <- fetch_and_contain_remote_object_from_id(id), - nil <- Object.normalize(data), - params <- %{ - "type" => "Create", - "to" => data["to"], - "cc" => data["cc"], - "actor" => data["actor"] || data["attributedTo"], - "object" => data - }, - :ok <- Transmogrifier.contain_origin(id, params), - {:ok, activity} <- Transmogrifier.handle_incoming(params) do - {:ok, Object.normalize(activity)} - else - {:error, {:reject, nil}} -> - {:reject, nil} - - object = %Object{} -> - {:ok, object} - - _e -> - Logger.info("Couldn't get object via AP, trying out OStatus fetching...") - - case OStatus.fetch_activity_from_url(id) do - {:ok, [activity | _]} -> {:ok, Object.normalize(activity)} - e -> e - end - end - end - end - - def fetch_and_contain_remote_object_from_id(id) do - Logger.info("Fetching object #{id} via AP") - - with true <- String.starts_with?(id, "http"), - {:ok, %{body: body, status: code}} when code in 200..299 <- - @httpoison.get( - id, - [{:Accept, "application/activity+json"}] - ), - {:ok, data} <- Jason.decode(body), - :ok <- Transmogrifier.contain_origin_from_id(id, data) do - {:ok, data} - else - e -> - {:error, e} - end - end - # filter out broken threads def contain_broken_threads(%Activity{} = activity, %User{} = user) do entire_thread_visible_for_user?(activity, user)