Merge branch 'feature/object-normalize-preload' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Instances
8 alias Pleroma.Notification
9 alias Pleroma.Object
10 alias Pleroma.Repo
11 alias Pleroma.Upload
12 alias Pleroma.User
13 alias Pleroma.Web.ActivityPub.MRF
14 alias Pleroma.Web.ActivityPub.Transmogrifier
15 alias Pleroma.Web.Federator
16 alias Pleroma.Web.OStatus
17 alias Pleroma.Web.WebFinger
18
19 import Ecto.Query
20 import Pleroma.Web.ActivityPub.Utils
21 import Pleroma.Web.ActivityPub.Visibility
22
23 require Logger
24
25 @httpoison Application.get_env(:pleroma, :httpoison)
26
27 # For Announce activities, we filter the recipients based on following status for any actors
28 # that match actual users. See issue #164 for more information about why this is necessary.
29 defp get_recipients(%{"type" => "Announce"} = data) do
30 to = data["to"] || []
31 cc = data["cc"] || []
32 actor = User.get_cached_by_ap_id(data["actor"])
33
34 recipients =
35 (to ++ cc)
36 |> Enum.filter(fn recipient ->
37 case User.get_cached_by_ap_id(recipient) do
38 nil ->
39 true
40
41 user ->
42 User.following?(user, actor)
43 end
44 end)
45
46 {recipients, to, cc}
47 end
48
49 defp get_recipients(%{"type" => "Create"} = data) do
50 to = data["to"] || []
51 cc = data["cc"] || []
52 actor = data["actor"] || []
53 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
54 {recipients, to, cc}
55 end
56
57 defp get_recipients(data) do
58 to = data["to"] || []
59 cc = data["cc"] || []
60 recipients = to ++ cc
61 {recipients, to, cc}
62 end
63
64 defp check_actor_is_active(actor) do
65 if not is_nil(actor) do
66 with user <- User.get_cached_by_ap_id(actor),
67 false <- user.info.deactivated do
68 :ok
69 else
70 _e -> :reject
71 end
72 else
73 :ok
74 end
75 end
76
77 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
78 limit = Pleroma.Config.get([:instance, :remote_limit])
79 String.length(content) <= limit
80 end
81
82 defp check_remote_limit(_), do: true
83
84 def increase_note_count_if_public(actor, object) do
85 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
86 end
87
88 def decrease_note_count_if_public(actor, object) do
89 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
90 end
91
92 def insert(map, local \\ true) when is_map(map) do
93 with nil <- Activity.normalize(map),
94 map <- lazy_put_activity_defaults(map),
95 :ok <- check_actor_is_active(map["actor"]),
96 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
97 {:ok, map} <- MRF.filter(map),
98 {:ok, object} <- insert_full_object(map) do
99 {recipients, _, _} = get_recipients(map)
100
101 {:ok, activity} =
102 Repo.insert(%Activity{
103 data: map,
104 local: local,
105 actor: map["actor"],
106 recipients: recipients
107 })
108
109 # Splice in the child object if we have one.
110 activity =
111 if !is_nil(object) do
112 Map.put(activity, :object, object)
113 else
114 activity
115 end
116
117 Task.start(fn ->
118 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
119 end)
120
121 Notification.create_notifications(activity)
122 stream_out(activity)
123 {:ok, activity}
124 else
125 %Activity{} = activity -> {:ok, activity}
126 error -> {:error, error}
127 end
128 end
129
130 def stream_out(activity) do
131 public = "https://www.w3.org/ns/activitystreams#Public"
132
133 if activity.data["type"] in ["Create", "Announce", "Delete"] do
134 Pleroma.Web.Streamer.stream("user", activity)
135 Pleroma.Web.Streamer.stream("list", activity)
136
137 if Enum.member?(activity.data["to"], public) do
138 Pleroma.Web.Streamer.stream("public", activity)
139
140 if activity.local do
141 Pleroma.Web.Streamer.stream("public:local", activity)
142 end
143
144 if activity.data["type"] in ["Create"] do
145 activity.data["object"]
146 |> Map.get("tag", [])
147 |> Enum.filter(fn tag -> is_bitstring(tag) end)
148 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
149
150 if activity.data["object"]["attachment"] != [] do
151 Pleroma.Web.Streamer.stream("public:media", activity)
152
153 if activity.local do
154 Pleroma.Web.Streamer.stream("public:local:media", activity)
155 end
156 end
157 end
158 else
159 if !Enum.member?(activity.data["cc"] || [], public) &&
160 !Enum.member?(
161 activity.data["to"],
162 User.get_by_ap_id(activity.data["actor"]).follower_address
163 ),
164 do: Pleroma.Web.Streamer.stream("direct", activity)
165 end
166 end
167 end
168
169 def create(%{to: to, actor: actor, context: context, object: object} = params) do
170 additional = params[:additional] || %{}
171 # only accept false as false value
172 local = !(params[:local] == false)
173 published = params[:published]
174
175 with create_data <-
176 make_create_data(
177 %{to: to, actor: actor, published: published, context: context, object: object},
178 additional
179 ),
180 {:ok, activity} <- insert(create_data, local),
181 # Changing note count prior to enqueuing federation task in order to avoid
182 # race conditions on updating user.info
183 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
184 :ok <- maybe_federate(activity) do
185 {:ok, activity}
186 end
187 end
188
189 def accept(%{to: to, actor: actor, object: object} = params) do
190 # only accept false as false value
191 local = !(params[:local] == false)
192
193 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
194 {:ok, activity} <- insert(data, local),
195 :ok <- maybe_federate(activity) do
196 {:ok, activity}
197 end
198 end
199
200 def reject(%{to: to, actor: actor, object: object} = params) do
201 # only accept false as false value
202 local = !(params[:local] == false)
203
204 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
205 {:ok, activity} <- insert(data, local),
206 :ok <- maybe_federate(activity) do
207 {:ok, activity}
208 end
209 end
210
211 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
212 # only accept false as false value
213 local = !(params[:local] == false)
214
215 with data <- %{
216 "to" => to,
217 "cc" => cc,
218 "type" => "Update",
219 "actor" => actor,
220 "object" => object
221 },
222 {:ok, activity} <- insert(data, local),
223 :ok <- maybe_federate(activity) do
224 {:ok, activity}
225 end
226 end
227
228 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
229 def like(
230 %User{ap_id: ap_id} = user,
231 %Object{data: %{"id" => _}} = object,
232 activity_id \\ nil,
233 local \\ true
234 ) do
235 with nil <- get_existing_like(ap_id, object),
236 like_data <- make_like_data(user, object, activity_id),
237 {:ok, activity} <- insert(like_data, local),
238 {:ok, object} <- add_like_to_object(activity, object),
239 :ok <- maybe_federate(activity) do
240 {:ok, activity, object}
241 else
242 %Activity{} = activity -> {:ok, activity, object}
243 error -> {:error, error}
244 end
245 end
246
247 def unlike(
248 %User{} = actor,
249 %Object{} = object,
250 activity_id \\ nil,
251 local \\ true
252 ) do
253 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
254 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
255 {:ok, unlike_activity} <- insert(unlike_data, local),
256 {:ok, _activity} <- Repo.delete(like_activity),
257 {:ok, object} <- remove_like_from_object(like_activity, object),
258 :ok <- maybe_federate(unlike_activity) do
259 {:ok, unlike_activity, like_activity, object}
260 else
261 _e -> {:ok, object}
262 end
263 end
264
265 def announce(
266 %User{ap_id: _} = user,
267 %Object{data: %{"id" => _}} = object,
268 activity_id \\ nil,
269 local \\ true,
270 public \\ true
271 ) do
272 with true <- is_public?(object),
273 announce_data <- make_announce_data(user, object, activity_id, public),
274 {:ok, activity} <- insert(announce_data, local),
275 {:ok, object} <- add_announce_to_object(activity, object),
276 :ok <- maybe_federate(activity) do
277 {:ok, activity, object}
278 else
279 error -> {:error, error}
280 end
281 end
282
283 def unannounce(
284 %User{} = actor,
285 %Object{} = object,
286 activity_id \\ nil,
287 local \\ true
288 ) do
289 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
290 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
291 {:ok, unannounce_activity} <- insert(unannounce_data, local),
292 :ok <- maybe_federate(unannounce_activity),
293 {:ok, _activity} <- Repo.delete(announce_activity),
294 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
295 {:ok, unannounce_activity, object}
296 else
297 _e -> {:ok, object}
298 end
299 end
300
301 def follow(follower, followed, activity_id \\ nil, local \\ true) do
302 with data <- make_follow_data(follower, followed, activity_id),
303 {:ok, activity} <- insert(data, local),
304 :ok <- maybe_federate(activity) do
305 {:ok, activity}
306 end
307 end
308
309 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
310 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
311 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
312 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
313 {:ok, activity} <- insert(unfollow_data, local),
314 :ok <- maybe_federate(activity) do
315 {:ok, activity}
316 end
317 end
318
319 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
320 user = User.get_cached_by_ap_id(actor)
321 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
322
323 with {:ok, object, activity} <- Object.delete(object),
324 data <- %{
325 "type" => "Delete",
326 "actor" => actor,
327 "object" => id,
328 "to" => to,
329 "deleted_activity_id" => activity && activity.id
330 },
331 {:ok, activity} <- insert(data, local),
332 # Changing note count prior to enqueuing federation task in order to avoid
333 # race conditions on updating user.info
334 {:ok, _actor} <- decrease_note_count_if_public(user, object),
335 :ok <- maybe_federate(activity) do
336 {:ok, activity}
337 end
338 end
339
340 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
341 ap_config = Application.get_env(:pleroma, :activitypub)
342 unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
343 outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
344
345 with true <- unfollow_blocked do
346 follow_activity = fetch_latest_follow(blocker, blocked)
347
348 if follow_activity do
349 unfollow(blocker, blocked, nil, local)
350 end
351 end
352
353 with true <- outgoing_blocks,
354 block_data <- make_block_data(blocker, blocked, activity_id),
355 {:ok, activity} <- insert(block_data, local),
356 :ok <- maybe_federate(activity) do
357 {:ok, activity}
358 else
359 _e -> {:ok, nil}
360 end
361 end
362
363 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
364 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
365 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
366 {:ok, activity} <- insert(unblock_data, local),
367 :ok <- maybe_federate(activity) do
368 {:ok, activity}
369 end
370 end
371
372 def flag(
373 %{
374 actor: actor,
375 context: context,
376 account: account,
377 statuses: statuses,
378 content: content
379 } = params
380 ) do
381 # only accept false as false value
382 local = !(params[:local] == false)
383 forward = !(params[:forward] == false)
384
385 additional = params[:additional] || %{}
386
387 params = %{
388 actor: actor,
389 context: context,
390 account: account,
391 statuses: statuses,
392 content: content
393 }
394
395 additional =
396 if forward do
397 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
398 else
399 Map.merge(additional, %{"to" => [], "cc" => []})
400 end
401
402 with flag_data <- make_flag_data(params, additional),
403 {:ok, activity} <- insert(flag_data, local),
404 :ok <- maybe_federate(activity) do
405 Enum.each(User.all_superusers(), fn superuser ->
406 superuser
407 |> Pleroma.AdminEmail.report(actor, account, statuses, content)
408 |> Pleroma.Mailer.deliver_async()
409 end)
410
411 {:ok, activity}
412 end
413 end
414
415 def fetch_activities_for_context(context, opts \\ %{}) do
416 public = ["https://www.w3.org/ns/activitystreams#Public"]
417
418 recipients =
419 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
420
421 query = from(activity in Activity)
422
423 query =
424 query
425 |> restrict_blocked(opts)
426 |> restrict_recipients(recipients, opts["user"])
427
428 query =
429 from(
430 activity in query,
431 where:
432 fragment(
433 "?->>'type' = ? and ?->>'context' = ?",
434 activity.data,
435 "Create",
436 activity.data,
437 ^context
438 ),
439 order_by: [desc: :id]
440 )
441 |> Activity.with_preloaded_object()
442
443 Repo.all(query)
444 end
445
446 def fetch_public_activities(opts \\ %{}) do
447 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
448
449 q
450 |> restrict_unlisted()
451 |> Repo.all()
452 |> Enum.reverse()
453 end
454
455 @valid_visibilities ~w[direct unlisted public private]
456
457 defp restrict_visibility(query, %{visibility: visibility})
458 when is_list(visibility) do
459 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
460 query =
461 from(
462 a in query,
463 where:
464 fragment(
465 "activity_visibility(?, ?, ?) = ANY (?)",
466 a.actor,
467 a.recipients,
468 a.data,
469 ^visibility
470 )
471 )
472
473 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
474
475 query
476 else
477 Logger.error("Could not restrict visibility to #{visibility}")
478 end
479 end
480
481 defp restrict_visibility(query, %{visibility: visibility})
482 when visibility in @valid_visibilities do
483 query =
484 from(
485 a in query,
486 where:
487 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
488 )
489
490 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
491
492 query
493 end
494
495 defp restrict_visibility(_query, %{visibility: visibility})
496 when visibility not in @valid_visibilities do
497 Logger.error("Could not restrict visibility to #{visibility}")
498 end
499
500 defp restrict_visibility(query, _visibility), do: query
501
502 def fetch_user_activities(user, reading_user, params \\ %{}) do
503 params =
504 params
505 |> Map.put("type", ["Create", "Announce"])
506 |> Map.put("actor_id", user.ap_id)
507 |> Map.put("whole_db", true)
508 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
509
510 recipients =
511 if reading_user do
512 ["https://www.w3.org/ns/activitystreams#Public"] ++
513 [reading_user.ap_id | reading_user.following]
514 else
515 ["https://www.w3.org/ns/activitystreams#Public"]
516 end
517
518 fetch_activities(recipients, params)
519 |> Enum.reverse()
520 end
521
522 defp restrict_since(query, %{"since_id" => ""}), do: query
523
524 defp restrict_since(query, %{"since_id" => since_id}) do
525 from(activity in query, where: activity.id > ^since_id)
526 end
527
528 defp restrict_since(query, _), do: query
529
530 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
531 when is_list(tag_reject) and tag_reject != [] do
532 from(
533 activity in query,
534 where: fragment(~s(\(not \(? #> '{"object","tag"}'\) \\?| ?\)), activity.data, ^tag_reject)
535 )
536 end
537
538 defp restrict_tag_reject(query, _), do: query
539
540 defp restrict_tag_all(query, %{"tag_all" => tag_all})
541 when is_list(tag_all) and tag_all != [] do
542 from(
543 activity in query,
544 where: fragment(~s(\(? #> '{"object","tag"}'\) \\?& ?), activity.data, ^tag_all)
545 )
546 end
547
548 defp restrict_tag_all(query, _), do: query
549
550 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
551 from(
552 activity in query,
553 where: fragment(~s(\(? #> '{"object","tag"}'\) \\?| ?), activity.data, ^tag)
554 )
555 end
556
557 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
558 from(
559 activity in query,
560 where: fragment(~s(? <@ (? #> '{"object","tag"}'\)), ^tag, activity.data)
561 )
562 end
563
564 defp restrict_tag(query, _), do: query
565
566 defp restrict_to_cc(query, recipients_to, recipients_cc) do
567 from(
568 activity in query,
569 where:
570 fragment(
571 "(?->'to' \\?| ?) or (?->'cc' \\?| ?)",
572 activity.data,
573 ^recipients_to,
574 activity.data,
575 ^recipients_cc
576 )
577 )
578 end
579
580 defp restrict_recipients(query, [], _user), do: query
581
582 defp restrict_recipients(query, recipients, nil) do
583 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
584 end
585
586 defp restrict_recipients(query, recipients, user) do
587 from(
588 activity in query,
589 where: fragment("? && ?", ^recipients, activity.recipients),
590 or_where: activity.actor == ^user.ap_id
591 )
592 end
593
594 defp restrict_limit(query, %{"limit" => limit}) do
595 from(activity in query, limit: ^limit)
596 end
597
598 defp restrict_limit(query, _), do: query
599
600 defp restrict_local(query, %{"local_only" => true}) do
601 from(activity in query, where: activity.local == true)
602 end
603
604 defp restrict_local(query, _), do: query
605
606 defp restrict_max(query, %{"max_id" => ""}), do: query
607
608 defp restrict_max(query, %{"max_id" => max_id}) do
609 from(activity in query, where: activity.id < ^max_id)
610 end
611
612 defp restrict_max(query, _), do: query
613
614 defp restrict_actor(query, %{"actor_id" => actor_id}) do
615 from(activity in query, where: activity.actor == ^actor_id)
616 end
617
618 defp restrict_actor(query, _), do: query
619
620 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
621 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
622 end
623
624 defp restrict_type(query, %{"type" => type}) do
625 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
626 end
627
628 defp restrict_type(query, _), do: query
629
630 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
631 from(
632 activity in query,
633 where: fragment(~s(? <@ (? #> '{"object","likes"}'\)), ^ap_id, activity.data)
634 )
635 end
636
637 defp restrict_favorited_by(query, _), do: query
638
639 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
640 from(
641 activity in query,
642 where: fragment(~s(not (? #> '{"object","attachment"}' = ?\)), activity.data, ^[])
643 )
644 end
645
646 defp restrict_media(query, _), do: query
647
648 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
649 from(
650 activity in query,
651 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
652 )
653 end
654
655 defp restrict_replies(query, _), do: query
656
657 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
658 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
659 end
660
661 defp restrict_reblogs(query, _), do: query
662
663 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
664
665 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
666 mutes = info.mutes
667
668 from(
669 activity in query,
670 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
671 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
672 )
673 end
674
675 defp restrict_muted(query, _), do: query
676
677 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
678 blocks = info.blocks || []
679 domain_blocks = info.domain_blocks || []
680
681 from(
682 activity in query,
683 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
684 where: fragment("not (?->'to' \\?| ?)", activity.data, ^blocks),
685 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks)
686 )
687 end
688
689 defp restrict_blocked(query, _), do: query
690
691 defp restrict_unlisted(query) do
692 from(
693 activity in query,
694 where:
695 fragment(
696 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
697 activity.data,
698 ^["https://www.w3.org/ns/activitystreams#Public"]
699 )
700 )
701 end
702
703 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
704 from(activity in query, where: activity.id in ^ids)
705 end
706
707 defp restrict_pinned(query, _), do: query
708
709 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
710 muted_reblogs = info.muted_reblogs || []
711
712 from(
713 activity in query,
714 where: fragment("not ?->>'type' = 'Announce'", activity.data),
715 where: fragment("not ? = ANY(?)", activity.actor, ^muted_reblogs)
716 )
717 end
718
719 defp restrict_muted_reblogs(query, _), do: query
720
721 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
722
723 defp maybe_preload_objects(query, _) do
724 query
725 |> Activity.with_preloaded_object()
726 end
727
728 def fetch_activities_query(recipients, opts \\ %{}) do
729 base_query =
730 from(
731 activity in Activity,
732 limit: 20,
733 order_by: [fragment("? desc nulls last", activity.id)]
734 )
735
736 base_query
737 |> maybe_preload_objects(opts)
738 |> restrict_recipients(recipients, opts["user"])
739 |> restrict_tag(opts)
740 |> restrict_tag_reject(opts)
741 |> restrict_tag_all(opts)
742 |> restrict_since(opts)
743 |> restrict_local(opts)
744 |> restrict_limit(opts)
745 |> restrict_max(opts)
746 |> restrict_actor(opts)
747 |> restrict_type(opts)
748 |> restrict_favorited_by(opts)
749 |> restrict_blocked(opts)
750 |> restrict_muted(opts)
751 |> restrict_media(opts)
752 |> restrict_visibility(opts)
753 |> restrict_replies(opts)
754 |> restrict_reblogs(opts)
755 |> restrict_pinned(opts)
756 |> restrict_muted_reblogs(opts)
757 end
758
759 def fetch_activities(recipients, opts \\ %{}) do
760 fetch_activities_query(recipients, opts)
761 |> Repo.all()
762 |> Enum.reverse()
763 end
764
765 def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do
766 fetch_activities_query([], opts)
767 |> restrict_to_cc(recipients_to, recipients_cc)
768 |> Repo.all()
769 |> Enum.reverse()
770 end
771
772 def upload(file, opts \\ []) do
773 with {:ok, data} <- Upload.store(file, opts) do
774 obj_data =
775 if opts[:actor] do
776 Map.put(data, "actor", opts[:actor])
777 else
778 data
779 end
780
781 Repo.insert(%Object{data: obj_data})
782 end
783 end
784
785 def user_data_from_user_object(data) do
786 avatar =
787 data["icon"]["url"] &&
788 %{
789 "type" => "Image",
790 "url" => [%{"href" => data["icon"]["url"]}]
791 }
792
793 banner =
794 data["image"]["url"] &&
795 %{
796 "type" => "Image",
797 "url" => [%{"href" => data["image"]["url"]}]
798 }
799
800 locked = data["manuallyApprovesFollowers"] || false
801 data = Transmogrifier.maybe_fix_user_object(data)
802
803 user_data = %{
804 ap_id: data["id"],
805 info: %{
806 "ap_enabled" => true,
807 "source_data" => data,
808 "banner" => banner,
809 "locked" => locked
810 },
811 avatar: avatar,
812 name: data["name"],
813 follower_address: data["followers"],
814 bio: data["summary"]
815 }
816
817 # nickname can be nil because of virtual actors
818 user_data =
819 if data["preferredUsername"] do
820 Map.put(
821 user_data,
822 :nickname,
823 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
824 )
825 else
826 Map.put(user_data, :nickname, nil)
827 end
828
829 {:ok, user_data}
830 end
831
832 def fetch_and_prepare_user_from_ap_id(ap_id) do
833 with {:ok, data} <- fetch_and_contain_remote_object_from_id(ap_id) do
834 user_data_from_user_object(data)
835 else
836 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
837 end
838 end
839
840 def make_user_from_ap_id(ap_id) do
841 if _user = User.get_by_ap_id(ap_id) do
842 Transmogrifier.upgrade_user_from_ap_id(ap_id)
843 else
844 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
845 User.insert_or_update_user(data)
846 else
847 e -> {:error, e}
848 end
849 end
850 end
851
852 def make_user_from_nickname(nickname) do
853 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
854 make_user_from_ap_id(ap_id)
855 else
856 _e -> {:error, "No AP id in WebFinger"}
857 end
858 end
859
860 def should_federate?(inbox, public) do
861 if public do
862 true
863 else
864 inbox_info = URI.parse(inbox)
865 !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
866 end
867 end
868
869 def publish(actor, activity) do
870 remote_followers =
871 if actor.follower_address in activity.recipients do
872 {:ok, followers} = User.get_followers(actor)
873 followers |> Enum.filter(&(!&1.local))
874 else
875 []
876 end
877
878 public = is_public?(activity)
879
880 {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
881 json = Jason.encode!(data)
882
883 (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
884 |> Enum.filter(fn user -> User.ap_enabled?(user) end)
885 |> Enum.map(fn %{info: %{source_data: data}} ->
886 (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
887 end)
888 |> Enum.uniq()
889 |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
890 |> Instances.filter_reachable()
891 |> Enum.each(fn {inbox, unreachable_since} ->
892 Federator.publish_single_ap(%{
893 inbox: inbox,
894 json: json,
895 actor: actor,
896 id: activity.data["id"],
897 unreachable_since: unreachable_since
898 })
899 end)
900 end
901
902 def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
903 Logger.info("Federating #{id} to #{inbox}")
904 host = URI.parse(inbox).host
905
906 digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
907
908 date =
909 NaiveDateTime.utc_now()
910 |> Timex.format!("{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT")
911
912 signature =
913 Pleroma.Web.HTTPSignatures.sign(actor, %{
914 host: host,
915 "content-length": byte_size(json),
916 digest: digest,
917 date: date
918 })
919
920 with {:ok, %{status: code}} when code in 200..299 <-
921 result =
922 @httpoison.post(
923 inbox,
924 json,
925 [
926 {"Content-Type", "application/activity+json"},
927 {"Date", date},
928 {"signature", signature},
929 {"digest", digest}
930 ]
931 ) do
932 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
933 do: Instances.set_reachable(inbox)
934
935 result
936 else
937 {_post_result, response} ->
938 unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
939 {:error, response}
940 end
941 end
942
943 # TODO:
944 # This will create a Create activity, which we need internally at the moment.
945 def fetch_object_from_id(id) do
946 if object = Object.get_cached_by_ap_id(id) do
947 {:ok, object}
948 else
949 with {:ok, data} <- fetch_and_contain_remote_object_from_id(id),
950 nil <- Object.normalize(data),
951 params <- %{
952 "type" => "Create",
953 "to" => data["to"],
954 "cc" => data["cc"],
955 "actor" => data["actor"] || data["attributedTo"],
956 "object" => data
957 },
958 :ok <- Transmogrifier.contain_origin(id, params),
959 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
960 {:ok, Object.normalize(activity)}
961 else
962 {:error, {:reject, nil}} ->
963 {:reject, nil}
964
965 object = %Object{} ->
966 {:ok, object}
967
968 _e ->
969 Logger.info("Couldn't get object via AP, trying out OStatus fetching...")
970
971 case OStatus.fetch_activity_from_url(id) do
972 {:ok, [activity | _]} -> {:ok, Object.normalize(activity)}
973 e -> e
974 end
975 end
976 end
977 end
978
979 def fetch_and_contain_remote_object_from_id(id) do
980 Logger.info("Fetching object #{id} via AP")
981
982 with true <- String.starts_with?(id, "http"),
983 {:ok, %{body: body, status: code}} when code in 200..299 <-
984 @httpoison.get(
985 id,
986 [{:Accept, "application/activity+json"}]
987 ),
988 {:ok, data} <- Jason.decode(body),
989 :ok <- Transmogrifier.contain_origin_from_id(id, data) do
990 {:ok, data}
991 else
992 e ->
993 {:error, e}
994 end
995 end
996
997 # filter out broken threads
998 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
999 entire_thread_visible_for_user?(activity, user)
1000 end
1001
1002 # do post-processing on a specific activity
1003 def contain_activity(%Activity{} = activity, %User{} = user) do
1004 contain_broken_threads(activity, user)
1005 end
1006
1007 # do post-processing on a timeline
1008 def contain_timeline(timeline, user) do
1009 timeline
1010 |> Enum.filter(fn activity ->
1011 contain_activity(activity, user)
1012 end)
1013 end
1014 end