activities generation tasks
authorAlexander Strizhakov <alex.strizhakov@gmail.com>
Tue, 14 Jan 2020 11:42:30 +0000 (14:42 +0300)
committerAlexander Strizhakov <alex.strizhakov@gmail.com>
Mon, 30 Mar 2020 08:42:25 +0000 (11:42 +0300)
benchmarks/load_testing/activities.ex [new file with mode: 0644]
benchmarks/load_testing/fetcher.ex
benchmarks/load_testing/generator.ex [deleted file]
benchmarks/load_testing/helper.ex
benchmarks/load_testing/users.ex [new file with mode: 0644]
benchmarks/mix/tasks/pleroma/load_testing.ex
config/benchmark.exs
lib/pleroma/application.ex

diff --git a/benchmarks/load_testing/activities.ex b/benchmarks/load_testing/activities.ex
new file mode 100644 (file)
index 0000000..db0e5a6
--- /dev/null
@@ -0,0 +1,515 @@
+defmodule Pleroma.LoadTesting.Activities do
+  @moduledoc """
+  Module for generating different activities.
+  """
+  import Ecto.Query
+  import Pleroma.LoadTesting.Helper, only: [to_sec: 1]
+
+  alias Ecto.UUID
+  alias Pleroma.Constants
+  alias Pleroma.LoadTesting.Users
+  alias Pleroma.Repo
+  alias Pleroma.Web.CommonAPI
+
+  require Constants
+
+  @defaults [
+    iterations: 170,
+    friends_used: 20,
+    non_friends_used: 20
+  ]
+
+  @max_concurrency 30
+
+  @visibility ~w(public private direct unlisted)
+  @types ~w(simple emoji mentions hell_thread attachment tag like reblog simple_thread remote)
+  @groups ~w(user friends non_friends)
+
+  @spec generate(User.t(), keyword()) :: :ok
+  def generate(user, opts \\ []) do
+    {:ok, _} =
+      Agent.start_link(fn -> %{} end,
+        name: :benchmark_state
+      )
+
+    opts = Keyword.merge(@defaults, opts)
+
+    friends =
+      user
+      |> Users.get_users(limit: opts[:friends_used], local: :local, friends?: true)
+      |> Enum.shuffle()
+
+    non_friends =
+      user
+      |> Users.get_users(limit: opts[:non_friends_used], local: :local, friends?: false)
+      |> Enum.shuffle()
+
+    task_data =
+      for visibility <- @visibility,
+          type <- @types,
+          group <- @groups,
+          do: {visibility, type, group}
+
+    IO.puts("Starting generating #{opts[:iterations]} iterations of activities...")
+
+    friends_thread = Enum.take(friends, 5)
+    non_friends_thread = Enum.take(friends, 5)
+
+    public_long_thread = fn ->
+      generate_long_thread("public", user, friends_thread, non_friends_thread, opts)
+    end
+
+    private_long_thread = fn ->
+      generate_long_thread("private", user, friends_thread, non_friends_thread, opts)
+    end
+
+    iterations = opts[:iterations]
+
+    {time, _} =
+      :timer.tc(fn ->
+        Enum.each(
+          1..iterations,
+          fn
+            i when i == iterations - 2 ->
+              spawn(public_long_thread)
+              spawn(private_long_thread)
+              generate_activities(user, friends, non_friends, Enum.shuffle(task_data), opts)
+
+            _ ->
+              generate_activities(user, friends, non_friends, Enum.shuffle(task_data), opts)
+          end
+        )
+      end)
+
+    IO.puts("Generating iterations activities take #{to_sec(time)} sec.\n")
+    :ok
+  end
+
+  defp generate_long_thread(visibility, user, friends, non_friends, _opts) do
+    group =
+      if visibility == "public",
+        do: "friends",
+        else: "user"
+
+    tasks = get_reply_tasks(visibility, group) |> Stream.cycle() |> Enum.take(50)
+
+    {:ok, activity} =
+      CommonAPI.post(user, %{
+        "status" => "Start of #{visibility} long thread",
+        "visibility" => visibility
+      })
+
+    Agent.update(:benchmark_state, fn state ->
+      key =
+        if visibility == "public",
+          do: :public_thread,
+          else: :private_thread
+
+      Map.put(state, key, activity)
+    end)
+
+    acc = {activity.id, ["@" <> user.nickname, "reply to long thread"]}
+    insert_replies_for_long_thread(tasks, visibility, user, friends, non_friends, acc)
+    IO.puts("Generating #{visibility} long thread ended\n")
+  end
+
+  defp insert_replies_for_long_thread(tasks, visibility, user, friends, non_friends, acc) do
+    Enum.reduce(tasks, acc, fn
+      "friend", {id, data} ->
+        friend = Enum.random(friends)
+        insert_reply(friend, List.delete(data, "@" <> friend.nickname), id, visibility)
+
+      "non_friend", {id, data} ->
+        non_friend = Enum.random(non_friends)
+        insert_reply(non_friend, List.delete(data, "@" <> non_friend.nickname), id, visibility)
+
+      "user", {id, data} ->
+        insert_reply(user, List.delete(data, "@" <> user.nickname), id, visibility)
+    end)
+  end
+
+  defp generate_activities(user, friends, non_friends, task_data, opts) do
+    Task.async_stream(
+      task_data,
+      fn {visibility, type, group} ->
+        insert_activity(type, visibility, group, user, friends, non_friends, opts)
+      end,
+      max_concurrency: @max_concurrency,
+      timeout: 30_000
+    )
+    |> Stream.run()
+  end
+
+  defp insert_activity("simple", visibility, group, user, friends, non_friends, _opts) do
+    {:ok, _activity} =
+      group
+      |> get_actor(user, friends, non_friends)
+      |> CommonAPI.post(%{"status" => "Simple status", "visibility" => visibility})
+  end
+
+  defp insert_activity("emoji", visibility, group, user, friends, non_friends, _opts) do
+    {:ok, _activity} =
+      group
+      |> get_actor(user, friends, non_friends)
+      |> CommonAPI.post(%{
+        "status" => "Simple status with emoji :firefox:",
+        "visibility" => visibility
+      })
+  end
+
+  defp insert_activity("mentions", visibility, group, user, friends, non_friends, _opts) do
+    user_mentions =
+      get_random_mentions(friends, Enum.random(0..3)) ++
+        get_random_mentions(non_friends, Enum.random(0..3))
+
+    user_mentions =
+      if Enum.random([true, false]),
+        do: ["@" <> user.nickname | user_mentions],
+        else: user_mentions
+
+    {:ok, _activity} =
+      group
+      |> get_actor(user, friends, non_friends)
+      |> CommonAPI.post(%{
+        "status" => Enum.join(user_mentions, ", ") <> " simple status with mentions",
+        "visibility" => visibility
+      })
+  end
+
+  defp insert_activity("hell_thread", visibility, group, user, friends, non_friends, _opts) do
+    mentions =
+      with {:ok, nil} <- Cachex.get(:user_cache, "hell_thread_mentions") do
+        cached =
+          ([user | Enum.take(friends, 10)] ++ Enum.take(non_friends, 10))
+          |> Enum.map(&"@#{&1.nickname}")
+          |> Enum.join(", ")
+
+        Cachex.put(:user_cache, "hell_thread_mentions", cached)
+        cached
+      else
+        {:ok, cached} -> cached
+      end
+
+    {:ok, _activity} =
+      group
+      |> get_actor(user, friends, non_friends)
+      |> CommonAPI.post(%{
+        "status" => mentions <> " hell thread status",
+        "visibility" => visibility
+      })
+  end
+
+  defp insert_activity("attachment", visibility, group, user, friends, non_friends, _opts) do
+    actor = get_actor(group, user, friends, non_friends)
+
+    obj_data = %{
+      "actor" => actor.ap_id,
+      "name" => "4467-11.jpg",
+      "type" => "Document",
+      "url" => [
+        %{
+          "href" =>
+            "#{Pleroma.Web.base_url()}/media/b1b873552422a07bf53af01f3c231c841db4dfc42c35efde681abaf0f2a4eab7.jpg",
+          "mediaType" => "image/jpeg",
+          "type" => "Link"
+        }
+      ]
+    }
+
+    object = Repo.insert!(%Pleroma.Object{data: obj_data})
+
+    {:ok, _activity} =
+      CommonAPI.post(actor, %{
+        "status" => "Post with attachment",
+        "visibility" => visibility,
+        "media_ids" => [object.id]
+      })
+  end
+
+  defp insert_activity("tag", visibility, group, user, friends, non_friends, _opts) do
+    {:ok, _activity} =
+      group
+      |> get_actor(user, friends, non_friends)
+      |> CommonAPI.post(%{"status" => "Status with #tag", "visibility" => visibility})
+  end
+
+  defp insert_activity("like", visibility, group, user, friends, non_friends, opts) do
+    actor = get_actor(group, user, friends, non_friends)
+
+    with activity_id when not is_nil(activity_id) <- get_random_create_activity_id(),
+         {:ok, _activity, _object} <- CommonAPI.favorite(activity_id, actor) do
+      :ok
+    else
+      {:error, _} ->
+        insert_activity("like", visibility, group, user, friends, non_friends, opts)
+
+      nil ->
+        Process.sleep(15)
+        insert_activity("like", visibility, group, user, friends, non_friends, opts)
+    end
+  end
+
+  defp insert_activity("reblog", visibility, group, user, friends, non_friends, opts) do
+    actor = get_actor(group, user, friends, non_friends)
+
+    with activity_id when not is_nil(activity_id) <- get_random_create_activity_id(),
+         {:ok, _activity, _object} <- CommonAPI.repeat(activity_id, actor) do
+      :ok
+    else
+      {:error, _} ->
+        insert_activity("reblog", visibility, group, user, friends, non_friends, opts)
+
+      nil ->
+        Process.sleep(15)
+        insert_activity("reblog", visibility, group, user, friends, non_friends, opts)
+    end
+  end
+
+  defp insert_activity("simple_thread", visibility, group, user, friends, non_friends, _opts)
+       when visibility in ["public", "unlisted", "private"] do
+    actor = get_actor(group, user, friends, non_friends)
+    tasks = get_reply_tasks(visibility, group)
+
+    {:ok, activity} =
+      CommonAPI.post(user, %{"status" => "Simple status", "visibility" => "unlisted"})
+
+    acc = {activity.id, ["@" <> actor.nickname, "reply to status"]}
+    insert_replies(tasks, visibility, user, friends, non_friends, acc)
+  end
+
+  defp insert_activity("simple_thread", "direct", group, user, friends, non_friends, _opts) do
+    actor = get_actor(group, user, friends, non_friends)
+    tasks = get_reply_tasks("direct", group)
+
+    list =
+      case group do
+        "non_friends" ->
+          Enum.take(non_friends, 3)
+
+        _ ->
+          Enum.take(friends, 3)
+      end
+
+    data = Enum.map(list, &("@" <> &1.nickname))
+
+    {:ok, activity} =
+      CommonAPI.post(actor, %{
+        "status" => Enum.join(data, ", ") <> "simple status",
+        "visibility" => "direct"
+      })
+
+    acc = {activity.id, ["@" <> user.nickname | data] ++ ["reply to status"]}
+    insert_direct_replies(tasks, user, list, acc)
+  end
+
+  defp insert_activity("remote", _, "user", _, _, _, _), do: :ok
+
+  defp insert_activity("remote", visibility, group, user, _friends, _non_friends, opts) do
+    remote_friends =
+      Users.get_users(user, limit: opts[:friends_used], local: :external, friends?: true)
+
+    remote_non_friends =
+      Users.get_users(user, limit: opts[:non_friends_used], local: :external, friends?: false)
+
+    actor = get_actor(group, user, remote_friends, remote_non_friends)
+
+    {act_data, obj_data} = prepare_activity_data(actor, visibility, user)
+    {activity_data, object_data} = other_data(actor)
+
+    activity_data
+    |> Map.merge(act_data)
+    |> Map.put("object", Map.merge(object_data, obj_data))
+    |> Pleroma.Web.ActivityPub.ActivityPub.insert(false)
+  end
+
+  defp get_actor("user", user, _friends, _non_friends), do: user
+  defp get_actor("friends", _user, friends, _non_friends), do: Enum.random(friends)
+  defp get_actor("non_friends", _user, _friends, non_friends), do: Enum.random(non_friends)
+
+  defp other_data(actor) do
+    %{host: host} = URI.parse(actor.ap_id)
+    datetime = DateTime.utc_now()
+    context_id = "http://#{host}:4000/contexts/#{UUID.generate()}"
+    activity_id = "http://#{host}:4000/activities/#{UUID.generate()}"
+    object_id = "http://#{host}:4000/objects/#{UUID.generate()}"
+
+    activity_data = %{
+      "actor" => actor.ap_id,
+      "context" => context_id,
+      "id" => activity_id,
+      "published" => datetime,
+      "type" => "Create",
+      "directMessage" => false
+    }
+
+    object_data = %{
+      "actor" => actor.ap_id,
+      "attachment" => [],
+      "attributedTo" => actor.ap_id,
+      "bcc" => [],
+      "bto" => [],
+      "content" => "Remote post",
+      "context" => context_id,
+      "conversation" => context_id,
+      "emoji" => %{},
+      "id" => object_id,
+      "published" => datetime,
+      "sensitive" => false,
+      "summary" => "",
+      "tag" => [],
+      "to" => ["https://www.w3.org/ns/activitystreams#Public"],
+      "type" => "Note"
+    }
+
+    {activity_data, object_data}
+  end
+
+  defp prepare_activity_data(actor, "public", _mention) do
+    obj_data = %{
+      "cc" => [actor.follower_address],
+      "to" => [Constants.as_public()]
+    }
+
+    act_data = %{
+      "cc" => [actor.follower_address],
+      "to" => [Constants.as_public()]
+    }
+
+    {act_data, obj_data}
+  end
+
+  defp prepare_activity_data(actor, "private", _mention) do
+    obj_data = %{
+      "cc" => [],
+      "to" => [actor.follower_address]
+    }
+
+    act_data = %{
+      "cc" => [],
+      "to" => [actor.follower_address]
+    }
+
+    {act_data, obj_data}
+  end
+
+  defp prepare_activity_data(actor, "unlisted", _mention) do
+    obj_data = %{
+      "cc" => [Constants.as_public()],
+      "to" => [actor.follower_address]
+    }
+
+    act_data = %{
+      "cc" => [Constants.as_public()],
+      "to" => [actor.follower_address]
+    }
+
+    {act_data, obj_data}
+  end
+
+  defp prepare_activity_data(_actor, "direct", mention) do
+    %{host: mentioned_host} = URI.parse(mention.ap_id)
+
+    obj_data = %{
+      "cc" => [],
+      "content" =>
+        "<span class=\"h-card\"><a class=\"u-url mention\" href=\"#{mention.ap_id}\" rel=\"ugc\">@<span>#{
+          mention.nickname
+        }</span></a></span> direct message",
+      "tag" => [
+        %{
+          "href" => mention.ap_id,
+          "name" => "@#{mention.nickname}@#{mentioned_host}",
+          "type" => "Mention"
+        }
+      ],
+      "to" => [mention.ap_id]
+    }
+
+    act_data = %{
+      "cc" => [],
+      "directMessage" => true,
+      "to" => [mention.ap_id]
+    }
+
+    {act_data, obj_data}
+  end
+
+  defp get_reply_tasks("public", "user"), do: ~w(friend non_friend user)
+  defp get_reply_tasks("public", "friends"), do: ~w(non_friend user friend)
+  defp get_reply_tasks("public", "non_friends"), do: ~w(user friend non_friend)
+
+  defp get_reply_tasks(visibility, "user") when visibility in ["unlisted", "private"],
+    do: ~w(friend user friend)
+
+  defp get_reply_tasks(visibility, "friends") when visibility in ["unlisted", "private"],
+    do: ~w(user friend user)
+
+  defp get_reply_tasks(visibility, "non_friends") when visibility in ["unlisted", "private"],
+    do: []
+
+  defp get_reply_tasks("direct", "user"), do: ~w(friend user friend)
+  defp get_reply_tasks("direct", "friends"), do: ~w(user friend user)
+  defp get_reply_tasks("direct", "non_friends"), do: ~w(user non_friend user)
+
+  defp insert_replies(tasks, visibility, user, friends, non_friends, acc) do
+    Enum.reduce(tasks, acc, fn
+      "friend", {id, data} ->
+        friend = Enum.random(friends)
+        insert_reply(friend, data, id, visibility)
+
+      "non_friend", {id, data} ->
+        non_friend = Enum.random(non_friends)
+        insert_reply(non_friend, data, id, visibility)
+
+      "user", {id, data} ->
+        insert_reply(user, data, id, visibility)
+    end)
+  end
+
+  defp insert_direct_replies(tasks, user, list, acc) do
+    Enum.reduce(tasks, acc, fn
+      group, {id, data} when group in ["friend", "non_friend"] ->
+        actor = Enum.random(list)
+
+        {reply_id, _} =
+          insert_reply(actor, List.delete(data, "@" <> actor.nickname), id, "direct")
+
+        {reply_id, data}
+
+      "user", {id, data} ->
+        {reply_id, _} = insert_reply(user, List.delete(data, "@" <> user.nickname), id, "direct")
+        {reply_id, data}
+    end)
+  end
+
+  defp insert_reply(actor, data, activity_id, visibility) do
+    {:ok, reply} =
+      CommonAPI.post(actor, %{
+        "status" => Enum.join(data, ", "),
+        "visibility" => visibility,
+        "in_reply_to_status_id" => activity_id
+      })
+
+    {reply.id, ["@" <> actor.nickname | data]}
+  end
+
+  defp get_random_mentions(_users, count) when count == 0, do: []
+
+  defp get_random_mentions(users, count) do
+    users
+    |> Enum.shuffle()
+    |> Enum.take(count)
+    |> Enum.map(&"@#{&1.nickname}")
+  end
+
+  defp get_random_create_activity_id do
+    Repo.one(
+      from(a in Pleroma.Activity,
+        where: fragment("(?)->>'type' = ?", a.data, ^"Create"),
+        order_by: fragment("RANDOM()"),
+        limit: 1,
+        select: a.id
+      )
+    )
+  end
+end
index a45a71d4a4b706fc80e6fd0bcc2a5e0c0b6a482d..bd65ac84fcbc548ac8a880da07222ec040d10091 100644 (file)
 defmodule Pleroma.LoadTesting.Fetcher do
