X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fpleroma%2Fweb%2Factivity_pub%2Factivity_pub.ex;h=9a137d8de8bb8abb2ee888565db7d228b1a6e669;hb=5e2b491276d5cd8d90fddf219f7653d1c9b31ef3;hp=d06bc64eac30d515f965fe87eb9b466e8150a23a;hpb=1557b99beb3b406572ef2d3baaabed1c9baeca1c;p=akkoma diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index d06bc64ea..9a137d8de 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -4,7 +4,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.Activity - alias Pleroma.Instances + alias Pleroma.Conversation alias Pleroma.Notification alias Pleroma.Object alias Pleroma.Object.Fetcher @@ -14,7 +14,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.User alias Pleroma.Web.ActivityPub.MRF alias Pleroma.Web.ActivityPub.Transmogrifier - alias Pleroma.Web.Federator alias Pleroma.Web.WebFinger import Ecto.Query @@ -23,8 +22,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do require Logger - @httpoison Application.get_env(:pleroma, :httpoison) - # For Announce activities, we filter the recipients based on following status for any actors # that match actual users. See issue #164 for more information about why this is necessary. defp get_recipients(%{"type" => "Announce"} = data) do @@ -141,7 +138,14 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end) Notification.create_notifications(activity) + + participations = + activity + |> Conversation.create_or_bump_for() + |> get_participations() + stream_out(activity) + stream_out_participations(participations) {:ok, activity} else %Activity{} = activity -> @@ -164,6 +168,19 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end end + defp get_participations({:ok, %{participations: participations}}), do: participations + defp get_participations(_), do: [] + + def stream_out_participations(participations) do + participations = + participations + |> Repo.preload(:user) + + Enum.each(participations, fn participation -> + Pleroma.Web.Streamer.stream("participation", participation) + end) + end + def stream_out(activity) do public = "https://www.w3.org/ns/activitystreams#Public" @@ -195,6 +212,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end end else + # TODO: Write test, replace with visibility test if !Enum.member?(activity.data["cc"] || [], public) && !Enum.member?( activity.data["to"], @@ -457,35 +475,44 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end end - def fetch_activities_for_context(context, opts \\ %{}) do + defp fetch_activities_for_context_query(context, opts) do public = ["https://www.w3.org/ns/activitystreams#Public"] recipients = if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public - query = from(activity in Activity) - - query = - query - |> restrict_blocked(opts) - |> restrict_recipients(recipients, opts["user"]) - - query = - from( - activity in query, - where: - fragment( - "?->>'type' = ? and ?->>'context' = ?", - activity.data, - "Create", - activity.data, - ^context - ), - order_by: [desc: :id] + from(activity in Activity) + |> restrict_blocked(opts) + |> restrict_recipients(recipients, opts["user"]) + |> where( + [activity], + fragment( + "?->>'type' = ? and ?->>'context' = ?", + activity.data, + "Create", + activity.data, + ^context ) - |> Activity.with_preloaded_object() + ) + |> order_by([activity], desc: activity.id) + end - Repo.all(query) + @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()] + def fetch_activities_for_context(context, opts \\ %{}) do + context + |> fetch_activities_for_context_query(opts) + |> Activity.with_preloaded_object() + |> Repo.all() + end + + @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) :: + Pleroma.FlakeId.t() | nil + def fetch_latest_activity_id_for_context(context, opts \\ %{}) do + context + |> fetch_activities_for_context_query(opts) + |> limit(1) + |> select([a], a.id) + |> Repo.one() end def fetch_public_activities(opts \\ %{}) do @@ -784,11 +811,32 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do |> Activity.with_preloaded_object() end + defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query + + defp maybe_preload_bookmarks(query, opts) do + query + |> Activity.with_preloaded_bookmark(opts["user"]) + end + + defp maybe_order(query, %{order: :desc}) do + query + |> order_by(desc: :id) + end + + defp maybe_order(query, %{order: :asc}) do + query + |> order_by(asc: :id) + end + + defp maybe_order(query, _), do: query + def fetch_activities_query(recipients, opts \\ %{}) do base_query = from(activity in Activity) base_query |> maybe_preload_objects(opts) + |> maybe_preload_bookmarks(opts) + |> maybe_order(opts) |> restrict_recipients(recipients, opts["user"]) |> restrict_tag(opts) |> restrict_tag_reject(opts) @@ -910,89 +958,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end end - def should_federate?(inbox, public) do - if public do - true - else - inbox_info = URI.parse(inbox) - !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host) - end - end - - def publish(actor, activity) do - remote_followers = - if actor.follower_address in activity.recipients do - {:ok, followers} = User.get_followers(actor) - followers |> Enum.filter(&(!&1.local)) - else - [] - end - - public = is_public?(activity) - - {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) - json = Jason.encode!(data) - - (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers) - |> Enum.filter(fn user -> User.ap_enabled?(user) end) - |> Enum.map(fn %{info: %{source_data: data}} -> - (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"] - end) - |> Enum.uniq() - |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) - |> Instances.filter_reachable() - |> Enum.each(fn {inbox, unreachable_since} -> - Federator.publish_single_ap(%{ - inbox: inbox, - json: json, - actor: actor, - id: activity.data["id"], - unreachable_since: unreachable_since - }) - end) - end - - def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do - Logger.info("Federating #{id} to #{inbox}") - host = URI.parse(inbox).host - - digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64()) - - date = - NaiveDateTime.utc_now() - |> Timex.format!("{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT") - - signature = - Pleroma.Web.HTTPSignatures.sign(actor, %{ - host: host, - "content-length": byte_size(json), - digest: digest, - date: date - }) - - with {:ok, %{status: code}} when code in 200..299 <- - result = - @httpoison.post( - inbox, - json, - [ - {"Content-Type", "application/activity+json"}, - {"Date", date}, - {"signature", signature}, - {"digest", digest} - ] - ) do - if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since], - do: Instances.set_reachable(inbox) - - result - else - {_post_result, response} -> - unless params[:unreachable_since], do: Instances.set_unreachable(inbox) - {:error, response} - end - end - # filter out broken threads def contain_broken_threads(%Activity{} = activity, %User{} = user) do entire_thread_visible_for_user?(activity, user)