Merge remote-tracking branch 'origin/develop' into conversations_three
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Conversation
8 alias Pleroma.Instances
9 alias Pleroma.Notification
10 alias Pleroma.Object
11 alias Pleroma.Object.Fetcher
12 alias Pleroma.Pagination
13 alias Pleroma.Repo
14 alias Pleroma.Upload
15 alias Pleroma.User
16 alias Pleroma.Web.ActivityPub.MRF
17 alias Pleroma.Web.ActivityPub.Transmogrifier
18 alias Pleroma.Web.Federator
19 alias Pleroma.Web.WebFinger
20
21 import Ecto.Query
22 import Pleroma.Web.ActivityPub.Utils
23 import Pleroma.Web.ActivityPub.Visibility
24
25 require Logger
26
27 @httpoison Application.get_env(:pleroma, :httpoison)
28
29 # For Announce activities, we filter the recipients based on following status for any actors
30 # that match actual users. See issue #164 for more information about why this is necessary.
31 defp get_recipients(%{"type" => "Announce"} = data) do
32 to = data["to"] || []
33 cc = data["cc"] || []
34 actor = User.get_cached_by_ap_id(data["actor"])
35
36 recipients =
37 (to ++ cc)
38 |> Enum.filter(fn recipient ->
39 case User.get_cached_by_ap_id(recipient) do
40 nil ->
41 true
42
43 user ->
44 User.following?(user, actor)
45 end
46 end)
47
48 {recipients, to, cc}
49 end
50
51 defp get_recipients(%{"type" => "Create"} = data) do
52 to = data["to"] || []
53 cc = data["cc"] || []
54 actor = data["actor"] || []
55 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
56 {recipients, to, cc}
57 end
58
59 defp get_recipients(data) do
60 to = data["to"] || []
61 cc = data["cc"] || []
62 recipients = to ++ cc
63 {recipients, to, cc}
64 end
65
66 defp check_actor_is_active(actor) do
67 if not is_nil(actor) do
68 with user <- User.get_cached_by_ap_id(actor),
69 false <- user.info.deactivated do
70 :ok
71 else
72 _e -> :reject
73 end
74 else
75 :ok
76 end
77 end
78
79 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
80 limit = Pleroma.Config.get([:instance, :remote_limit])
81 String.length(content) <= limit
82 end
83
84 defp check_remote_limit(_), do: true
85
86 def increase_note_count_if_public(actor, object) do
87 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
88 end
89
90 def decrease_note_count_if_public(actor, object) do
91 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
92 end
93
94 def increase_replies_count_if_reply(%{
95 "object" => %{"inReplyTo" => reply_ap_id} = object,
96 "type" => "Create"
97 }) do
98 if is_public?(object) do
99 Object.increase_replies_count(reply_ap_id)
100 end
101 end
102
103 def increase_replies_count_if_reply(_create_data), do: :noop
104
105 def decrease_replies_count_if_reply(%Object{
106 data: %{"inReplyTo" => reply_ap_id} = object
107 }) do
108 if is_public?(object) do
109 Object.decrease_replies_count(reply_ap_id)
110 end
111 end
112
113 def decrease_replies_count_if_reply(_object), do: :noop
114
115 def insert(map, local \\ true, fake \\ false) when is_map(map) do
116 with nil <- Activity.normalize(map),
117 map <- lazy_put_activity_defaults(map, fake),
118 :ok <- check_actor_is_active(map["actor"]),
119 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
120 {:ok, map} <- MRF.filter(map),
121 {recipients, _, _} = get_recipients(map),
122 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
123 {:ok, map, object} <- insert_full_object(map) do
124 {:ok, activity} =
125 Repo.insert(%Activity{
126 data: map,
127 local: local,
128 actor: map["actor"],
129 recipients: recipients
130 })
131
132 # Splice in the child object if we have one.
133 activity =
134 if !is_nil(object) do
135 Map.put(activity, :object, object)
136 else
137 activity
138 end
139
140 Task.start(fn ->
141 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
142 end)
143
144 Notification.create_notifications(activity)
145 Conversation.create_or_bump_for(activity)
146 stream_out(activity)
147 {:ok, activity}
148 else
149 %Activity{} = activity ->
150 {:ok, activity}
151
152 {:fake, true, map, recipients} ->
153 activity = %Activity{
154 data: map,
155 local: local,
156 actor: map["actor"],
157 recipients: recipients,
158 id: "pleroma:fakeid"
159 }
160
161 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
162 {:ok, activity}
163
164 error ->
165 {:error, error}
166 end
167 end
168
169 def stream_out(activity) do
170 public = "https://www.w3.org/ns/activitystreams#Public"
171
172 if activity.data["type"] in ["Create", "Announce", "Delete"] do
173 Pleroma.Web.Streamer.stream("user", activity)
174 Pleroma.Web.Streamer.stream("list", activity)
175
176 if Enum.member?(activity.data["to"], public) do
177 Pleroma.Web.Streamer.stream("public", activity)
178
179 if activity.local do
180 Pleroma.Web.Streamer.stream("public:local", activity)
181 end
182
183 if activity.data["type"] in ["Create"] do
184 object = Object.normalize(activity)
185
186 object.data
187 |> Map.get("tag", [])
188 |> Enum.filter(fn tag -> is_bitstring(tag) end)
189 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
190
191 if object.data["attachment"] != [] do
192 Pleroma.Web.Streamer.stream("public:media", activity)
193
194 if activity.local do
195 Pleroma.Web.Streamer.stream("public:local:media", activity)
196 end
197 end
198 end
199 else
200 if !Enum.member?(activity.data["cc"] || [], public) &&
201 !Enum.member?(
202 activity.data["to"],
203 User.get_cached_by_ap_id(activity.data["actor"]).follower_address
204 ),
205 do: Pleroma.Web.Streamer.stream("direct", activity)
206 end
207 end
208 end
209
210 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
211 additional = params[:additional] || %{}
212 # only accept false as false value
213 local = !(params[:local] == false)
214 published = params[:published]
215
216 with create_data <-
217 make_create_data(
218 %{to: to, actor: actor, published: published, context: context, object: object},
219 additional
220 ),
221 {:ok, activity} <- insert(create_data, local, fake),
222 {:fake, false, activity} <- {:fake, fake, activity},
223 _ <- increase_replies_count_if_reply(create_data),
224 # Changing note count prior to enqueuing federation task in order to avoid
225 # race conditions on updating user.info
226 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
227 :ok <- maybe_federate(activity) do
228 {:ok, activity}
229 else
230 {:fake, true, activity} ->
231 {:ok, activity}
232 end
233 end
234
235 def accept(%{to: to, actor: actor, object: object} = params) do
236 # only accept false as false value
237 local = !(params[:local] == false)
238
239 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
240 {:ok, activity} <- insert(data, local),
241 :ok <- maybe_federate(activity) do
242 {:ok, activity}
243 end
244 end
245
246 def reject(%{to: to, actor: actor, object: object} = params) do
247 # only accept false as false value
248 local = !(params[:local] == false)
249
250 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
251 {:ok, activity} <- insert(data, local),
252 :ok <- maybe_federate(activity) do
253 {:ok, activity}
254 end
255 end
256
257 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
258 # only accept false as false value
259 local = !(params[:local] == false)
260
261 with data <- %{
262 "to" => to,
263 "cc" => cc,
264 "type" => "Update",
265 "actor" => actor,
266 "object" => object
267 },
268 {:ok, activity} <- insert(data, local),
269 :ok <- maybe_federate(activity) do
270 {:ok, activity}
271 end
272 end
273
274 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
275 def like(
276 %User{ap_id: ap_id} = user,
277 %Object{data: %{"id" => _}} = object,
278 activity_id \\ nil,
279 local \\ true
280 ) do
281 with nil <- get_existing_like(ap_id, object),
282 like_data <- make_like_data(user, object, activity_id),
283 {:ok, activity} <- insert(like_data, local),
284 {:ok, object} <- add_like_to_object(activity, object),
285 :ok <- maybe_federate(activity) do
286 {:ok, activity, object}
287 else
288 %Activity{} = activity -> {:ok, activity, object}
289 error -> {:error, error}
290 end
291 end
292
293 def unlike(
294 %User{} = actor,
295 %Object{} = object,
296 activity_id \\ nil,
297 local \\ true
298 ) do
299 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
300 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
301 {:ok, unlike_activity} <- insert(unlike_data, local),
302 {:ok, _activity} <- Repo.delete(like_activity),
303 {:ok, object} <- remove_like_from_object(like_activity, object),
304 :ok <- maybe_federate(unlike_activity) do
305 {:ok, unlike_activity, like_activity, object}
306 else
307 _e -> {:ok, object}
308 end
309 end
310
311 def announce(
312 %User{ap_id: _} = user,
313 %Object{data: %{"id" => _}} = object,
314 activity_id \\ nil,
315 local \\ true,
316 public \\ true
317 ) do
318 with true <- is_public?(object),
319 announce_data <- make_announce_data(user, object, activity_id, public),
320 {:ok, activity} <- insert(announce_data, local),
321 {:ok, object} <- add_announce_to_object(activity, object),
322 :ok <- maybe_federate(activity) do
323 {:ok, activity, object}
324 else
325 error -> {:error, error}
326 end
327 end
328
329 def unannounce(
330 %User{} = actor,
331 %Object{} = object,
332 activity_id \\ nil,
333 local \\ true
334 ) do
335 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
336 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
337 {:ok, unannounce_activity} <- insert(unannounce_data, local),
338 :ok <- maybe_federate(unannounce_activity),
339 {:ok, _activity} <- Repo.delete(announce_activity),
340 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
341 {:ok, unannounce_activity, object}
342 else
343 _e -> {:ok, object}
344 end
345 end
346
347 def follow(follower, followed, activity_id \\ nil, local \\ true) do
348 with data <- make_follow_data(follower, followed, activity_id),
349 {:ok, activity} <- insert(data, local),
350 :ok <- maybe_federate(activity) do
351 {:ok, activity}
352 end
353 end
354
355 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
356 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
357 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
358 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
359 {:ok, activity} <- insert(unfollow_data, local),
360 :ok <- maybe_federate(activity) do
361 {:ok, activity}
362 end
363 end
364
365 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
366 user = User.get_cached_by_ap_id(actor)
367 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
368
369 with {:ok, object, activity} <- Object.delete(object),
370 data <- %{
371 "type" => "Delete",
372 "actor" => actor,
373 "object" => id,
374 "to" => to,
375 "deleted_activity_id" => activity && activity.id
376 },
377 {:ok, activity} <- insert(data, local),
378 _ <- decrease_replies_count_if_reply(object),
379 # Changing note count prior to enqueuing federation task in order to avoid
380 # race conditions on updating user.info
381 {:ok, _actor} <- decrease_note_count_if_public(user, object),
382 :ok <- maybe_federate(activity) do
383 {:ok, activity}
384 end
385 end
386
387 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
388 ap_config = Application.get_env(:pleroma, :activitypub)
389 unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
390 outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
391
392 with true <- unfollow_blocked do
393 follow_activity = fetch_latest_follow(blocker, blocked)
394
395 if follow_activity do
396 unfollow(blocker, blocked, nil, local)
397 end
398 end
399
400 with true <- outgoing_blocks,
401 block_data <- make_block_data(blocker, blocked, activity_id),
402 {:ok, activity} <- insert(block_data, local),
403 :ok <- maybe_federate(activity) do
404 {:ok, activity}
405 else
406 _e -> {:ok, nil}
407 end
408 end
409
410 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
411 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
412 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
413 {:ok, activity} <- insert(unblock_data, local),
414 :ok <- maybe_federate(activity) do
415 {:ok, activity}
416 end
417 end
418
419 def flag(
420 %{
421 actor: actor,
422 context: context,
423 account: account,
424 statuses: statuses,
425 content: content
426 } = params
427 ) do
428 # only accept false as false value
429 local = !(params[:local] == false)
430 forward = !(params[:forward] == false)
431
432 additional = params[:additional] || %{}
433
434 params = %{
435 actor: actor,
436 context: context,
437 account: account,
438 statuses: statuses,
439 content: content
440 }
441
442 additional =
443 if forward do
444 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
445 else
446 Map.merge(additional, %{"to" => [], "cc" => []})
447 end
448
449 with flag_data <- make_flag_data(params, additional),
450 {:ok, activity} <- insert(flag_data, local),
451 :ok <- maybe_federate(activity) do
452 Enum.each(User.all_superusers(), fn superuser ->
453 superuser
454 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
455 |> Pleroma.Emails.Mailer.deliver_async()
456 end)
457
458 {:ok, activity}
459 end
460 end
461
462 defp fetch_activities_for_context_query(context, opts) do
463 public = ["https://www.w3.org/ns/activitystreams#Public"]
464
465 recipients =
466 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
467
468 from(activity in Activity)
469 |> restrict_blocked(opts)
470 |> restrict_recipients(recipients, opts["user"])
471 |> where(
472 [activity],
473 fragment(
474 "?->>'type' = ? and ?->>'context' = ?",
475 activity.data,
476 "Create",
477 activity.data,
478 ^context
479 )
480 )
481 |> order_by([activity], desc: activity.id)
482 end
483
484 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
485 def fetch_activities_for_context(context, opts \\ %{}) do
486 context
487 |> fetch_activities_for_context_query(opts)
488 |> Activity.with_preloaded_object()
489 |> Repo.all()
490 end
491
492 @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
493 Pleroma.FlakeId.t() | nil
494 def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
495 context
496 |> fetch_activities_for_context_query(opts)
497 |> limit(1)
498 |> select([a], a.id)
499 |> Repo.one()
500 end
501
502 def fetch_public_activities(opts \\ %{}) do
503 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
504
505 q
506 |> restrict_unlisted()
507 |> Pagination.fetch_paginated(opts)
508 |> Enum.reverse()
509 end
510
511 @valid_visibilities ~w[direct unlisted public private]
512
513 defp restrict_visibility(query, %{visibility: visibility})
514 when is_list(visibility) do
515 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
516 query =
517 from(
518 a in query,
519 where:
520 fragment(
521 "activity_visibility(?, ?, ?) = ANY (?)",
522 a.actor,
523 a.recipients,
524 a.data,
525 ^visibility
526 )
527 )
528
529 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
530
531 query
532 else
533 Logger.error("Could not restrict visibility to #{visibility}")
534 end
535 end
536
537 defp restrict_visibility(query, %{visibility: visibility})
538 when visibility in @valid_visibilities do
539 query =
540 from(
541 a in query,
542 where:
543 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
544 )
545
546 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
547
548 query
549 end
550
551 defp restrict_visibility(_query, %{visibility: visibility})
552 when visibility not in @valid_visibilities do
553 Logger.error("Could not restrict visibility to #{visibility}")
554 end
555
556 defp restrict_visibility(query, _visibility), do: query
557
558 def fetch_user_activities(user, reading_user, params \\ %{}) do
559 params =
560 params
561 |> Map.put("type", ["Create", "Announce"])
562 |> Map.put("actor_id", user.ap_id)
563 |> Map.put("whole_db", true)
564 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
565
566 recipients =
567 if reading_user do
568 ["https://www.w3.org/ns/activitystreams#Public"] ++
569 [reading_user.ap_id | reading_user.following]
570 else
571 ["https://www.w3.org/ns/activitystreams#Public"]
572 end
573
574 fetch_activities(recipients, params)
575 |> Enum.reverse()
576 end
577
578 defp restrict_since(query, %{"since_id" => ""}), do: query
579
580 defp restrict_since(query, %{"since_id" => since_id}) do
581 from(activity in query, where: activity.id > ^since_id)
582 end
583
584 defp restrict_since(query, _), do: query
585
586 defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do
587 raise "Can't use the child object without preloading!"
588 end
589
590 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
591 when is_list(tag_reject) and tag_reject != [] do
592 from(
593 [_activity, object] in query,
594 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
595 )
596 end
597
598 defp restrict_tag_reject(query, _), do: query
599
600 defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do
601 raise "Can't use the child object without preloading!"
602 end
603
604 defp restrict_tag_all(query, %{"tag_all" => tag_all})
605 when is_list(tag_all) and tag_all != [] do
606 from(
607 [_activity, object] in query,
608 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
609 )
610 end
611
612 defp restrict_tag_all(query, _), do: query
613
614 defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do
615 raise "Can't use the child object without preloading!"
616 end
617
618 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
619 from(
620 [_activity, object] in query,
621 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
622 )
623 end
624
625 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
626 from(
627 [_activity, object] in query,
628 where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
629 )
630 end
631
632 defp restrict_tag(query, _), do: query
633
634 defp restrict_to_cc(query, recipients_to, recipients_cc) do
635 from(
636 activity in query,
637 where:
638 fragment(
639 "(?->'to' \\?| ?) or (?->'cc' \\?| ?)",
640 activity.data,
641 ^recipients_to,
642 activity.data,
643 ^recipients_cc
644 )
645 )
646 end
647
648 defp restrict_recipients(query, [], _user), do: query
649
650 defp restrict_recipients(query, recipients, nil) do
651 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
652 end
653
654 defp restrict_recipients(query, recipients, user) do
655 from(
656 activity in query,
657 where: fragment("? && ?", ^recipients, activity.recipients),
658 or_where: activity.actor == ^user.ap_id
659 )
660 end
661
662 defp restrict_local(query, %{"local_only" => true}) do
663 from(activity in query, where: activity.local == true)
664 end
665
666 defp restrict_local(query, _), do: query
667
668 defp restrict_actor(query, %{"actor_id" => actor_id}) do
669 from(activity in query, where: activity.actor == ^actor_id)
670 end
671
672 defp restrict_actor(query, _), do: query
673
674 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
675 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
676 end
677
678 defp restrict_type(query, %{"type" => type}) do
679 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
680 end
681
682 defp restrict_type(query, _), do: query
683
684 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
685 from(
686 activity in query,
687 where: fragment(~s(? <@ (? #> '{"object","likes"}'\)), ^ap_id, activity.data)
688 )
689 end
690
691 defp restrict_favorited_by(query, _), do: query
692
693 defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do
694 raise "Can't use the child object without preloading!"
695 end
696
697 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
698 from(
699 [_activity, object] in query,
700 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
701 )
702 end
703
704 defp restrict_media(query, _), do: query
705
706 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
707 from(
708 activity in query,
709 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
710 )
711 end
712
713 defp restrict_replies(query, _), do: query
714
715 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
716 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
717 end
718
719 defp restrict_reblogs(query, _), do: query
720
721 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
722
723 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
724 mutes = info.mutes
725
726 from(
727 activity in query,
728 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
729 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
730 )
731 end
732
733 defp restrict_muted(query, _), do: query
734
735 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
736 blocks = info.blocks || []
737 domain_blocks = info.domain_blocks || []
738
739 from(
740 activity in query,
741 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
742 where: fragment("not (? && ?)", activity.recipients, ^blocks),
743 where:
744 fragment(
745 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
746 activity.data,
747 activity.data,
748 ^blocks
749 ),
750 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks)
751 )
752 end
753
754 defp restrict_blocked(query, _), do: query
755
756 defp restrict_unlisted(query) do
757 from(
758 activity in query,
759 where:
760 fragment(
761 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
762 activity.data,
763 ^["https://www.w3.org/ns/activitystreams#Public"]
764 )
765 )
766 end
767
768 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
769 from(activity in query, where: activity.id in ^ids)
770 end
771
772 defp restrict_pinned(query, _), do: query
773
774 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
775 muted_reblogs = info.muted_reblogs || []
776
777 from(
778 activity in query,
779 where:
780 fragment(
781 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
782 activity.data,
783 activity.actor,
784 ^muted_reblogs
785 )
786 )
787 end
788
789 defp restrict_muted_reblogs(query, _), do: query
790
791 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
792
793 defp maybe_preload_objects(query, _) do
794 query
795 |> Activity.with_preloaded_object()
796 end
797
798 def fetch_activities_query(recipients, opts \\ %{}) do
799 base_query = from(activity in Activity)
800
801 base_query
802 |> maybe_preload_objects(opts)
803 |> restrict_recipients(recipients, opts["user"])
804 |> restrict_tag(opts)
805 |> restrict_tag_reject(opts)
806 |> restrict_tag_all(opts)
807 |> restrict_since(opts)
808 |> restrict_local(opts)
809 |> restrict_actor(opts)
810 |> restrict_type(opts)
811 |> restrict_favorited_by(opts)
812 |> restrict_blocked(opts)
813 |> restrict_muted(opts)
814 |> restrict_media(opts)
815 |> restrict_visibility(opts)
816 |> restrict_replies(opts)
817 |> restrict_reblogs(opts)
818 |> restrict_pinned(opts)
819 |> restrict_muted_reblogs(opts)
820 end
821
822 def fetch_activities(recipients, opts \\ %{}) do
823 fetch_activities_query(recipients, opts)
824 |> Pagination.fetch_paginated(opts)
825 |> Enum.reverse()
826 end
827
828 def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do
829 fetch_activities_query([], opts)
830 |> restrict_to_cc(recipients_to, recipients_cc)
831 |> Pagination.fetch_paginated(opts)
832 |> Enum.reverse()
833 end
834
835 def upload(file, opts \\ []) do
836 with {:ok, data} <- Upload.store(file, opts) do
837 obj_data =
838 if opts[:actor] do
839 Map.put(data, "actor", opts[:actor])
840 else
841 data
842 end
843
844 Repo.insert(%Object{data: obj_data})
845 end
846 end
847
848 def user_data_from_user_object(data) do
849 avatar =
850 data["icon"]["url"] &&
851 %{
852 "type" => "Image",
853 "url" => [%{"href" => data["icon"]["url"]}]
854 }
855
856 banner =
857 data["image"]["url"] &&
858 %{
859 "type" => "Image",
860 "url" => [%{"href" => data["image"]["url"]}]
861 }
862
863 locked = data["manuallyApprovesFollowers"] || false
864 data = Transmogrifier.maybe_fix_user_object(data)
865
866 user_data = %{
867 ap_id: data["id"],
868 info: %{
869 "ap_enabled" => true,
870 "source_data" => data,
871 "banner" => banner,
872 "locked" => locked
873 },
874 avatar: avatar,
875 name: data["name"],
876 follower_address: data["followers"],
877 bio: data["summary"]
878 }
879
880 # nickname can be nil because of virtual actors
881 user_data =
882 if data["preferredUsername"] do
883 Map.put(
884 user_data,
885 :nickname,
886 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
887 )
888 else
889 Map.put(user_data, :nickname, nil)
890 end
891
892 {:ok, user_data}
893 end
894
895 def fetch_and_prepare_user_from_ap_id(ap_id) do
896 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
897 user_data_from_user_object(data)
898 else
899 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
900 end
901 end
902
903 def make_user_from_ap_id(ap_id) do
904 if _user = User.get_cached_by_ap_id(ap_id) do
905 Transmogrifier.upgrade_user_from_ap_id(ap_id)
906 else
907 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
908 User.insert_or_update_user(data)
909 else
910 e -> {:error, e}
911 end
912 end
913 end
914
915 def make_user_from_nickname(nickname) do
916 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
917 make_user_from_ap_id(ap_id)
918 else
919 _e -> {:error, "No AP id in WebFinger"}
920 end
921 end
922
923 def should_federate?(inbox, public) do
924 if public do
925 true
926 else
927 inbox_info = URI.parse(inbox)
928 !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
929 end
930 end
931
932 def publish(actor, activity) do
933 remote_followers =
934 if actor.follower_address in activity.recipients do
935 {:ok, followers} = User.get_followers(actor)
936 followers |> Enum.filter(&(!&1.local))
937 else
938 []
939 end
940
941 public = is_public?(activity)
942
943 {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
944 json = Jason.encode!(data)
945
946 (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
947 |> Enum.filter(fn user -> User.ap_enabled?(user) end)
948 |> Enum.map(fn %{info: %{source_data: data}} ->
949 (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
950 end)
951 |> Enum.uniq()
952 |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
953 |> Instances.filter_reachable()
954 |> Enum.each(fn {inbox, unreachable_since} ->
955 Federator.publish_single_ap(%{
956 inbox: inbox,
957 json: json,
958 actor: actor,
959 id: activity.data["id"],
960 unreachable_since: unreachable_since
961 })
962 end)
963 end
964
965 def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
966 Logger.info("Federating #{id} to #{inbox}")
967 host = URI.parse(inbox).host
968
969 digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
970
971 date =
972 NaiveDateTime.utc_now()
973 |> Timex.format!("{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT")
974
975 signature =
976 Pleroma.Web.HTTPSignatures.sign(actor, %{
977 host: host,
978 "content-length": byte_size(json),
979 digest: digest,
980 date: date
981 })
982
983 with {:ok, %{status: code}} when code in 200..299 <-
984 result =
985 @httpoison.post(
986 inbox,
987 json,
988 [
989 {"Content-Type", "application/activity+json"},
990 {"Date", date},
991 {"signature", signature},
992 {"digest", digest}
993 ]
994 ) do
995 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
996 do: Instances.set_reachable(inbox)
997
998 result
999 else
1000 {_post_result, response} ->
1001 unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
1002 {:error, response}
1003 end
1004 end
1005
1006 # filter out broken threads
1007 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1008 entire_thread_visible_for_user?(activity, user)
1009 end
1010
1011 # do post-processing on a specific activity
1012 def contain_activity(%Activity{} = activity, %User{} = user) do
1013 contain_broken_threads(activity, user)
1014 end
1015
1016 # do post-processing on a timeline
1017 def contain_timeline(timeline, user) do
1018 timeline
1019 |> Enum.filter(fn activity ->
1020 contain_activity(activity, user)
1021 end)
1022 end
1023 end