-  use Pleroma.LoadTesting.Helper
-
-  def fetch_user(user) do
-    Benchee.run(%{
-      "By id" => fn -> Repo.get_by(User, id: user.id) end,
-      "By ap_id" => fn -> Repo.get_by(User, ap_id: user.ap_id) end,
-      "By email" => fn -> Repo.get_by(User, email: user.email) end,
-      "By nickname" => fn -> Repo.get_by(User, nickname: user.nickname) end
-    })
+  alias Pleroma.Activity
+  alias Pleroma.Pagination
+  alias Pleroma.Repo
+  alias Pleroma.User
+  alias Pleroma.Web.ActivityPub.ActivityPub
+  alias Pleroma.Web.MastodonAPI.MastodonAPI
+  alias Pleroma.Web.MastodonAPI.StatusView
+
+  @spec run_benchmarks(User.t()) :: any()
+  def run_benchmarks(user) do
+    fetch_user(user)
+    fetch_timelines(user)
+    render_views(user)
   end
 
-  def query_timelines(user) do
-    home_timeline_params = %{
-      "count" => 20,
-      "with_muted" => true,
-      "type" => ["Create", "Announce"],
+  defp formatters do
+    [
+      Benchee.Formatters.Console
+    ]
+  end
+
+  defp fetch_user(user) do
+    Benchee.run(
+      %{
+        "By id" => fn -> Repo.get_by(User, id: user.id) end,
+        "By ap_id" => fn -> Repo.get_by(User, ap_id: user.ap_id) end,
+        "By email" => fn -> Repo.get_by(User, email: user.email) end,
+        "By nickname" => fn -> Repo.get_by(User, nickname: user.nickname) end
+      },
+      formatters: formatters()
+    )
+  end
+
+  defp fetch_timelines(user) do
+    fetch_home_timeline(user)
+    fetch_direct_timeline(user)
+    fetch_public_timeline(user)
+    fetch_public_timeline(user, :local)
+    fetch_public_timeline(user, :tag)
+    fetch_notifications(user)
+    fetch_favourites(user)
+    fetch_long_thread(user)
+  end
+
+  defp render_views(user) do
+    render_timelines(user)
+    render_long_thread(user)
+  end
+
+  defp opts_for_home_timeline(user) do
+    %{
       "blocking_user" => user,
+      "count" => "20",
       "muting_user" => user,
-      "user" => user
+      "type" => ["Create", "Announce"],
+      "user" => user,
+      "with_muted" => "true"
     }
+  end
 
-    mastodon_public_timeline_params = %{
-      "count" => 20,
-      "local_only" => true,
-      "only_media" => "false",
+  defp fetch_home_timeline(user) do
+    opts = opts_for_home_timeline(user)
+
+    recipients = [user.ap_id | User.following(user)]
+
+    first_page_last =
+      ActivityPub.fetch_activities(recipients, opts) |> Enum.reverse() |> List.last()
+
+    second_page_last =
+      ActivityPub.fetch_activities(recipients, Map.put(opts, "max_id", first_page_last.id))
+      |> Enum.reverse()
+      |> List.last()
+
+    third_page_last =
+      ActivityPub.fetch_activities(recipients, Map.put(opts, "max_id", second_page_last.id))
+      |> Enum.reverse()
+      |> List.last()
+
+    forth_page_last =
+      ActivityPub.fetch_activities(recipients, Map.put(opts, "max_id", third_page_last.id))
+      |> Enum.reverse()
+      |> List.last()
+
+    Benchee.run(
+      %{
+        "home timeline" => fn opts -> ActivityPub.fetch_activities(recipients, opts) end
+      },
+      inputs: %{
+        "1 page" => opts,
+        "2 page" => Map.put(opts, "max_id", first_page_last.id),
+        "3 page" => Map.put(opts, "max_id", second_page_last.id),
+        "4 page" => Map.put(opts, "max_id", third_page_last.id),
+        "5 page" => Map.put(opts, "max_id", forth_page_last.id),
+        "1 page only media" => Map.put(opts, "only_media", "true"),
+        "2 page only media" =>
+          Map.put(opts, "max_id", first_page_last.id) |> Map.put("only_media", "true"),
+        "3 page only media" =>
+          Map.put(opts, "max_id", second_page_last.id) |> Map.put("only_media", "true"),
+        "4 page only media" =>
+          Map.put(opts, "max_id", third_page_last.id) |> Map.put("only_media", "true"),
+        "5 page only media" =>
+          Map.put(opts, "max_id", forth_page_last.id) |> Map.put("only_media", "true")
+      },
+      formatters: formatters()
+    )
+  end
+
+  defp opts_for_direct_timeline(user) do
+    %{
+      :visibility => "direct",
+      "blocking_user" => user,
+      "count" => "20",
+      "type" => "Create",
+      "user" => user,
+      "with_muted" => "true"
+    }
+  end
+
+  defp fetch_direct_timeline(user) do
+    recipients = [user.ap_id]
+
+    opts = opts_for_direct_timeline(user)
+
+    first_page_last =
+      recipients
+      |> ActivityPub.fetch_activities_query(opts)
+      |> Pagination.fetch_paginated(opts)
+      |> List.last()
+
+    opts2 = Map.put(opts, "max_id", first_page_last.id)
+
+    second_page_last =
+      recipients
+      |> ActivityPub.fetch_activities_query(opts2)
+      |> Pagination.fetch_paginated(opts2)
+      |> List.last()
+
+    opts3 = Map.put(opts, "max_id", second_page_last.id)
+
+    third_page_last =
+      recipients
+      |> ActivityPub.fetch_activities_query(opts3)
+      |> Pagination.fetch_paginated(opts3)
+      |> List.last()
+
+    opts4 = Map.put(opts, "max_id", third_page_last.id)
+
+    forth_page_last =
+      recipients
+      |> ActivityPub.fetch_activities_query(opts4)
+      |> Pagination.fetch_paginated(opts4)
+      |> List.last()
+
+    Benchee.run(
+      %{
+        "direct timeline" => fn opts ->
+          ActivityPub.fetch_activities_query(recipients, opts) |> Pagination.fetch_paginated(opts)
+        end
+      },
+      inputs: %{
+        "1 page" => opts,
+        "2 page" => opts2,
+        "3 page" => opts3,
+        "4 page" => opts4,
+        "5 page" => Map.put(opts4, "max_id", forth_page_last.id)
+      },
+      formatters: formatters()
+    )
+  end
+
+  defp opts_for_public_timeline(user) do
+    %{
       "type" => ["Create", "Announce"],
-      "with_muted" => "true",
+      "local_only" => false,
       "blocking_user" => user,
       "muting_user" => user
     }
+  end
 
-    mastodon_federated_timeline_params = %{
-      "count" => 20,
-      "only_media" => "false",
+  defp opts_for_public_timeline(user, :local) do
+    %{
       "type" => ["Create", "Announce"],
-      "with_muted" => "true",
+      "local_only" => true,
       "blocking_user" => user,
       "muting_user" => user
     }
+  end
 
-    following = User.following(user)
-
-    Benchee.run(%{
-      "User home timeline" => fn ->
-        Pleroma.Web.ActivityPub.ActivityPub.fetch_activities(
-          following,
-          home_timeline_params
-        )
-      end,
-      "User mastodon public timeline" => fn ->
-        Pleroma.Web.ActivityPub.ActivityPub.fetch_public_activities(
-          mastodon_public_timeline_params
-        )
-      end,
-      "User mastodon federated public timeline" => fn ->
-        Pleroma.Web.ActivityPub.ActivityPub.fetch_public_activities(
-          mastodon_federated_timeline_params
-        )
-      end
-    })
-
-    home_activities =
-      Pleroma.Web.ActivityPub.ActivityPub.fetch_activities(
-        following,
-        home_timeline_params
-      )
+  defp opts_for_public_timeline(user, :tag) do
+    %{
+      "blocking_user" => user,
+      "count" => "20",
+      "local_only" => nil,
+      "muting_user" => user,
+      "tag" => ["tag"],
+      "tag_all" => [],
+      "tag_reject" => [],
+      "type" => "Create",
+      "user" => user,
+      "with_muted" => "true"
+    }
+  end
 
-    public_activities =
-      Pleroma.Web.ActivityPub.ActivityPub.fetch_public_activities(mastodon_public_timeline_params)
+  defp fetch_public_timeline(user) do
+    opts = opts_for_public_timeline(user)
 
-    public_federated_activities =
-      Pleroma.Web.ActivityPub.ActivityPub.fetch_public_activities(
-        mastodon_federated_timeline_params
-      )
+    fetch_public_timeline(opts, "public timeline")
+  end
+
+  defp fetch_public_timeline(user, :local) do
+    opts = opts_for_public_timeline(user, :local)
 
-    Benchee.run(%{
-      "Rendering home timeline" => fn ->
-        Pleroma.Web.MastodonAPI.StatusView.render("index.json", %{
-          activities: home_activities,
-          for: user,
-          as: :activity
-        })
-      end,
-      "Rendering public timeline" => fn ->
-        Pleroma.Web.MastodonAPI.StatusView.render("index.json", %{
-          activities: public_activities,
-          for: user,
-          as: :activity
-        })
-      end,
-      "Rendering public federated timeline" => fn ->
-        Pleroma.Web.MastodonAPI.StatusView.render("index.json", %{
-          activities: public_federated_activities,
-          for: user,
-          as: :activity
-        })
-      end,
-      "Rendering favorites timeline" => fn ->
-        conn = Phoenix.ConnTest.build_conn(:get, "http://localhost:4001/api/v1/favourites", nil)
-        Pleroma.Web.MastodonAPI.StatusController.favourites(
-          %Plug.Conn{conn |
-                     assigns: %{user: user},
-                     query_params:  %{"limit" => "0"},
-                     body_params: %{},
-                     cookies: %{},
-                     params: %{},
-                     path_params: %{},
-                     private: %{
-                       Pleroma.Web.Router => {[], %{}},
-                       phoenix_router: Pleroma.Web.Router,
-                       phoenix_action: :favourites,
-                       phoenix_controller: Pleroma.Web.MastodonAPI.StatusController,
-                       phoenix_endpoint: Pleroma.Web.Endpoint,
-                       phoenix_format: "json",
-                       phoenix_layout: {Pleroma.Web.LayoutView, "app.html"},
-                       phoenix_recycled: true,
-
-                       phoenix_view: Pleroma.Web.MastodonAPI.StatusView,
-                       plug_session: %{"user_id" => user.id},
-                       plug_session_fetch: :done,
-                       plug_session_info: :write,
-                       plug_skip_csrf_protection: true
-                     }
-          },
-          %{})
-      end,
-    })
+    fetch_public_timeline(opts, "public timeline only local")
   end
 
-  def query_notifications(user) do
-    without_muted_params = %{"count" => "20", "with_muted" => "false"}
-    with_muted_params = %{"count" => "20", "with_muted" => "true"}
-
-    Benchee.run(%{
-      "Notifications without muted" => fn ->
-        Pleroma.Web.MastodonAPI.MastodonAPI.get_notifications(user, without_muted_params)
-      end,
-      "Notifications with muted" => fn ->
-        Pleroma.Web.MastodonAPI.MastodonAPI.get_notifications(user, with_muted_params)
-      end
-    })
-
-    without_muted_notifications =
-      Pleroma.Web.MastodonAPI.MastodonAPI.get_notifications(user, without_muted_params)
-
-    with_muted_notifications =
-      Pleroma.Web.MastodonAPI.MastodonAPI.get_notifications(user, with_muted_params)
-
-    Benchee.run(%{
-      "Render notifications without muted" => fn ->
-        Pleroma.Web.MastodonAPI.NotificationView.render("index.json", %{
-          notifications: without_muted_notifications,
-          for: user
-        })
-      end,
-      "Render notifications with muted" => fn ->
-        Pleroma.Web.MastodonAPI.NotificationView.render("index.json", %{
-          notifications: with_muted_notifications,
-          for: user
-        })
-      end
-    })
+  defp fetch_public_timeline(user, :tag) do
+    opts = opts_for_public_timeline(user, :tag)
+
+    fetch_public_timeline(opts, "hashtag timeline")
   end
 
-  def query_dms(user) do
-    params = %{
-      "count" => "20",
-      "with_muted" => "true",
-      "type" => "Create",
+  defp fetch_public_timeline(user, :only_media) do
+    opts = opts_for_public_timeline(user) |> Map.put("only_media", "true")
+
+    fetch_public_timeline(opts, "public timeline only media")
+  end
+
+  defp fetch_public_timeline(opts, title) when is_binary(title) do
+    first_page_last = ActivityPub.fetch_public_activities(opts) |> List.last()
+
+    second_page_last =
+      ActivityPub.fetch_public_activities(Map.put(opts, "max_id", first_page_last.id))
+      |> List.last()
+
+    third_page_last =
+      ActivityPub.fetch_public_activities(Map.put(opts, "max_id", second_page_last.id))
+      |> List.last()
+
+    forth_page_last =
+      ActivityPub.fetch_public_activities(Map.put(opts, "max_id", third_page_last.id))
+      |> List.last()
+
+    Benchee.run(
+      %{
+        title => fn opts ->
+          ActivityPub.fetch_public_activities(opts)
+        end
+      },
+      inputs: %{
+        "1 page" => opts,
+        "2 page" => Map.put(opts, "max_id", first_page_last.id),
+        "3 page" => Map.put(opts, "max_id", second_page_last.id),
+        "4 page" => Map.put(opts, "max_id", third_page_last.id),
+        "5 page" => Map.put(opts, "max_id", forth_page_last.id)
+      },
+      formatters: formatters()
+    )
+  end
+
+  defp opts_for_notifications do
+    %{"count" => "20", "with_muted" => "true"}
+  end
+
+  defp fetch_notifications(user) do
+    opts = opts_for_notifications()
+
+    first_page_last = MastodonAPI.get_notifications(user, opts) |> List.last()
+
+    second_page_last =
+      MastodonAPI.get_notifications(user, Map.put(opts, "max_id", first_page_last.id))
+      |> List.last()
+
+    third_page_last =
+      MastodonAPI.get_notifications(user, Map.put(opts, "max_id", second_page_last.id))
+      |> List.last()
+
+    forth_page_last =
+      MastodonAPI.get_notifications(user, Map.put(opts, "max_id", third_page_last.id))
+      |> List.last()
+
+    Benchee.run(
+      %{
+        "Notifications" => fn opts ->
+          MastodonAPI.get_notifications(user, opts)
+        end
+      },
+      inputs: %{
+        "1 page" => opts,
+        "2 page" => Map.put(opts, "max_id", first_page_last.id),
+        "3 page" => Map.put(opts, "max_id", second_page_last.id),
+        "4 page" => Map.put(opts, "max_id", third_page_last.id),
+        "5 page" => Map.put(opts, "max_id", forth_page_last.id)
+      },
+      formatters: formatters()
+    )
+  end
+
+  defp fetch_favourites(user) do
+    first_page_last = ActivityPub.fetch_favourites(user) |> List.last()
+
+    second_page_last =
+      ActivityPub.fetch_favourites(user, %{"max_id" => first_page_last.id}) |> List.last()
+
+    third_page_last =
+      ActivityPub.fetch_favourites(user, %{"max_id" => second_page_last.id}) |> List.last()
+
+    forth_page_last =
+      ActivityPub.fetch_favourites(user, %{"max_id" => third_page_last.id}) |> List.last()
+
+    Benchee.run(
+      %{
+        "Favourites" => fn opts ->
+          ActivityPub.fetch_favourites(user, opts)
+        end
+      },
+      inputs: %{
+        "1 page" => %{},
+        "2 page" => %{"max_id" => first_page_last.id},
+        "3 page" => %{"max_id" => second_page_last.id},
+        "4 page" => %{"max_id" => third_page_last.id},
+        "5 page" => %{"max_id" => forth_page_last.id}
+      },
+      formatters: formatters()
+    )
+  end
+
+  defp opts_for_long_thread(user) do
+    %{
       "blocking_user" => user,
-      "user" => user,
-      visibility: "direct"
+      "user" => user
     }
+  end
+
+  defp fetch_long_thread(user) do
+    %{public_thread: public, private_thread: private} =
+      Agent.get(:benchmark_state, fn state -> state end)
+
+    opts = opts_for_long_thread(user)
+
+    private_input = {private.data["context"], Map.put(opts, "exclude_id", private.id)}
 
-    Benchee.run(%{
-      "Direct messages with muted" => fn ->
-        Pleroma.Web.ActivityPub.ActivityPub.fetch_activities_query([user.ap_id], params)
-        |> Pleroma.Pagination.fetch_paginated(params)
-      end,
-      "Direct messages without muted" => fn ->
-        Pleroma.Web.ActivityPub.ActivityPub.fetch_activities_query([user.ap_id], params)
-        |> Pleroma.Pagination.fetch_paginated(Map.put(params, "with_muted", false))
-      end
-    })
-
-    dms_with_muted =
-      Pleroma.Web.ActivityPub.ActivityPub.fetch_activities_query([user.ap_id], params)
-      |> Pleroma.Pagination.fetch_paginated(params)
-
-    dms_without_muted =
-      Pleroma.Web.ActivityPub.ActivityPub.fetch_activities_query([user.ap_id], params)
-      |> Pleroma.Pagination.fetch_paginated(Map.put(params, "with_muted", false))
-
-    Benchee.run(%{
-      "Rendering dms with muted" => fn ->
-        Pleroma.Web.MastodonAPI.StatusView.render("index.json", %{
-          activities: dms_with_muted,
-          for: user,
-          as: :activity
-        })
-      end,
-      "Rendering dms without muted" => fn ->
-        Pleroma.Web.MastodonAPI.StatusView.render("index.json", %{
-          activities: dms_without_muted,
-          for: user,
-          as: :activity
-        })
-      end
-    })
+    public_input = {public.data["context"], Map.put(opts, "exclude_id", public.id)}
+
+    Benchee.run(
+      %{
+        "fetch context" => fn {context, opts} ->
+          ActivityPub.fetch_activities_for_context(context, opts)
+        end
+      },
+      inputs: %{
+        "Private long thread" => private_input,
+        "Public long thread" => public_input
+      },
+      formatters: formatters()
+    )
   end
 
-  def query_long_thread(user, activity) do
-    Benchee.run(%{
-      "Fetch main post" => fn ->
-        Pleroma.Activity.get_by_id_with_object(activity.id)
-      end,
-      "Fetch context of main post" => fn ->
-        Pleroma.Web.ActivityPub.ActivityPub.fetch_activities_for_context(
-          activity.data["context"],
-          %{
-            "blocking_user" => user,
-            "user" => user,
-            "exclude_id" => activity.id
-          }
-        )
-      end
-    })
-
-    activity = Pleroma.Activity.get_by_id_with_object(activity.id)
-
-    context =
-      Pleroma.Web.ActivityPub.ActivityPub.fetch_activities_for_context(
-        activity.data["context"],
-        %{
-          "blocking_user" => user,
-          "user" => user,
-          "exclude_id" => activity.id
-        }
+  defp render_timelines(user) do
+    opts = opts_for_home_timeline(user)
+
+    recipients = [user.ap_id | User.following(user)]
+
+    home_activities = ActivityPub.fetch_activities(recipients, opts) |> Enum.reverse()
+
+    recipients = [user.ap_id]
+
+    opts = opts_for_direct_timeline(user)
+
+    direct_activities =
+      recipients
+      |> ActivityPub.fetch_activities_query(opts)
+      |> Pagination.fetch_paginated(opts)
+
+    opts = opts_for_public_timeline(user)
+
+    public_activities = ActivityPub.fetch_public_activities(opts)
+
+    opts = opts_for_public_timeline(user, :tag)
+
+    tag_activities = ActivityPub.fetch_public_activities(opts)
+
+    opts = opts_for_notifications()
+
+    notifications = MastodonAPI.get_notifications(user, opts)
+
+    favourites = ActivityPub.fetch_favourites(user)
+
+    Benchee.run(
+      %{
+        "Rendering home timeline" => fn ->
+          StatusView.render("index.json", %{
+            activities: home_activities,
+            for: user,
+            as: :activity
+          })
+        end,
+        "Rendering direct timeline" => fn ->
+          StatusView.render("index.json", %{
+            activities: direct_activities,
+            for: user,
+            as: :activity
+          })
+        end,
+        "Rendering public timeline" => fn ->
+          StatusView.render("index.json", %{
+            activities: public_activities,
+            for: user,
+            as: :activity
+          })
+        end,
+        "Rendering tag timeline" => fn ->
+          StatusView.render("index.json", %{
+            activities: tag_activities,
+            for: user,
+            as: :activity
+          })
+        end,
+        "Rendering notifications" => fn ->
+          Pleroma.Web.MastodonAPI.NotificationView.render("index.json", %{
+            notifications: notifications,
+            for: user
+          })
+        end,
+        "Rendering favourites timeline" => fn ->
+          StatusView.render("index.json", %{
+            activities: favourites,
+            for: user,
+            as: :activity
+          })
+        end
+      },
+      formatters: formatters()
+    )
+  end
+
+  defp render_long_thread(user) do
+    %{public_thread: public, private_thread: private} =
+      Agent.get(:benchmark_state, fn state -> state end)
+
+    opts = %{for: user}
+    public_activity = Activity.get_by_id_with_object(public.id)
+    private_activity = Activity.get_by_id_with_object(private.id)
+
+    Benchee.run(
+      %{
+        "render" => fn opts ->
+          StatusView.render("show.json", opts)
+        end
+      },
+      inputs: %{
+        "Public root" => Map.put(opts, :activity, public_activity),
+        "Private root" => Map.put(opts, :activity, private_activity)
+      },
+      formatters: formatters()
+    )
+
+    fetch_opts = opts_for_long_thread(user)
+
+    public_context =
+      ActivityPub.fetch_activities_for_context(
+        public.data["context"],
+        Map.put(fetch_opts, "exclude_id", public.id)
       )
 
-    Benchee.run(%{
-      "Render status" => fn ->
-        Pleroma.Web.MastodonAPI.StatusView.render("show.json", %{
-          activity: activity,
-          for: user
-        })
-      end,
-      "Render context" => fn ->
-        Pleroma.Web.MastodonAPI.StatusView.render(
-          "index.json",
-          for: user,
-          activities: context,
-          as: :activity
-        )
-        |> Enum.reverse()
-      end
-    })
+    private_context =
+      ActivityPub.fetch_activities_for_context(
+        private.data["context"],
+        Map.put(fetch_opts, "exclude_id", private.id)
+      )
+
+    Benchee.run(
+      %{
+        "render" => fn opts ->
+          StatusView.render("context.json", opts)
+        end
+      },
+      inputs: %{
+        "Public context" => %{user: user, activity: public_activity, activities: public_context},
+        "Private context" => %{
+          user: user,
+          activity: private_activity,
+          activities: private_context
+        }
+      },
+      formatters: formatters()
+    )
   end
 end
diff --git a/benchmarks/load_testing/generator.ex b/benchmarks/load_testing/generator.ex
deleted file mode 100644 (file)
index e467375..0000000
+++ /dev/null
@@ -1,410 +0,0 @@
-defmodule Pleroma.LoadTesting.Generator do
-  use Pleroma.LoadTesting.Helper
-  alias Pleroma.Web.CommonAPI
-
-  def generate_like_activities(user, posts) do
-    count_likes = Kernel.trunc(length(posts) / 4)
-    IO.puts("Starting generating #{count_likes} like activities...")
-
-    {time, _} =
-      :timer.tc(fn ->
-        Task.async_stream(
-          Enum.take_random(posts, count_likes),
-          fn post -> {:ok, _, _} = CommonAPI.favorite(post.id, user) end,
-          max_concurrency: 10,
-          timeout: 30_000
-        )
-        |> Stream.run()
-      end)
-
-    IO.puts("Inserting like activities take #{to_sec(time)} sec.\n")
-  end
-
-  def generate_users(opts) do
-    IO.puts("Starting generating #{opts[:users_max]} users...")
-    {time, users} = :timer.tc(fn -> do_generate_users(opts) end)
-
-    IO.puts("Inserting users took #{to_sec(time)} sec.\n")
-    users
-  end
-
-  defp do_generate_users(opts) do
-    max = Keyword.get(opts, :users_max)
-
-    Task.async_stream(
-      1..max,
-      &generate_user_data(&1),
-      max_concurrency: 10,
-      timeout: 30_000
-    )
-    |> Enum.to_list()
-  end
-
-  defp generate_user_data(i) do
-    remote = Enum.random([true, false])
-
-    user = %User{
-      name: "Test ใƒ†ใ‚นใƒˆ User #{i}",
-      email: "user#{i}@example.com",
-      nickname: "nick#{i}",
-      password_hash:
-        "$pbkdf2-sha512$160000$bU.OSFI7H/yqWb5DPEqyjw$uKp/2rmXw12QqnRRTqTtuk2DTwZfF8VR4MYW2xMeIlqPR/UX1nT1CEKVUx2CowFMZ5JON8aDvURrZpJjSgqXrg",
-      bio: "Tester Number #{i}",
-      local: remote
-    }
-
-    user_urls =
-      if remote do
-        base_url =
-          Enum.random(["https://domain1.com", "https://domain2.com", "https://domain3.com"])
-
-        ap_id = "#{base_url}/users/#{user.nickname}"
-
-        %{
-          ap_id: ap_id,
-          follower_address: ap_id <> "/followers",
-          following_address: ap_id <> "/following"
-        }
-      else
-        %{
-          ap_id: User.ap_id(user),
-          follower_address: User.ap_followers(user),
-          following_address: User.ap_following(user)
-        }
-      end
-
-    user = Map.merge(user, user_urls)
-
-    Repo.insert!(user)
-  end
-
-  def generate_activities(user, users) do
-    do_generate_activities(user, users)
-  end
-
-  defp do_generate_activities(user, users) do
-    IO.puts("Starting generating 20000 common activities...")
-
-    {time, _} =
-      :timer.tc(fn ->
-        Task.async_stream(
-          1..20_000,
-          fn _ ->
-            do_generate_activity([user | users])
-          end,
-          max_concurrency: 10,
-          timeout: 30_000
-        )
-        |> Stream.run()
-      end)
-
-    IO.puts("Inserting common activities take #{to_sec(time)} sec.\n")
-
-    IO.puts("Starting generating 20000 activities with mentions...")
-
-    {time, _} =
-      :timer.tc(fn ->
-        Task.async_stream(
-          1..20_000,
-          fn _ ->
-            do_generate_activity_with_mention(user, users)
-          end,
-          max_concurrency: 10,
-          timeout: 30_000
-        )
-        |> Stream.run()
-      end)
-
-    IO.puts("Inserting activities with menthions take #{to_sec(time)} sec.\n")
-
-    IO.puts("Starting generating 10000 activities with threads...")
-
-    {time, _} =
-      :timer.tc(fn ->
-        Task.async_stream(
-          1..10_000,
-          fn _ ->
-            do_generate_threads([user | users])
-          end,
-          max_concurrency: 10,
-          timeout: 30_000
-        )
-        |> Stream.run()
-      end)
-
-    IO.puts("Inserting activities with threads take #{to_sec(time)} sec.\n")
-  end
-
-  defp do_generate_activity(users) do
-    post = %{
-      "status" => "Some status without mention with random user"
-    }
-
-    CommonAPI.post(Enum.random(users), post)
-  end
-
-  def generate_power_intervals(opts \\ []) do
-    count = Keyword.get(opts, :count, 20)
-    power = Keyword.get(opts, :power, 2)
-    IO.puts("Generating #{count} intervals for a power #{power} series...")
-    counts = Enum.map(1..count, fn n -> :math.pow(n, power) end)
-    sum = Enum.sum(counts)
-
-    densities =
-      Enum.map(counts, fn c ->
-        c / sum
-      end)
-
-    densities
-    |> Enum.reduce(0, fn density, acc ->
-      if acc == 0 do
-        [{0, density}]
-      else
-        [{_, lower} | _] = acc
-        [{lower, lower + density} | acc]
-      end
-    end)
-    |> Enum.reverse()
-  end
-
-  def generate_tagged_activities(opts \\ []) do
-    tag_count = Keyword.get(opts, :tag_count, 20)
-    users = Keyword.get(opts, :users, Repo.all(User))
-    activity_count = Keyword.get(opts, :count, 200_000)
-
-    intervals = generate_power_intervals(count: tag_count)
-
-    IO.puts(
-      "Generating #{activity_count} activities using #{tag_count} different tags of format `tag_n`, starting at tag_0"
-    )
-
-    Enum.each(1..activity_count, fn _ ->
-      random = :rand.uniform()
-      i = Enum.find_index(intervals, fn {lower, upper} -> lower <= random && upper > random end)
-      CommonAPI.post(Enum.random(users), %{"status" => "a post with the tag #tag_#{i}"})
-    end)
-  end
-
-  defp do_generate_activity_with_mention(user, users) do
-    mentions_cnt = Enum.random([2, 3, 4, 5])
-    with_user = Enum.random([true, false])
-    users = Enum.shuffle(users)
-    mentions_users = Enum.take(users, mentions_cnt)
-    mentions_users = if with_user, do: [user | mentions_users], else: mentions_users
-
-    mentions_str =
-      Enum.map(mentions_users, fn user -> "@" <> user.nickname end) |> Enum.join(", ")
-
-    post = %{
-      "status" => mentions_str <> "some status with mentions random users"
-    }
-
-    CommonAPI.post(Enum.random(users), post)
-  end
-
-  defp do_generate_threads(users) do
-    thread_length = Enum.random([2, 3, 4, 5])
-    actor = Enum.random(users)
-
-    post = %{
-      "status" => "Start of the thread"
-    }
-
-    {:ok, activity} = CommonAPI.post(actor, post)
-
-    Enum.each(1..thread_length, fn _ ->
-      user = Enum.random(users)
-
-      post = %{
-        "status" => "@#{actor.nickname} reply to thread",
-        "in_reply_to_status_id" => activity.id
-      }
-
-      CommonAPI.post(user, post)
-    end)
-  end
-
-  def generate_remote_activities(user, users) do
-    do_generate_remote_activities(user, users)
-  end
-
-  defp do_generate_remote_activities(user, users) do
-    IO.puts("Starting generating 10000 remote activities...")
-
-    {time, _} =
-      :timer.tc(fn ->
-        Task.async_stream(
-          1..10_000,
-          fn i ->
-            do_generate_remote_activity(i, user, users)
-          end,
-          max_concurrency: 10,
-          timeout: 30_000
-        )
-        |> Stream.run()
-      end)
-
-    IO.puts("Inserting remote activities take #{to_sec(time)} sec.\n")
-  end
-
-  defp do_generate_remote_activity(i, user, users) do
-    actor = Enum.random(users)
-    %{host: host} = URI.parse(actor.ap_id)
-    date = Date.utc_today()
-    datetime = DateTime.utc_now()
-
-    map = %{
-      "actor" => actor.ap_id,
-      "cc" => [actor.follower_address, user.ap_id],
-      "context" => "tag:mastodon.example.org,#{date}:objectId=#{i}:objectType=Conversation",
-      "id" => actor.ap_id <> "/statuses/#{i}/activity",
-      "object" => %{
-        "actor" => actor.ap_id,
-        "atomUri" => actor.ap_id <> "/statuses/#{i}",
-        "attachment" => [],
-        "attributedTo" => actor.ap_id,
-        "bcc" => [],
-        "bto" => [],
-        "cc" => [actor.follower_address, user.ap_id],
-        "content" =>
-          "<p><span class=\"h-card\"><a href=\"" <>
-            user.ap_id <>
-            "\" class=\"u-url mention\">@<span>" <> user.nickname <> "</span></a></span></p>",
-        "context" => "tag:mastodon.example.org,#{date}:objectId=#{i}:objectType=Conversation",
-        "conversation" =>
-          "tag:mastodon.example.org,#{date}:objectId=#{i}:objectType=Conversation",
-        "emoji" => %{},
-        "id" => actor.ap_id <> "/statuses/#{i}",
-        "inReplyTo" => nil,
-        "inReplyToAtomUri" => nil,
-        "published" => datetime,
-        "sensitive" => true,
-        "summary" => "cw",
-        "tag" => [
-          %{
-            "href" => user.ap_id,
-            "name" => "@#{user.nickname}@#{host}",
-            "type" => "Mention"
-          }
-        ],
-        "to" => ["https://www.w3.org/ns/activitystreams#Public"],
-        "type" => "Note",
-        "url" => "http://#{host}/@#{actor.nickname}/#{i}"
-      },
-      "published" => datetime,
-      "to" => ["https://www.w3.org/ns/activitystreams#Public"],
-      "type" => "Create"
-    }
-
-    Pleroma.Web.ActivityPub.ActivityPub.insert(map, false)
-  end
-
-  def generate_dms(user, users, opts) do
-    IO.puts("Starting generating #{opts[:dms_max]} DMs")
-    {time, _} = :timer.tc(fn -> do_generate_dms(user, users, opts) end)
-    IO.puts("Inserting dms take #{to_sec(time)} sec.\n")
-  end
-
-  defp do_generate_dms(user, users, opts) do
-    Task.async_stream(
-      1..opts[:dms_max],
-      fn _ ->
-        do_generate_dm(user, users)
-      end,
-      max_concurrency: 10,
-      timeout: 30_000
-    )
-    |> Stream.run()
-  end
-
-  defp do_generate_dm(user, users) do
-    post = %{
-      "status" => "@#{user.nickname} some direct message",
-      "visibility" => "direct"
-    }
-
-    CommonAPI.post(Enum.random(users), post)
-  end
-
-  def generate_long_thread(user, users, opts) do
-    IO.puts("Starting generating long thread with #{opts[:thread_length]} replies")
-    {time, activity} = :timer.tc(fn -> do_generate_long_thread(user, users, opts) end)
-    IO.puts("Inserting long thread replies take #{to_sec(time)} sec.\n")
-    {:ok, activity}
-  end
-
-  defp do_generate_long_thread(user, users, opts) do
-    {:ok, %{id: id} = activity} = CommonAPI.post(user, %{"status" => "Start of long thread"})
-
-    Task.async_stream(
-      1..opts[:thread_length],
-      fn _ -> do_generate_thread(users, id) end,
-      max_concurrency: 10,
-      timeout: 30_000
-    )
-    |> Stream.run()
-
-    activity
-  end
-
-  defp do_generate_thread(users, activity_id) do
-    CommonAPI.post(Enum.random(users), %{
-      "status" => "reply to main post",
-      "in_reply_to_status_id" => activity_id
-    })
-  end
-
-  def generate_non_visible_message(user, users) do
-    IO.puts("Starting generating 1000 non visible posts")
-
-    {time, _} =
-      :timer.tc(fn ->
-        do_generate_non_visible_posts(user, users)
-      end)
-
-    IO.puts("Inserting non visible posts take #{to_sec(time)} sec.\n")
-  end
-
-  defp do_generate_non_visible_posts(user, users) do
-    [not_friend | users] = users
-
-    make_friends(user, users)
-
-    Task.async_stream(1..1000, fn _ -> do_generate_non_visible_post(not_friend, users) end,
-      max_concurrency: 10,
-      timeout: 30_000
-    )
-    |> Stream.run()
-  end
-
-  defp make_friends(_user, []), do: nil
-
-  defp make_friends(user, [friend | users]) do
-    {:ok, _} = User.follow(user, friend)
-    {:ok, _} = User.follow(friend, user)
-    make_friends(user, users)
-  end
-
-  defp do_generate_non_visible_post(not_friend, users) do
-    post = %{
-      "status" => "some non visible post",
-      "visibility" => "private"
-    }
-
-    {:ok, activity} = CommonAPI.post(not_friend, post)
-
-    thread_length = Enum.random([2, 3, 4, 5])
-
-    Enum.each(1..thread_length, fn _ ->
-      user = Enum.random(users)
-
-      post = %{
-        "status" => "@#{not_friend.nickname} reply to non visible post",
-        "in_reply_to_status_id" => activity.id,
-        "visibility" => "private"
-      }
-
-      CommonAPI.post(user, post)
-    end)
-  end
-end
index 47b25c65fb7fb499e4b8eb2073779473a3b4a412..23bbb1cec22f1288f6ce6555971e01846742364f 100644 (file)
@@ -1,11 +1,3 @@
 defmodule Pleroma.LoadTesting.Helper do
-  defmacro __using__(_) do
-    quote do
-      import Ecto.Query
-      alias Pleroma.Repo
-      alias Pleroma.User
-
-      defp to_sec(microseconds), do: microseconds / 1_000_000
-    end
-  end
+  def to_sec(microseconds), do: microseconds / 1_000_000
 end
diff --git a/benchmarks/load_testing/users.ex b/benchmarks/load_testing/users.ex
new file mode 100644 (file)
index 0000000..951b30d
--- /dev/null
@@ -0,0 +1,161 @@
+defmodule Pleroma.LoadTesting.Users do
+  @moduledoc """
+  Module for generating users with friends.
+  """
+  import Ecto.Query
+  import Pleroma.LoadTesting.Helper, only: [to_sec: 1]
+
+  alias Pleroma.Repo
+  alias Pleroma.User
+  alias Pleroma.User.Query
+
+  @defaults [
+    users: 20_000,
+    friends: 100
+  ]
+
+  @max_concurrency 30
+
+  @spec generate(keyword()) :: User.t()
+  def generate(opts \\ []) do
+    opts = Keyword.merge(@defaults, opts)
+
+    IO.puts("Starting generating #{opts[:users]} users...")
+
+    {time, _} = :timer.tc(fn -> generate_users(opts[:users]) end)
+
+    IO.puts("Generating users take #{to_sec(time)} sec.\n")
+
+    main_user =
+      Repo.one(from(u in User, where: u.local == true, order_by: fragment("RANDOM()"), limit: 1))
+
+    IO.puts("Starting making friends for #{opts[:friends]} users...")
+    {time, _} = :timer.tc(fn -> make_friends(main_user, opts[:friends]) end)
+
+    IO.puts("Making friends take #{to_sec(time)} sec.\n")
+
+    Repo.get(User, main_user.id)
+  end
+
+  defp generate_users(max) do
+    Task.async_stream(
+      1..max,
+      &generate_user(&1),
+      max_concurrency: @max_concurrency,
+      timeout: 30_000
+    )
+    |> Stream.run()
+  end
+
+  defp generate_user(i) do
+    remote = Enum.random([true, false])
+
+    %User{
+      name: "Test ใƒ†ใ‚นใƒˆ User #{i}",
+      email: "user#{i}@example.com",
+      nickname: "nick#{i}",
+      password_hash: Comeonin.Pbkdf2.hashpwsalt("test"),
+      bio: "Tester Number #{i}",
+      local: !remote
+    }
+    |> user_urls()
+    |> Repo.insert!()
+  end
+
+  defp user_urls(%{local: true} = user) do
+    urls = %{
+      ap_id: User.ap_id(user),
+      follower_address: User.ap_followers(user),
+      following_address: User.ap_following(user)
+    }
+
+    Map.merge(user, urls)
+  end
+
+  defp user_urls(%{local: false} = user) do
+    base_domain = Enum.random(["domain1.com", "domain2.com", "domain3.com"])
+
+    ap_id = "https://#{base_domain}/users/#{user.nickname}"
+
+    urls = %{
+      ap_id: ap_id,
+      follower_address: ap_id <> "/followers",
+      following_address: ap_id <> "/following"
+    }
+
+    Map.merge(user, urls)
+  end
+
+  defp make_friends(main_user, max) when is_integer(max) do
+    number_of_users =
+      (max / 2)
+      |> Kernel.trunc()
+
+    main_user
+    |> get_users(%{limit: number_of_users, local: :local})
+    |> run_stream(main_user)
+
+    main_user
+    |> get_users(%{limit: number_of_users, local: :external})
+    |> run_stream(main_user)
+  end
+
+  defp make_friends(%User{} = main_user, %User{} = user) do
+    {:ok, _} = User.follow(main_user, user)
+    {:ok, _} = User.follow(user, main_user)
+  end
+
+  @spec get_users(User.t(), keyword()) :: [User.t()]
+  def get_users(user, opts) do
+    criteria = %{limit: opts[:limit]}
+
+    criteria =
+      if opts[:local] do
+        Map.put(criteria, opts[:local], true)
+      else
+        criteria
+      end
+
+    criteria =
+      if opts[:friends?] do
+        Map.put(criteria, :friends, user)
+      else
+        criteria
+      end
+
+    query =
+      criteria
+      |> Query.build()
+      |> random_without_user(user)
+
+    query =
+      if opts[:friends?] == false do
+        friends_ids =
+          %{friends: user}
+          |> Query.build()
+          |> Repo.all()
+          |> Enum.map(& &1.id)
+
+        from(u in query, where: u.id not in ^friends_ids)
+      else
+        query
+      end
+
+    Repo.all(query)
+  end
+
+  defp random_without_user(query, user) do
+    from(u in query,
+      where: u.id != ^user.id,
+      order_by: fragment("RANDOM()")
+    )
+  end
+
+  defp run_stream(users, main_user) do
+    Task.async_stream(users, &make_friends(main_user, &1),
+      max_concurrency: @max_concurrency,
+      timeout: 30_000
+    )
+    |> Stream.run()
+  end
+end
index 0a751adac02ab3a9088382ca6f8d7c899208ae50..2623009906892f4d58d49f35792fc7bc70a2d102 100644 (file)
 defmodule Mix.Tasks.Pleroma.LoadTesting do
   use Mix.Task
-  use Pleroma.LoadTesting.Helper
-  import Mix.Pleroma
-  import Pleroma.LoadTesting.Generator
-  import Pleroma.LoadTesting.Fetcher
+  import Ecto.Query
+
+  alias Ecto.Adapters.SQL
+  alias Pleroma.Repo
+  alias Pleroma.User
 
   @shortdoc "Factory for generation data"
   @moduledoc """
   Generates data like:
   - local/remote users
-  - local/remote activities with notifications
-  - direct messages
-  - long thread
-  - non visible posts
+  - local/remote activities with differrent visibility:
+    - simple activiities
+    - with emoji
+    - with mentions
+    - hellthreads
+    - with attachments
+    - with tags
+    - likes
+    - reblogs
+    - simple threads
+    - long threads
 
   ## Generate data
-      MIX_ENV=benchmark mix pleroma.load_testing --users 20000 --dms 20000 --thread_length 2000
-      MIX_ENV=benchmark mix pleroma.load_testing -u 20000 -d 20000 -t 2000
+      MIX_ENV=benchmark mix pleroma.load_testing --users 20000 --friends 1000 --iterations 170 --friends_used 20 --non_friends_used 20
+      MIX_ENV=benchmark mix pleroma.load_testing -u 20000 -f 1000 -i 170 -fu 20 -nfu 20
 
   Options:
   - `--users NUMBER` - number of users to generate. Defaults to: 20000. Alias: `-u`
-  - `--dms NUMBER` - number of direct messages to generate. Defaults to: 20000. Alias `-d`
-  - `--thread_length` - number of messages in thread. Defaults to: 2000. ALias `-t`
+  - `--friends NUMBER` - number of friends for main user. Defaults to: 1000. Alias: `-f`
+  - `--iterations NUMBER` - number of iterations to generate activities. For each iteration in database is inserted about 120+ activities with different visibility, actors and types.Defaults to: 170. Alias: `-i`
+  - `--friends_used NUMBER` - number of main user friends used in activity generation. Defaults to: 20. Alias: `-fu`
+  - `--non_friends_used NUMBER` - number of non friends used in activity generation. Defaults to: 20. Alias: `-nfu`
   """
 
-  @aliases [u: :users, d: :dms, t: :thread_length]
+  @aliases [u: :users, f: :friends, i: :iterations, fu: :friends_used, nfu: :non_friends_used]
   @switches [
     users: :integer,
-    dms: :integer,
-    thread_length: :integer
+    friends: :integer,
+    iterations: :integer,
+    friends_used: :integer,
+    non_friends_used: :integer
   ]
-  @users_default 20_000
-  @dms_default 1_000
-  @thread_length_default 2_000
 
   def run(args) do
-    start_pleroma()
-    Pleroma.Config.put([:instance, :skip_thread_containment], true)
-    {opts, _} = OptionParser.parse!(args, strict: @switches, aliases: @aliases)
-
-    users_max = Keyword.get(opts, :users, @users_default)
-    dms_max = Keyword.get(opts, :dms, @dms_default)
-    thread_length = Keyword.get(opts, :thread_length, @thread_length_default)
-
+    Mix.Pleroma.start_pleroma()
     clean_tables()
+    {opts, _} = OptionParser.parse!(args, strict: @switches, aliases: @aliases)
 
-    opts =
-      Keyword.put(opts, :users_max, users_max)
-      |> Keyword.put(:dms_max, dms_max)
-      |> Keyword.put(:thread_length, thread_length)
-
-    generate_users(opts)
-
-    # main user for queries
-    IO.puts("Fetching local main user...")
-
-    {time, user} =
-      :timer.tc(fn ->
-        Repo.one(
-          from(u in User, where: u.local == true, order_by: fragment("RANDOM()"), limit: 1)
-        )
-      end)
-
-    IO.puts("Fetching main user take #{to_sec(time)} sec.\n")
-
-    IO.puts("Fetching local users...")
-
-    {time, users} =
-      :timer.tc(fn ->
-        Repo.all(
-          from(u in User,
-            where: u.id != ^user.id,
-            where: u.local == true,
-            order_by: fragment("RANDOM()"),
-            limit: 10
-          )
-        )
-      end)
-
-    IO.puts("Fetching local users take #{to_sec(time)} sec.\n")
-
-    IO.puts("Fetching remote users...")
-
-    {time, remote_users} =
-      :timer.tc(fn ->
-        Repo.all(
-          from(u in User,
-            where: u.id != ^user.id,
-            where: u.local == false,
-            order_by: fragment("RANDOM()"),
-            limit: 10
-          )
-        )
-      end)
-
-    IO.puts("Fetching remote users take #{to_sec(time)} sec.\n")
-
-    generate_activities(user, users)
-
-    generate_remote_activities(user, remote_users)
-
-    generate_like_activities(
-      user, Pleroma.Repo.all(Pleroma.Activity.Queries.by_type("Create"))
-    )
-
-    generate_dms(user, users, opts)
-
-    {:ok, activity} = generate_long_thread(user, users, opts)
-
-    generate_non_visible_message(user, users)
+    user = Pleroma.LoadTesting.Users.generate(opts)
+    Pleroma.LoadTesting.Activities.generate(user, opts)
 
     IO.puts("Users in DB: #{Repo.aggregate(from(u in User), :count, :id)}")
 
@@ -120,19 +61,14 @@ defmodule Mix.Tasks.Pleroma.LoadTesting do
       "Notifications in DB: #{Repo.aggregate(from(n in Pleroma.Notification), :count, :id)}"
     )
 
-    fetch_user(user)
-    query_timelines(user)
-    query_notifications(user)
-    query_dms(user)
-    query_long_thread(user, activity)
-    Pleroma.Config.put([:instance, :skip_thread_containment], false)
-    query_timelines(user)
+    Pleroma.LoadTesting.Fetcher.run_benchmarks(user)
   end
 
   defp clean_tables do
     IO.puts("Deleting old data...\n")
-    Ecto.Adapters.SQL.query!(Repo, "TRUNCATE users CASCADE;")
-    Ecto.Adapters.SQL.query!(Repo, "TRUNCATE activities CASCADE;")
-    Ecto.Adapters.SQL.query!(Repo, "TRUNCATE objects CASCADE;")
+    SQL.query!(Repo, "TRUNCATE users CASCADE;")
+    SQL.query!(Repo, "TRUNCATE activities CASCADE;")
+    SQL.query!(Repo, "TRUNCATE objects CASCADE;")
+    SQL.query!(Repo, "TRUNCATE oban_jobs CASCADE;")
   end
 end
index ff59395cf62239a120e0bd9a9a2b9dca987d9094..e867253ebd021925f4bb11eec6d34a1a9f7c27e1 100644 (file)
@@ -39,7 +39,7 @@ config :pleroma, Pleroma.Repo,
   adapter: Ecto.Adapters.Postgres,
   username: "postgres",
   password: "postgres",
-  database: "pleroma_test",
+  database: "pleroma_benchmark",
   hostname: System.get_env("DB_HOST") || "localhost",
   pool_size: 10
 
index 33f1705dfc85e978320f5508e782dd91f37bbb8b..51850abb550813f8c02b2760cab54d19d6d45205 100644 (file)
@@ -157,7 +157,7 @@ defmodule Pleroma.Application do
 
   defp chat_enabled?, do: Pleroma.Config.get([:chat, :enabled])
 
-  defp streamer_child(:test), do: []
+  defp streamer_child(env) when env in [:test, :benchmark], do: []
 
   defp streamer_child(_) do
     [Pleroma.Web.Streamer.supervisor()]