Fix oban tags
[akkoma] / benchmarks / load_testing / activities.ex
1 defmodule Pleroma.LoadTesting.Activities do
2 @moduledoc """
3 Module for generating different activities.
4 """
5 import Ecto.Query
6 import Pleroma.LoadTesting.Helper, only: [to_sec: 1]
7
8 alias Ecto.UUID
9 alias Pleroma.Constants
10 alias Pleroma.LoadTesting.Users
11 alias Pleroma.Repo
12 alias Pleroma.Web.CommonAPI
13
14 require Constants
15
16 @defaults [
17 iterations: 170,
18 friends_used: 20,
19 non_friends_used: 20
20 ]
21
22 @max_concurrency 10
23
24 @visibility ~w(public private direct unlisted)
25 @types [
26 :simple,
27 :simple_filtered,
28 :emoji,
29 :mentions,
30 :hell_thread,
31 :attachment,
32 :tag,
33 :like,
34 :reblog,
35 :simple_thread
36 ]
37 @groups [:friends_local, :friends_remote, :non_friends_local, :non_friends_local]
38 @remote_groups [:friends_remote, :non_friends_remote]
39 @friends_groups [:friends_local, :friends_remote]
40 @non_friends_groups [:non_friends_local, :non_friends_remote]
41
42 @spec generate(User.t(), keyword()) :: :ok
43 def generate(user, opts \\ []) do
44 {:ok, _} =
45 Agent.start_link(fn -> %{} end,
46 name: :benchmark_state
47 )
48
49 opts = Keyword.merge(@defaults, opts)
50
51 users = Users.prepare_users(user, opts)
52
53 {:ok, _} = Agent.start_link(fn -> users[:non_friends_remote] end, name: :non_friends_remote)
54
55 task_data =
56 for visibility <- @visibility,
57 type <- @types,
58 group <- [:user | @groups],
59 do: {visibility, type, group}
60
61 IO.puts("Starting generating #{opts[:iterations]} iterations of activities...")
62
63 public_long_thread = fn ->
64 generate_long_thread("public", users, opts)
65 end
66
67 private_long_thread = fn ->
68 generate_long_thread("private", users, opts)
69 end
70
71 iterations = opts[:iterations]
72
73 {time, _} =
74 :timer.tc(fn ->
75 Enum.each(
76 1..iterations,
77 fn
78 i when i == iterations - 2 ->
79 spawn(public_long_thread)
80 spawn(private_long_thread)
81 generate_activities(users, Enum.shuffle(task_data), opts)
82
83 _ ->
84 generate_activities(users, Enum.shuffle(task_data), opts)
85 end
86 )
87 end)
88
89 IO.puts("Generating iterations of activities took #{to_sec(time)} sec.\n")
90 :ok
91 end
92
93 def generate_power_intervals(opts \\ []) do
94 count = Keyword.get(opts, :count, 20)
95 power = Keyword.get(opts, :power, 2)
96 IO.puts("Generating #{count} intervals for a power #{power} series...")
97 counts = Enum.map(1..count, fn n -> :math.pow(n, power) end)
98 sum = Enum.sum(counts)
99
100 densities =
101 Enum.map(counts, fn c ->
102 c / sum
103 end)
104
105 densities
106 |> Enum.reduce(0, fn density, acc ->
107 if acc == 0 do
108 [{0, density}]
109 else
110 [{_, lower} | _] = acc
111 [{lower, lower + density} | acc]
112 end
113 end)
114 |> Enum.reverse()
115 end
116
117 def generate_tagged_activities(opts \\ []) do
118 tag_count = Keyword.get(opts, :tag_count, 20)
119 users = Keyword.get(opts, :users, Repo.all(Pleroma.User))
120 activity_count = Keyword.get(opts, :count, 200_000)
121
122 intervals = generate_power_intervals(count: tag_count)
123
124 IO.puts(
125 "Generating #{activity_count} activities using #{tag_count} different tags of format `tag_n`, starting at tag_0"
126 )
127
128 Enum.each(1..activity_count, fn _ ->
129 random = :rand.uniform()
130 i = Enum.find_index(intervals, fn {lower, upper} -> lower <= random && upper > random end)
131 CommonAPI.post(Enum.random(users), %{status: "a post with the tag #tag_#{i}"})
132 end)
133 end
134
135 defp generate_long_thread(visibility, users, _opts) do
136 group =
137 if visibility == "public",
138 do: :friends_local,
139 else: :user
140
141 tasks = get_reply_tasks(visibility, group) |> Stream.cycle() |> Enum.take(50)
142
143 {:ok, activity} =
144 CommonAPI.post(users[:user], %{
145 status: "Start of #{visibility} long thread",
146 visibility: visibility
147 })
148
149 Agent.update(:benchmark_state, fn state ->
150 key =
151 if visibility == "public",
152 do: :public_thread,
153 else: :private_thread
154
155 Map.put(state, key, activity)
156 end)
157
158 acc = {activity.id, ["@" <> users[:user].nickname, "reply to long thread"]}
159 insert_replies_for_long_thread(tasks, visibility, users, acc)
160 IO.puts("Generating #{visibility} long thread ended\n")
161 end
162
163 defp insert_replies_for_long_thread(tasks, visibility, users, acc) do
164 Enum.reduce(tasks, acc, fn
165 :user, {id, data} ->
166 user = users[:user]
167 insert_reply(user, List.delete(data, "@" <> user.nickname), id, visibility)
168
169 group, {id, data} ->
170 replier = Enum.random(users[group])
171 insert_reply(replier, List.delete(data, "@" <> replier.nickname), id, visibility)
172 end)
173 end
174
175 defp generate_activities(users, task_data, opts) do
176 Task.async_stream(
177 task_data,
178 fn {visibility, type, group} ->
179 insert_activity(type, visibility, group, users, opts)
180 end,
181 max_concurrency: @max_concurrency,
182 timeout: 30_000
183 )
184 |> Stream.run()
185 end
186
187 defp insert_local_activity(visibility, group, users, status) do
188 {:ok, _} =
189 group
190 |> get_actor(users)
191 |> CommonAPI.post(%{status: status, visibility: visibility})
192 end
193
194 defp insert_remote_activity(visibility, group, users, status) do
195 actor = get_actor(group, users)
196 {act_data, obj_data} = prepare_activity_data(actor, visibility, users[:user])
197 {activity_data, object_data} = other_data(actor, status)
198
199 activity_data
200 |> Map.merge(act_data)
201 |> Map.put("object", Map.merge(object_data, obj_data))
202 |> Pleroma.Web.ActivityPub.ActivityPub.insert(false)
203 end
204
205 defp user_mentions(users) do
206 user_mentions =
207 Enum.reduce(
208 @groups,
209 [],
210 fn group, acc ->
211 acc ++ get_random_mentions(users[group], Enum.random(0..2))
212 end
213 )
214
215 if Enum.random([true, false]),
216 do: ["@" <> users[:user].nickname | user_mentions],
217 else: user_mentions
218 end
219
220 defp hell_thread_mentions(users) do
221 with {:ok, nil} <- Cachex.get(:user_cache, "hell_thread_mentions") do
222 cached =
223 @groups
224 |> Enum.reduce([users[:user]], fn group, acc ->
225 acc ++ Enum.take(users[group], 5)
226 end)
227 |> Enum.map(&"@#{&1.nickname}")
228 |> Enum.join(", ")
229
230 Cachex.put(:user_cache, "hell_thread_mentions", cached)
231 cached
232 else
233 {:ok, cached} -> cached
234 end
235 end
236
237 defp insert_activity(:simple, visibility, group, users, _opts)
238 when group in @remote_groups do
239 insert_remote_activity(visibility, group, users, "Remote status")
240 end
241
242 defp insert_activity(:simple, visibility, group, users, _opts) do
243 insert_local_activity(visibility, group, users, "Simple status")
244 end
245
246 defp insert_activity(:simple_filtered, visibility, group, users, _opts)
247 when group in @remote_groups do
248 insert_remote_activity(visibility, group, users, "Remote status which must be filtered")
249 end
250
251 defp insert_activity(:simple_filtered, visibility, group, users, _opts) do
252 insert_local_activity(visibility, group, users, "Simple status which must be filtered")
253 end
254
255 defp insert_activity(:emoji, visibility, group, users, _opts)
256 when group in @remote_groups do
257 insert_remote_activity(visibility, group, users, "Remote status with emoji :firefox:")
258 end
259
260 defp insert_activity(:emoji, visibility, group, users, _opts) do
261 insert_local_activity(visibility, group, users, "Simple status with emoji :firefox:")
262 end
263
264 defp insert_activity(:mentions, visibility, group, users, _opts)
265 when group in @remote_groups do
266 mentions = user_mentions(users)
267
268 status = Enum.join(mentions, ", ") <> " remote status with mentions"
269
270 insert_remote_activity(visibility, group, users, status)
271 end
272
273 defp insert_activity(:mentions, visibility, group, users, _opts) do
274 mentions = user_mentions(users)
275
276 status = Enum.join(mentions, ", ") <> " simple status with mentions"
277 insert_remote_activity(visibility, group, users, status)
278 end
279
280 defp insert_activity(:hell_thread, visibility, group, users, _)
281 when group in @remote_groups do
282 mentions = hell_thread_mentions(users)
283 insert_remote_activity(visibility, group, users, mentions <> " remote hell thread status")
284 end
285
286 defp insert_activity(:hell_thread, visibility, group, users, _opts) do
287 mentions = hell_thread_mentions(users)
288
289 insert_local_activity(visibility, group, users, mentions <> " hell thread status")
290 end
291
292 defp insert_activity(:attachment, visibility, group, users, _opts) do
293 actor = get_actor(group, users)
294
295 obj_data = %{
296 "actor" => actor.ap_id,
297 "name" => "4467-11.jpg",
298 "type" => "Document",
299 "url" => [
300 %{
301 "href" =>
302 "#{Pleroma.Web.Endpoint.url()}/media/b1b873552422a07bf53af01f3c231c841db4dfc42c35efde681abaf0f2a4eab7.jpg",
303 "mediaType" => "image/jpeg",
304 "type" => "Link"
305 }
306 ]
307 }
308
309 object = Repo.insert!(%Pleroma.Object{data: obj_data})
310
311 {:ok, _activity} =
312 CommonAPI.post(actor, %{
313 status: "Post with attachment",
314 visibility: visibility,
315 media_ids: [object.id]
316 })
317 end
318
319 defp insert_activity(:tag, visibility, group, users, _opts) do
320 insert_local_activity(visibility, group, users, "Status with #tag")
321 end
322
323 defp insert_activity(:like, visibility, group, users, opts) do
324 actor = get_actor(group, users)
325
326 with activity_id when not is_nil(activity_id) <- get_random_create_activity_id(),
327 {:ok, _activity} <- CommonAPI.favorite(actor, activity_id) do
328 :ok
329 else
330 {:error, _} ->
331 insert_activity(:like, visibility, group, users, opts)
332
333 nil ->
334 Process.sleep(15)
335 insert_activity(:like, visibility, group, users, opts)
336 end
337 end
338
339 defp insert_activity(:reblog, visibility, group, users, opts) do
340 actor = get_actor(group, users)
341
342 with activity_id when not is_nil(activity_id) <- get_random_create_activity_id(),
343 {:ok, _activity} <- CommonAPI.repeat(activity_id, actor) do
344 :ok
345 else
346 {:error, _} ->
347 insert_activity(:reblog, visibility, group, users, opts)
348
349 nil ->
350 Process.sleep(15)
351 insert_activity(:reblog, visibility, group, users, opts)
352 end
353 end
354
355 defp insert_activity(:simple_thread, "direct", group, users, _opts) do
356 actor = get_actor(group, users)
357 tasks = get_reply_tasks("direct", group)
358
359 list =
360 case group do
361 :user ->
362 group = Enum.random(@friends_groups)
363 Enum.take(users[group], 3)
364
365 _ ->
366 Enum.take(users[group], 3)
367 end
368
369 data = Enum.map(list, &("@" <> &1.nickname))
370
371 {:ok, activity} =
372 CommonAPI.post(actor, %{
373 status: Enum.join(data, ", ") <> "simple status",
374 visibility: "direct"
375 })
376
377 acc = {activity.id, ["@" <> users[:user].nickname | data] ++ ["reply to status"]}
378 insert_direct_replies(tasks, users[:user], list, acc)
379 end
380
381 defp insert_activity(:simple_thread, visibility, group, users, _opts) do
382 actor = get_actor(group, users)
383 tasks = get_reply_tasks(visibility, group)
384
385 {:ok, activity} =
386 CommonAPI.post(users[:user], %{status: "Simple status", visibility: visibility})
387
388 acc = {activity.id, ["@" <> actor.nickname, "reply to status"]}
389 insert_replies(tasks, visibility, users, acc)
390 end
391
392 defp get_actor(:user, %{user: user}), do: user
393 defp get_actor(group, users), do: Enum.random(users[group])
394
395 defp other_data(actor, content) do
396 %{host: host} = URI.parse(actor.ap_id)
397 datetime = DateTime.utc_now() |> to_string()
398 context_id = "https://#{host}/contexts/#{UUID.generate()}"
399 activity_id = "https://#{host}/activities/#{UUID.generate()}"
400 object_id = "https://#{host}/objects/#{UUID.generate()}"
401
402 activity_data = %{
403 "actor" => actor.ap_id,
404 "context" => context_id,
405 "id" => activity_id,
406 "published" => datetime,
407 "type" => "Create",
408 "directMessage" => false
409 }
410
411 object_data = %{
412 "actor" => actor.ap_id,
413 "attachment" => [],
414 "attributedTo" => actor.ap_id,
415 "bcc" => [],
416 "bto" => [],
417 "content" => content,
418 "context" => context_id,
419 "conversation" => context_id,
420 "emoji" => %{},
421 "id" => object_id,
422 "published" => datetime,
423 "sensitive" => false,
424 "summary" => "",
425 "tag" => [],
426 "to" => ["https://www.w3.org/ns/activitystreams#Public"],
427 "type" => "Note"
428 }
429
430 {activity_data, object_data}
431 end
432
433 defp prepare_activity_data(actor, "public", _mention) do
434 obj_data = %{
435 "cc" => [actor.follower_address],
436 "to" => [Constants.as_public()]
437 }
438
439 act_data = %{
440 "cc" => [actor.follower_address],
441 "to" => [Constants.as_public()]
442 }
443
444 {act_data, obj_data}
445 end
446
447 defp prepare_activity_data(actor, "private", _mention) do
448 obj_data = %{
449 "cc" => [],
450 "to" => [actor.follower_address]
451 }
452
453 act_data = %{
454 "cc" => [],
455 "to" => [actor.follower_address]
456 }
457
458 {act_data, obj_data}
459 end
460
461 defp prepare_activity_data(actor, "unlisted", _mention) do
462 obj_data = %{
463 "cc" => [Constants.as_public()],
464 "to" => [actor.follower_address]
465 }
466
467 act_data = %{
468 "cc" => [Constants.as_public()],
469 "to" => [actor.follower_address]
470 }
471
472 {act_data, obj_data}
473 end
474
475 defp prepare_activity_data(_actor, "direct", mention) do
476 %{host: mentioned_host} = URI.parse(mention.ap_id)
477
478 obj_data = %{
479 "cc" => [],
480 "content" =>
481 "<span class=\"h-card\"><a class=\"u-url mention\" href=\"#{mention.ap_id}\" rel=\"ugc\">@<span>#{
482 mention.nickname
483 }</span></a></span> direct message",
484 "tag" => [
485 %{
486 "href" => mention.ap_id,
487 "name" => "@#{mention.nickname}@#{mentioned_host}",
488 "type" => "Mention"
489 }
490 ],
491 "to" => [mention.ap_id]
492 }
493
494 act_data = %{
495 "cc" => [],
496 "directMessage" => true,
497 "to" => [mention.ap_id]
498 }
499
500 {act_data, obj_data}
501 end
502
503 defp get_reply_tasks("public", :user) do
504 [:friends_local, :friends_remote, :non_friends_local, :non_friends_remote, :user]
505 end
506
507 defp get_reply_tasks("public", group) when group in @friends_groups do
508 [:non_friends_local, :non_friends_remote, :user, :friends_local, :friends_remote]
509 end
510
511 defp get_reply_tasks("public", group) when group in @non_friends_groups do
512 [:user, :friends_local, :friends_remote, :non_friends_local, :non_friends_remote]
513 end
514
515 defp get_reply_tasks(visibility, :user) when visibility in ["unlisted", "private"] do
516 [:friends_local, :friends_remote, :user, :friends_local, :friends_remote]
517 end
518
519 defp get_reply_tasks(visibility, group)
520 when visibility in ["unlisted", "private"] and group in @friends_groups do
521 [:user, :friends_remote, :friends_local, :user]
522 end
523
524 defp get_reply_tasks(visibility, group)
525 when visibility in ["unlisted", "private"] and
526 group in @non_friends_groups,
527 do: []
528
529 defp get_reply_tasks("direct", :user), do: [:friends_local, :user, :friends_remote]
530
531 defp get_reply_tasks("direct", group) when group in @friends_groups,
532 do: [:user, group, :user]
533
534 defp get_reply_tasks("direct", group) when group in @non_friends_groups do
535 [:user, :non_friends_remote, :user, :non_friends_local]
536 end
537
538 defp insert_replies(tasks, visibility, users, acc) do
539 Enum.reduce(tasks, acc, fn
540 :user, {id, data} ->
541 insert_reply(users[:user], data, id, visibility)
542
543 group, {id, data} ->
544 replier = Enum.random(users[group])
545 insert_reply(replier, data, id, visibility)
546 end)
547 end
548
549 defp insert_direct_replies(tasks, user, list, acc) do
550 Enum.reduce(tasks, acc, fn
551 :user, {id, data} ->
552 {reply_id, _} = insert_reply(user, List.delete(data, "@" <> user.nickname), id, "direct")
553 {reply_id, data}
554
555 _, {id, data} ->
556 actor = Enum.random(list)
557
558 {reply_id, _} =
559 insert_reply(actor, List.delete(data, "@" <> actor.nickname), id, "direct")
560
561 {reply_id, data}
562 end)
563 end
564
565 defp insert_reply(actor, data, activity_id, visibility) do
566 {:ok, reply} =
567 CommonAPI.post(actor, %{
568 status: Enum.join(data, ", "),
569 visibility: visibility,
570 in_reply_to_status_id: activity_id
571 })
572
573 {reply.id, ["@" <> actor.nickname | data]}
574 end
575
576 defp get_random_mentions(_users, count) when count == 0, do: []
577
578 defp get_random_mentions(users, count) do
579 users
580 |> Enum.shuffle()
581 |> Enum.take(count)
582 |> Enum.map(&"@#{&1.nickname}")
583 end
584
585 defp get_random_create_activity_id do
586 Repo.one(
587 from(a in Pleroma.Activity,
588 where: fragment("(?)->>'type' = ?", a.data, ^"Create"),
589 order_by: fragment("RANDOM()"),
590 limit: 1,
591 select: a.id
592 )
593 )
594 end
595 end