ActivityPub Utils: Greatly speed up the follow / block activity fetching.
[akkoma] / lib / pleroma / web / activity_pub / utils.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.Utils do
6 alias Ecto.Changeset
7 alias Ecto.UUID
8 alias Pleroma.Activity
9 alias Pleroma.Notification
10 alias Pleroma.Object
11 alias Pleroma.Repo
12 alias Pleroma.User
13 alias Pleroma.Web
14 alias Pleroma.Web.ActivityPub.Visibility
15 alias Pleroma.Web.Endpoint
16 alias Pleroma.Web.Router.Helpers
17
18 import Ecto.Query
19
20 require Logger
21
22 @supported_object_types ["Article", "Note", "Video", "Page"]
23
24 # Some implementations send the actor URI as the actor field, others send the entire actor object,
25 # so figure out what the actor's URI is based on what we have.
26 def get_ap_id(object) do
27 case object do
28 %{"id" => id} -> id
29 id -> id
30 end
31 end
32
33 def normalize_params(params) do
34 Map.put(params, "actor", get_ap_id(params["actor"]))
35 end
36
37 def determine_explicit_mentions(%{"tag" => tag} = _object) when is_list(tag) do
38 tag
39 |> Enum.filter(fn x -> is_map(x) end)
40 |> Enum.filter(fn x -> x["type"] == "Mention" end)
41 |> Enum.map(fn x -> x["href"] end)
42 end
43
44 def determine_explicit_mentions(%{"tag" => tag} = object) when is_map(tag) do
45 Map.put(object, "tag", [tag])
46 |> determine_explicit_mentions()
47 end
48
49 def determine_explicit_mentions(_), do: []
50
51 defp recipient_in_collection(ap_id, coll) when is_binary(coll), do: ap_id == coll
52 defp recipient_in_collection(ap_id, coll) when is_list(coll), do: ap_id in coll
53 defp recipient_in_collection(_, _), do: false
54
55 def recipient_in_message(ap_id, params) do
56 cond do
57 recipient_in_collection(ap_id, params["to"]) ->
58 true
59
60 recipient_in_collection(ap_id, params["cc"]) ->
61 true
62
63 recipient_in_collection(ap_id, params["bto"]) ->
64 true
65
66 recipient_in_collection(ap_id, params["bcc"]) ->
67 true
68
69 # if the message is unaddressed at all, then assume it is directly addressed
70 # to the recipient
71 !params["to"] && !params["cc"] && !params["bto"] && !params["bcc"] ->
72 true
73
74 true ->
75 false
76 end
77 end
78
79 defp extract_list(target) when is_binary(target), do: [target]
80 defp extract_list(lst) when is_list(lst), do: lst
81 defp extract_list(_), do: []
82
83 def maybe_splice_recipient(ap_id, params) do
84 need_splice =
85 !recipient_in_collection(ap_id, params["to"]) &&
86 !recipient_in_collection(ap_id, params["cc"])
87
88 cc_list = extract_list(params["cc"])
89
90 if need_splice do
91 params
92 |> Map.put("cc", [ap_id | cc_list])
93 else
94 params
95 end
96 end
97
98 def make_json_ld_header do
99 %{
100 "@context" => [
101 "https://www.w3.org/ns/activitystreams",
102 "#{Web.base_url()}/schemas/litepub-0.1.jsonld"
103 ]
104 }
105 end
106
107 def make_date do
108 DateTime.utc_now() |> DateTime.to_iso8601()
109 end
110
111 def generate_activity_id do
112 generate_id("activities")
113 end
114
115 def generate_context_id do
116 generate_id("contexts")
117 end
118
119 def generate_object_id do
120 Helpers.o_status_url(Endpoint, :object, UUID.generate())
121 end
122
123 def generate_id(type) do
124 "#{Web.base_url()}/#{type}/#{UUID.generate()}"
125 end
126
127 def get_notified_from_object(%{"type" => type} = object) when type in @supported_object_types do
128 fake_create_activity = %{
129 "to" => object["to"],
130 "cc" => object["cc"],
131 "type" => "Create",
132 "object" => object
133 }
134
135 Notification.get_notified_from_activity(%Activity{data: fake_create_activity}, false)
136 end
137
138 def get_notified_from_object(object) do
139 Notification.get_notified_from_activity(%Activity{data: object}, false)
140 end
141
142 def create_context(context) do
143 context = context || generate_id("contexts")
144 changeset = Object.context_mapping(context)
145
146 case Repo.insert(changeset) do
147 {:ok, object} ->
148 object
149
150 # This should be solved by an upsert, but it seems ecto
151 # has problems accessing the constraint inside the jsonb.
152 {:error, _} ->
153 Object.get_cached_by_ap_id(context)
154 end
155 end
156
157 @doc """
158 Enqueues an activity for federation if it's local
159 """
160 def maybe_federate(%Activity{local: true} = activity) do
161 priority =
162 case activity.data["type"] do
163 "Delete" -> 10
164 "Create" -> 1
165 _ -> 5
166 end
167
168 Pleroma.Web.Federator.publish(activity, priority)
169 :ok
170 end
171
172 def maybe_federate(_), do: :ok
173
174 @doc """
175 Adds an id and a published data if they aren't there,
176 also adds it to an included object
177 """
178 def lazy_put_activity_defaults(map) do
179 %{data: %{"id" => context}, id: context_id} = create_context(map["context"])
180
181 map =
182 map
183 |> Map.put_new_lazy("id", &generate_activity_id/0)
184 |> Map.put_new_lazy("published", &make_date/0)
185 |> Map.put_new("context", context)
186 |> Map.put_new("context_id", context_id)
187
188 if is_map(map["object"]) do
189 object = lazy_put_object_defaults(map["object"], map)
190 %{map | "object" => object}
191 else
192 map
193 end
194 end
195
196 @doc """
197 Adds an id and published date if they aren't there.
198 """
199 def lazy_put_object_defaults(map, activity \\ %{}) do
200 map
201 |> Map.put_new_lazy("id", &generate_object_id/0)
202 |> Map.put_new_lazy("published", &make_date/0)
203 |> Map.put_new("context", activity["context"])
204 |> Map.put_new("context_id", activity["context_id"])
205 end
206
207 @doc """
208 Inserts a full object if it is contained in an activity.
209 """
210 def insert_full_object(%{"object" => %{"type" => type} = object_data})
211 when is_map(object_data) and type in @supported_object_types do
212 with {:ok, object} <- Object.create(object_data) do
213 {:ok, object}
214 end
215 end
216
217 def insert_full_object(_), do: {:ok, nil}
218
219 def update_object_in_activities(%{data: %{"id" => id}} = object) do
220 # TODO
221 # Update activities that already had this. Could be done in a seperate process.
222 # Alternatively, just don't do this and fetch the current object each time. Most
223 # could probably be taken from cache.
224 relevant_activities = Activity.get_all_create_by_object_ap_id(id)
225
226 Enum.map(relevant_activities, fn activity ->
227 new_activity_data = activity.data |> Map.put("object", object.data)
228 changeset = Changeset.change(activity, data: new_activity_data)
229 Repo.update(changeset)
230 end)
231 end
232
233 #### Like-related helpers
234
235 @doc """
236 Returns an existing like if a user already liked an object
237 """
238 def get_existing_like(actor, %{data: %{"id" => id}}) do
239 query =
240 from(
241 activity in Activity,
242 where: fragment("(?)->>'actor' = ?", activity.data, ^actor),
243 # this is to use the index
244 where:
245 fragment(
246 "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
247 activity.data,
248 activity.data,
249 ^id
250 ),
251 where: fragment("(?)->>'type' = 'Like'", activity.data)
252 )
253
254 Repo.one(query)
255 end
256
257 @doc """
258 Returns like activities targeting an object
259 """
260 def get_object_likes(%{data: %{"id" => id}}) do
261 query =
262 from(
263 activity in Activity,
264 # this is to use the index
265 where:
266 fragment(
267 "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
268 activity.data,
269 activity.data,
270 ^id
271 ),
272 where: fragment("(?)->>'type' = 'Like'", activity.data)
273 )
274
275 Repo.all(query)
276 end
277
278 def make_like_data(
279 %User{ap_id: ap_id} = actor,
280 %{data: %{"actor" => object_actor_id, "id" => id}} = object,
281 activity_id
282 ) do
283 object_actor = User.get_cached_by_ap_id(object_actor_id)
284
285 to =
286 if Visibility.is_public?(object) do
287 [actor.follower_address, object.data["actor"]]
288 else
289 [object.data["actor"]]
290 end
291
292 cc =
293 (object.data["to"] ++ (object.data["cc"] || []))
294 |> List.delete(actor.ap_id)
295 |> List.delete(object_actor.follower_address)
296
297 data = %{
298 "type" => "Like",
299 "actor" => ap_id,
300 "object" => id,
301 "to" => to,
302 "cc" => cc,
303 "context" => object.data["context"]
304 }
305
306 if activity_id, do: Map.put(data, "id", activity_id), else: data
307 end
308
309 def update_element_in_object(property, element, object) do
310 with new_data <-
311 object.data
312 |> Map.put("#{property}_count", length(element))
313 |> Map.put("#{property}s", element),
314 changeset <- Changeset.change(object, data: new_data),
315 {:ok, object} <- Object.update_and_set_cache(changeset),
316 _ <- update_object_in_activities(object) do
317 {:ok, object}
318 end
319 end
320
321 def update_likes_in_object(likes, object) do
322 update_element_in_object("like", likes, object)
323 end
324
325 def add_like_to_object(%Activity{data: %{"actor" => actor}}, object) do
326 likes = if is_list(object.data["likes"]), do: object.data["likes"], else: []
327
328 with likes <- [actor | likes] |> Enum.uniq() do
329 update_likes_in_object(likes, object)
330 end
331 end
332
333 def remove_like_from_object(%Activity{data: %{"actor" => actor}}, object) do
334 likes = if is_list(object.data["likes"]), do: object.data["likes"], else: []
335
336 with likes <- likes |> List.delete(actor) do
337 update_likes_in_object(likes, object)
338 end
339 end
340
341 #### Follow-related helpers
342
343 @doc """
344 Updates a follow activity's state (for locked accounts).
345 """
346 def update_follow_state(
347 %Activity{data: %{"actor" => actor, "object" => object, "state" => "pending"}} = activity,
348 state
349 ) do
350 try do
351 Ecto.Adapters.SQL.query!(
352 Repo,
353 "UPDATE activities SET data = jsonb_set(data, '{state}', $1) WHERE data->>'type' = 'Follow' AND data->>'actor' = $2 AND data->>'object' = $3 AND data->>'state' = 'pending'",
354 [state, actor, object]
355 )
356
357 activity = Repo.get(Activity, activity.id)
358 {:ok, activity}
359 rescue
360 e ->
361 {:error, e}
362 end
363 end
364
365 def update_follow_state(%Activity{} = activity, state) do
366 with new_data <-
367 activity.data
368 |> Map.put("state", state),
369 changeset <- Changeset.change(activity, data: new_data),
370 {:ok, activity} <- Repo.update(changeset) do
371 {:ok, activity}
372 end
373 end
374
375 @doc """
376 Makes a follow activity data for the given follower and followed
377 """
378 def make_follow_data(
379 %User{ap_id: follower_id},
380 %User{ap_id: followed_id} = _followed,
381 activity_id
382 ) do
383 data = %{
384 "type" => "Follow",
385 "actor" => follower_id,
386 "to" => [followed_id],
387 "cc" => ["https://www.w3.org/ns/activitystreams#Public"],
388 "object" => followed_id,
389 "state" => "pending"
390 }
391
392 data = if activity_id, do: Map.put(data, "id", activity_id), else: data
393
394 data
395 end
396
397 def fetch_latest_follow(%User{ap_id: follower_id}, %User{ap_id: followed_id}) do
398 query =
399 from(
400 activity in Activity,
401 where:
402 fragment(
403 "? ->> 'type' = 'Follow'",
404 activity.data
405 ),
406 where: activity.actor == ^follower_id,
407 # this is to use the index
408 where:
409 fragment(
410 "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
411 activity.data,
412 activity.data,
413 ^followed_id
414 ),
415 order_by: [fragment("? desc nulls last", activity.id)],
416 limit: 1
417 )
418
419 Repo.one(query)
420 end
421
422 #### Announce-related helpers
423
424 @doc """
425 Retruns an existing announce activity if the notice has already been announced
426 """
427 def get_existing_announce(actor, %{data: %{"id" => id}}) do
428 query =
429 from(
430 activity in Activity,
431 where: activity.actor == ^actor,
432 # this is to use the index
433 where:
434 fragment(
435 "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
436 activity.data,
437 activity.data,
438 ^id
439 ),
440 where: fragment("(?)->>'type' = 'Announce'", activity.data)
441 )
442
443 Repo.one(query)
444 end
445
446 @doc """
447 Make announce activity data for the given actor and object
448 """
449 # for relayed messages, we only want to send to subscribers
450 def make_announce_data(
451 %User{ap_id: ap_id} = user,
452 %Object{data: %{"id" => id}} = object,
453 activity_id,
454 false
455 ) do
456 data = %{
457 "type" => "Announce",
458 "actor" => ap_id,
459 "object" => id,
460 "to" => [user.follower_address],
461 "cc" => [],
462 "context" => object.data["context"]
463 }
464
465 if activity_id, do: Map.put(data, "id", activity_id), else: data
466 end
467
468 def make_announce_data(
469 %User{ap_id: ap_id} = user,
470 %Object{data: %{"id" => id}} = object,
471 activity_id,
472 true
473 ) do
474 data = %{
475 "type" => "Announce",
476 "actor" => ap_id,
477 "object" => id,
478 "to" => [user.follower_address, object.data["actor"]],
479 "cc" => ["https://www.w3.org/ns/activitystreams#Public"],
480 "context" => object.data["context"]
481 }
482
483 if activity_id, do: Map.put(data, "id", activity_id), else: data
484 end
485
486 @doc """
487 Make unannounce activity data for the given actor and object
488 """
489 def make_unannounce_data(
490 %User{ap_id: ap_id} = user,
491 %Activity{data: %{"context" => context}} = activity,
492 activity_id
493 ) do
494 data = %{
495 "type" => "Undo",
496 "actor" => ap_id,
497 "object" => activity.data,
498 "to" => [user.follower_address, activity.data["actor"]],
499 "cc" => ["https://www.w3.org/ns/activitystreams#Public"],
500 "context" => context
501 }
502
503 if activity_id, do: Map.put(data, "id", activity_id), else: data
504 end
505
506 def make_unlike_data(
507 %User{ap_id: ap_id} = user,
508 %Activity{data: %{"context" => context}} = activity,
509 activity_id
510 ) do
511 data = %{
512 "type" => "Undo",
513 "actor" => ap_id,
514 "object" => activity.data,
515 "to" => [user.follower_address, activity.data["actor"]],
516 "cc" => ["https://www.w3.org/ns/activitystreams#Public"],
517 "context" => context
518 }
519
520 if activity_id, do: Map.put(data, "id", activity_id), else: data
521 end
522
523 def add_announce_to_object(
524 %Activity{
525 data: %{"actor" => actor, "cc" => ["https://www.w3.org/ns/activitystreams#Public"]}
526 },
527 object
528 ) do
529 announcements =
530 if is_list(object.data["announcements"]), do: object.data["announcements"], else: []
531
532 with announcements <- [actor | announcements] |> Enum.uniq() do
533 update_element_in_object("announcement", announcements, object)
534 end
535 end
536
537 def add_announce_to_object(_, object), do: {:ok, object}
538
539 def remove_announce_from_object(%Activity{data: %{"actor" => actor}}, object) do
540 announcements =
541 if is_list(object.data["announcements"]), do: object.data["announcements"], else: []
542
543 with announcements <- announcements |> List.delete(actor) do
544 update_element_in_object("announcement", announcements, object)
545 end
546 end
547
548 #### Unfollow-related helpers
549
550 def make_unfollow_data(follower, followed, follow_activity, activity_id) do
551 data = %{
552 "type" => "Undo",
553 "actor" => follower.ap_id,
554 "to" => [followed.ap_id],
555 "object" => follow_activity.data
556 }
557
558 if activity_id, do: Map.put(data, "id", activity_id), else: data
559 end
560
561 #### Block-related helpers
562 def fetch_latest_block(%User{ap_id: blocker_id}, %User{ap_id: blocked_id}) do
563 query =
564 from(
565 activity in Activity,
566 where:
567 fragment(
568 "? ->> 'type' = 'Block'",
569 activity.data
570 ),
571 where: activity.actor == ^blocker_id,
572 # this is to use the index
573 where:
574 fragment(
575 "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
576 activity.data,
577 activity.data,
578 ^blocked_id
579 ),
580 order_by: [fragment("? desc nulls last", activity.id)],
581 limit: 1
582 )
583
584 Repo.one(query)
585 end
586
587 def make_block_data(blocker, blocked, activity_id) do
588 data = %{
589 "type" => "Block",
590 "actor" => blocker.ap_id,
591 "to" => [blocked.ap_id],
592 "object" => blocked.ap_id
593 }
594
595 if activity_id, do: Map.put(data, "id", activity_id), else: data
596 end
597
598 def make_unblock_data(blocker, blocked, block_activity, activity_id) do
599 data = %{
600 "type" => "Undo",
601 "actor" => blocker.ap_id,
602 "to" => [blocked.ap_id],
603 "object" => block_activity.data
604 }
605
606 if activity_id, do: Map.put(data, "id", activity_id), else: data
607 end
608
609 #### Create-related helpers
610
611 def make_create_data(params, additional) do
612 published = params.published || make_date()
613
614 %{
615 "type" => "Create",
616 "to" => params.to |> Enum.uniq(),
617 "actor" => params.actor.ap_id,
618 "object" => params.object,
619 "published" => published,
620 "context" => params.context
621 }
622 |> Map.merge(additional)
623 end
624
625 #### Flag-related helpers
626
627 def make_flag_data(params, additional) do
628 status_ap_ids =
629 Enum.map(params.statuses || [], fn
630 %Activity{} = act -> act.data["id"]
631 act when is_map(act) -> act["id"]
632 act when is_binary(act) -> act
633 end)
634
635 object = [params.account.ap_id] ++ status_ap_ids
636
637 %{
638 "type" => "Flag",
639 "actor" => params.actor.ap_id,
640 "content" => params.content,
641 "object" => object,
642 "context" => params.context
643 }
644 |> Map.merge(additional)
645 end
646
647 @doc """
648 Fetches the OrderedCollection/OrderedCollectionPage from `from`, limiting the amount of pages fetched after
649 the first one to `pages_left` pages.
650 If the amount of pages is higher than the collection has, it returns whatever was there.
651 """
652 def fetch_ordered_collection(from, pages_left, acc \\ []) do
653 with {:ok, response} <- Tesla.get(from),
654 {:ok, collection} <- Poison.decode(response.body) do
655 case collection["type"] do
656 "OrderedCollection" ->
657 # If we've encountered the OrderedCollection and not the page,
658 # just call the same function on the page address
659 fetch_ordered_collection(collection["first"], pages_left)
660
661 "OrderedCollectionPage" ->
662 if pages_left > 0 do
663 # There are still more pages
664 if Map.has_key?(collection, "next") do
665 # There are still more pages, go deeper saving what we have into the accumulator
666 fetch_ordered_collection(
667 collection["next"],
668 pages_left - 1,
669 acc ++ collection["orderedItems"]
670 )
671 else
672 # No more pages left, just return whatever we already have
673 acc ++ collection["orderedItems"]
674 end
675 else
676 # Got the amount of pages needed, add them all to the accumulator
677 acc ++ collection["orderedItems"]
678 end
679
680 _ ->
681 {:error, "Not an OrderedCollection or OrderedCollectionPage"}
682 end
683 end
684 end
685 end