Merge branch 'fix/streaming-api-for-create-activity' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / side_effects.ex
index 55c99ad0cabf3da38ae35494fb192ba8b035f9a6..701181a1419e2f12bce3932fba3d0014832019da 100644 (file)
@@ -1,5 +1,5 @@
 # Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Web.ActivityPub.SideEffects do
@@ -10,7 +10,6 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
   collection, and so on.
   """
   alias Pleroma.Activity
-  alias Pleroma.Activity.Ir.Topics
   alias Pleroma.Chat
   alias Pleroma.Chat.MessageReference
   alias Pleroma.FollowingRelationship
@@ -24,13 +23,17 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
   alias Pleroma.Web.ActivityPub.Utils
   alias Pleroma.Web.Push
   alias Pleroma.Web.Streamer
+  alias Pleroma.Workers.PollWorker
 
   require Logger
 
   @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
+  @logger Pleroma.Config.get([:side_effects, :logger], Logger)
 
   @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
 
+  defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
+
   @impl true
   def handle(object, meta \\ [])
 
@@ -192,7 +195,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
   # - Set up notifications
   @impl true
   def handle(%{data: %{"type" => "Create"}} = activity, meta) do
-    with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
+    with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
          %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
       {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
       {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
@@ -201,6 +204,19 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
         Object.increase_replies_count(in_reply_to)
       end
 
+      reply_depth = (meta[:depth] || 0) + 1
+
+      # FIXME: Force inReplyTo to replies
+      if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
+           object.data["replies"] != nil do
+        for reply_id <- object.data["replies"] do
+          Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
+            "id" => reply_id,
+            "depth" => reply_depth
+          })
+        end
+      end
+
       ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
         Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
       end)
@@ -209,6 +225,8 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
         meta
         |> add_notifications(notifications)
 
+      ap_streamer().stream_out(activity)
+
       {:ok, activity, meta}
     else
       e -> Repo.rollback(e)
@@ -229,9 +247,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
     if !User.is_internal_user?(user) do
       Notification.create_notifications(object)
 
-      object
-      |> Topics.get_activity_topics()
-      |> Streamer.stream(object)
+      ap_streamer().stream_out(object)
     end
 
     {:ok, object, meta}
@@ -268,16 +284,16 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
   @impl true
   def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
     deleted_object =
-      Object.normalize(deleted_object, false) ||
+      Object.normalize(deleted_object, fetch: false) ||
         User.get_cached_by_ap_id(deleted_object)
 
     result =
       case deleted_object do
         %Object{} ->
-          with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
+          with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
                {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
                %User{} = user <- User.get_cached_by_ap_id(actor) do
-            User.remove_pinnned_activity(user, activity)
+            User.remove_pinned_object_id(user, deleted_object.data["id"])
 
             {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
 
@@ -287,12 +303,12 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
 
             MessageReference.delete_for_object(deleted_object)
 
-            ActivityPub.stream_out(object)
-            ActivityPub.stream_out_participations(deleted_object, user)
+            ap_streamer().stream_out(object)
+            ap_streamer().stream_out_participations(deleted_object, user)
             :ok
           else
             {:actor, _} ->
-              Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
+              @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
               :no_object_actor
           end
 
@@ -310,13 +326,70 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
     end
   end
 
+  # Tasks this handles:
+  # - adds pin to user
+  # - removes expiration job for pinned activity, if was set for expiration
+  @impl true
+  def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
+    with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
+         {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
+      # if pinned activity was scheduled for deletion, we remove job
+      if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
+        Oban.cancel_job(expiration.id)
+      end
+
+      {:ok, object, meta}
+    else
+      nil ->
+        {:error, :user_not_found}
+
+      {:error, changeset} ->
+        if changeset.errors[:pinned_objects] do
+          {:error, :pinned_statuses_limit_reached}
+        else
+          changeset.errors
+        end
+    end
+  end
+
+  # Tasks this handles:
+  # - removes pin from user
+  # - removes corresponding Add activity
+  # - if activity had expiration, recreates activity expiration job
+  @impl true
+  def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
+    with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
+         {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
+      data["object"]
+      |> Activity.add_by_params_query(user.ap_id, user.featured_address)
+      |> Repo.delete_all()
+
+      # if pinned activity was scheduled for deletion, we reschedule it for deletion
+      if meta[:expires_at] do
+        # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
+        {:ok, expires_at} =
+          Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
+
+        Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
+          activity_id: meta[:activity_id],
+          expires_at: expires_at
+        })
+      end
+
+      {:ok, object, meta}
+    else
+      nil -> {:error, :user_not_found}
+      error -> error
+    end
+  end
+
   # Nothing to do
   @impl true
   def handle(object, meta) do
     {:ok, object, meta}
   end
 
-  def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
+  def handle_object_creation(%{"type" => "ChatMessage"} = object, _activity, meta) do
     with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
       actor = User.get_cached_by_ap_id(object.data["actor"])
       recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
@@ -351,7 +424,14 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
     end
   end
 
-  def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
+  def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
+    with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
+      PollWorker.schedule_poll_end(activity)
+      {:ok, object, meta}
+    end
+  end
+
+  def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
     with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
       Object.increase_vote_count(
         object.data["inReplyTo"],
@@ -363,15 +443,15 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
     end
   end
 
-  def handle_object_creation(%{"type" => objtype} = object, meta)
-      when objtype in ~w[Audio Video Question Event Article] do
+  def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
+      when objtype in ~w[Audio Video Event Article Note Page] do
     with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
       {:ok, object, meta}
     end
   end
 
   # Nothing to do
-  def handle_object_creation(object, meta) do
+  def handle_object_creation(object, _activity, meta) do
     {:ok, object, meta}
   end