Merge branch 'features/flavour-switching' into 'develop'
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Repo
8 alias Pleroma.Object
9 alias Pleroma.Upload
10 alias Pleroma.User
11 alias Pleroma.Notification
12 alias Pleroma.Instances
13 alias Pleroma.Web.ActivityPub.Transmogrifier
14 alias Pleroma.Web.ActivityPub.MRF
15 alias Pleroma.Web.WebFinger
16 alias Pleroma.Web.Federator
17 alias Pleroma.Web.OStatus
18
19 import Ecto.Query
20 import Pleroma.Web.ActivityPub.Utils
21
22 require Logger
23
24 @httpoison Application.get_env(:pleroma, :httpoison)
25
26 # For Announce activities, we filter the recipients based on following status for any actors
27 # that match actual users. See issue #164 for more information about why this is necessary.
28 defp get_recipients(%{"type" => "Announce"} = data) do
29 to = data["to"] || []
30 cc = data["cc"] || []
31 actor = User.get_cached_by_ap_id(data["actor"])
32
33 recipients =
34 (to ++ cc)
35 |> Enum.filter(fn recipient ->
36 case User.get_cached_by_ap_id(recipient) do
37 nil ->
38 true
39
40 user ->
41 User.following?(user, actor)
42 end
43 end)
44
45 {recipients, to, cc}
46 end
47
48 defp get_recipients(%{"type" => "Create"} = data) do
49 to = data["to"] || []
50 cc = data["cc"] || []
51 actor = data["actor"] || []
52 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
53 {recipients, to, cc}
54 end
55
56 defp get_recipients(data) do
57 to = data["to"] || []
58 cc = data["cc"] || []
59 recipients = to ++ cc
60 {recipients, to, cc}
61 end
62
63 defp check_actor_is_active(actor) do
64 if not is_nil(actor) do
65 with user <- User.get_cached_by_ap_id(actor),
66 false <- user.info.deactivated do
67 :ok
68 else
69 _e -> :reject
70 end
71 else
72 :ok
73 end
74 end
75
76 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
77 limit = Pleroma.Config.get([:instance, :remote_limit])
78 String.length(content) <= limit
79 end
80
81 defp check_remote_limit(_), do: true
82
83 def insert(map, local \\ true) when is_map(map) do
84 with nil <- Activity.normalize(map),
85 map <- lazy_put_activity_defaults(map),
86 :ok <- check_actor_is_active(map["actor"]),
87 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
88 {:ok, map} <- MRF.filter(map),
89 :ok <- insert_full_object(map) do
90 {recipients, _, _} = get_recipients(map)
91
92 {:ok, activity} =
93 Repo.insert(%Activity{
94 data: map,
95 local: local,
96 actor: map["actor"],
97 recipients: recipients
98 })
99
100 Task.start(fn ->
101 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
102 end)
103
104 Notification.create_notifications(activity)
105 stream_out(activity)
106 {:ok, activity}
107 else
108 %Activity{} = activity -> {:ok, activity}
109 error -> {:error, error}
110 end
111 end
112
113 def stream_out(activity) do
114 public = "https://www.w3.org/ns/activitystreams#Public"
115
116 if activity.data["type"] in ["Create", "Announce", "Delete"] do
117 Pleroma.Web.Streamer.stream("user", activity)
118 Pleroma.Web.Streamer.stream("list", activity)
119
120 if Enum.member?(activity.data["to"], public) do
121 Pleroma.Web.Streamer.stream("public", activity)
122
123 if activity.local do
124 Pleroma.Web.Streamer.stream("public:local", activity)
125 end
126
127 if activity.data["type"] in ["Create"] do
128 activity.data["object"]
129 |> Map.get("tag", [])
130 |> Enum.filter(fn tag -> is_bitstring(tag) end)
131 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
132
133 if activity.data["object"]["attachment"] != [] do
134 Pleroma.Web.Streamer.stream("public:media", activity)
135
136 if activity.local do
137 Pleroma.Web.Streamer.stream("public:local:media", activity)
138 end
139 end
140 end
141 else
142 if !Enum.member?(activity.data["cc"] || [], public) &&
143 !Enum.member?(
144 activity.data["to"],
145 User.get_by_ap_id(activity.data["actor"]).follower_address
146 ),
147 do: Pleroma.Web.Streamer.stream("direct", activity)
148 end
149 end
150 end
151
152 def create(%{to: to, actor: actor, context: context, object: object} = params) do
153 additional = params[:additional] || %{}
154 # only accept false as false value
155 local = !(params[:local] == false)
156 published = params[:published]
157
158 with create_data <-
159 make_create_data(
160 %{to: to, actor: actor, published: published, context: context, object: object},
161 additional
162 ),
163 {:ok, activity} <- insert(create_data, local),
164 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
165 {:ok, _actor} <- User.increase_note_count(actor),
166 :ok <- maybe_federate(activity) do
167 {:ok, activity}
168 end
169 end
170
171 def accept(%{to: to, actor: actor, object: object} = params) do
172 # only accept false as false value
173 local = !(params[:local] == false)
174
175 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
176 {:ok, activity} <- insert(data, local),
177 :ok <- maybe_federate(activity),
178 _ <- User.update_follow_request_count(actor) do
179 {:ok, activity}
180 end
181 end
182
183 def reject(%{to: to, actor: actor, object: object} = params) do
184 # only accept false as false value
185 local = !(params[:local] == false)
186
187 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
188 {:ok, activity} <- insert(data, local),
189 :ok <- maybe_federate(activity),
190 _ <- User.update_follow_request_count(actor) do
191 {:ok, activity}
192 end
193 end
194
195 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
196 # only accept false as false value
197 local = !(params[:local] == false)
198
199 with data <- %{
200 "to" => to,
201 "cc" => cc,
202 "type" => "Update",
203 "actor" => actor,
204 "object" => object
205 },
206 {:ok, activity} <- insert(data, local),
207 :ok <- maybe_federate(activity) do
208 {:ok, activity}
209 end
210 end
211
212 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
213 def like(
214 %User{ap_id: ap_id} = user,
215 %Object{data: %{"id" => _}} = object,
216 activity_id \\ nil,
217 local \\ true
218 ) do
219 with nil <- get_existing_like(ap_id, object),
220 like_data <- make_like_data(user, object, activity_id),
221 {:ok, activity} <- insert(like_data, local),
222 {:ok, object} <- add_like_to_object(activity, object),
223 :ok <- maybe_federate(activity) do
224 {:ok, activity, object}
225 else
226 %Activity{} = activity -> {:ok, activity, object}
227 error -> {:error, error}
228 end
229 end
230
231 def unlike(
232 %User{} = actor,
233 %Object{} = object,
234 activity_id \\ nil,
235 local \\ true
236 ) do
237 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
238 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
239 {:ok, unlike_activity} <- insert(unlike_data, local),
240 {:ok, _activity} <- Repo.delete(like_activity),
241 {:ok, object} <- remove_like_from_object(like_activity, object),
242 :ok <- maybe_federate(unlike_activity) do
243 {:ok, unlike_activity, like_activity, object}
244 else
245 _e -> {:ok, object}
246 end
247 end
248
249 def announce(
250 %User{ap_id: _} = user,
251 %Object{data: %{"id" => _}} = object,
252 activity_id \\ nil,
253 local \\ true,
254 public \\ true
255 ) do
256 with true <- is_public?(object),
257 announce_data <- make_announce_data(user, object, activity_id, public),
258 {:ok, activity} <- insert(announce_data, local),
259 {:ok, object} <- add_announce_to_object(activity, object),
260 :ok <- maybe_federate(activity) do
261 {:ok, activity, object}
262 else
263 error -> {:error, error}
264 end
265 end
266
267 def unannounce(
268 %User{} = actor,
269 %Object{} = object,
270 activity_id \\ nil,
271 local \\ true
272 ) do
273 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
274 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
275 {:ok, unannounce_activity} <- insert(unannounce_data, local),
276 :ok <- maybe_federate(unannounce_activity),
277 {:ok, _activity} <- Repo.delete(announce_activity),
278 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
279 {:ok, unannounce_activity, object}
280 else
281 _e -> {:ok, object}
282 end
283 end
284
285 def follow(follower, followed, activity_id \\ nil, local \\ true) do
286 with data <- make_follow_data(follower, followed, activity_id),
287 {:ok, activity} <- insert(data, local),
288 :ok <- maybe_federate(activity),
289 _ <- User.update_follow_request_count(followed) do
290 {:ok, activity}
291 end
292 end
293
294 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
295 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
296 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
297 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
298 {:ok, activity} <- insert(unfollow_data, local),
299 :ok <- maybe_federate(activity),
300 _ <- User.update_follow_request_count(followed) do
301 {:ok, activity}
302 end
303 end
304
305 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
306 user = User.get_cached_by_ap_id(actor)
307
308 data = %{
309 "type" => "Delete",
310 "actor" => actor,
311 "object" => id,
312 "to" => [user.follower_address, "https://www.w3.org/ns/activitystreams#Public"]
313 }
314
315 with {:ok, _} <- Object.delete(object),
316 {:ok, activity} <- insert(data, local),
317 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
318 {:ok, _actor} <- User.decrease_note_count(user),
319 :ok <- maybe_federate(activity) do
320 {:ok, activity}
321 end
322 end
323
324 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
325 ap_config = Application.get_env(:pleroma, :activitypub)
326 unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
327 outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
328
329 with true <- unfollow_blocked do
330 follow_activity = fetch_latest_follow(blocker, blocked)
331
332 if follow_activity do
333 unfollow(blocker, blocked, nil, local)
334 end
335 end
336
337 with true <- outgoing_blocks,
338 block_data <- make_block_data(blocker, blocked, activity_id),
339 {:ok, activity} <- insert(block_data, local),
340 :ok <- maybe_federate(activity) do
341 {:ok, activity}
342 else
343 _e -> {:ok, nil}
344 end
345 end
346
347 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
348 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
349 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
350 {:ok, activity} <- insert(unblock_data, local),
351 :ok <- maybe_federate(activity) do
352 {:ok, activity}
353 end
354 end
355
356 def fetch_activities_for_context(context, opts \\ %{}) do
357 public = ["https://www.w3.org/ns/activitystreams#Public"]
358
359 recipients =
360 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
361
362 query = from(activity in Activity)
363
364 query =
365 query
366 |> restrict_blocked(opts)
367 |> restrict_recipients(recipients, opts["user"])
368
369 query =
370 from(
371 activity in query,
372 where:
373 fragment(
374 "?->>'type' = ? and ?->>'context' = ?",
375 activity.data,
376 "Create",
377 activity.data,
378 ^context
379 ),
380 order_by: [desc: :id]
381 )
382
383 Repo.all(query)
384 end
385
386 def fetch_public_activities(opts \\ %{}) do
387 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
388
389 q
390 |> restrict_unlisted()
391 |> Repo.all()
392 |> Enum.reverse()
393 end
394
395 @valid_visibilities ~w[direct unlisted public private]
396
397 defp restrict_visibility(query, %{visibility: visibility})
398 when visibility in @valid_visibilities do
399 query =
400 from(
401 a in query,
402 where:
403 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
404 )
405
406 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
407
408 query
409 end
410
411 defp restrict_visibility(_query, %{visibility: visibility})
412 when visibility not in @valid_visibilities do
413 Logger.error("Could not restrict visibility to #{visibility}")
414 end
415
416 defp restrict_visibility(query, _visibility), do: query
417
418 def fetch_user_activities(user, reading_user, params \\ %{}) do
419 params =
420 params
421 |> Map.put("type", ["Create", "Announce"])
422 |> Map.put("actor_id", user.ap_id)
423 |> Map.put("whole_db", true)
424 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
425
426 recipients =
427 if reading_user do
428 ["https://www.w3.org/ns/activitystreams#Public"] ++
429 [reading_user.ap_id | reading_user.following]
430 else
431 ["https://www.w3.org/ns/activitystreams#Public"]
432 end
433
434 fetch_activities(recipients, params)
435 |> Enum.reverse()
436 end
437
438 defp restrict_since(query, %{"since_id" => ""}), do: query
439
440 defp restrict_since(query, %{"since_id" => since_id}) do
441 from(activity in query, where: activity.id > ^since_id)
442 end
443
444 defp restrict_since(query, _), do: query
445
446 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
447 when is_list(tag_reject) and tag_reject != [] do
448 from(
449 activity in query,
450 where: fragment("(not (? #> '{\"object\",\"tag\"}') \\?| ?)", activity.data, ^tag_reject)
451 )
452 end
453
454 defp restrict_tag_reject(query, _), do: query
455
456 defp restrict_tag_all(query, %{"tag_all" => tag_all})
457 when is_list(tag_all) and tag_all != [] do
458 from(
459 activity in query,
460 where: fragment("(? #> '{\"object\",\"tag\"}') \\?& ?", activity.data, ^tag_all)
461 )
462 end
463
464 defp restrict_tag_all(query, _), do: query
465
466 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
467 from(
468 activity in query,
469 where: fragment("(? #> '{\"object\",\"tag\"}') \\?| ?", activity.data, ^tag)
470 )
471 end
472
473 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
474 from(
475 activity in query,
476 where: fragment("? <@ (? #> '{\"object\",\"tag\"}')", ^tag, activity.data)
477 )
478 end
479
480 defp restrict_tag(query, _), do: query
481
482 defp restrict_to_cc(query, recipients_to, recipients_cc) do
483 from(
484 activity in query,
485 where:
486 fragment(
487 "(?->'to' \\?| ?) or (?->'cc' \\?| ?)",
488 activity.data,
489 ^recipients_to,
490 activity.data,
491 ^recipients_cc
492 )
493 )
494 end
495
496 defp restrict_recipients(query, [], _user), do: query
497
498 defp restrict_recipients(query, recipients, nil) do
499 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
500 end
501
502 defp restrict_recipients(query, recipients, user) do
503 from(
504 activity in query,
505 where: fragment("? && ?", ^recipients, activity.recipients),
506 or_where: activity.actor == ^user.ap_id
507 )
508 end
509
510 defp restrict_limit(query, %{"limit" => limit}) do
511 from(activity in query, limit: ^limit)
512 end
513
514 defp restrict_limit(query, _), do: query
515
516 defp restrict_local(query, %{"local_only" => true}) do
517 from(activity in query, where: activity.local == true)
518 end
519
520 defp restrict_local(query, _), do: query
521
522 defp restrict_max(query, %{"max_id" => ""}), do: query
523
524 defp restrict_max(query, %{"max_id" => max_id}) do
525 from(activity in query, where: activity.id < ^max_id)
526 end
527
528 defp restrict_max(query, _), do: query
529
530 defp restrict_actor(query, %{"actor_id" => actor_id}) do
531 from(activity in query, where: activity.actor == ^actor_id)
532 end
533
534 defp restrict_actor(query, _), do: query
535
536 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
537 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
538 end
539
540 defp restrict_type(query, %{"type" => type}) do
541 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
542 end
543
544 defp restrict_type(query, _), do: query
545
546 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
547 from(
548 activity in query,
549 where: fragment("? <@ (? #> '{\"object\",\"likes\"}')", ^ap_id, activity.data)
550 )
551 end
552
553 defp restrict_favorited_by(query, _), do: query
554
555 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
556 from(
557 activity in query,
558 where: fragment("not (? #> '{\"object\",\"attachment\"}' = ?)", activity.data, ^[])
559 )
560 end
561
562 defp restrict_media(query, _), do: query
563
564 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
565 from(
566 activity in query,
567 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
568 )
569 end
570
571 defp restrict_replies(query, _), do: query
572
573 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
574 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
575 end
576
577 defp restrict_reblogs(query, _), do: query
578
579 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
580 blocks = info.blocks || []
581 domain_blocks = info.domain_blocks || []
582
583 from(
584 activity in query,
585 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
586 where: fragment("not (?->'to' \\?| ?)", activity.data, ^blocks),
587 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks)
588 )
589 end
590
591 defp restrict_blocked(query, _), do: query
592
593 defp restrict_unlisted(query) do
594 from(
595 activity in query,
596 where:
597 fragment(
598 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
599 activity.data,
600 ^["https://www.w3.org/ns/activitystreams#Public"]
601 )
602 )
603 end
604
605 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
606 from(activity in query, where: activity.id in ^ids)
607 end
608
609 defp restrict_pinned(query, _), do: query
610
611 def fetch_activities_query(recipients, opts \\ %{}) do
612 base_query =
613 from(
614 activity in Activity,
615 limit: 20,
616 order_by: [fragment("? desc nulls last", activity.id)]
617 )
618
619 base_query
620 |> restrict_recipients(recipients, opts["user"])
621 |> restrict_tag(opts)
622 |> restrict_tag_reject(opts)
623 |> restrict_tag_all(opts)
624 |> restrict_since(opts)
625 |> restrict_local(opts)
626 |> restrict_limit(opts)
627 |> restrict_max(opts)
628 |> restrict_actor(opts)
629 |> restrict_type(opts)
630 |> restrict_favorited_by(opts)
631 |> restrict_blocked(opts)
632 |> restrict_media(opts)
633 |> restrict_visibility(opts)
634 |> restrict_replies(opts)
635 |> restrict_reblogs(opts)
636 |> restrict_pinned(opts)
637 end
638
639 def fetch_activities(recipients, opts \\ %{}) do
640 fetch_activities_query(recipients, opts)
641 |> Repo.all()
642 |> Enum.reverse()
643 end
644
645 def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do
646 fetch_activities_query([], opts)
647 |> restrict_to_cc(recipients_to, recipients_cc)
648 |> Repo.all()
649 |> Enum.reverse()
650 end
651
652 def upload(file, opts \\ []) do
653 with {:ok, data} <- Upload.store(file, opts) do
654 obj_data =
655 if opts[:actor] do
656 Map.put(data, "actor", opts[:actor])
657 else
658 data
659 end
660
661 Repo.insert(%Object{data: obj_data})
662 end
663 end
664
665 def user_data_from_user_object(data) do
666 avatar =
667 data["icon"]["url"] &&
668 %{
669 "type" => "Image",
670 "url" => [%{"href" => data["icon"]["url"]}]
671 }
672
673 banner =
674 data["image"]["url"] &&
675 %{
676 "type" => "Image",
677 "url" => [%{"href" => data["image"]["url"]}]
678 }
679
680 locked = data["manuallyApprovesFollowers"] || false
681 data = Transmogrifier.maybe_fix_user_object(data)
682
683 user_data = %{
684 ap_id: data["id"],
685 info: %{
686 "ap_enabled" => true,
687 "source_data" => data,
688 "banner" => banner,
689 "locked" => locked
690 },
691 avatar: avatar,
692 name: data["name"],
693 follower_address: data["followers"],
694 bio: data["summary"]
695 }
696
697 # nickname can be nil because of virtual actors
698 user_data =
699 if data["preferredUsername"] do
700 Map.put(
701 user_data,
702 :nickname,
703 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
704 )
705 else
706 Map.put(user_data, :nickname, nil)
707 end
708
709 {:ok, user_data}
710 end
711
712 def fetch_and_prepare_user_from_ap_id(ap_id) do
713 with {:ok, data} <- fetch_and_contain_remote_object_from_id(ap_id) do
714 user_data_from_user_object(data)
715 else
716 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
717 end
718 end
719
720 def make_user_from_ap_id(ap_id) do
721 if _user = User.get_by_ap_id(ap_id) do
722 Transmogrifier.upgrade_user_from_ap_id(ap_id)
723 else
724 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
725 User.insert_or_update_user(data)
726 else
727 e -> {:error, e}
728 end
729 end
730 end
731
732 def make_user_from_nickname(nickname) do
733 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
734 make_user_from_ap_id(ap_id)
735 else
736 _e -> {:error, "No AP id in WebFinger"}
737 end
738 end
739
740 def should_federate?(inbox, public) do
741 if public do
742 true
743 else
744 inbox_info = URI.parse(inbox)
745 !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
746 end
747 end
748
749 def publish(actor, activity) do
750 remote_followers =
751 if actor.follower_address in activity.recipients do
752 {:ok, followers} = User.get_followers(actor)
753 followers |> Enum.filter(&(!&1.local))
754 else
755 []
756 end
757
758 public = is_public?(activity)
759
760 reachable_inboxes_metadata =
761 (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
762 |> Enum.filter(fn user -> User.ap_enabled?(user) end)
763 |> Enum.map(fn %{info: %{source_data: data}} ->
764 (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
765 end)
766 |> Enum.uniq()
767 |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
768 |> Instances.filter_reachable()
769
770 {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
771 json = Jason.encode!(data)
772
773 Enum.each(reachable_inboxes_metadata, fn {inbox, unreachable_since} ->
774 Federator.enqueue(:publish_single_ap, %{
775 inbox: inbox,
776 json: json,
777 actor: actor,
778 id: activity.data["id"],
779 unreachable_since: unreachable_since
780 })
781 end)
782 end
783
784 def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
785 Logger.info("Federating #{id} to #{inbox}")
786 host = URI.parse(inbox).host
787
788 digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
789
790 signature =
791 Pleroma.Web.HTTPSignatures.sign(actor, %{
792 host: host,
793 "content-length": byte_size(json),
794 digest: digest
795 })
796
797 with {:ok, %{status: code}} when code in 200..299 <-
798 result =
799 @httpoison.post(
800 inbox,
801 json,
802 [
803 {"Content-Type", "application/activity+json"},
804 {"signature", signature},
805 {"digest", digest}
806 ]
807 ) do
808 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
809 do: Instances.set_reachable(inbox)
810
811 result
812 else
813 {_post_result, response} ->
814 unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
815 {:error, response}
816 end
817 end
818
819 # TODO:
820 # This will create a Create activity, which we need internally at the moment.
821 def fetch_object_from_id(id) do
822 if object = Object.get_cached_by_ap_id(id) do
823 {:ok, object}
824 else
825 with {:ok, data} <- fetch_and_contain_remote_object_from_id(id),
826 nil <- Object.normalize(data),
827 params <- %{
828 "type" => "Create",
829 "to" => data["to"],
830 "cc" => data["cc"],
831 "actor" => data["actor"] || data["attributedTo"],
832 "object" => data
833 },
834 :ok <- Transmogrifier.contain_origin(id, params),
835 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
836 {:ok, Object.normalize(activity.data["object"])}
837 else
838 {:error, {:reject, nil}} ->
839 {:reject, nil}
840
841 object = %Object{} ->
842 {:ok, object}
843
844 _e ->
845 Logger.info("Couldn't get object via AP, trying out OStatus fetching...")
846
847 case OStatus.fetch_activity_from_url(id) do
848 {:ok, [activity | _]} -> {:ok, Object.normalize(activity.data["object"])}
849 e -> e
850 end
851 end
852 end
853 end
854
855 def fetch_and_contain_remote_object_from_id(id) do
856 Logger.info("Fetching object #{id} via AP")
857
858 with true <- String.starts_with?(id, "http"),
859 {:ok, %{body: body, status: code}} when code in 200..299 <-
860 @httpoison.get(
861 id,
862 [{:Accept, "application/activity+json"}]
863 ),
864 {:ok, data} <- Jason.decode(body),
865 :ok <- Transmogrifier.contain_origin_from_id(id, data) do
866 {:ok, data}
867 else
868 e ->
869 {:error, e}
870 end
871 end
872
873 def is_public?(%Object{data: %{"type" => "Tombstone"}}), do: false
874 def is_public?(%Object{data: data}), do: is_public?(data)
875 def is_public?(%Activity{data: data}), do: is_public?(data)
876 def is_public?(%{"directMessage" => true}), do: false
877
878 def is_public?(data) do
879 "https://www.w3.org/ns/activitystreams#Public" in (data["to"] ++ (data["cc"] || []))
880 end
881
882 def is_private?(activity) do
883 !is_public?(activity) && Enum.any?(activity.data["to"], &String.contains?(&1, "/followers"))
884 end
885
886 def is_direct?(%Activity{data: %{"directMessage" => true}}), do: true
887 def is_direct?(%Object{data: %{"directMessage" => true}}), do: true
888
889 def is_direct?(activity) do
890 !is_public?(activity) && !is_private?(activity)
891 end
892
893 def visible_for_user?(activity, nil) do
894 is_public?(activity)
895 end
896
897 def visible_for_user?(activity, user) do
898 x = [user.ap_id | user.following]
899 y = activity.data["to"] ++ (activity.data["cc"] || [])
900 visible_for_user?(activity, nil) || Enum.any?(x, &(&1 in y))
901 end
902
903 # guard
904 def entire_thread_visible_for_user?(nil, _user), do: false
905
906 # child
907 def entire_thread_visible_for_user?(
908 %Activity{data: %{"object" => %{"inReplyTo" => parent_id}}} = tail,
909 user
910 )
911 when is_binary(parent_id) do
912 parent = Activity.get_in_reply_to_activity(tail)
913 visible_for_user?(tail, user) && entire_thread_visible_for_user?(parent, user)
914 end
915
916 # root
917 def entire_thread_visible_for_user?(tail, user), do: visible_for_user?(tail, user)
918
919 # filter out broken threads
920 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
921 entire_thread_visible_for_user?(activity, user)
922 end
923
924 # do post-processing on a specific activity
925 def contain_activity(%Activity{} = activity, %User{} = user) do
926 contain_broken_threads(activity, user)
927 end
928
929 # do post-processing on a timeline
930 def contain_timeline(timeline, user) do
931 timeline
932 |> Enum.filter(fn activity ->
933 contain_activity(activity, user)
934 end)
935 end
936 end