Merge remote-tracking branch 'origin/develop' into benchmark-finishing
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Activity.Ir.Topics
8 alias Pleroma.Config
9 alias Pleroma.Conversation
10 alias Pleroma.Notification
11 alias Pleroma.Object
12 alias Pleroma.Object.Containment
13 alias Pleroma.Object.Fetcher
14 alias Pleroma.Pagination
15 alias Pleroma.Repo
16 alias Pleroma.Upload
17 alias Pleroma.User
18 alias Pleroma.Web.ActivityPub.MRF
19 alias Pleroma.Web.ActivityPub.Transmogrifier
20 alias Pleroma.Web.ActivityPub.Utils
21 alias Pleroma.Web.Streamer
22 alias Pleroma.Web.WebFinger
23 alias Pleroma.Workers.BackgroundWorker
24
25 import Ecto.Query
26 import Pleroma.Web.ActivityPub.Utils
27 import Pleroma.Web.ActivityPub.Visibility
28
29 require Logger
30 require Pleroma.Constants
31
32 # For Announce activities, we filter the recipients based on following status for any actors
33 # that match actual users. See issue #164 for more information about why this is necessary.
34 defp get_recipients(%{"type" => "Announce"} = data) do
35 to = Map.get(data, "to", [])
36 cc = Map.get(data, "cc", [])
37 bcc = Map.get(data, "bcc", [])
38 actor = User.get_cached_by_ap_id(data["actor"])
39
40 recipients =
41 Enum.filter(Enum.concat([to, cc, bcc]), fn recipient ->
42 case User.get_cached_by_ap_id(recipient) do
43 nil -> true
44 user -> User.following?(user, actor)
45 end
46 end)
47
48 {recipients, to, cc}
49 end
50
51 defp get_recipients(%{"type" => "Create"} = data) do
52 to = Map.get(data, "to", [])
53 cc = Map.get(data, "cc", [])
54 bcc = Map.get(data, "bcc", [])
55 actor = Map.get(data, "actor", [])
56 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
57 {recipients, to, cc}
58 end
59
60 defp get_recipients(data) do
61 to = Map.get(data, "to", [])
62 cc = Map.get(data, "cc", [])
63 bcc = Map.get(data, "bcc", [])
64 recipients = Enum.concat([to, cc, bcc])
65 {recipients, to, cc}
66 end
67
68 defp check_actor_is_active(actor) do
69 if not is_nil(actor) do
70 with user <- User.get_cached_by_ap_id(actor),
71 false <- user.info.deactivated do
72 true
73 else
74 _e -> false
75 end
76 else
77 true
78 end
79 end
80
81 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
82 limit = Config.get([:instance, :remote_limit])
83 String.length(content) <= limit
84 end
85
86 defp check_remote_limit(_), do: true
87
88 def increase_note_count_if_public(actor, object) do
89 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
90 end
91
92 def decrease_note_count_if_public(actor, object) do
93 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
94 end
95
96 def increase_replies_count_if_reply(%{
97 "object" => %{"inReplyTo" => reply_ap_id} = object,
98 "type" => "Create"
99 }) do
100 if is_public?(object) do
101 Object.increase_replies_count(reply_ap_id)
102 end
103 end
104
105 def increase_replies_count_if_reply(_create_data), do: :noop
106
107 def decrease_replies_count_if_reply(%Object{
108 data: %{"inReplyTo" => reply_ap_id} = object
109 }) do
110 if is_public?(object) do
111 Object.decrease_replies_count(reply_ap_id)
112 end
113 end
114
115 def decrease_replies_count_if_reply(_object), do: :noop
116
117 def increase_poll_votes_if_vote(%{
118 "object" => %{"inReplyTo" => reply_ap_id, "name" => name},
119 "type" => "Create"
120 }) do
121 Object.increase_vote_count(reply_ap_id, name)
122 end
123
124 def increase_poll_votes_if_vote(_create_data), do: :noop
125
126 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
127 with nil <- Activity.normalize(map),
128 map <- lazy_put_activity_defaults(map, fake),
129 true <- bypass_actor_check || check_actor_is_active(map["actor"]),
130 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
131 {:ok, map} <- MRF.filter(map),
132 {recipients, _, _} = get_recipients(map),
133 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
134 :ok <- Containment.contain_child(map),
135 {:ok, map, object} <- insert_full_object(map) do
136 {:ok, activity} =
137 Repo.insert(%Activity{
138 data: map,
139 local: local,
140 actor: map["actor"],
141 recipients: recipients
142 })
143
144 # Splice in the child object if we have one.
145 activity =
146 if not is_nil(object) do
147 Map.put(activity, :object, object)
148 else
149 activity
150 end
151
152 BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
153
154 Notification.create_notifications(activity)
155
156 participations =
157 activity
158 |> Conversation.create_or_bump_for()
159 |> get_participations()
160
161 stream_out(activity)
162 stream_out_participations(participations)
163 {:ok, activity}
164 else
165 %Activity{} = activity ->
166 {:ok, activity}
167
168 {:fake, true, map, recipients} ->
169 activity = %Activity{
170 data: map,
171 local: local,
172 actor: map["actor"],
173 recipients: recipients,
174 id: "pleroma:fakeid"
175 }
176
177 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
178 {:ok, activity}
179
180 error ->
181 {:error, error}
182 end
183 end
184
185 defp get_participations({:ok, %{participations: participations}}), do: participations
186 defp get_participations(_), do: []
187
188 def stream_out_participations(participations) do
189 participations =
190 participations
191 |> Repo.preload(:user)
192
193 Streamer.stream("participation", participations)
194 end
195
196 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
197 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context),
198 conversation = Repo.preload(conversation, :participations),
199 last_activity_id =
200 fetch_latest_activity_id_for_context(conversation.ap_id, %{
201 "user" => user,
202 "blocking_user" => user
203 }) do
204 if last_activity_id do
205 stream_out_participations(conversation.participations)
206 end
207 end
208 end
209
210 def stream_out_participations(_, _), do: :noop
211
212 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
213 when data_type in ["Create", "Announce", "Delete"] do
214 activity
215 |> Topics.get_activity_topics()
216 |> Streamer.stream(activity)
217 end
218
219 def stream_out(_activity) do
220 :noop
221 end
222
223 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
224 additional = params[:additional] || %{}
225 # only accept false as false value
226 local = !(params[:local] == false)
227 published = params[:published]
228 quick_insert? = Pleroma.Config.get([:env]) == :benchmark
229
230 with create_data <-
231 make_create_data(
232 %{to: to, actor: actor, published: published, context: context, object: object},
233 additional
234 ),
235 {:ok, activity} <- insert(create_data, local, fake),
236 {:fake, false, activity} <- {:fake, fake, activity},
237 _ <- increase_replies_count_if_reply(create_data),
238 _ <- increase_poll_votes_if_vote(create_data),
239 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
240 # Changing note count prior to enqueuing federation task in order to avoid
241 # race conditions on updating user.info
242 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
243 :ok <- maybe_federate(activity) do
244 {:ok, activity}
245 else
246 {:quick_insert, true, activity} ->
247 {:ok, activity}
248
249 {:fake, true, activity} ->
250 {:ok, activity}
251
252 {:error, message} ->
253 {:error, message}
254 end
255 end
256
257 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
258 additional = params[:additional] || %{}
259 # only accept false as false value
260 local = !(params[:local] == false)
261 published = params[:published]
262
263 with listen_data <-
264 make_listen_data(
265 %{to: to, actor: actor, published: published, context: context, object: object},
266 additional
267 ),
268 {:ok, activity} <- insert(listen_data, local),
269 :ok <- maybe_federate(activity) do
270 {:ok, activity}
271 else
272 {:error, message} ->
273 {:error, message}
274 end
275 end
276
277 def accept(%{to: to, actor: actor, object: object} = params) do
278 # only accept false as false value
279 local = !(params[:local] == false)
280
281 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
282 {:ok, activity} <- insert(data, local),
283 :ok <- maybe_federate(activity) do
284 {:ok, activity}
285 end
286 end
287
288 def reject(%{to: to, actor: actor, object: object} = params) do
289 # only accept false as false value
290 local = !(params[:local] == false)
291
292 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
293 {:ok, activity} <- insert(data, local),
294 :ok <- maybe_federate(activity) do
295 {:ok, activity}
296 end
297 end
298
299 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
300 local = !(params[:local] == false)
301 activity_id = params[:activity_id]
302
303 with data <- %{
304 "to" => to,
305 "cc" => cc,
306 "type" => "Update",
307 "actor" => actor,
308 "object" => object
309 },
310 data <- Utils.maybe_put(data, "id", activity_id),
311 {:ok, activity} <- insert(data, local),
312 :ok <- maybe_federate(activity) do
313 {:ok, activity}
314 end
315 end
316
317 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
318 def like(
319 %User{ap_id: ap_id} = user,
320 %Object{data: %{"id" => _}} = object,
321 activity_id \\ nil,
322 local \\ true
323 ) do
324 with nil <- get_existing_like(ap_id, object),
325 like_data <- make_like_data(user, object, activity_id),
326 {:ok, activity} <- insert(like_data, local),
327 {:ok, object} <- add_like_to_object(activity, object),
328 :ok <- maybe_federate(activity) do
329 {:ok, activity, object}
330 else
331 %Activity{} = activity -> {:ok, activity, object}
332 error -> {:error, error}
333 end
334 end
335
336 def unlike(%User{} = actor, %Object{} = object, activity_id \\ nil, local \\ true) do
337 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
338 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
339 {:ok, unlike_activity} <- insert(unlike_data, local),
340 {:ok, _activity} <- Repo.delete(like_activity),
341 {:ok, object} <- remove_like_from_object(like_activity, object),
342 :ok <- maybe_federate(unlike_activity) do
343 {:ok, unlike_activity, like_activity, object}
344 else
345 _e -> {:ok, object}
346 end
347 end
348
349 def announce(
350 %User{ap_id: _} = user,
351 %Object{data: %{"id" => _}} = object,
352 activity_id \\ nil,
353 local \\ true,
354 public \\ true
355 ) do
356 with true <- is_announceable?(object, user, public),
357 announce_data <- make_announce_data(user, object, activity_id, public),
358 {:ok, activity} <- insert(announce_data, local),
359 {:ok, object} <- add_announce_to_object(activity, object),
360 :ok <- maybe_federate(activity) do
361 {:ok, activity, object}
362 else
363 error -> {:error, error}
364 end
365 end
366
367 def unannounce(
368 %User{} = actor,
369 %Object{} = object,
370 activity_id \\ nil,
371 local \\ true
372 ) do
373 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
374 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
375 {:ok, unannounce_activity} <- insert(unannounce_data, local),
376 :ok <- maybe_federate(unannounce_activity),
377 {:ok, _activity} <- Repo.delete(announce_activity),
378 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
379 {:ok, unannounce_activity, object}
380 else
381 _e -> {:ok, object}
382 end
383 end
384
385 def follow(follower, followed, activity_id \\ nil, local \\ true) do
386 with data <- make_follow_data(follower, followed, activity_id),
387 {:ok, activity} <- insert(data, local),
388 :ok <- maybe_federate(activity),
389 _ <- User.set_follow_state_cache(follower.ap_id, followed.ap_id, activity.data["state"]) do
390 {:ok, activity}
391 end
392 end
393
394 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
395 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
396 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
397 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
398 {:ok, activity} <- insert(unfollow_data, local),
399 :ok <- maybe_federate(activity) do
400 {:ok, activity}
401 end
402 end
403
404 def delete(%User{ap_id: ap_id, follower_address: follower_address} = user) do
405 with data <- %{
406 "to" => [follower_address],
407 "type" => "Delete",
408 "actor" => ap_id,
409 "object" => %{"type" => "Person", "id" => ap_id}
410 },
411 {:ok, activity} <- insert(data, true, true, true),
412 :ok <- maybe_federate(activity) do
413 {:ok, user}
414 end
415 end
416
417 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
418 user = User.get_cached_by_ap_id(actor)
419 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
420
421 with {:ok, object, activity} <- Object.delete(object),
422 data <- %{
423 "type" => "Delete",
424 "actor" => actor,
425 "object" => id,
426 "to" => to,
427 "deleted_activity_id" => activity && activity.id
428 },
429 {:ok, activity} <- insert(data, local, false),
430 stream_out_participations(object, user),
431 _ <- decrease_replies_count_if_reply(object),
432 # Changing note count prior to enqueuing federation task in order to avoid
433 # race conditions on updating user.info
434 {:ok, _actor} <- decrease_note_count_if_public(user, object),
435 :ok <- maybe_federate(activity) do
436 {:ok, activity}
437 end
438 end
439
440 @spec block(User.t(), User.t(), String.t() | nil, boolean) :: {:ok, Activity.t() | nil}
441 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
442 outgoing_blocks = Config.get([:activitypub, :outgoing_blocks])
443 unfollow_blocked = Config.get([:activitypub, :unfollow_blocked])
444
445 if unfollow_blocked do
446 follow_activity = fetch_latest_follow(blocker, blocked)
447 if follow_activity, do: unfollow(blocker, blocked, nil, local)
448 end
449
450 with true <- outgoing_blocks,
451 block_data <- make_block_data(blocker, blocked, activity_id),
452 {:ok, activity} <- insert(block_data, local),
453 :ok <- maybe_federate(activity) do
454 {:ok, activity}
455 else
456 _e -> {:ok, nil}
457 end
458 end
459
460 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
461 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
462 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
463 {:ok, activity} <- insert(unblock_data, local),
464 :ok <- maybe_federate(activity) do
465 {:ok, activity}
466 end
467 end
468
469 @spec flag(map()) :: {:ok, Activity.t()} | any
470 def flag(
471 %{
472 actor: actor,
473 context: _context,
474 account: account,
475 statuses: statuses,
476 content: content
477 } = params
478 ) do
479 # only accept false as false value
480 local = !(params[:local] == false)
481 forward = !(params[:forward] == false)
482
483 additional = params[:additional] || %{}
484
485 additional =
486 if forward do
487 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
488 else
489 Map.merge(additional, %{"to" => [], "cc" => []})
490 end
491
492 with flag_data <- make_flag_data(params, additional),
493 {:ok, activity} <- insert(flag_data, local),
494 :ok <- maybe_federate(activity) do
495 Enum.each(User.all_superusers(), fn superuser ->
496 superuser
497 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
498 |> Pleroma.Emails.Mailer.deliver_async()
499 end)
500
501 {:ok, activity}
502 end
503 end
504
505 defp fetch_activities_for_context_query(context, opts) do
506 public = [Pleroma.Constants.as_public()]
507
508 recipients =
509 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
510
511 from(activity in Activity)
512 |> maybe_preload_objects(opts)
513 |> maybe_preload_bookmarks(opts)
514 |> maybe_set_thread_muted_field(opts)
515 |> restrict_blocked(opts)
516 |> restrict_recipients(recipients, opts["user"])
517 |> where(
518 [activity],
519 fragment(
520 "?->>'type' = ? and ?->>'context' = ?",
521 activity.data,
522 "Create",
523 activity.data,
524 ^context
525 )
526 )
527 |> exclude_poll_votes(opts)
528 |> exclude_id(opts)
529 |> order_by([activity], desc: activity.id)
530 end
531
532 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
533 def fetch_activities_for_context(context, opts \\ %{}) do
534 context
535 |> fetch_activities_for_context_query(opts)
536 |> Repo.all()
537 end
538
539 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
540 FlakeId.Ecto.CompatType.t() | nil
541 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
542 context
543 |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts))
544 |> limit(1)
545 |> select([a], a.id)
546 |> Repo.one()
547 end
548
549 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
550 opts = Map.drop(opts, ["user"])
551
552 [Pleroma.Constants.as_public()]
553 |> fetch_activities_query(opts)
554 |> restrict_unlisted()
555 |> Pagination.fetch_paginated(opts, pagination)
556 |> Enum.reverse()
557 end
558
559 @valid_visibilities ~w[direct unlisted public private]
560
561 defp restrict_visibility(query, %{visibility: visibility})
562 when is_list(visibility) do
563 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
564 query =
565 from(
566 a in query,
567 where:
568 fragment(
569 "activity_visibility(?, ?, ?) = ANY (?)",
570 a.actor,
571 a.recipients,
572 a.data,
573 ^visibility
574 )
575 )
576
577 query
578 else
579 Logger.error("Could not restrict visibility to #{visibility}")
580 end
581 end
582
583 defp restrict_visibility(query, %{visibility: visibility})
584 when visibility in @valid_visibilities do
585 from(
586 a in query,
587 where:
588 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
589 )
590 end
591
592 defp restrict_visibility(_query, %{visibility: visibility})
593 when visibility not in @valid_visibilities do
594 Logger.error("Could not restrict visibility to #{visibility}")
595 end
596
597 defp restrict_visibility(query, _visibility), do: query
598
599 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
600 do: query
601
602 defp restrict_thread_visibility(
603 query,
604 %{"user" => %User{info: %{skip_thread_containment: true}}},
605 _
606 ),
607 do: query
608
609 defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do
610 from(
611 a in query,
612 where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
613 )
614 end
615
616 defp restrict_thread_visibility(query, _, _), do: query
617
618 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
619 params =
620 params
621 |> Map.put("user", reading_user)
622 |> Map.put("actor_id", user.ap_id)
623 |> Map.put("whole_db", true)
624
625 recipients =
626 user_activities_recipients(%{
627 "godmode" => params["godmode"],
628 "reading_user" => reading_user
629 })
630
631 fetch_activities(recipients, params)
632 |> Enum.reverse()
633 end
634
635 def fetch_user_activities(user, reading_user, params \\ %{}) do
636 params =
637 params
638 |> Map.put("type", ["Create", "Announce"])
639 |> Map.put("user", reading_user)
640 |> Map.put("actor_id", user.ap_id)
641 |> Map.put("whole_db", true)
642 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
643
644 recipients =
645 user_activities_recipients(%{
646 "godmode" => params["godmode"],
647 "reading_user" => reading_user
648 })
649
650 fetch_activities(recipients, params)
651 |> Enum.reverse()
652 end
653
654 defp user_activities_recipients(%{"godmode" => true}) do
655 []
656 end
657
658 defp user_activities_recipients(%{"reading_user" => reading_user}) do
659 if reading_user do
660 [Pleroma.Constants.as_public()] ++ [reading_user.ap_id | reading_user.following]
661 else
662 [Pleroma.Constants.as_public()]
663 end
664 end
665
666 defp restrict_since(query, %{"since_id" => ""}), do: query
667
668 defp restrict_since(query, %{"since_id" => since_id}) do
669 from(activity in query, where: activity.id > ^since_id)
670 end
671
672 defp restrict_since(query, _), do: query
673
674 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
675 raise "Can't use the child object without preloading!"
676 end
677
678 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
679 when is_list(tag_reject) and tag_reject != [] do
680 from(
681 [_activity, object] in query,
682 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
683 )
684 end
685
686 defp restrict_tag_reject(query, _), do: query
687
688 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
689 raise "Can't use the child object without preloading!"
690 end
691
692 defp restrict_tag_all(query, %{"tag_all" => tag_all})
693 when is_list(tag_all) and tag_all != [] do
694 from(
695 [_activity, object] in query,
696 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
697 )
698 end
699
700 defp restrict_tag_all(query, _), do: query
701
702 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
703 raise "Can't use the child object without preloading!"
704 end
705
706 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
707 from(
708 [_activity, object] in query,
709 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
710 )
711 end
712
713 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
714 from(
715 [_activity, object] in query,
716 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
717 )
718 end
719
720 defp restrict_tag(query, _), do: query
721
722 defp restrict_recipients(query, [], _user), do: query
723
724 defp restrict_recipients(query, recipients, nil) do
725 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
726 end
727
728 defp restrict_recipients(query, recipients, user) do
729 from(
730 activity in query,
731 where: fragment("? && ?", ^recipients, activity.recipients),
732 or_where: activity.actor == ^user.ap_id
733 )
734 end
735
736 defp restrict_local(query, %{"local_only" => true}) do
737 from(activity in query, where: activity.local == true)
738 end
739
740 defp restrict_local(query, _), do: query
741
742 defp restrict_actor(query, %{"actor_id" => actor_id}) do
743 from(activity in query, where: activity.actor == ^actor_id)
744 end
745
746 defp restrict_actor(query, _), do: query
747
748 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
749 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
750 end
751
752 defp restrict_type(query, %{"type" => type}) do
753 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
754 end
755
756 defp restrict_type(query, _), do: query
757
758 defp restrict_state(query, %{"state" => state}) do
759 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
760 end
761
762 defp restrict_state(query, _), do: query
763
764 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
765 from(
766 [_activity, object] in query,
767 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
768 )
769 end
770
771 defp restrict_favorited_by(query, _), do: query
772
773 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
774 raise "Can't use the child object without preloading!"
775 end
776
777 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
778 from(
779 [_activity, object] in query,
780 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
781 )
782 end
783
784 defp restrict_media(query, _), do: query
785
786 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
787 from(
788 [_activity, object] in query,
789 where: fragment("?->>'inReplyTo' is null", object.data)
790 )
791 end
792
793 defp restrict_replies(query, _), do: query
794
795 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
796 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
797 end
798
799 defp restrict_reblogs(query, _), do: query
800
801 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
802
803 defp restrict_muted(query, %{"muting_user" => %User{info: info}} = opts) do
804 mutes = info.mutes
805
806 query =
807 from([activity] in query,
808 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
809 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
810 )
811
812 unless opts["skip_preload"] do
813 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
814 else
815 query
816 end
817 end
818
819 defp restrict_muted(query, _), do: query
820
821 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
822 blocks = info.blocks || []
823 domain_blocks = info.domain_blocks || []
824
825 query =
826 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
827
828 from(
829 [activity, object: o] in query,
830 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
831 where: fragment("not (? && ?)", activity.recipients, ^blocks),
832 where:
833 fragment(
834 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
835 activity.data,
836 activity.data,
837 ^blocks
838 ),
839 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks),
840 where: fragment("not (split_part(?->>'actor', '/', 3) = ANY(?))", o.data, ^domain_blocks)
841 )
842 end
843
844 defp restrict_blocked(query, _), do: query
845
846 defp restrict_unlisted(query) do
847 from(
848 activity in query,
849 where:
850 fragment(
851 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
852 activity.data,
853 ^[Pleroma.Constants.as_public()]
854 )
855 )
856 end
857
858 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
859 from(activity in query, where: activity.id in ^ids)
860 end
861
862 defp restrict_pinned(query, _), do: query
863
864 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
865 muted_reblogs = info.muted_reblogs || []
866
867 from(
868 activity in query,
869 where:
870 fragment(
871 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
872 activity.data,
873 activity.actor,
874 ^muted_reblogs
875 )
876 )
877 end
878
879 defp restrict_muted_reblogs(query, _), do: query
880
881 defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query
882
883 defp exclude_poll_votes(query, _) do
884 if has_named_binding?(query, :object) do
885 from([activity, object: o] in query,
886 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
887 )
888 else
889 query
890 end
891 end
892
893 defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do
894 from(activity in query, where: activity.id != ^id)
895 end
896
897 defp exclude_id(query, _), do: query
898
899 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
900
901 defp maybe_preload_objects(query, _) do
902 query
903 |> Activity.with_preloaded_object()
904 end
905
906 defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
907
908 defp maybe_preload_bookmarks(query, opts) do
909 query
910 |> Activity.with_preloaded_bookmark(opts["user"])
911 end
912
913 defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query
914
915 defp maybe_set_thread_muted_field(query, opts) do
916 query
917 |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"])
918 end
919
920 defp maybe_order(query, %{order: :desc}) do
921 query
922 |> order_by(desc: :id)
923 end
924
925 defp maybe_order(query, %{order: :asc}) do
926 query
927 |> order_by(asc: :id)
928 end
929
930 defp maybe_order(query, _), do: query
931
932 def fetch_activities_query(recipients, opts \\ %{}) do
933 config = %{
934 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
935 }
936
937 Activity
938 |> maybe_preload_objects(opts)
939 |> maybe_preload_bookmarks(opts)
940 |> maybe_set_thread_muted_field(opts)
941 |> maybe_order(opts)
942 |> restrict_recipients(recipients, opts["user"])
943 |> restrict_tag(opts)
944 |> restrict_tag_reject(opts)
945 |> restrict_tag_all(opts)
946 |> restrict_since(opts)
947 |> restrict_local(opts)
948 |> restrict_actor(opts)
949 |> restrict_type(opts)
950 |> restrict_state(opts)
951 |> restrict_favorited_by(opts)
952 |> restrict_blocked(opts)
953 |> restrict_muted(opts)
954 |> restrict_media(opts)
955 |> restrict_visibility(opts)
956 |> restrict_thread_visibility(opts, config)
957 |> restrict_replies(opts)
958 |> restrict_reblogs(opts)
959 |> restrict_pinned(opts)
960 |> restrict_muted_reblogs(opts)
961 |> Activity.restrict_deactivated_users()
962 |> exclude_poll_votes(opts)
963 end
964
965 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
966 list_memberships = Pleroma.List.memberships(opts["user"])
967
968 fetch_activities_query(recipients ++ list_memberships, opts)
969 |> Pagination.fetch_paginated(opts, pagination)
970 |> Enum.reverse()
971 |> maybe_update_cc(list_memberships, opts["user"])
972 end
973
974 defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id})
975 when is_list(list_memberships) and length(list_memberships) > 0 do
976 Enum.map(activities, fn
977 %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 ->
978 if Enum.any?(bcc, &(&1 in list_memberships)) do
979 update_in(activity.data["cc"], &[user_ap_id | &1])
980 else
981 activity
982 end
983
984 activity ->
985 activity
986 end)
987 end
988
989 defp maybe_update_cc(activities, _, _), do: activities
990
991 def fetch_activities_bounded_query(query, recipients, recipients_with_public) do
992 from(activity in query,
993 where:
994 fragment("? && ?", activity.recipients, ^recipients) or
995 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
996 ^Pleroma.Constants.as_public() in activity.recipients)
997 )
998 end
999
1000 def fetch_activities_bounded(
1001 recipients,
1002 recipients_with_public,
1003 opts \\ %{},
1004 pagination \\ :keyset
1005 ) do
1006 fetch_activities_query([], opts)
1007 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1008 |> Pagination.fetch_paginated(opts, pagination)
1009 |> Enum.reverse()
1010 end
1011
1012 def upload(file, opts \\ []) do
1013 with {:ok, data} <- Upload.store(file, opts) do
1014 obj_data =
1015 if opts[:actor] do
1016 Map.put(data, "actor", opts[:actor])
1017 else
1018 data
1019 end
1020
1021 Repo.insert(%Object{data: obj_data})
1022 end
1023 end
1024
1025 defp object_to_user_data(data) do
1026 avatar =
1027 data["icon"]["url"] &&
1028 %{
1029 "type" => "Image",
1030 "url" => [%{"href" => data["icon"]["url"]}]
1031 }
1032
1033 banner =
1034 data["image"]["url"] &&
1035 %{
1036 "type" => "Image",
1037 "url" => [%{"href" => data["image"]["url"]}]
1038 }
1039
1040 fields =
1041 data
1042 |> Map.get("attachment", [])
1043 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1044 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1045
1046 locked = data["manuallyApprovesFollowers"] || false
1047 data = Transmogrifier.maybe_fix_user_object(data)
1048 discoverable = data["discoverable"] || false
1049
1050 user_data = %{
1051 ap_id: data["id"],
1052 info: %{
1053 ap_enabled: true,
1054 source_data: data,
1055 banner: banner,
1056 fields: fields,
1057 locked: locked,
1058 discoverable: discoverable
1059 },
1060 avatar: avatar,
1061 name: data["name"],
1062 follower_address: data["followers"],
1063 following_address: data["following"],
1064 bio: data["summary"]
1065 }
1066
1067 # nickname can be nil because of virtual actors
1068 user_data =
1069 if data["preferredUsername"] do
1070 Map.put(
1071 user_data,
1072 :nickname,
1073 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
1074 )
1075 else
1076 Map.put(user_data, :nickname, nil)
1077 end
1078
1079 {:ok, user_data}
1080 end
1081
1082 def fetch_follow_information_for_user(user) do
1083 with {:ok, following_data} <-
1084 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1085 following_count when is_integer(following_count) <- following_data["totalItems"],
1086 {:ok, hide_follows} <- collection_private(following_data),
1087 {:ok, followers_data} <-
1088 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1089 followers_count when is_integer(followers_count) <- followers_data["totalItems"],
1090 {:ok, hide_followers} <- collection_private(followers_data) do
1091 {:ok,
1092 %{
1093 hide_follows: hide_follows,
1094 follower_count: followers_count,
1095 following_count: following_count,
1096 hide_followers: hide_followers
1097 }}
1098 else
1099 {:error, _} = e ->
1100 e
1101
1102 e ->
1103 {:error, e}
1104 end
1105 end
1106
1107 defp maybe_update_follow_information(data) do
1108 with {:enabled, true} <-
1109 {:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])},
1110 {:ok, info} <- fetch_follow_information_for_user(data) do
1111 info = Map.merge(data.info, info)
1112 Map.put(data, :info, info)
1113 else
1114 {:enabled, false} ->
1115 data
1116
1117 e ->
1118 Logger.error(
1119 "Follower/Following counter update for #{data.ap_id} failed.\n" <> inspect(e)
1120 )
1121
1122 data
1123 end
1124 end
1125
1126 defp collection_private(data) do
1127 if is_map(data["first"]) and
1128 data["first"]["type"] in ["CollectionPage", "OrderedCollectionPage"] do
1129 {:ok, false}
1130 else
1131 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1132 Fetcher.fetch_and_contain_remote_object_from_id(data["first"]) do
1133 {:ok, false}
1134 else
1135 {:error, {:ok, %{status: code}}} when code in [401, 403] ->
1136 {:ok, true}
1137
1138 {:error, _} = e ->
1139 e
1140
1141 e ->
1142 {:error, e}
1143 end
1144 end
1145 end
1146
1147 def user_data_from_user_object(data) do
1148 with {:ok, data} <- MRF.filter(data),
1149 {:ok, data} <- object_to_user_data(data) do
1150 {:ok, data}
1151 else
1152 e -> {:error, e}
1153 end
1154 end
1155
1156 def fetch_and_prepare_user_from_ap_id(ap_id) do
1157 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1158 {:ok, data} <- user_data_from_user_object(data),
1159 data <- maybe_update_follow_information(data) do
1160 {:ok, data}
1161 else
1162 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1163 end
1164 end
1165
1166 def make_user_from_ap_id(ap_id) do
1167 if _user = User.get_cached_by_ap_id(ap_id) do
1168 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1169 else
1170 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
1171 User.insert_or_update_user(data)
1172 else
1173 e -> {:error, e}
1174 end
1175 end
1176 end
1177
1178 def make_user_from_nickname(nickname) do
1179 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
1180 make_user_from_ap_id(ap_id)
1181 else
1182 _e -> {:error, "No AP id in WebFinger"}
1183 end
1184 end
1185
1186 # filter out broken threads
1187 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1188 entire_thread_visible_for_user?(activity, user)
1189 end
1190
1191 # do post-processing on a specific activity
1192 def contain_activity(%Activity{} = activity, %User{} = user) do
1193 contain_broken_threads(activity, user)
1194 end
1195
1196 def fetch_direct_messages_query do
1197 Activity
1198 |> restrict_type(%{"type" => "Create"})
1199 |> restrict_visibility(%{visibility: "direct"})
1200 |> order_by([activity], asc: activity.id)
1201 end
1202 end