defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.Activity
alias Pleroma.Conversation
- alias Pleroma.Instances
alias Pleroma.Notification
alias Pleroma.Object
alias Pleroma.Object.Fetcher
alias Pleroma.User
alias Pleroma.Web.ActivityPub.MRF
alias Pleroma.Web.ActivityPub.Transmogrifier
- alias Pleroma.Web.Federator
alias Pleroma.Web.WebFinger
import Ecto.Query
require Logger
- @httpoison Application.get_env(:pleroma, :httpoison)
-
# For Announce activities, we filter the recipients based on following status for any actors
# that match actual users. See issue #164 for more information about why this is necessary.
defp get_recipients(%{"type" => "Announce"} = data) do
end)
Notification.create_notifications(activity)
- Conversation.create_or_bump_for(activity)
+
+ participations =
+ activity
+ |> Conversation.create_or_bump_for()
+ |> get_participations()
+
stream_out(activity)
+ stream_out_participations(participations)
{:ok, activity}
else
%Activity{} = activity ->
end
end
+ defp get_participations({:ok, %{participations: participations}}), do: participations
+ defp get_participations(_), do: []
+
+ def stream_out_participations(participations) do
+ participations =
+ participations
+ |> Repo.preload(:user)
+
+ Enum.each(participations, fn participation ->
+ Pleroma.Web.Streamer.stream("participation", participation)
+ end)
+ end
+
def stream_out(activity) do
public = "https://www.w3.org/ns/activitystreams#Public"
end
end
else
+ # TODO: Write test, replace with visibility test
if !Enum.member?(activity.data["cc"] || [], public) &&
!Enum.member?(
activity.data["to"],
|> Activity.with_preloaded_object()
end
+ defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
+
+ defp maybe_preload_bookmarks(query, opts) do
+ query
+ |> Activity.with_preloaded_bookmark(opts["user"])
+ end
+
+ defp maybe_order(query, %{order: :desc}) do
+ query
+ |> order_by(desc: :id)
+ end
+
+ defp maybe_order(query, %{order: :asc}) do
+ query
+ |> order_by(asc: :id)
+ end
+
+ defp maybe_order(query, _), do: query
+
def fetch_activities_query(recipients, opts \\ %{}) do
base_query = from(activity in Activity)
base_query
|> maybe_preload_objects(opts)
+ |> maybe_preload_bookmarks(opts)
+ |> maybe_order(opts)
|> restrict_recipients(recipients, opts["user"])
|> restrict_tag(opts)
|> restrict_tag_reject(opts)
|> restrict_reblogs(opts)
|> restrict_pinned(opts)
|> restrict_muted_reblogs(opts)
+ |> Activity.restrict_deactivated_users()
end
def fetch_activities(recipients, opts \\ %{}) do
end
end
- def should_federate?(inbox, public) do
- if public do
- true
- else
- inbox_info = URI.parse(inbox)
- !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
- end
- end
-
- def publish(actor, activity) do
- remote_followers =
- if actor.follower_address in activity.recipients do
- {:ok, followers} = User.get_followers(actor)
- followers |> Enum.filter(&(!&1.local))
- else
- []
- end
-
- public = is_public?(activity)
-
- {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
- json = Jason.encode!(data)
-
- (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
- |> Enum.filter(fn user -> User.ap_enabled?(user) end)
- |> Enum.map(fn %{info: %{source_data: data}} ->
- (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
- end)
- |> Enum.uniq()
- |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
- |> Instances.filter_reachable()
- |> Enum.each(fn {inbox, unreachable_since} ->
- Federator.publish_single_ap(%{
- inbox: inbox,
- json: json,
- actor: actor,
- id: activity.data["id"],
- unreachable_since: unreachable_since
- })
- end)
- end
-
- def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
- Logger.info("Federating #{id} to #{inbox}")
- host = URI.parse(inbox).host
-
- digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
-
- date =
- NaiveDateTime.utc_now()
- |> Timex.format!("{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT")
-
- signature =
- Pleroma.Web.HTTPSignatures.sign(actor, %{
- host: host,
- "content-length": byte_size(json),
- digest: digest,
- date: date
- })
-
- with {:ok, %{status: code}} when code in 200..299 <-
- result =
- @httpoison.post(
- inbox,
- json,
- [
- {"Content-Type", "application/activity+json"},
- {"Date", date},
- {"signature", signature},
- {"digest", digest}
- ]
- ) do
- if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
- do: Instances.set_reachable(inbox)
-
- result
- else
- {_post_result, response} ->
- unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
- {:error, response}
- end
- end
-
# filter out broken threads
def contain_broken_threads(%Activity{} = activity, %User{} = user) do
entire_thread_visible_for_user?(activity, user)