Add Pipeline module, test for federation.
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Conversation
10 alias Pleroma.Notification
11 alias Pleroma.Object
12 alias Pleroma.Object.Containment
13 alias Pleroma.Object.Fetcher
14 alias Pleroma.Pagination
15 alias Pleroma.Repo
16 alias Pleroma.Upload
17 alias Pleroma.User
18 alias Pleroma.Web.ActivityPub.MRF
19 alias Pleroma.Web.ActivityPub.Transmogrifier
20 alias Pleroma.Web.ActivityPub.Utils
21 alias Pleroma.Web.Streamer
22 alias Pleroma.Web.WebFinger
23 alias Pleroma.Workers.BackgroundWorker
24
25 import Ecto.Query
26 import Pleroma.Web.ActivityPub.Utils
27 import Pleroma.Web.ActivityPub.Visibility
28
29 require Logger
30 require Pleroma.Constants
31
32 # For Announce activities, we filter the recipients based on following status for any actors
33 # that match actual users. See issue #164 for more information about why this is necessary.
34 defp get_recipients(%{"type" => "Announce"} = data) do
35 to = Map.get(data, "to", [])
36 cc = Map.get(data, "cc", [])
37 bcc = Map.get(data, "bcc", [])
38 actor = User.get_cached_by_ap_id(data["actor"])
39
40 recipients =
41 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
42 case User.get_cached_by_ap_id(recipient) do
43 nil -> true
44 user -> User.following?(user, actor)
45 end
46 end)
47
48 {recipients, to, cc}
49 end
50
51 defp get_recipients(%{"type" => "Create"} = data) do
52 to = Map.get(data, "to", [])
53 cc = Map.get(data, "cc", [])
54 bcc = Map.get(data, "bcc", [])
55 actor = Map.get(data, "actor", [])
56 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
57 {recipients, to, cc}
58 end
59
60 defp get_recipients(data) do
61 to = Map.get(data, "to", [])
62 cc = Map.get(data, "cc", [])
63 bcc = Map.get(data, "bcc", [])
64 recipients = Enum.concat([to, cc, bcc])
65 {recipients, to, cc}
66 end
67
68 defp check_actor_is_active(actor) do
69 if not is_nil(actor) do
70 with user <- User.get_cached_by_ap_id(actor),
71 false <- user.info.deactivated do
72 true
73 else
74 _e -> false
75 end
76 else
77 true
78 end
79 end
80
81 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
82 limit = Config.get([:instance, :remote_limit])
83 String.length(content) <= limit
84 end
85
86 defp check_remote_limit(_), do: true
87
88 def increase_note_count_if_public(actor, object) do
89 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
90 end
91
92 def decrease_note_count_if_public(actor, object) do
93 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
94 end
95
96 def increase_replies_count_if_reply(%{
97 "object" => %{"inReplyTo" => reply_ap_id} = object,
98 "type" => "Create"
99 }) do
100 if is_public?(object) do
101 Object.increase_replies_count(reply_ap_id)
102 end
103 end
104
105 def increase_replies_count_if_reply(_create_data), do: :noop
106
107 def decrease_replies_count_if_reply(%Object{
108 data: %{"inReplyTo" => reply_ap_id} = object
109 }) do
110 if is_public?(object) do
111 Object.decrease_replies_count(reply_ap_id)
112 end
113 end
114
115 def decrease_replies_count_if_reply(_object), do: :noop
116
117 def increase_poll_votes_if_vote(%{
118 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
119 "type" => "Create"
120 }) do
121 Object.increase_vote_count(reply_ap_id, name)
122 end
123
124 def increase_poll_votes_if_vote(_create_data), do: :noop
125
126 # TODO rewrite in with style
127 @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
128 def persist(object, meta) do
129 local = Keyword.fetch!(meta, :local)
130 {recipients, _, _} = get_recipients(object)
131
132 {:ok, activity} =
133 Repo.insert(%Activity{
134 data: object,
135 local: local,
136 recipients: recipients,
137 actor: object["actor"]
138 })
139
140 {:ok, activity, meta}
141 end
142
143 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
144 with nil <- Activity.normalize(map),
145 map <- lazy_put_activity_defaults(map, fake),
146 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
147 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
148 {:ok, map} <- MRF.filter(map),
149 {recipients, _, _} = get_recipients(map),
150 # ???
151 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
152 :ok <- Containment.contain_child(map),
153 {:ok, map, object} <- insert_full_object(map) do
154 {:ok, activity} =
155 Repo.insert(%Activity{
156 data: map,
157 local: local,
158 actor: map["actor"],
159 recipients: recipients
160 })
161
162 # Splice in the child object if we have one.
163 activity =
164 if not is_nil(object) do
165 Map.put(activity, :object, object)
166 else
167 activity
168 end
169
170 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
171
172 Notification.create_notifications(activity)
173
174 participations =
175 activity
176 |> Conversation.create_or_bump_for()
177 |> get_participations()
178
179 stream_out(activity)
180 stream_out_participations(participations)
181 {:ok, activity}
182 else
183 %Activity{} = activity ->
184 {:ok, activity}
185
186 {:fake, true, map, recipients} ->
187 activity = %Activity{
188 data: map,
189 local: local,
190 actor: map["actor"],
191 recipients: recipients,
192 id: "pleroma:fakeid"
193 }
194
195 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
196 {:ok, activity}
197
198 error ->
199 {:error, error}
200 end
201 end
202
203 defp get_participations({:ok, %{participations: participations}}), do: participations
204 defp get_participations(_), do: []
205
206 def stream_out_participations(participations) do
207 participations =
208 participations
209 |> Repo.preload(:user)
210
211 Streamer.stream("participation", participations)
212 end
213
214 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
215 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
216 conversation = Repo.preload(conversation, :participations),
217 last_activity_id =
218 fetch_latest_activity_id_for_context(conversation.ap_id, %{
219 "user" => user,
220 "blocking_user" => user
221 }) do
222 if last_activity_id do
223 stream_out_participations(conversation.participations)
224 end
225 end
226 end
227
228 def stream_out_participations(_, _), do: :noop
229
230 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
231 when data_type in ["Create", "Announce", "Delete"] do
232 activity
233 |> Topics.get_activity_topics()
234 |> Streamer.stream(activity)
235 end
236
237 def stream_out(_activity) do
238 :noop
239 end
240
241 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
242 additional = params[:additional] || %{}
243 # only accept false as false value
244 local = !(params[:local] == false)
245 published = params[:published]
246
247 with create_data <-
248 make_create_data(
249 %{to: to, actor: actor, published: published, context: context, object: object},
250 additional
251 ),
252 {:ok, activity} <- insert(create_data, local, fake),
253 {:fake, false, activity} <- {:fake, fake, activity},
254 _ <- increase_replies_count_if_reply(create_data),
255 _ <- increase_poll_votes_if_vote(create_data),
256 # Changing note count prior to enqueuing federation task in order to avoid
257 # race conditions on updating user.info
258 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
259 :ok <- maybe_federate(activity) do
260 {:ok, activity}
261 else
262 {:fake, true, activity} ->
263 {:ok, activity}
264
265 {:error, message} ->
266 {:error, message}
267 end
268 end
269
270 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
271 additional = params[:additional] || %{}
272 # only accept false as false value
273 local = !(params[:local] == false)
274 published = params[:published]
275
276 with listen_data <-
277 make_listen_data(
278 %{to: to, actor: actor, published: published, context: context, object: object},
279 additional
280 ),
281 {:ok, activity} <- insert(listen_data, local),
282 :ok <- maybe_federate(activity) do
283 {:ok, activity}
284 else
285 {:error, message} ->
286 {:error, message}
287 end
288 end
289
290 def accept(params) do
291 accept_or_reject("Accept", params)
292 end
293
294 def reject(params) do
295 accept_or_reject("Reject", params)
296 end
297
298 def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
299 local = Map.get(params, :local, true)
300 activity_id = Map.get(params, :activity_id, nil)
301
302 with data <-
303 %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
304 |> Utils.maybe_put("id", activity_id),
305 {:ok, activity} <- insert(data, local),
306 :ok <- maybe_federate(activity) do
307 {:ok, activity}
308 end
309 end
310
311 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
312 local = !(params[:local] == false)
313 activity_id = params[:activity_id]
314
315 with data <- %{
316 "to" => to,
317 "cc" => cc,
318 "type" => "Update",
319 "actor" => actor,
320 "object" => object
321 },
322 data <- Utils.maybe_put(data, "id", activity_id),
323 {:ok, activity} <- insert(data, local),
324 :ok <- maybe_federate(activity) do
325 {:ok, activity}
326 end
327 end
328
329 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
330 def like(
331 %User{ap_id: ap_id} = user,
332 %Object{data: %{"id" => _}} = object,
333 activity_id \\ nil,
334 local \\ true
335 ) do
336 with nil <- get_existing_like(ap_id, object),
337 like_data <- make_like_data(user, object, activity_id),
338 {:ok, activity} <- insert(like_data, local),
339 {:ok, object} <- add_like_to_object(activity, object),
340 :ok <- maybe_federate(activity) do
341 {:ok, activity, object}
342 else
343 %Activity{} = activity -> {:ok, activity, object}
344 error -> {:error, error}
345 end
346 end
347
348 def unlike(%User{} = actor, %Object{} = object, activity_id \\ nil, local \\ true) do
349 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
350 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
351 {:ok, unlike_activity} <- insert(unlike_data, local),
352 {:ok, _activity} <- Repo.delete(like_activity),
353 {:ok, object} <- remove_like_from_object(like_activity, object),
354 :ok <- maybe_federate(unlike_activity) do
355 {:ok, unlike_activity, like_activity, object}
356 else
357 _e -> {:ok, object}
358 end
359 end
360
361 def announce(
362 %User{ap_id: _} = user,
363 %Object{data: %{"id" => _}} = object,
364 activity_id \\ nil,
365 local \\ true,
366 public \\ true
367 ) do
368 with true <- is_announceable?(object, user, public),
369 announce_data <- make_announce_data(user, object, activity_id, public),
370 {:ok, activity} <- insert(announce_data, local),
371 {:ok, object} <- add_announce_to_object(activity, object),
372 :ok <- maybe_federate(activity) do
373 {:ok, activity, object}
374 else
375 error -> {:error, error}
376 end
377 end
378
379 def unannounce(
380 %User{} = actor,
381 %Object{} = object,
382 activity_id \\ nil,
383 local \\ true
384 ) do
385 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
386 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
387 {:ok, unannounce_activity} <- insert(unannounce_data, local),
388 :ok <- maybe_federate(unannounce_activity),
389 {:ok, _activity} <- Repo.delete(announce_activity),
390 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
391 {:ok, unannounce_activity, object}
392 else
393 _e -> {:ok, object}
394 end
395 end
396
397 def follow(follower, followed, activity_id \\ nil, local \\ true) do
398 with data <- make_follow_data(follower, followed, activity_id),
399 {:ok, activity} <- insert(data, local),
400 :ok <- maybe_federate(activity),
401 _ <- User.set_follow_state_cache(follower.ap_id, followed.ap_id, activity.data["state"]) do
402 {:ok, activity}
403 end
404 end
405
406 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
407 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
408 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
409 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
410 {:ok, activity} <- insert(unfollow_data, local),
411 :ok <- maybe_federate(activity) do
412 {:ok, activity}
413 end
414 end
415
416 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
417 with data <- %{
418 "to" => [follower_address],
419 "type" => "Delete",
420 "actor" => ap_id,
421 "object" => %{"type" => "Person", "id" => ap_id}
422 },
423 {:ok, activity} <- insert(data, true, true, true),
424 :ok <- maybe_federate(activity) do
425 {:ok, user}
426 end
427 end
428
429 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, options \\ []) do
430 local = Keyword.get(options, :local, true)
431 activity_id = Keyword.get(options, :activity_id, nil)
432 actor = Keyword.get(options, :actor, actor)
433
434 user = User.get_cached_by_ap_id(actor)
435 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
436
437 with {:ok, object, activity} <- Object.delete(object),
438 data <-
439 %{
440 "type" => "Delete",
441 "actor" => actor,
442 "object" => id,
443 "to" => to,
444 "deleted_activity_id" => activity && activity.id
445 }
446 |> maybe_put("id", activity_id),
447 {:ok, activity} <- insert(data, local, false),
448 stream_out_participations(object, user),
449 _ <- decrease_replies_count_if_reply(object),
450 # Changing note count prior to enqueuing federation task in order to avoid
451 # race conditions on updating user.info
452 {:ok, _actor} <- decrease_note_count_if_public(user, object),
453 :ok <- maybe_federate(activity) do
454 {:ok, activity}
455 end
456 end
457
458 @spec block(User.t(), User.t(), String.t() | nil, boolean) :: {:ok, Activity.t() | nil}
459 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
460 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
461 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
462
463 if unfollow_blocked do
464 follow_activity = fetch_latest_follow(blocker, blocked)
465 if follow_activity, do: unfollow(blocker, blocked, nil, local)
466 end
467
468 with true <- outgoing_blocks,
469 block_data <- make_block_data(blocker, blocked, activity_id),
470 {:ok, activity} <- insert(block_data, local),
471 :ok <- maybe_federate(activity) do
472 {:ok, activity}
473 else
474 _e -> {:ok, nil}
475 end
476 end
477
478 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
479 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
480 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
481 {:ok, activity} <- insert(unblock_data, local),
482 :ok <- maybe_federate(activity) do
483 {:ok, activity}
484 end
485 end
486
487 @spec flag(map()) :: {:ok, Activity.t()} | any
488 def flag(
489 %{
490 actor: actor,
491 context: _context,
492 account: account,
493 statuses: statuses,
494 content: content
495 } = params
496 ) do
497 # only accept false as false value
498 local = !(params[:local] == false)
499 forward = !(params[:forward] == false)
500
501 additional = params[:additional] || %{}
502
503 additional =
504 if forward do
505 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
506 else
507 Map.merge(additional, %{"to" => [], "cc" => []})
508 end
509
510 with flag_data <- make_flag_data(params, additional),
511 {:ok, activity} <- insert(flag_data, local),
512 :ok <- maybe_federate(activity) do
513 Enum.each(User.all_superusers(), fn superuser ->
514 superuser
515 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
516 |> Pleroma.Emails.Mailer.deliver_async()
517 end)
518
519 {:ok, activity}
520 end
521 end
522
523 defp fetch_activities_for_context_query(context, opts) do
524 public = [Pleroma.Constants.as_public()]
525
526 recipients =
527 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
528
529 from(activity in Activity)
530 |> maybe_preload_objects(opts)
531 |> maybe_preload_bookmarks(opts)
532 |> maybe_set_thread_muted_field(opts)
533 |> restrict_blocked(opts)
534 |> restrict_recipients(recipients, opts["user"])
535 |> where(
536 [activity],
537 fragment(
538 "?->>'type' = ? and ?->>'context' = ?",
539 activity.data,
540 "Create",
541 activity.data,
542 ^context
543 )
544 )
545 |> exclude_poll_votes(opts)
546 |> exclude_id(opts)
547 |> order_by([activity], desc: activity.id)
548 end
549
550 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
551 def fetch_activities_for_context(context, opts \\ %{}) do
552 context
553 |> fetch_activities_for_context_query(opts)
554 |> Repo.all()
555 end
556
557 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
558 FlakeId.Ecto.CompatType.t() | nil
559 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
560 context
561 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
562 |> limit(1)
563 |> select([a], a.id)
564 |> Repo.one()
565 end
566
567 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
568 opts = Map.drop(opts, ["user"])
569
570 [Pleroma.Constants.as_public()]
571 |> fetch_activities_query(opts)
572 |> restrict_unlisted()
573 |> Pagination.fetch_paginated(opts, pagination)
574 |> Enum.reverse()
575 end
576
577 @valid_visibilities ~w[direct unlisted public private]
578
579 defp restrict_visibility(query, %{visibility: visibility})
580 when is_list(visibility) do
581 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
582 query =
583 from(
584 a in query,
585 where:
586 fragment(
587 "activity_visibility(?, ?, ?) = ANY (?)",
588 a.actor,
589 a.recipients,
590 a.data,
591 ^visibility
592 )
593 )
594
595 query
596 else
597 Logger.error("Could not restrict visibility to #{visibility}")
598 end
599 end
600
601 defp restrict_visibility(query, %{visibility: visibility})
602 when visibility in @valid_visibilities do
603 from(
604 a in query,
605 where:
606 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
607 )
608 end
609
610 defp restrict_visibility(_query, %{visibility: visibility})
611 when visibility not in @valid_visibilities do
612 Logger.error("Could not restrict visibility to #{visibility}")
613 end
614
615 defp restrict_visibility(query, _visibility), do: query
616
617 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
618 do: query
619
620 defp restrict_thread_visibility(
621 query,
622 %{"user" => %User{info: %{skip_thread_containment: true}}},
623 _
624 ),
625 do: query
626
627 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
628 from(
629 a in query,
630 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
631 )
632 end
633
634 defp restrict_thread_visibility(query, _, _), do: query
635
636 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
637 params =
638 params
639 |> Map.put("user", reading_user)
640 |> Map.put("actor_id", user.ap_id)
641 |> Map.put("whole_db", true)
642
643 recipients =
644 user_activities_recipients(%{
645 "godmode" => params["godmode"],
646 "reading_user" => reading_user
647 })
648
649 fetch_activities(recipients, params)
650 |> Enum.reverse()
651 end
652
653 def fetch_user_activities(user, reading_user, params \\ %{}) do
654 params =
655 params
656 |> Map.put("type", ["Create", "Announce"])
657 |> Map.put("user", reading_user)
658 |> Map.put("actor_id", user.ap_id)
659 |> Map.put("whole_db", true)
660 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
661
662 recipients =
663 user_activities_recipients(%{
664 "godmode" => params["godmode"],
665 "reading_user" => reading_user
666 })
667
668 fetch_activities(recipients, params)
669 |> Enum.reverse()
670 end
671
672 defp user_activities_recipients(%{"godmode" => true}) do
673 []
674 end
675
676 defp user_activities_recipients(%{"reading_user" => reading_user}) do
677 if reading_user do
678 [Pleroma.Constants.as_public()] ++ [reading_user.ap_id | reading_user.following]
679 else
680 [Pleroma.Constants.as_public()]
681 end
682 end
683
684 defp restrict_since(query, %{"since_id" => ""}), do: query
685
686 defp restrict_since(query, %{"since_id" => since_id}) do
687 from(activity in query, where: activity.id > ^since_id)
688 end
689
690 defp restrict_since(query, _), do: query
691
692 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
693 raise "Can't use the child object without preloading!"
694 end
695
696 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
697 when is_list(tag_reject) and tag_reject != [] do
698 from(
699 [_activity, object] in query,
700 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
701 )
702 end
703
704 defp restrict_tag_reject(query, _), do: query
705
706 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
707 raise "Can't use the child object without preloading!"
708 end
709
710 defp restrict_tag_all(query, %{"tag_all" => tag_all})
711 when is_list(tag_all) and tag_all != [] do
712 from(
713 [_activity, object] in query,
714 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
715 )
716 end
717
718 defp restrict_tag_all(query, _), do: query
719
720 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
721 raise "Can't use the child object without preloading!"
722 end
723
724 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
725 from(
726 [_activity, object] in query,
727 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
728 )
729 end
730
731 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
732 from(
733 [_activity, object] in query,
734 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
735 )
736 end
737
738 defp restrict_tag(query, _), do: query
739
740 defp restrict_recipients(query, [], _user), do: query
741
742 defp restrict_recipients(query, recipients, nil) do
743 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
744 end
745
746 defp restrict_recipients(query, recipients, user) do
747 from(
748 activity in query,
749 where: fragment("? && ?", ^recipients, activity.recipients),
750 or_where: activity.actor == ^user.ap_id
751 )
752 end
753
754 defp restrict_local(query, %{"local_only" => true}) do
755 from(activity in query, where: activity.local == true)
756 end
757
758 defp restrict_local(query, _), do: query
759
760 defp restrict_actor(query, %{"actor_id" => actor_id}) do
761 from(activity in query, where: activity.actor == ^actor_id)
762 end
763
764 defp restrict_actor(query, _), do: query
765
766 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
767 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
768 end
769
770 defp restrict_type(query, %{"type" => type}) do
771 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
772 end
773
774 defp restrict_type(query, _), do: query
775
776 defp restrict_state(query, %{"state" => state}) do
777 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
778 end
779
780 defp restrict_state(query, _), do: query
781
782 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
783 from(
784 [_activity, object] in query,
785 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
786 )
787 end
788
789 defp restrict_favorited_by(query, _), do: query
790
791 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
792 raise "Can't use the child object without preloading!"
793 end
794
795 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
796 from(
797 [_activity, object] in query,
798 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
799 )
800 end
801
802 defp restrict_media(query, _), do: query
803
804 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
805 from(
806 [_activity, object] in query,
807 where: fragment("?->>'inReplyTo' is null", object.data)
808 )
809 end
810
811 defp restrict_replies(query, _), do: query
812
813 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
814 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
815 end
816
817 defp restrict_reblogs(query, _), do: query
818
819 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
820
821 defp restrict_muted(query, %{"muting_user" => %User{info: info}} = opts) do
822 mutes = info.mutes
823
824 query =
825 from([activity] in query,
826 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
827 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
828 )
829
830 unless opts["skip_preload"] do
831 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
832 else
833 query
834 end
835 end
836
837 defp restrict_muted(query, _), do: query
838
839 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
840 blocks = info.blocks || []
841 domain_blocks = info.domain_blocks || []
842
843 query =
844 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
845
846 from(
847 [activity, object: o] in query,
848 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
849 where: fragment("not (? && ?)", activity.recipients, ^blocks),
850 where:
851 fragment(
852 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
853 activity.data,
854 activity.data,
855 ^blocks
856 ),
857 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
858 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
859 )
860 end
861
862 defp restrict_blocked(query, _), do: query
863
864 defp restrict_unlisted(query) do
865 from(
866 activity in query,
867 where:
868 fragment(
869 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
870 activity.data,
871 ^[Pleroma.Constants.as_public()]
872 )
873 )
874 end
875
876 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
877 from(activity in query, where: activity.id in ^ids)
878 end
879
880 defp restrict_pinned(query, _), do: query
881
882 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
883 muted_reblogs = info.muted_reblogs || []
884
885 from(
886 activity in query,
887 where:
888 fragment(
889 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
890 activity.data,
891 activity.actor,
892 ^muted_reblogs
893 )
894 )
895 end
896
897 defp restrict_muted_reblogs(query, _), do: query
898
899 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
900
901 defp exclude_poll_votes(query, _) do
902 if has_named_binding?(query, :object) do
903 from([activity, object: o] in query,
904 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
905 )
906 else
907 query
908 end
909 end
910
911 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
912 from(activity in query, where: activity.id != ^id)
913 end
914
915 defp exclude_id(query, _), do: query
916
917 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
918
919 defp maybe_preload_objects(query, _) do
920 query
921 |> Activity.with_preloaded_object()
922 end
923
924 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
925
926 defp maybe_preload_bookmarks(query, opts) do
927 query
928 |> Activity.with_preloaded_bookmark(opts["user"])
929 end
930
931 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
932
933 defp maybe_set_thread_muted_field(query, opts) do
934 query
935 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
936 end
937
938 defp maybe_order(query, %{order: :desc}) do
939 query
940 |> order_by(desc: :id)
941 end
942
943 defp maybe_order(query, %{order: :asc}) do
944 query
945 |> order_by(asc: :id)
946 end
947
948 defp maybe_order(query, _), do: query
949
950 def fetch_activities_query(recipients, opts \\ %{}) do
951 config = %{
952 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
953 }
954
955 Activity
956 |> maybe_preload_objects(opts)
957 |> maybe_preload_bookmarks(opts)
958 |> maybe_set_thread_muted_field(opts)
959 |> maybe_order(opts)
960 |> restrict_recipients(recipients, opts["user"])
961 |> restrict_tag(opts)
962 |> restrict_tag_reject(opts)
963 |> restrict_tag_all(opts)
964 |> restrict_since(opts)
965 |> restrict_local(opts)
966 |> restrict_actor(opts)
967 |> restrict_type(opts)
968 |> restrict_state(opts)
969 |> restrict_favorited_by(opts)
970 |> restrict_blocked(opts)
971 |> restrict_muted(opts)
972 |> restrict_media(opts)
973 |> restrict_visibility(opts)
974 |> restrict_thread_visibility(opts, config)
975 |> restrict_replies(opts)
976 |> restrict_reblogs(opts)
977 |> restrict_pinned(opts)
978 |> restrict_muted_reblogs(opts)
979 |> Activity.restrict_deactivated_users()
980 |> exclude_poll_votes(opts)
981 end
982
983 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
984 list_memberships = Pleroma.List.memberships(opts["user"])
985
986 fetch_activities_query(recipients ++ list_memberships, opts)
987 |> Pagination.fetch_paginated(opts, pagination)
988 |> Enum.reverse()
989 |> maybe_update_cc(list_memberships, opts["user"])
990 end
991
992 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
993 when is_list(list_memberships) and length(list_memberships) > 0 do
994 Enum.map(activities, fn
995 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
996 if Enum.any?(bcc, &(&1 in list_memberships)) do
997 update_in(activity.data["cc"], &[user_ap_id | &1])
998 else
999 activity
1000 end
1001
1002 activity ->
1003 activity
1004 end)
1005 end
1006
1007 defp maybe_update_cc(activities, _, _), do: activities
1008
1009 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1010 from(activity in query,
1011 where:
1012 fragment("? && ?", activity.recipients, ^recipients) or
1013 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1014 ^Pleroma.Constants.as_public() in activity.recipients)
1015 )
1016 end
1017
1018 def fetch_activities_bounded(
1019 recipients,
1020 recipients_with_public,
1021 opts \\ %{},
1022 pagination \\ :keyset
1023 ) do
1024 fetch_activities_query([], opts)
1025 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1026 |> Pagination.fetch_paginated(opts, pagination)
1027 |> Enum.reverse()
1028 end
1029
1030 def upload(file, opts \\ []) do
1031 with {:ok, data} <- Upload.store(file, opts) do
1032 obj_data =
1033 if opts[:actor] do
1034 Map.put(data, "actor", opts[:actor])
1035 else
1036 data
1037 end
1038
1039 Repo.insert(%Object{data: obj_data})
1040 end
1041 end
1042
1043 defp object_to_user_data(data) do
1044 avatar =
1045 data["icon"]["url"] &&
1046 %{
1047 "type" => "Image",
1048 "url" => [%{"href" => data["icon"]["url"]}]
1049 }
1050
1051 banner =
1052 data["image"]["url"] &&
1053 %{
1054 "type" => "Image",
1055 "url" => [%{"href" => data["image"]["url"]}]
1056 }
1057
1058 fields =
1059 data
1060 |> Map.get("attachment", [])
1061 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1062 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1063
1064 locked = data["manuallyApprovesFollowers"] || false
1065 data = Transmogrifier.maybe_fix_user_object(data)
1066 discoverable = data["discoverable"] || false
1067
1068 user_data = %{
1069 ap_id: data["id"],
1070 info: %{
1071 ap_enabled: true,
1072 source_data: data,
1073 banner: banner,
1074 fields: fields,
1075 locked: locked,
1076 discoverable: discoverable
1077 },
1078 avatar: avatar,
1079 name: data["name"],
1080 follower_address: data["followers"],
1081 following_address: data["following"],
1082 bio: data["summary"]
1083 }
1084
1085 # nickname can be nil because of virtual actors
1086 user_data =
1087 if data["preferredUsername"] do
1088 Map.put(
1089 user_data,
1090 :nickname,
1091 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1092 )
1093 else
1094 Map.put(user_data, :nickname, nil)
1095 end
1096
1097 {:ok, user_data}
1098 end
1099
1100 def fetch_follow_information_for_user(user) do
1101 with {:ok, following_data} <-
1102 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1103 following_count when is_integer(following_count) <- following_data["totalItems"],
1104 {:ok, hide_follows} <- collection_private(following_data),
1105 {:ok, followers_data} <-
1106 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1107 followers_count when is_integer(followers_count) <- followers_data["totalItems"],
1108 {:ok, hide_followers} <- collection_private(followers_data) do
1109 {:ok,
1110 %{
1111 hide_follows: hide_follows,
1112 follower_count: followers_count,
1113 following_count: following_count,
1114 hide_followers: hide_followers
1115 }}
1116 else
1117 {:error, _} = e ->
1118 e
1119
1120 e ->
1121 {:error, e}
1122 end
1123 end
1124
1125 defp maybe_update_follow_information(data) do
1126 with {:enabled, true} <-
1127 {:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])},
1128 {:ok, info} <- fetch_follow_information_for_user(data) do
1129 info = Map.merge(data.info, info)
1130 Map.put(data, :info, info)
1131 else
1132 {:enabled, false} ->
1133 data
1134
1135 e ->
1136 Logger.error(
1137 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1138 )
1139
1140 data
1141 end
1142 end
1143
1144 defp collection_private(data) do
1145 if is_map(data["first"]) and
1146 data["first"]["type"] in ["CollectionPage", "OrderedCollectionPage"] do
1147 {:ok, false}
1148 else
1149 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1150 Fetcher.fetch_and_contain_remote_object_from_id(data["first"]) do
1151 {:ok, false}
1152 else
1153 {:error, {:ok, %{status: code}}} when code in [401, 403] ->
1154 {:ok, true}
1155
1156 {:error, _} = e ->
1157 e
1158
1159 e ->
1160 {:error, e}
1161 end
1162 end
1163 end
1164
1165 def user_data_from_user_object(data) do
1166 with {:ok, data} <- MRF.filter(data),
1167 {:ok, data} <- object_to_user_data(data) do
1168 {:ok, data}
1169 else
1170 e -> {:error, e}
1171 end
1172 end
1173
1174 def fetch_and_prepare_user_from_ap_id(ap_id) do
1175 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1176 {:ok, data} <- user_data_from_user_object(data),
1177 data <- maybe_update_follow_information(data) do
1178 {:ok, data}
1179 else
1180 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1181 end
1182 end
1183
1184 def make_user_from_ap_id(ap_id) do
1185 if _user = User.get_cached_by_ap_id(ap_id) do
1186 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1187 else
1188 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1189 User.insert_or_update_user(data)
1190 else
1191 e -> {:error, e}
1192 end
1193 end
1194 end
1195
1196 def make_user_from_nickname(nickname) do
1197 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1198 make_user_from_ap_id(ap_id)
1199 else
1200 _e -> {:error, "No AP id in WebFinger"}
1201 end
1202 end
1203
1204 # filter out broken threads
1205 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1206 entire_thread_visible_for_user?(activity, user)
1207 end
1208
1209 # do post-processing on a specific activity
1210 def contain_activity(%Activity{} = activity, %User{} = user) do
1211 contain_broken_threads(activity, user)
1212 end
1213
1214 def fetch_direct_messages_query do
1215 Activity
1216 |> restrict_type(%{"type" => "Create"})
1217 |> restrict_visibility(%{visibility: "direct"})
1218 |> order_by([activity], asc: activity.id)
1219 end
1220 end