Change query order in fetch_activities_for_context_query to make poll vote exclusion...
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
index cb88ba308c105deb2d9411d257ed1eb9f75c41a0..45feae25a720fba541480753dd9ef97dd95c7e59 100644 (file)
@@ -4,17 +4,16 @@
 
 defmodule Pleroma.Web.ActivityPub.ActivityPub do
   alias Pleroma.Activity
-  alias Pleroma.Instances
+  alias Pleroma.Conversation
   alias Pleroma.Notification
   alias Pleroma.Object
+  alias Pleroma.Object.Fetcher
   alias Pleroma.Pagination
   alias Pleroma.Repo
   alias Pleroma.Upload
   alias Pleroma.User
   alias Pleroma.Web.ActivityPub.MRF
   alias Pleroma.Web.ActivityPub.Transmogrifier
-  alias Pleroma.Web.Federator
-  alias Pleroma.Web.OStatus
   alias Pleroma.Web.WebFinger
 
   import Ecto.Query
@@ -23,8 +22,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
 
   require Logger
 
-  @httpoison Application.get_env(:pleroma, :httpoison)
-
   # For Announce activities, we filter the recipients based on following status for any actors
   # that match actual users.  See issue #164 for more information about why this is necessary.
   defp get_recipients(%{"type" => "Announce"} = data) do
@@ -95,7 +92,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
         "type" => "Create"
       }) do
     if is_public?(object) do
-      Activity.increase_replies_count(reply_ap_id)
       Object.increase_replies_count(reply_ap_id)
     end
   end
@@ -106,13 +102,21 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
         data: %{"inReplyTo" => reply_ap_id} = object
       }) do
     if is_public?(object) do
-      Activity.decrease_replies_count(reply_ap_id)
       Object.decrease_replies_count(reply_ap_id)
     end
   end
 
   def decrease_replies_count_if_reply(_object), do: :noop
 
+  def increase_poll_votes_if_vote(%{
+        "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
+        "type" => "Create"
+      }) do
+    Object.increase_vote_count(reply_ap_id, name)
+  end
+
+  def increase_poll_votes_if_vote(_create_data), do: :noop
+
   def insert(map, local \\ true, fake \\ false) when is_map(map) do
     with nil <- Activity.normalize(map),
          map <- lazy_put_activity_defaults(map, fake),
