bd2544470df295b2fc435d838269c13286080915
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Conversation
8 alias Pleroma.Instances
9 alias Pleroma.Notification
10 alias Pleroma.Object
11 alias Pleroma.Object.Fetcher
12 alias Pleroma.Pagination
13 alias Pleroma.Repo
14 alias Pleroma.Upload
15 alias Pleroma.User
16 alias Pleroma.Web.ActivityPub.MRF
17 alias Pleroma.Web.ActivityPub.Transmogrifier
18 alias Pleroma.Web.Federator
19 alias Pleroma.Web.WebFinger
20
21 import Ecto.Query
22 import Pleroma.Web.ActivityPub.Utils
23 import Pleroma.Web.ActivityPub.Visibility
24
25 require Logger
26
27 @httpoison Application.get_env(:pleroma, :httpoison)
28
29 # For Announce activities, we filter the recipients based on following status for any actors
30 # that match actual users. See issue #164 for more information about why this is necessary.
31 defp get_recipients(%{"type" => "Announce"} = data) do
32 to = data["to"] || []
33 cc = data["cc"] || []
34 actor = User.get_cached_by_ap_id(data["actor"])
35
36 recipients =
37 (to ++ cc)
38 |> Enum.filter(fn recipient ->
39 case User.get_cached_by_ap_id(recipient) do
40 nil ->
41 true
42
43 user ->
44 User.following?(user, actor)
45 end
46 end)
47
48 {recipients, to, cc}
49 end
50
51 defp get_recipients(%{"type" => "Create"} = data) do
52 to = data["to"] || []
53 cc = data["cc"] || []
54 actor = data["actor"] || []
55 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
56 {recipients, to, cc}
57 end
58
59 defp get_recipients(data) do
60 to = data["to"] || []
61 cc = data["cc"] || []
62 recipients = to ++ cc
63 {recipients, to, cc}
64 end
65
66 defp check_actor_is_active(actor) do
67 if not is_nil(actor) do
68 with user <- User.get_cached_by_ap_id(actor),
69 false <- user.info.deactivated do
70 :ok
71 else
72 _e -> :reject
73 end
74 else
75 :ok
76 end
77 end
78
79 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
80 limit = Pleroma.Config.get([:instance, :remote_limit])
81 String.length(content) <= limit
82 end
83
84 defp check_remote_limit(_), do: true
85
86 def increase_note_count_if_public(actor, object) do
87 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
88 end
89
90 def decrease_note_count_if_public(actor, object) do
91 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
92 end
93
94 def increase_replies_count_if_reply(%{
95 "object" => %{"inReplyTo" => reply_ap_id} = object,
96 "type" => "Create"
97 }) do
98 if is_public?(object) do
99 Object.increase_replies_count(reply_ap_id)
100 end
101 end
102
103 def increase_replies_count_if_reply(_create_data), do: :noop
104
105 def decrease_replies_count_if_reply(%Object{
106 data: %{"inReplyTo" => reply_ap_id} = object
107 }) do
108 if is_public?(object) do
109 Object.decrease_replies_count(reply_ap_id)
110 end
111 end
112
113 def decrease_replies_count_if_reply(_object), do: :noop
114
115 def insert(map, local \\ true, fake \\ false) when is_map(map) do
116 with nil <- Activity.normalize(map),
117 map <- lazy_put_activity_defaults(map, fake),
118 :ok <- check_actor_is_active(map["actor"]),
119 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
120 {:ok, map} <- MRF.filter(map),
121 {recipients, _, _} = get_recipients(map),
122 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
123 {:ok, map, object} <- insert_full_object(map) do
124 {:ok, activity} =
125 Repo.insert(%Activity{
126 data: map,
127 local: local,
128 actor: map["actor"],
129 recipients: recipients
130 })
131
132 # Splice in the child object if we have one.
133 activity =
134 if !is_nil(object) do
135 Map.put(activity, :object, object)
136 else
137 activity
138 end
139
140 activity =
141 if activity.data["type"] in ["Create", "Announce"] do
142 Repo.preload(activity, :bookmarks)
143 else
144 activity
145 end
146
147 Task.start(fn ->
148 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
149 end)
150
151 Notification.create_notifications(activity)
152
153 participations =
154 activity
155 |> Conversation.create_or_bump_for()
156 |> get_participations()
157
158 stream_out(activity)
159 stream_out_participations(participations)
160 {:ok, activity}
161 else
162 %Activity{} = activity ->
163 {:ok, activity}
164
165 {:fake, true, map, recipients} ->
166 activity = %Activity{
167 data: map,
168 local: local,
169 actor: map["actor"],
170 recipients: recipients,
171 id: "pleroma:fakeid"
172 }
173
174 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
175 {:ok, activity}
176
177 error ->
178 {:error, error}
179 end
180 end
181
182 defp get_participations({:ok, %{participations: participations}}), do: participations
183 defp get_participations(_), do: []
184
185 def stream_out_participations(participations) do
186 participations =
187 participations
188 |> Repo.preload(:user)
189
190 Enum.each(participations, fn participation ->
191 Pleroma.Web.Streamer.stream("participation", participation)
192 end)
193 end
194
195 def stream_out(activity) do
196 public = "https://www.w3.org/ns/activitystreams#Public"
197
198 if activity.data["type"] in ["Create", "Announce", "Delete"] do
199 Pleroma.Web.Streamer.stream("user", activity)
200 Pleroma.Web.Streamer.stream("list", activity)
201
202 if Enum.member?(activity.data["to"], public) do
203 Pleroma.Web.Streamer.stream("public", activity)
204
205 if activity.local do
206 Pleroma.Web.Streamer.stream("public:local", activity)
207 end
208
209 if activity.data["type"] in ["Create"] do
210 object = Object.normalize(activity)
211
212 object.data
213 |> Map.get("tag", [])
214 |> Enum.filter(fn tag -> is_bitstring(tag) end)
215 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
216
217 if object.data["attachment"] != [] do
218 Pleroma.Web.Streamer.stream("public:media", activity)
219
220 if activity.local do
221 Pleroma.Web.Streamer.stream("public:local:media", activity)
222 end
223 end
224 end
225 else
226 # TODO: Write test, replace with visibility test
227 if !Enum.member?(activity.data["cc"] || [], public) &&
228 !Enum.member?(
229 activity.data["to"],
230 User.get_cached_by_ap_id(activity.data["actor"]).follower_address
231 ),
232 do: Pleroma.Web.Streamer.stream("direct", activity)
233 end
234 end
235 end
236
237 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
238 additional = params[:additional] || %{}
239 # only accept false as false value
240 local = !(params[:local] == false)
241 published = params[:published]
242
243 with create_data <-
244 make_create_data(
245 %{to: to, actor: actor, published: published, context: context, object: object},
246 additional
247 ),
248 {:ok, activity} <- insert(create_data, local, fake),
249 {:fake, false, activity} <- {:fake, fake, activity},
250 _ <- increase_replies_count_if_reply(create_data),
251 # Changing note count prior to enqueuing federation task in order to avoid
252 # race conditions on updating user.info
253 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
254 :ok <- maybe_federate(activity) do
255 {:ok, activity}
256 else
257 {:fake, true, activity} ->
258 {:ok, activity}
259 end
260 end
261
262 def accept(%{to: to, actor: actor, object: object} = params) do
263 # only accept false as false value
264 local = !(params[:local] == false)
265
266 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
267 {:ok, activity} <- insert(data, local),
268 :ok <- maybe_federate(activity) do
269 {:ok, activity}
270 end
271 end
272
273 def reject(%{to: to, actor: actor, object: object} = params) do
274 # only accept false as false value
275 local = !(params[:local] == false)
276
277 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
278 {:ok, activity} <- insert(data, local),
279 :ok <- maybe_federate(activity) do
280 {:ok, activity}
281 end
282 end
283
284 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
285 # only accept false as false value
286 local = !(params[:local] == false)
287
288 with data <- %{
289 "to" => to,
290 "cc" => cc,
291 "type" => "Update",
292 "actor" => actor,
293 "object" => object
294 },
295 {:ok, activity} <- insert(data, local),
296 :ok <- maybe_federate(activity) do
297 {:ok, activity}
298 end
299 end
300
301 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
302 def like(
303 %User{ap_id: ap_id} = user,
304 %Object{data: %{"id" => _}} = object,
305 activity_id \\ nil,
306 local \\ true
307 ) do
308 with nil <- get_existing_like(ap_id, object),
309 like_data <- make_like_data(user, object, activity_id),
310 {:ok, activity} <- insert(like_data, local),
311 {:ok, object} <- add_like_to_object(activity, object),
312 :ok <- maybe_federate(activity) do
313 {:ok, activity, object}
314 else
315 %Activity{} = activity -> {:ok, activity, object}
316 error -> {:error, error}
317 end
318 end
319
320 def unlike(
321 %User{} = actor,
322 %Object{} = object,
323 activity_id \\ nil,
324 local \\ true
325 ) do
326 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
327 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
328 {:ok, unlike_activity} <- insert(unlike_data, local),
329 {:ok, _activity} <- Repo.delete(like_activity),
330 {:ok, object} <- remove_like_from_object(like_activity, object),
331 :ok <- maybe_federate(unlike_activity) do
332 {:ok, unlike_activity, like_activity, object}
333 else
334 _e -> {:ok, object}
335 end
336 end
337
338 def announce(
339 %User{ap_id: _} = user,
340 %Object{data: %{"id" => _}} = object,
341 activity_id \\ nil,
342 local \\ true,
343 public \\ true
344 ) do
345 with true <- is_public?(object),
346 announce_data <- make_announce_data(user, object, activity_id, public),
347 {:ok, activity} <- insert(announce_data, local),
348 {:ok, object} <- add_announce_to_object(activity, object),
349 :ok <- maybe_federate(activity) do
350 {:ok, activity, object}
351 else
352 error -> {:error, error}
353 end
354 end
355
356 def unannounce(
357 %User{} = actor,
358 %Object{} = object,
359 activity_id \\ nil,
360 local \\ true
361 ) do
362 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
363 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
364 {:ok, unannounce_activity} <- insert(unannounce_data, local),
365 :ok <- maybe_federate(unannounce_activity),
366 {:ok, _activity} <- Repo.delete(announce_activity),
367 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
368 {:ok, unannounce_activity, object}
369 else
370 _e -> {:ok, object}
371 end
372 end
373
374 def follow(follower, followed, activity_id \\ nil, local \\ true) do
375 with data <- make_follow_data(follower, followed, activity_id),
376 {:ok, activity} <- insert(data, local),
377 :ok <- maybe_federate(activity) do
378 {:ok, activity}
379 end
380 end
381
382 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
383 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
384 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
385 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
386 {:ok, activity} <- insert(unfollow_data, local),
387 :ok <- maybe_federate(activity) do
388 {:ok, activity}
389 end
390 end
391
392 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
393 user = User.get_cached_by_ap_id(actor)
394 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
395
396 with {:ok, object, activity} <- Object.delete(object),
397 data <- %{
398 "type" => "Delete",
399 "actor" => actor,
400 "object" => id,
401 "to" => to,
402 "deleted_activity_id" => activity && activity.id
403 },
404 {:ok, activity} <- insert(data, local),
405 _ <- decrease_replies_count_if_reply(object),
406 # Changing note count prior to enqueuing federation task in order to avoid
407 # race conditions on updating user.info
408 {:ok, _actor} <- decrease_note_count_if_public(user, object),
409 :ok <- maybe_federate(activity) do
410 {:ok, activity}
411 end
412 end
413
414 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
415 ap_config = Application.get_env(:pleroma, :activitypub)
416 unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
417 outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
418
419 with true <- unfollow_blocked do
420 follow_activity = fetch_latest_follow(blocker, blocked)
421
422 if follow_activity do
423 unfollow(blocker, blocked, nil, local)
424 end
425 end
426
427 with true <- outgoing_blocks,
428 block_data <- make_block_data(blocker, blocked, activity_id),
429 {:ok, activity} <- insert(block_data, local),
430 :ok <- maybe_federate(activity) do
431 {:ok, activity}
432 else
433 _e -> {:ok, nil}
434 end
435 end
436
437 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
438 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
439 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
440 {:ok, activity} <- insert(unblock_data, local),
441 :ok <- maybe_federate(activity) do
442 {:ok, activity}
443 end
444 end
445
446 def flag(
447 %{
448 actor: actor,
449 context: context,
450 account: account,
451 statuses: statuses,
452 content: content
453 } = params
454 ) do
455 # only accept false as false value
456 local = !(params[:local] == false)
457 forward = !(params[:forward] == false)
458
459 additional = params[:additional] || %{}
460
461 params = %{
462 actor: actor,
463 context: context,
464 account: account,
465 statuses: statuses,
466 content: content
467 }
468
469 additional =
470 if forward do
471 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
472 else
473 Map.merge(additional, %{"to" => [], "cc" => []})
474 end
475
476 with flag_data <- make_flag_data(params, additional),
477 {:ok, activity} <- insert(flag_data, local),
478 :ok <- maybe_federate(activity) do
479 Enum.each(User.all_superusers(), fn superuser ->
480 superuser
481 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
482 |> Pleroma.Emails.Mailer.deliver_async()
483 end)
484
485 {:ok, activity}
486 end
487 end
488
489 defp fetch_activities_for_context_query(context, opts) do
490 public = ["https://www.w3.org/ns/activitystreams#Public"]
491
492 recipients =
493 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
494
495 from(activity in Activity)
496 |> restrict_blocked(opts)
497 |> restrict_recipients(recipients, opts["user"])
498 |> where(
499 [activity],
500 fragment(
501 "?->>'type' = ? and ?->>'context' = ?",
502 activity.data,
503 "Create",
504 activity.data,
505 ^context
506 )
507 )
508 |> order_by([activity], desc: activity.id)
509 end
510
511 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
512 def fetch_activities_for_context(context, opts \\ %{}) do
513 context
514 |> fetch_activities_for_context_query(opts)
515 |> Activity.with_preloaded_object()
516 |> Repo.all()
517 end
518
519 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
520 Pleroma.FlakeId.t() | nil
521 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
522 context
523 |> fetch_activities_for_context_query(opts)
524 |> limit(1)
525 |> select([a], a.id)
526 |> Repo.one()
527 end
528
529 def fetch_public_activities(opts \\ %{}) do
530 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
531
532 q
533 |> restrict_unlisted()
534 |> Pagination.fetch_paginated(opts)
535 |> Enum.reverse()
536 end
537
538 @valid_visibilities ~w[direct unlisted public private]
539
540 defp restrict_visibility(query, %{visibility: visibility})
541 when is_list(visibility) do
542 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
543 query =
544 from(
545 a in query,
546 where:
547 fragment(
548 "activity_visibility(?, ?, ?) = ANY (?)",
549 a.actor,
550 a.recipients,
551 a.data,
552 ^visibility
553 )
554 )
555
556 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
557
558 query
559 else
560 Logger.error("Could not restrict visibility to #{visibility}")
561 end
562 end
563
564 defp restrict_visibility(query, %{visibility: visibility})
565 when visibility in @valid_visibilities do
566 query =
567 from(
568 a in query,
569 where:
570 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
571 )
572
573 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
574
575 query
576 end
577
578 defp restrict_visibility(_query, %{visibility: visibility})
579 when visibility not in @valid_visibilities do
580 Logger.error("Could not restrict visibility to #{visibility}")
581 end
582
583 defp restrict_visibility(query, _visibility), do: query
584
585 def fetch_user_activities(user, reading_user, params \\ %{}) do
586 params =
587 params
588 |> Map.put("type", ["Create", "Announce"])
589 |> Map.put("actor_id", user.ap_id)
590 |> Map.put("whole_db", true)
591 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
592
593 recipients =
594 if reading_user do
595 ["https://www.w3.org/ns/activitystreams#Public"] ++
596 [reading_user.ap_id | reading_user.following]
597 else
598 ["https://www.w3.org/ns/activitystreams#Public"]
599 end
600
601 fetch_activities(recipients, params)
602 |> Enum.reverse()
603 end
604
605 defp restrict_since(query, %{"since_id" => ""}), do: query
606
607 defp restrict_since(query, %{"since_id" => since_id}) do
608 from(activity in query, where: activity.id > ^since_id)
609 end
610
611 defp restrict_since(query, _), do: query
612
613 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
614 raise "Can't use the child object without preloading!"
615 end
616
617 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
618 when is_list(tag_reject) and tag_reject != [] do
619 from(
620 [_activity, object] in query,
621 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
622 )
623 end
624
625 defp restrict_tag_reject(query, _), do: query
626
627 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
628 raise "Can't use the child object without preloading!"
629 end
630
631 defp restrict_tag_all(query, %{"tag_all" => tag_all})
632 when is_list(tag_all) and tag_all != [] do
633 from(
634 [_activity, object] in query,
635 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
636 )
637 end
638
639 defp restrict_tag_all(query, _), do: query
640
641 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
642 raise "Can't use the child object without preloading!"
643 end
644
645 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
646 from(
647 [_activity, object] in query,
648 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
649 )
650 end
651
652 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
653 from(
654 [_activity, object] in query,
655 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
656 )
657 end
658
659 defp restrict_tag(query, _), do: query
660
661 defp restrict_to_cc(query, recipients_to, recipients_cc) do
662 from(
663 activity in query,
664 where:
665 fragment(
666 "(?->'to' \\?| ?) or (?->'cc' \\?| ?)",
667 activity.data,
668 ^recipients_to,
669 activity.data,
670 ^recipients_cc
671 )
672 )
673 end
674
675 defp restrict_recipients(query, [], _user), do: query
676
677 defp restrict_recipients(query, recipients, nil) do
678 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
679 end
680
681 defp restrict_recipients(query, recipients, user) do
682 from(
683 activity in query,
684 where: fragment("? && ?", ^recipients, activity.recipients),
685 or_where: activity.actor == ^user.ap_id
686 )
687 end
688
689 defp restrict_local(query, %{"local_only" => true}) do
690 from(activity in query, where: activity.local == true)
691 end
692
693 defp restrict_local(query, _), do: query
694
695 defp restrict_actor(query, %{"actor_id" => actor_id}) do
696 from(activity in query, where: activity.actor == ^actor_id)
697 end
698
699 defp restrict_actor(query, _), do: query
700
701 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
702 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
703 end
704
705 defp restrict_type(query, %{"type" => type}) do
706 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
707 end
708
709 defp restrict_type(query, _), do: query
710
711 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
712 from(
713 activity in query,
714 where: fragment(~s(? <@ (? #> '{"object","likes"}'\)), ^ap_id, activity.data)
715 )
716 end
717
718 defp restrict_favorited_by(query, _), do: query
719
720 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
721 raise "Can't use the child object without preloading!"
722 end
723
724 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
725 from(
726 [_activity, object] in query,
727 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
728 )
729 end
730
731 defp restrict_media(query, _), do: query
732
733 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
734 from(
735 activity in query,
736 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
737 )
738 end
739
740 defp restrict_replies(query, _), do: query
741
742 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
743 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
744 end
745
746 defp restrict_reblogs(query, _), do: query
747
748 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
749
750 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
751 mutes = info.mutes
752
753 from(
754 activity in query,
755 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
756 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
757 )
758 end
759
760 defp restrict_muted(query, _), do: query
761
762 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
763 blocks = info.blocks || []
764 domain_blocks = info.domain_blocks || []
765
766 from(
767 activity in query,
768 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
769 where: fragment("not (? && ?)", activity.recipients, ^blocks),
770 where:
771 fragment(
772 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
773 activity.data,
774 activity.data,
775 ^blocks
776 ),
777 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks)
778 )
779 end
780
781 defp restrict_blocked(query, _), do: query
782
783 defp restrict_unlisted(query) do
784 from(
785 activity in query,
786 where:
787 fragment(
788 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
789 activity.data,
790 ^["https://www.w3.org/ns/activitystreams#Public"]
791 )
792 )
793 end
794
795 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
796 from(activity in query, where: activity.id in ^ids)
797 end
798
799 defp restrict_pinned(query, _), do: query
800
801 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
802 muted_reblogs = info.muted_reblogs || []
803
804 from(
805 activity in query,
806 where:
807 fragment(
808 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
809 activity.data,
810 activity.actor,
811 ^muted_reblogs
812 )
813 )
814 end
815
816 defp restrict_muted_reblogs(query, _), do: query
817
818 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
819
820 defp maybe_preload_objects(query, _) do
821 query
822 |> Activity.with_preloaded_object()
823 end
824
825 def fetch_activities_query(recipients, opts \\ %{}) do
826 base_query = from(activity in Activity)
827
828 base_query
829 |> maybe_preload_objects(opts)
830 |> restrict_recipients(recipients, opts["user"])
831 |> restrict_tag(opts)
832 |> restrict_tag_reject(opts)
833 |> restrict_tag_all(opts)
834 |> restrict_since(opts)
835 |> restrict_local(opts)
836 |> restrict_actor(opts)
837 |> restrict_type(opts)
838 |> restrict_favorited_by(opts)
839 |> restrict_blocked(opts)
840 |> restrict_muted(opts)
841 |> restrict_media(opts)
842 |> restrict_visibility(opts)
843 |> restrict_replies(opts)
844 |> restrict_reblogs(opts)
845 |> restrict_pinned(opts)
846 |> restrict_muted_reblogs(opts)
847 end
848
849 def fetch_activities(recipients, opts \\ %{}) do
850 fetch_activities_query(recipients, opts)
851 |> Pagination.fetch_paginated(opts)
852 |> Enum.reverse()
853 end
854
855 def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do
856 fetch_activities_query([], opts)
857 |> restrict_to_cc(recipients_to, recipients_cc)
858 |> Pagination.fetch_paginated(opts)
859 |> Enum.reverse()
860 end
861
862 def upload(file, opts \\ []) do
863 with {:ok, data} <- Upload.store(file, opts) do
864 obj_data =
865 if opts[:actor] do
866 Map.put(data, "actor", opts[:actor])
867 else
868 data
869 end
870
871 Repo.insert(%Object{data: obj_data})
872 end
873 end
874
875 def user_data_from_user_object(data) do
876 avatar =
877 data["icon"]["url"] &&
878 %{
879 "type" => "Image",
880 "url" => [%{"href" => data["icon"]["url"]}]
881 }
882
883 banner =
884 data["image"]["url"] &&
885 %{
886 "type" => "Image",
887 "url" => [%{"href" => data["image"]["url"]}]
888 }
889
890 locked = data["manuallyApprovesFollowers"] || false
891 data = Transmogrifier.maybe_fix_user_object(data)
892
893 user_data = %{
894 ap_id: data["id"],
895 info: %{
896 "ap_enabled" => true,
897 "source_data" => data,
898 "banner" => banner,
899 "locked" => locked
900 },
901 avatar: avatar,
902 name: data["name"],
903 follower_address: data["followers"],
904 bio: data["summary"]
905 }
906
907 # nickname can be nil because of virtual actors
908 user_data =
909 if data["preferredUsername"] do
910 Map.put(
911 user_data,
912 :nickname,
913 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
914 )
915 else
916 Map.put(user_data, :nickname, nil)
917 end
918
919 {:ok, user_data}
920 end
921
922 def fetch_and_prepare_user_from_ap_id(ap_id) do
923 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
924 user_data_from_user_object(data)
925 else
926 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
927 end
928 end
929
930 def make_user_from_ap_id(ap_id) do
931 if _user = User.get_cached_by_ap_id(ap_id) do
932 Transmogrifier.upgrade_user_from_ap_id(ap_id)
933 else
934 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
935 User.insert_or_update_user(data)
936 else
937 e -> {:error, e}
938 end
939 end
940 end
941
942 def make_user_from_nickname(nickname) do
943 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
944 make_user_from_ap_id(ap_id)
945 else
946 _e -> {:error, "No AP id in WebFinger"}
947 end
948 end
949
950 def should_federate?(inbox, public) do
951 if public do
952 true
953 else
954 inbox_info = URI.parse(inbox)
955 !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
956 end
957 end
958
959 def publish(actor, activity) do
960 remote_followers =
961 if actor.follower_address in activity.recipients do
962 {:ok, followers} = User.get_followers(actor)
963 followers |> Enum.filter(&(!&1.local))
964 else
965 []
966 end
967
968 public = is_public?(activity)
969
970 {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
971 json = Jason.encode!(data)
972
973 (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
974 |> Enum.filter(fn user -> User.ap_enabled?(user) end)
975 |> Enum.map(fn %{info: %{source_data: data}} ->
976 (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
977 end)
978 |> Enum.uniq()
979 |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
980 |> Instances.filter_reachable()
981 |> Enum.each(fn {inbox, unreachable_since} ->
982 Federator.publish_single_ap(%{
983 inbox: inbox,
984 json: json,
985 actor: actor,
986 id: activity.data["id"],
987 unreachable_since: unreachable_since
988 })
989 end)
990 end
991
992 def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
993 Logger.info("Federating #{id} to #{inbox}")
994 host = URI.parse(inbox).host
995
996 digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
997
998 date =
999 NaiveDateTime.utc_now()
1000 |> Timex.format!("{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT")
1001
1002 signature =
1003 Pleroma.Web.HTTPSignatures.sign(actor, %{
1004 host: host,
1005 "content-length": byte_size(json),
1006 digest: digest,
1007 date: date
1008 })
1009
1010 with {:ok, %{status: code}} when code in 200..299 <-
1011 result =
1012 @httpoison.post(
1013 inbox,
1014 json,
1015 [
1016 {"Content-Type", "application/activity+json"},
1017 {"Date", date},
1018 {"signature", signature},
1019 {"digest", digest}
1020 ]
1021 ) do
1022 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
1023 do: Instances.set_reachable(inbox)
1024
1025 result
1026 else
1027 {_post_result, response} ->
1028 unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
1029 {:error, response}
1030 end
1031 end
1032
1033 # filter out broken threads
1034 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1035 entire_thread_visible_for_user?(activity, user)
1036 end
1037
1038 # do post-processing on a specific activity
1039 def contain_activity(%Activity{} = activity, %User{} = user) do
1040 contain_broken_threads(activity, user)
1041 end
1042
1043 # do post-processing on a timeline
1044 def contain_timeline(timeline, user) do
1045 timeline
1046 |> Enum.filter(fn activity ->
1047 contain_activity(activity, user)
1048 end)
1049 end
1050 end