X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;ds=inline;f=lib%2Fpleroma%2Fweb%2Factivity_pub%2Fside_effects.ex;h=34617a2184fe29da361eaee46604bb4d54c7267a;hb=a079ec3a3cdfd42d2cbd51c7698c2c87828e5778;hp=55c99ad0cabf3da38ae35494fb192ba8b035f9a6;hpb=4134abef63e1165f5701741c1012e64cb908654c;p=akkoma
diff --git a/lib/pleroma/web/activity_pub/side_effects.ex b/lib/pleroma/web/activity_pub/side_effects.ex
index 55c99ad0c..34617a218 100644
--- a/lib/pleroma/web/activity_pub/side_effects.ex
+++ b/lib/pleroma/web/activity_pub/side_effects.ex
@@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors
+# Copyright © 2017-2022 Pleroma Authors
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.ActivityPub.SideEffects do
@@ -10,9 +10,6 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
collection, and so on.
"""
alias Pleroma.Activity
- alias Pleroma.Activity.Ir.Topics
- alias Pleroma.Chat
- alias Pleroma.Chat.MessageReference
alias Pleroma.FollowingRelationship
alias Pleroma.Notification
alias Pleroma.Object
@@ -24,13 +21,17 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.Push
alias Pleroma.Web.Streamer
+ alias Pleroma.Workers.PollWorker
+ require Pleroma.Constants
require Logger
- @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
+ @logger Pleroma.Config.get([:side_effects, :logger], Logger)
@behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
+ defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
+
@impl true
def handle(object, meta \\ [])
@@ -150,23 +151,26 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
# Tasks this handles:
# - Update the user
+ # - Update a non-user object (Note, Question, etc.)
#
# For a local user, we also get a changeset with the full information, so we
# can update non-federating, non-activitypub settings as well.
@impl true
def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
- if changeset = Keyword.get(meta, :user_update_changeset) do
- changeset
- |> User.update_and_set_cache()
+ updated_object_id = updated_object["id"]
+
+ with {_, true} <- {:has_id, is_binary(updated_object_id)},
+ %{"type" => type} <- updated_object,
+ {_, is_user} <- {:is_user, type in Pleroma.Constants.actor_types()} do
+ if is_user do
+ handle_update_user(object, meta)
+ else
+ handle_update_object(object, meta)
+ end
else
- {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
-
- User.get_by_ap_id(updated_object["id"])
- |> User.remote_user_changeset(new_user_data)
- |> User.update_and_set_cache()
+ _ ->
+ {:ok, object, meta}
end
-
- {:ok, object, meta}
end
# Tasks this handles:
@@ -188,30 +192,56 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
# - Increase the user note count
# - Increase the reply count
# - Increase replies count
+ # - Ask for scraping of nodeinfo
# - Set up ActivityExpiration
# - Set up notifications
+ # - Index incoming posts for search (if needed)
@impl true
def handle(%{data: %{"type" => "Create"}} = activity, meta) do
- with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
+ with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
%User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
{:ok, notifications} = Notification.create_notifications(activity, do_send: false)
{:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
+ {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
- if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
+ if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
Object.increase_replies_count(in_reply_to)
end
+ reply_depth = (meta[:depth] || 0) + 1
+
+ Pleroma.Workers.NodeInfoFetcherWorker.enqueue("process", %{
+ "source_url" => activity.data["actor"]
+ })
+
+ # FIXME: Force inReplyTo to replies
+ if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
+ object.data["replies"] != nil do
+ for reply_id <- object.data["replies"] do
+ Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
+ "id" => reply_id,
+ "depth" => reply_depth
+ })
+ end
+ end
+
ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
end)
+ Pleroma.Search.add_to_index(Map.put(activity, :object, object))
+
meta =
meta
|> add_notifications(notifications)
+ ap_streamer().stream_out(activity)
+
{:ok, activity, meta}
else
- e -> Repo.rollback(e)
+ e ->
+ Logger.error(inspect(e))
+ Repo.rollback(e)
end
end
@@ -229,9 +259,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
if !User.is_internal_user?(user) do
Notification.create_notifications(object)
- object
- |> Topics.get_activity_topics()
- |> Streamer.stream(object)
+ ap_streamer().stream_out(object)
end
{:ok, object, meta}
@@ -260,24 +288,24 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
# Tasks this handles:
# - Delete and unpins the create activity
- # - Replace object with Tombstone
# - Set up notification
# - Reduce the user note count
# - Reduce the reply count
# - Stream out the activity
+ # - Removes posts from search index (if needed)
@impl true
def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
deleted_object =
- Object.normalize(deleted_object, false) ||
+ Object.normalize(deleted_object, fetch: false) ||
User.get_cached_by_ap_id(deleted_object)
result =
case deleted_object do
%Object{} ->
- with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
+ with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
{_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
%User{} = user <- User.get_cached_by_ap_id(actor) do
- User.remove_pinnned_activity(user, activity)
+ User.remove_pinned_object_id(user, deleted_object.data["id"])
{:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
@@ -285,14 +313,12 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
Object.decrease_replies_count(in_reply_to)
end
- MessageReference.delete_for_object(deleted_object)
-
- ActivityPub.stream_out(object)
- ActivityPub.stream_out_participations(deleted_object, user)
+ ap_streamer().stream_out(object)
+ ap_streamer().stream_out_participations(deleted_object, user)
:ok
else
{:actor, _} ->
- Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
+ @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
:no_object_actor
end
@@ -303,55 +329,161 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
end
if result == :ok do
- Notification.create_notifications(object)
+ # Only remove from index when deleting actual objects, not users or anything else
+ with %Pleroma.Object{} <- deleted_object do
+ Pleroma.Search.remove_from_index(deleted_object)
+ end
+
{:ok, object, meta}
else
{:error, result}
end
end
+ # Tasks this handles:
+ # - adds pin to user
+ # - removes expiration job for pinned activity, if was set for expiration
+ @impl true
+ def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
+ with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
+ {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
+ # if pinned activity was scheduled for deletion, we remove job
+ if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
+ Oban.cancel_job(expiration.id)
+ end
+
+ {:ok, object, meta}
+ else
+ nil ->
+ {:error, :user_not_found}
+
+ {:error, changeset} ->
+ if changeset.errors[:pinned_objects] do
+ {:error, :pinned_statuses_limit_reached}
+ else
+ changeset.errors
+ end
+ end
+ end
+
+ # Tasks this handles:
+ # - removes pin from user
+ # - removes corresponding Add activity
+ # - if activity had expiration, recreates activity expiration job
+ @impl true
+ def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
+ with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
+ {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
+ data["object"]
+ |> Activity.add_by_params_query(user.ap_id, user.featured_address)
+ |> Repo.delete_all()
+
+ # if pinned activity was scheduled for deletion, we reschedule it for deletion
+ if meta[:expires_at] do
+ # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
+ {:ok, expires_at} =
+ Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
+
+ Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
+ activity_id: meta[:activity_id],
+ expires_at: expires_at
+ })
+ end
+
+ {:ok, object, meta}
+ else
+ nil -> {:error, :user_not_found}
+ error -> error
+ end
+ end
+
# Nothing to do
@impl true
def handle(object, meta) do
{:ok, object, meta}
end
- def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
- with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
- actor = User.get_cached_by_ap_id(object.data["actor"])
- recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
-
- streamables =
- [[actor, recipient], [recipient, actor]]
- |> Enum.uniq()
- |> Enum.map(fn [user, other_user] ->
- if user.local do
- {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
- {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
-
- @cachex.put(
- :chat_message_id_idempotency_key_cache,
- cm_ref.id,
- meta[:idempotency_key]
- )
-
- {
- ["user", "user:pleroma_chat"],
- {user, %{cm_ref | chat: chat, object: object}}
- }
+ defp handle_update_user(
+ %{data: %{"type" => "Update", "object" => updated_object}} = object,
+ meta
+ ) do
+ if changeset = Keyword.get(meta, :user_update_changeset) do
+ changeset
+ |> User.update_and_set_cache()
+ else
+ {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
+
+ User.get_by_ap_id(updated_object["id"])
+ |> User.remote_user_changeset(new_user_data)
+ |> User.update_and_set_cache()
+ end
+
+ {:ok, object, meta}
+ end
+
+ defp handle_update_object(
+ %{data: %{"type" => "Update", "object" => updated_object}} = object,
+ meta
+ ) do
+ orig_object_ap_id = updated_object["id"]
+ orig_object = Object.get_by_ap_id(orig_object_ap_id)
+ orig_object_data = orig_object.data
+
+ updated_object =
+ if meta[:local] do
+ # If this is a local Update, we don't process it by transmogrifier,
+ # so we use the embedded object as-is.
+ updated_object
+ else
+ meta[:object_data]
+ end
+
+ if orig_object_data["type"] in Pleroma.Constants.updatable_object_types() do
+ %{
+ updated_data: updated_object_data,
+ updated: updated,
+ used_history_in_new_object?: used_history_in_new_object?
+ } = Object.Updater.make_new_object_data_from_update_object(orig_object_data, updated_object)
+
+ changeset =
+ orig_object
+ |> Repo.preload(:hashtags)
+ |> Object.change(%{data: updated_object_data})
+
+ with {:ok, new_object} <- Repo.update(changeset),
+ {:ok, _} <- Object.invalid_object_cache(new_object),
+ {:ok, _} <- Object.set_cache(new_object),
+ # The metadata/utils.ex uses the object id for the cache.
+ {:ok, _} <- Pleroma.Activity.HTML.invalidate_cache_for(new_object.id) do
+ if used_history_in_new_object? do
+ with create_activity when not is_nil(create_activity) <-
+ Pleroma.Activity.get_create_by_object_ap_id(orig_object_ap_id),
+ {:ok, _} <- Pleroma.Activity.HTML.invalidate_cache_for(create_activity.id) do
+ nil
+ else
+ _ -> nil
end
- end)
- |> Enum.filter(& &1)
+ end
- meta =
- meta
- |> add_streamables(streamables)
+ if updated do
+ object
+ |> Activity.normalize()
+ |> ActivityPub.notify_and_stream()
+ end
+ end
+ end
+ {:ok, object, meta}
+ end
+
+ def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
+ with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
+ PollWorker.schedule_poll_end(activity)
{:ok, object, meta}
end
end
- def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
+ def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
Object.increase_vote_count(
object.data["inReplyTo"],
@@ -363,15 +495,15 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
end
end
- def handle_object_creation(%{"type" => objtype} = object, meta)
- when objtype in ~w[Audio Video Question Event Article] do
+ def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
+ when objtype in ~w[Audio Video Event Article Note Page] do
with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
{:ok, object, meta}
end
end
# Nothing to do
- def handle_object_creation(object, meta) do
+ def handle_object_creation(object, _activity, meta) do
{:ok, object, meta}
end
@@ -442,13 +574,6 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
meta
end
- defp add_streamables(meta, streamables) do
- existing = Keyword.get(meta, :streamables, [])
-
- meta
- |> Keyword.put(:streamables, streamables ++ existing)
- end
-
defp add_notifications(meta, notifications) do
existing = Keyword.get(meta, :notifications, [])