@@ -121,7 +125,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
          {:ok, map} <- MRF.filter(map),
          {recipients, _, _} = get_recipients(map),
          {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
-         {:ok, object} <- insert_full_object(map) do
+         {:ok, map, object} <- insert_full_object(map) do
       {:ok, activity} =
         Repo.insert(%Activity{
           data: map,
@@ -138,12 +142,17 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
           activity
         end
 
-      Task.start(fn ->
-        Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
-      end)
+      PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
 
       Notification.create_notifications(activity)
+
+      participations =
+        activity
+        |> Conversation.create_or_bump_for()
+        |> get_participations()
+
       stream_out(activity)
+      stream_out_participations(participations)
       {:ok, activity}
     else
       %Activity{} = activity ->
@@ -166,41 +175,59 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     end
   end
 
+  defp get_participations({:ok, %{participations: participations}}), do: participations
+  defp get_participations(_), do: []
+
+  def stream_out_participations(participations) do
+    participations =
+      participations
+      |> Repo.preload(:user)
+
+    Enum.each(participations, fn participation ->
+      Pleroma.Web.Streamer.stream("participation", participation)
+    end)
+  end
+
   def stream_out(activity) do
     public = "https://www.w3.org/ns/activitystreams#Public"
 
     if activity.data["type"] in ["Create", "Announce", "Delete"] do
-      Pleroma.Web.Streamer.stream("user", activity)
-      Pleroma.Web.Streamer.stream("list", activity)
+      object = Object.normalize(activity)
+      # Do not stream out poll replies
+      unless object.data["type"] == "Answer" do
+        Pleroma.Web.Streamer.stream("user", activity)
+        Pleroma.Web.Streamer.stream("list", activity)
 
-      if Enum.member?(activity.data["to"], public) do
-        Pleroma.Web.Streamer.stream("public", activity)
+        if Enum.member?(activity.data["to"], public) do
+          Pleroma.Web.Streamer.stream("public", activity)
 
-        if activity.local do
-          Pleroma.Web.Streamer.stream("public:local", activity)
-        end
+          if activity.local do
+            Pleroma.Web.Streamer.stream("public:local", activity)
+          end
 
-        if activity.data["type"] in ["Create"] do
-          activity.data["object"]
-          |> Map.get("tag", [])
-          |> Enum.filter(fn tag -> is_bitstring(tag) end)
-          |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
+          if activity.data["type"] in ["Create"] do
+            object.data
+            |> Map.get("tag", [])
+            |> Enum.filter(fn tag -> is_bitstring(tag) end)
+            |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
 
-          if activity.data["object"]["attachment"] != [] do
-            Pleroma.Web.Streamer.stream("public:media", activity)
+            if object.data["attachment"] != [] do
+              Pleroma.Web.Streamer.stream("public:media", activity)
 
-            if activity.local do
-              Pleroma.Web.Streamer.stream("public:local:media", activity)
+              if activity.local do
+                Pleroma.Web.Streamer.stream("public:local:media", activity)
+              end
             end
           end
+        else
+          # TODO: Write test, replace with visibility test
+          if !Enum.member?(activity.data["cc"] || [], public) &&
+               !Enum.member?(
+                 activity.data["to"],
+                 User.get_cached_by_ap_id(activity.data["actor"]).follower_address
+               ),
+             do: Pleroma.Web.Streamer.stream("direct", activity)
         end
-      else
-        if !Enum.member?(activity.data["cc"] || [], public) &&
-             !Enum.member?(
-               activity.data["to"],
-               User.get_by_ap_id(activity.data["actor"]).follower_address
-             ),
-           do: Pleroma.Web.Streamer.stream("direct", activity)
       end
     end
   end
@@ -219,6 +246,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
          {:ok, activity} <- insert(create_data, local, fake),
          {:fake, false, activity} <- {:fake, fake, activity},
          _ <- increase_replies_count_if_reply(create_data),
+         _ <- increase_poll_votes_if_vote(create_data),
          # Changing note count prior to enqueuing federation task in order to avoid
          # race conditions on updating user.info
          {:ok, _actor} <- increase_note_count_if_public(actor, activity),
@@ -383,16 +411,12 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
   end
 
   def block(blocker, blocked, activity_id \\ nil, local \\ true) do
-    ap_config = Application.get_env(:pleroma, :activitypub)
-    unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
-    outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
+    outgoing_blocks = Pleroma.Config.get([:activitypub, :outgoing_blocks])
+    unfollow_blocked = Pleroma.Config.get([:activitypub, :unfollow_blocked])
 
-    with true <- unfollow_blocked do
+    if unfollow_blocked do
       follow_activity = fetch_latest_follow(blocker, blocked)
-
-      if follow_activity do
-        unfollow(blocker, blocked, nil, local)
-      end
+      if follow_activity, do: unfollow(blocker, blocked, nil, local)
     end
 
     with true <- outgoing_blocks,
@@ -457,35 +481,45 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     end
   end
 
-  def fetch_activities_for_context(context, opts \\ %{}) do
+  defp fetch_activities_for_context_query(context, opts) do
     public = ["https://www.w3.org/ns/activitystreams#Public"]
 
     recipients =
       if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
 
-    query = from(activity in Activity)
-
-    query =
-      query
-      |> restrict_blocked(opts)
-      |> restrict_recipients(recipients, opts["user"])
-
-    query =
-      from(
-        activity in query,
-        where:
-          fragment(
-            "?->>'type' = ? and ?->>'context' = ?",
-            activity.data,
-            "Create",
-            activity.data,
-            ^context
-          ),
-        order_by: [desc: :id]
+    from(activity in Activity)
+    |> maybe_preload_objects(opts)
+    |> restrict_blocked(opts)
+    |> restrict_recipients(recipients, opts["user"])
+    |> where(
+      [activity],
+      fragment(
+        "?->>'type' = ? and ?->>'context' = ?",
+        activity.data,
+        "Create",
+        activity.data,
+        ^context
       )
-      |> Activity.with_preloaded_object()
+    )
+    |> exclude_poll_votes(opts)
+    |> order_by([activity], desc: activity.id)
+  end
+
+  @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
+  def fetch_activities_for_context(context, opts \\ %{}) do
+    context
+    |> fetch_activities_for_context_query(opts)
+    |> Repo.all()
+  end
 
-    Repo.all(query)
+  @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
+          Pleroma.FlakeId.t() | nil
+  def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
+    context
+    |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
+    |> limit(1)
+    |> select([a], a.id)
+    |> Repo.one()
   end
 
   def fetch_public_activities(opts \\ %{}) do
@@ -515,8 +549,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
             )
         )
 
-      Ecto.Adapters.SQL.to_sql(:all, Repo, query)
-
       query
     else
       Logger.error("Could not restrict visibility to #{visibility}")
@@ -532,8 +564,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
           fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
       )
 
-    Ecto.Adapters.SQL.to_sql(:all, Repo, query)
-
     query
   end
 
@@ -544,6 +574,18 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
 
   defp restrict_visibility(query, _visibility), do: query
 
+  defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}) do
+    query =
+      from(
+        a in query,
+        where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
+      )
+
+    query
+  end
+
+  defp restrict_thread_visibility(query, _), do: query
+
   def fetch_user_activities(user, reading_user, params \\ %{}) do
     params =
       params
