defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.{Activity, Repo, Object, Upload, User, Notification}
+ alias Pleroma.Object.Fetcher
alias Pleroma.Web.ActivityPub.{Transmogrifier, MRF}
alias Pleroma.Web.WebFinger
alias Pleroma.Web.Federator
@httpoison Application.get_env(:pleroma, :httpoison)
- @instance Application.get_env(:pleroma, :instance)
-
# For Announce activities, we filter the recipients based on following status for any actors
# that match actual users. See issue #164 for more information about why this is necessary.
defp get_recipients(%{"type" => "Announce"} = data) do
defp check_actor_is_active(actor) do
if not is_nil(actor) do
with user <- User.get_cached_by_ap_id(actor),
- nil <- user.info["deactivated"] do
+ false <- user.info.deactivated do
:ok
else
_e -> :reject
map <- lazy_put_activity_defaults(map),
:ok <- check_actor_is_active(map["actor"]),
{:ok, map} <- MRF.filter(map),
- :ok <- insert_full_object(map) do
+ {:ok, map} <- insert_full_object(map) do
{recipients, _, _} = get_recipients(map)
{:ok, activity} =
public = "https://www.w3.org/ns/activitystreams#Public"
if activity.data["type"] in ["Create", "Announce"] do
+ object = Object.normalize(activity.data["object"])
+
Pleroma.Web.Streamer.stream("user", activity)
Pleroma.Web.Streamer.stream("list", activity)
Pleroma.Web.Streamer.stream("public:local", activity)
end
- activity.data["object"]
+ object.data
|> Map.get("tag", [])
|> Enum.filter(fn tag -> is_bitstring(tag) end)
|> Enum.map(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
- if activity.data["object"]["attachment"] != [] do
+ if object.data["attachment"] != [] do
Pleroma.Web.Streamer.stream("public:media", activity)
if activity.local do
"to" => [user.follower_address, "https://www.w3.org/ns/activitystreams#Public"]
}
- with Repo.delete(object),
- Repo.delete_all(Activity.all_non_create_by_object_ap_id_q(id)),
+ with {:ok, _} <- Object.delete(object),
{:ok, activity} <- insert(data, local),
:ok <- maybe_federate(activity),
{:ok, _actor} <- User.decrease_note_count(user) do
end
defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
- blocks = info["blocks"] || []
- domain_blocks = info["domain_blocks"] || []
+ blocks = info.blocks || []
+ domain_blocks = info.domain_blocks || []
from(
activity in query,
|> Enum.reverse()
end
- def upload(file) do
- data = Upload.store(file, Application.get_env(:pleroma, :instance)[:dedupe_media])
- Repo.insert(%Object{data: data})
+ def upload(file, opts \\ []) do
+ with {:ok, data} <- Upload.store(file, opts) do
+ Repo.insert(%Object{data: data})
+ end
end
def user_data_from_user_object(data) do
end
def fetch_and_prepare_user_from_ap_id(ap_id) do
- with {:ok, %{status_code: 200, body: body}} <-
- @httpoison.get(ap_id, [Accept: "application/activity+json"], follow_redirect: true),
- {:ok, data} <- Jason.decode(body) do
+ with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
user_data_from_user_object(data)
else
e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
end
end
- @quarantined_instances Keyword.get(@instance, :quarantined_instances, [])
-
def should_federate?(inbox, public) do
if public do
true
else
inbox_info = URI.parse(inbox)
- inbox_info.host not in @quarantined_instances
+ !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
end
end
remote_inboxes =
(Pleroma.Web.Salmon.remote_users(activity) ++ followers)
|> Enum.filter(fn user -> User.ap_enabled?(user) end)
- |> Enum.map(fn %{info: %{"source_data" => data}} ->
+ |> Enum.map(fn %{info: %{source_data: data}} ->
(is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
end)
|> Enum.uniq()
)
end
- # TODO:
- # This will create a Create activity, which we need internally at the moment.
- def fetch_object_from_id(id) do
- if object = Object.get_cached_by_ap_id(id) do
- {:ok, object}
- else
- Logger.info("Fetching #{id} via AP")
-
- with true <- String.starts_with?(id, "http"),
- {:ok, %{body: body, status_code: code}} when code in 200..299 <-
- @httpoison.get(
- id,
- [Accept: "application/activity+json"],
- follow_redirect: true,
- timeout: 10000,
- recv_timeout: 20000
- ),
- {:ok, data} <- Jason.decode(body),
- nil <- Object.normalize(data),
- params <- %{
- "type" => "Create",
- "to" => data["to"],
- "cc" => data["cc"],
- "actor" => data["attributedTo"],
- "object" => data
- },
- :ok <- Transmogrifier.contain_origin(id, params),
- {:ok, activity} <- Transmogrifier.handle_incoming(params) do
- {:ok, Object.normalize(activity.data["object"])}
- else
- {:error, {:reject, nil}} ->
- {:reject, nil}
-
- object = %Object{} ->
- {:ok, object}
-
- _e ->
- Logger.info("Couldn't get object via AP, trying out OStatus fetching...")
-
- case OStatus.fetch_activity_from_url(id) do
- {:ok, [activity | _]} -> {:ok, Object.normalize(activity.data["object"])}
- e -> e
- end
- end
- end
- end
-
def is_public?(activity) do
"https://www.w3.org/ns/activitystreams#Public" in (activity.data["to"] ++
(activity.data["cc"] || []))
y = activity.data["to"] ++ (activity.data["cc"] || [])
visible_for_user?(activity, nil) || Enum.any?(x, &(&1 in y))
end
+
+ # guard
+ def entire_thread_visible_for_user?(nil, user), do: false
+
+ # child / root
+ def entire_thread_visible_for_user?(
+ %Activity{data: %{"object" => object_id}} = tail,
+ user
+ ) do
+ parent = Activity.get_in_reply_to_activity(tail)
+
+ cond do
+ !is_nil(parent) ->
+ visible_for_user?(tail, user) && entire_thread_visible_for_user?(parent, user)
+
+ true ->
+ visible_for_user?(tail, user)
+ end
+ end
+
+ # filter out broken threads
+ def contain_broken_threads(%Activity{} = activity, %User{} = user) do
+ entire_thread_visible_for_user?(activity, user)
+ end
+
+ # do post-processing on a specific activity
+ def contain_activity(%Activity{} = activity, %User{} = user) do
+ contain_broken_threads(activity, user)
+ end
+
+ # do post-processing on a timeline
+ def contain_timeline(timeline, user) do
+ timeline
+ |> Enum.filter(fn activity ->
+ contain_activity(activity, user)
+ end)
+ end
end