@@ -572,56 +614,54 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
 
   defp restrict_since(query, _), do: query
 
+  defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
+    raise "Can't use the child object without preloading!"
+  end
+
   defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
        when is_list(tag_reject) and tag_reject != [] do
     from(
-      activity in query,
-      where: fragment(~s(\(not \(? #> '{"object","tag"}'\) \\?| ?\)), activity.data, ^tag_reject)
+      [_activity, object] in query,
+      where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
     )
   end
 
   defp restrict_tag_reject(query, _), do: query
 
+  defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
+    raise "Can't use the child object without preloading!"
+  end
+
   defp restrict_tag_all(query, %{"tag_all" => tag_all})
        when is_list(tag_all) and tag_all != [] do
     from(
-      activity in query,
-      where: fragment(~s(\(? #> '{"object","tag"}'\) \\?& ?), activity.data, ^tag_all)
+      [_activity, object] in query,
+      where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
     )
   end
 
   defp restrict_tag_all(query, _), do: query
 
+  defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
+    raise "Can't use the child object without preloading!"
+  end
+
   defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
     from(
-      activity in query,
-      where: fragment(~s(\(? #> '{"object","tag"}'\) \\?| ?), activity.data, ^tag)
+      [_activity, object] in query,
+      where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
     )
   end
 
   defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
     from(
-      activity in query,
-      where: fragment(~s(? <@ (? #> '{"object","tag"}'\)), ^tag, activity.data)
+      [_activity, object] in query,
+      where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
     )
   end
 
   defp restrict_tag(query, _), do: query
 
-  defp restrict_to_cc(query, recipients_to, recipients_cc) do
-    from(
-      activity in query,
-      where:
-        fragment(
-          "(?->'to' \\?| ?) or (?->'cc' \\?| ?)",
-          activity.data,
-          ^recipients_to,
-          activity.data,
-          ^recipients_cc
-        )
-    )
-  end
-
   defp restrict_recipients(query, [], _user), do: query
 
   defp restrict_recipients(query, recipients, nil) do
@@ -658,6 +698,12 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
 
   defp restrict_type(query, _), do: query
 
+  defp restrict_state(query, %{"state" => state}) do
+    from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
+  end
+
+  defp restrict_state(query, _), do: query
+
   defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
     from(
       activity in query,
@@ -667,10 +713,14 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
 
   defp restrict_favorited_by(query, _), do: query
 
+  defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
+    raise "Can't use the child object without preloading!"
+  end
+
   defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
     from(
-      activity in query,
-      where: fragment(~s(not (? #> '{"object","attachment"}' = ?\)), activity.data, ^[])
+      [_activity, object] in query,
+      where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
     )
   end
 
@@ -709,8 +759,11 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     blocks = info.blocks || []
     domain_blocks = info.domain_blocks || []
 
+    query =
+      if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
+
     from(
-      activity in query,
+      [activity, object: o] in query,
       where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
       where: fragment("not (? && ?)", activity.recipients, ^blocks),
       where:
@@ -720,7 +773,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
           activity.data,
           ^blocks
         ),
-      where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks)
+      where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
+      where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
     )
   end
 
@@ -761,6 +815,18 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
 
   defp restrict_muted_reblogs(query, _), do: query
 
+  defp exclude_poll_votes(query, %{"include_poll_votes" => "true"}), do: query
+
+  defp exclude_poll_votes(query, _) do
+    if has_named_binding?(query, :object) do
+      from([activity, object: o] in query,
+        where: fragment("not(?->>'type' = ?)", o.data, "Answer")
+      )
+    else
+      query
+    end
+  end
+
   defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
 
   defp maybe_preload_objects(query, _) do
@@ -768,11 +834,40 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     |> Activity.with_preloaded_object()
   end
 
+  defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
+
+  defp maybe_preload_bookmarks(query, opts) do
+    query
+    |> Activity.with_preloaded_bookmark(opts["user"])
+  end
+
+  defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
+
+  defp maybe_set_thread_muted_field(query, opts) do
+    query
+    |> Activity.with_set_thread_muted_field(opts["user"])
+  end
+
+  defp maybe_order(query, %{order: :desc}) do
+    query
+    |> order_by(desc: :id)
+  end
+
+  defp maybe_order(query, %{order: :asc}) do
+    query
+    |> order_by(asc: :id)
+  end
+
+  defp maybe_order(query, _), do: query
+
   def fetch_activities_query(recipients, opts \\ %{}) do
     base_query = from(activity in Activity)
 
     base_query
     |> maybe_preload_objects(opts)
+    |> maybe_preload_bookmarks(opts)
+    |> maybe_set_thread_muted_field(opts)
+    |> maybe_order(opts)
     |> restrict_recipients(recipients, opts["user"])
     |> restrict_tag(opts)
     |> restrict_tag_reject(opts)
@@ -781,15 +876,19 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     |> restrict_local(opts)
     |> restrict_actor(opts)
     |> restrict_type(opts)
+    |> restrict_state(opts)
     |> restrict_favorited_by(opts)
     |> restrict_blocked(opts)
     |> restrict_muted(opts)
     |> restrict_media(opts)
     |> restrict_visibility(opts)
+    |> restrict_thread_visibility(opts)
     |> restrict_replies(opts)
     |> restrict_reblogs(opts)
     |> restrict_pinned(opts)
     |> restrict_muted_reblogs(opts)
+    |> Activity.restrict_deactivated_users()
+    |> exclude_poll_votes(opts)
   end
 
   def fetch_activities(recipients, opts \\ %{}) do
@@ -798,9 +897,18 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     |> Enum.reverse()
   end
 
-  def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do
+  def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
+    from(activity in query,
+      where:
+        fragment("? && ?", activity.recipients, ^recipients) or
+          (fragment("? && ?", activity.recipients, ^recipients_with_public) and
+             "https://www.w3.org/ns/activitystreams#Public" in activity.recipients)
+    )
+  end
+
+  def fetch_activities_bounded(recipients, recipients_with_public, opts \\ %{}) do
     fetch_activities_query([], opts)
-    |> restrict_to_cc(recipients_to, recipients_cc)
+    |> fetch_activities_bounded_query(recipients, recipients_with_public)
     |> Pagination.fetch_paginated(opts)
     |> Enum.reverse()
   end
@@ -818,7 +926,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     end
   end
 
-  def user_data_from_user_object(data) do
+  defp object_to_user_data(data) do
     avatar =
       data["icon"]["url"] &&
         %{
@@ -865,16 +973,26 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     {:ok, user_data}
   end
 
+  def user_data_from_user_object(data) do
+    with {:ok, data} <- MRF.filter(data),
+         {:ok, data} <- object_to_user_data(data) do
+      {:ok, data}
+    else
+      e -> {:error, e}
+    end
+  end
+
   def fetch_and_prepare_user_from_ap_id(ap_id) do
-    with {:ok, data} <- fetch_and_contain_remote_object_from_id(ap_id) do
-      user_data_from_user_object(data)
+    with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
+         {:ok, data} <- user_data_from_user_object(data) do
+      {:ok, data}
     else
       e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
     end
   end
 
   def make_user_from_ap_id(ap_id) do
-    if _user = User.get_by_ap_id(ap_id) do
+    if _user = User.get_cached_by_ap_id(ap_id) do
       Transmogrifier.upgrade_user_from_ap_id(ap_id)
     else
       with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
@@ -893,143 +1011,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     end
   end
 
-  def should_federate?(inbox, public) do
-    if public do
-      true
-    else
-      inbox_info = URI.parse(inbox)
-      !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
-    end
-  end
-
-  def publish(actor, activity) do
-    remote_followers =
-      if actor.follower_address in activity.recipients do
-        {:ok, followers} = User.get_followers(actor)
-        followers |> Enum.filter(&(!&1.local))
-      else
-        []
-      end
-
-    public = is_public?(activity)
-
-    {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
-    json = Jason.encode!(data)
-
-    (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
-    |> Enum.filter(fn user -> User.ap_enabled?(user) end)
-    |> Enum.map(fn %{info: %{source_data: data}} ->
-      (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
-    end)
-    |> Enum.uniq()
-    |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
-    |> Instances.filter_reachable()
-    |> Enum.each(fn {inbox, unreachable_since} ->
-      Federator.publish_single_ap(%{
-        inbox: inbox,
-        json: json,
-        actor: actor,
-        id: activity.data["id"],
-        unreachable_since: unreachable_since
-      })
-    end)
-  end
-
-  def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
-    Logger.info("Federating #{id} to #{inbox}")
-    host = URI.parse(inbox).host
-
-    digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
-
-    date =
-      NaiveDateTime.utc_now()
-      |> Timex.format!("{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT")
-
-    signature =
-      Pleroma.Web.HTTPSignatures.sign(actor, %{
-        host: host,
-        "content-length": byte_size(json),
-        digest: digest,
-        date: date
-      })
-
-    with {:ok, %{status: code}} when code in 200..299 <-
-           result =
-             @httpoison.post(
-               inbox,
-               json,
-               [
-                 {"Content-Type", "application/activity+json"},
-                 {"Date", date},
-                 {"signature", signature},
-                 {"digest", digest}
-               ]
-             ) do
-      if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
-        do: Instances.set_reachable(inbox)
-
-      result
-    else
-      {_post_result, response} ->
-        unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
-        {:error, response}
-    end
-  end
-
-  # TODO:
-  # This will create a Create activity, which we need internally at the moment.
-  def fetch_object_from_id(id) do
-    if object = Object.get_cached_by_ap_id(id) do
-      {:ok, object}
-    else
-      with {:ok, data} <- fetch_and_contain_remote_object_from_id(id),
-           nil <- Object.normalize(data),
-           params <- %{
-             "type" => "Create",
-             "to" => data["to"],
-             "cc" => data["cc"],
-             "actor" => data["actor"] || data["attributedTo"],
-             "object" => data
-           },
-           :ok <- Transmogrifier.contain_origin(id, params),
-           {:ok, activity} <- Transmogrifier.handle_incoming(params) do
-        {:ok, Object.normalize(activity)}
-      else
-        {:error, {:reject, nil}} ->
-          {:reject, nil}
-
-        object = %Object{} ->
-          {:ok, object}
-
-        _e ->
-          Logger.info("Couldn't get object via AP, trying out OStatus fetching...")
-
-          case OStatus.fetch_activity_from_url(id) do
-            {:ok, [activity | _]} -> {:ok, Object.normalize(activity)}
-            e -> e
-          end
-      end
-    end
-  end
-
-  def fetch_and_contain_remote_object_from_id(id) do
-    Logger.info("Fetching object #{id} via AP")
-
-    with true <- String.starts_with?(id, "http"),
-         {:ok, %{body: body, status: code}} when code in 200..299 <-
-           @httpoison.get(
-             id,
-             [{:Accept, "application/activity+json"}]
-           ),
-         {:ok, data} <- Jason.decode(body),
-         :ok <- Transmogrifier.contain_origin_from_id(id, data) do
-      {:ok, data}
-    else
-      e ->
-        {:error, e}
-    end
-  end
-
   # filter out broken threads
   def contain_broken_threads(%Activity{} = activity, %User{} = user) do
     entire_thread_visible_for_user?(activity, user)
@@ -1040,11 +1021,10 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     contain_broken_threads(activity, user)
   end
 
-  # do post-processing on a timeline
-  def contain_timeline(timeline, user) do
-    timeline
-    |> Enum.filter(fn activity ->
-      contain_activity(activity, user)
-    end)
+  def fetch_direct_messages_query do
+    Activity
+    |> restrict_type(%{"type" => "Create"})
+    |> restrict_visibility(%{visibility: "direct"})
+    |> order_by([activity], asc: activity.id)
   end
 end