Merge branch 'develop' of git.pleroma.social:pleroma/pleroma into feature/jobs
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Repo
8 alias Pleroma.Object
9 alias Pleroma.Upload
10 alias Pleroma.User
11 alias Pleroma.Notification
12 alias Pleroma.Instances
13 alias Pleroma.Web.ActivityPub.Transmogrifier
14 alias Pleroma.Web.ActivityPub.MRF
15 alias Pleroma.Web.WebFinger
16 alias Pleroma.Web.Federator
17 alias Pleroma.Web.OStatus
18
19 import Ecto.Query
20 import Pleroma.Web.ActivityPub.Utils
21
22 require Logger
23
24 @httpoison Application.get_env(:pleroma, :httpoison)
25
26 # For Announce activities, we filter the recipients based on following status for any actors
27 # that match actual users. See issue #164 for more information about why this is necessary.
28 defp get_recipients(%{"type" => "Announce"} = data) do
29 to = data["to"] || []
30 cc = data["cc"] || []
31 actor = User.get_cached_by_ap_id(data["actor"])
32
33 recipients =
34 (to ++ cc)
35 |> Enum.filter(fn recipient ->
36 case User.get_cached_by_ap_id(recipient) do
37 nil ->
38 true
39
40 user ->
41 User.following?(user, actor)
42 end
43 end)
44
45 {recipients, to, cc}
46 end
47
48 defp get_recipients(%{"type" => "Create"} = data) do
49 to = data["to"] || []
50 cc = data["cc"] || []
51 actor = data["actor"] || []
52 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
53 {recipients, to, cc}
54 end
55
56 defp get_recipients(data) do
57 to = data["to"] || []
58 cc = data["cc"] || []
59 recipients = to ++ cc
60 {recipients, to, cc}
61 end
62
63 defp check_actor_is_active(actor) do
64 if not is_nil(actor) do
65 with user <- User.get_cached_by_ap_id(actor),
66 false <- user.info.deactivated do
67 :ok
68 else
69 _e -> :reject
70 end
71 else
72 :ok
73 end
74 end
75
76 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
77 limit = Pleroma.Config.get([:instance, :remote_limit])
78 String.length(content) <= limit
79 end
80
81 defp check_remote_limit(_), do: true
82
83 def insert(map, local \\ true) when is_map(map) do
84 with nil <- Activity.normalize(map),
85 map <- lazy_put_activity_defaults(map),
86 :ok <- check_actor_is_active(map["actor"]),
87 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
88 {:ok, map} <- MRF.filter(map),
89 :ok <- insert_full_object(map) do
90 {recipients, _, _} = get_recipients(map)
91
92 {:ok, activity} =
93 Repo.insert(%Activity{
94 data: map,
95 local: local,
96 actor: map["actor"],
97 recipients: recipients
98 })
99
100 Task.start(fn ->
101 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
102 end)
103
104 Notification.create_notifications(activity)
105 stream_out(activity)
106 {:ok, activity}
107 else
108 %Activity{} = activity -> {:ok, activity}
109 error -> {:error, error}
110 end
111 end
112
113 def stream_out(activity) do
114 public = "https://www.w3.org/ns/activitystreams#Public"
115
116 if activity.data["type"] in ["Create", "Announce", "Delete"] do
117 Pleroma.Web.Streamer.stream("user", activity)
118 Pleroma.Web.Streamer.stream("list", activity)
119
120 if Enum.member?(activity.data["to"], public) do
121 Pleroma.Web.Streamer.stream("public", activity)
122
123 if activity.local do
124 Pleroma.Web.Streamer.stream("public:local", activity)
125 end
126
127 if activity.data["type"] in ["Create"] do
128 activity.data["object"]
129 |> Map.get("tag", [])
130 |> Enum.filter(fn tag -> is_bitstring(tag) end)
131 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
132
133 if activity.data["object"]["attachment"] != [] do
134 Pleroma.Web.Streamer.stream("public:media", activity)
135
136 if activity.local do
137 Pleroma.Web.Streamer.stream("public:local:media", activity)
138 end
139 end
140 end
141 else
142 if !Enum.member?(activity.data["cc"] || [], public) &&
143 !Enum.member?(
144 activity.data["to"],
145 User.get_by_ap_id(activity.data["actor"]).follower_address
146 ),
147 do: Pleroma.Web.Streamer.stream("direct", activity)
148 end
149 end
150 end
151
152 def create(%{to: to, actor: actor, context: context, object: object} = params) do
153 additional = params[:additional] || %{}
154 # only accept false as false value
155 local = !(params[:local] == false)
156 published = params[:published]
157
158 with create_data <-
159 make_create_data(
160 %{to: to, actor: actor, published: published, context: context, object: object},
161 additional
162 ),
163 {:ok, activity} <- insert(create_data, local),
164 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
165 {:ok, _actor} <- User.increase_note_count(actor),
166 :ok <- maybe_federate(activity) do
167 {:ok, activity}
168 end
169 end
170
171 def accept(%{to: to, actor: actor, object: object} = params) do
172 # only accept false as false value
173 local = !(params[:local] == false)
174
175 with data <- %{"to" => to, "type" => "Accept", "actor" => actor, "object" => object},
176 {:ok, activity} <- insert(data, local),
177 :ok <- maybe_federate(activity) do
178 {:ok, activity}
179 end
180 end
181
182 def reject(%{to: to, actor: actor, object: object} = params) do
183 # only accept false as false value
184 local = !(params[:local] == false)
185
186 with data <- %{"to" => to, "type" => "Reject", "actor" => actor, "object" => object},
187 {:ok, activity} <- insert(data, local),
188 :ok <- maybe_federate(activity) do
189 {:ok, activity}
190 end
191 end
192
193 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
194 # only accept false as false value
195 local = !(params[:local] == false)
196
197 with data <- %{
198 "to" => to,
199 "cc" => cc,
200 "type" => "Update",
201 "actor" => actor,
202 "object" => object
203 },
204 {:ok, activity} <- insert(data, local),
205 :ok <- maybe_federate(activity) do
206 {:ok, activity}
207 end
208 end
209
210 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
211 def like(
212 %User{ap_id: ap_id} = user,
213 %Object{data: %{"id" => _}} = object,
214 activity_id \\ nil,
215 local \\ true
216 ) do
217 with nil <- get_existing_like(ap_id, object),
218 like_data <- make_like_data(user, object, activity_id),
219 {:ok, activity} <- insert(like_data, local),
220 {:ok, object} <- add_like_to_object(activity, object),
221 :ok <- maybe_federate(activity) do
222 {:ok, activity, object}
223 else
224 %Activity{} = activity -> {:ok, activity, object}
225 error -> {:error, error}
226 end
227 end
228
229 def unlike(
230 %User{} = actor,
231 %Object{} = object,
232 activity_id \\ nil,
233 local \\ true
234 ) do
235 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
236 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
237 {:ok, unlike_activity} <- insert(unlike_data, local),
238 {:ok, _activity} <- Repo.delete(like_activity),
239 {:ok, object} <- remove_like_from_object(like_activity, object),
240 :ok <- maybe_federate(unlike_activity) do
241 {:ok, unlike_activity, like_activity, object}
242 else
243 _e -> {:ok, object}
244 end
245 end
246
247 def announce(
248 %User{ap_id: _} = user,
249 %Object{data: %{"id" => _}} = object,
250 activity_id \\ nil,
251 local \\ true,
252 public \\ true
253 ) do
254 with true <- is_public?(object),
255 announce_data <- make_announce_data(user, object, activity_id, public),
256 {:ok, activity} <- insert(announce_data, local),
257 {:ok, object} <- add_announce_to_object(activity, object),
258 :ok <- maybe_federate(activity) do
259 {:ok, activity, object}
260 else
261 error -> {:error, error}
262 end
263 end
264
265 def unannounce(
266 %User{} = actor,
267 %Object{} = object,
268 activity_id \\ nil,
269 local \\ true
270 ) do
271 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
272 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
273 {:ok, unannounce_activity} <- insert(unannounce_data, local),
274 :ok <- maybe_federate(unannounce_activity),
275 {:ok, _activity} <- Repo.delete(announce_activity),
276 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
277 {:ok, unannounce_activity, object}
278 else
279 _e -> {:ok, object}
280 end
281 end
282
283 def follow(follower, followed, activity_id \\ nil, local \\ true) do
284 with data <- make_follow_data(follower, followed, activity_id),
285 {:ok, activity} <- insert(data, local),
286 :ok <- maybe_federate(activity) do
287 {:ok, activity}
288 end
289 end
290
291 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
292 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
293 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
294 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
295 {:ok, activity} <- insert(unfollow_data, local),
296 :ok <- maybe_federate(activity) do
297 {:ok, activity}
298 end
299 end
300
301 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
302 user = User.get_cached_by_ap_id(actor)
303
304 data = %{
305 "type" => "Delete",
306 "actor" => actor,
307 "object" => id,
308 "to" => [user.follower_address, "https://www.w3.org/ns/activitystreams#Public"]
309 }
310
311 with {:ok, _} <- Object.delete(object),
312 {:ok, activity} <- insert(data, local),
313 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
314 {:ok, _actor} <- User.decrease_note_count(user),
315 :ok <- maybe_federate(activity) do
316 {:ok, activity}
317 end
318 end
319
320 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
321 ap_config = Application.get_env(:pleroma, :activitypub)
322 unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
323 outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
324
325 with true <- unfollow_blocked do
326 follow_activity = fetch_latest_follow(blocker, blocked)
327
328 if follow_activity do
329 unfollow(blocker, blocked, nil, local)
330 end
331 end
332
333 with true <- outgoing_blocks,
334 block_data <- make_block_data(blocker, blocked, activity_id),
335 {:ok, activity} <- insert(block_data, local),
336 :ok <- maybe_federate(activity) do
337 {:ok, activity}
338 else
339 _e -> {:ok, nil}
340 end
341 end
342
343 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
344 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
345 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
346 {:ok, activity} <- insert(unblock_data, local),
347 :ok <- maybe_federate(activity) do
348 {:ok, activity}
349 end
350 end
351
352 def fetch_activities_for_context(context, opts \\ %{}) do
353 public = ["https://www.w3.org/ns/activitystreams#Public"]
354
355 recipients =
356 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
357
358 query = from(activity in Activity)
359
360 query =
361 query
362 |> restrict_blocked(opts)
363 |> restrict_recipients(recipients, opts["user"])
364
365 query =
366 from(
367 activity in query,
368 where:
369 fragment(
370 "?->>'type' = ? and ?->>'context' = ?",
371 activity.data,
372 "Create",
373 activity.data,
374 ^context
375 ),
376 order_by: [desc: :id]
377 )
378
379 Repo.all(query)
380 end
381
382 def fetch_public_activities(opts \\ %{}) do
383 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
384
385 q
386 |> restrict_unlisted()
387 |> Repo.all()
388 |> Enum.reverse()
389 end
390
391 @valid_visibilities ~w[direct unlisted public private]
392
393 defp restrict_visibility(query, %{visibility: visibility})
394 when visibility in @valid_visibilities do
395 query =
396 from(
397 a in query,
398 where:
399 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
400 )
401
402 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
403
404 query
405 end
406
407 defp restrict_visibility(_query, %{visibility: visibility})
408 when visibility not in @valid_visibilities do
409 Logger.error("Could not restrict visibility to #{visibility}")
410 end
411
412 defp restrict_visibility(query, _visibility), do: query
413
414 def fetch_user_activities(user, reading_user, params \\ %{}) do
415 params =
416 params
417 |> Map.put("type", ["Create", "Announce"])
418 |> Map.put("actor_id", user.ap_id)
419 |> Map.put("whole_db", true)
420 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
421
422 recipients =
423 if reading_user do
424 ["https://www.w3.org/ns/activitystreams#Public"] ++
425 [reading_user.ap_id | reading_user.following]
426 else
427 ["https://www.w3.org/ns/activitystreams#Public"]
428 end
429
430 fetch_activities(recipients, params)
431 |> Enum.reverse()
432 end
433
434 defp restrict_since(query, %{"since_id" => ""}), do: query
435
436 defp restrict_since(query, %{"since_id" => since_id}) do
437 from(activity in query, where: activity.id > ^since_id)
438 end
439
440 defp restrict_since(query, _), do: query
441
442 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
443 when is_list(tag_reject) and tag_reject != [] do
444 from(
445 activity in query,
446 where: fragment("(not (? #> '{\"object\",\"tag\"}') \\?| ?)", activity.data, ^tag_reject)
447 )
448 end
449
450 defp restrict_tag_reject(query, _), do: query
451
452 defp restrict_tag_all(query, %{"tag_all" => tag_all})
453 when is_list(tag_all) and tag_all != [] do
454 from(
455 activity in query,
456 where: fragment("(? #> '{\"object\",\"tag\"}') \\?& ?", activity.data, ^tag_all)
457 )
458 end
459
460 defp restrict_tag_all(query, _), do: query
461
462 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
463 from(
464 activity in query,
465 where: fragment("(? #> '{\"object\",\"tag\"}') \\?| ?", activity.data, ^tag)
466 )
467 end
468
469 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
470 from(
471 activity in query,
472 where: fragment("? <@ (? #> '{\"object\",\"tag\"}')", ^tag, activity.data)
473 )
474 end
475
476 defp restrict_tag(query, _), do: query
477
478 defp restrict_to_cc(query, recipients_to, recipients_cc) do
479 from(
480 activity in query,
481 where:
482 fragment(
483 "(?->'to' \\?| ?) or (?->'cc' \\?| ?)",
484 activity.data,
485 ^recipients_to,
486 activity.data,
487 ^recipients_cc
488 )
489 )
490 end
491
492 defp restrict_recipients(query, [], _user), do: query
493
494 defp restrict_recipients(query, recipients, nil) do
495 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
496 end
497
498 defp restrict_recipients(query, recipients, user) do
499 from(
500 activity in query,
501 where: fragment("? && ?", ^recipients, activity.recipients),
502 or_where: activity.actor == ^user.ap_id
503 )
504 end
505
506 defp restrict_limit(query, %{"limit" => limit}) do
507 from(activity in query, limit: ^limit)
508 end
509
510 defp restrict_limit(query, _), do: query
511
512 defp restrict_local(query, %{"local_only" => true}) do
513 from(activity in query, where: activity.local == true)
514 end
515
516 defp restrict_local(query, _), do: query
517
518 defp restrict_max(query, %{"max_id" => ""}), do: query
519
520 defp restrict_max(query, %{"max_id" => max_id}) do
521 from(activity in query, where: activity.id < ^max_id)
522 end
523
524 defp restrict_max(query, _), do: query
525
526 defp restrict_actor(query, %{"actor_id" => actor_id}) do
527 from(activity in query, where: activity.actor == ^actor_id)
528 end
529
530 defp restrict_actor(query, _), do: query
531
532 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
533 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
534 end
535
536 defp restrict_type(query, %{"type" => type}) do
537 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
538 end
539
540 defp restrict_type(query, _), do: query
541
542 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
543 from(
544 activity in query,
545 where: fragment("? <@ (? #> '{\"object\",\"likes\"}')", ^ap_id, activity.data)
546 )
547 end
548
549 defp restrict_favorited_by(query, _), do: query
550
551 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
552 from(
553 activity in query,
554 where: fragment("not (? #> '{\"object\",\"attachment\"}' = ?)", activity.data, ^[])
555 )
556 end
557
558 defp restrict_media(query, _), do: query
559
560 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
561 from(
562 activity in query,
563 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
564 )
565 end
566
567 defp restrict_replies(query, _), do: query
568
569 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
570 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
571 end
572
573 defp restrict_reblogs(query, _), do: query
574
575 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
576 blocks = info.blocks || []
577 domain_blocks = info.domain_blocks || []
578
579 from(
580 activity in query,
581 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
582 where: fragment("not (?->'to' \\?| ?)", activity.data, ^blocks),
583 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks)
584 )
585 end
586
587 defp restrict_blocked(query, _), do: query
588
589 defp restrict_unlisted(query) do
590 from(
591 activity in query,
592 where:
593 fragment(
594 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
595 activity.data,
596 ^["https://www.w3.org/ns/activitystreams#Public"]
597 )
598 )
599 end
600
601 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
602 from(activity in query, where: activity.id in ^ids)
603 end
604
605 defp restrict_pinned(query, _), do: query
606
607 def fetch_activities_query(recipients, opts \\ %{}) do
608 base_query =
609 from(
610 activity in Activity,
611 limit: 20,
612 order_by: [fragment("? desc nulls last", activity.id)]
613 )
614
615 base_query
616 |> restrict_recipients(recipients, opts["user"])
617 |> restrict_tag(opts)
618 |> restrict_tag_reject(opts)
619 |> restrict_tag_all(opts)
620 |> restrict_since(opts)
621 |> restrict_local(opts)
622 |> restrict_limit(opts)
623 |> restrict_max(opts)
624 |> restrict_actor(opts)
625 |> restrict_type(opts)
626 |> restrict_favorited_by(opts)
627 |> restrict_blocked(opts)
628 |> restrict_media(opts)
629 |> restrict_visibility(opts)
630 |> restrict_replies(opts)
631 |> restrict_reblogs(opts)
632 |> restrict_pinned(opts)
633 end
634
635 def fetch_activities(recipients, opts \\ %{}) do
636 fetch_activities_query(recipients, opts)
637 |> Repo.all()
638 |> Enum.reverse()
639 end
640
641 def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do
642 fetch_activities_query([], opts)
643 |> restrict_to_cc(recipients_to, recipients_cc)
644 |> Repo.all()
645 |> Enum.reverse()
646 end
647
648 def upload(file, opts \\ []) do
649 with {:ok, data} <- Upload.store(file, opts) do
650 obj_data =
651 if opts[:actor] do
652 Map.put(data, "actor", opts[:actor])
653 else
654 data
655 end
656
657 Repo.insert(%Object{data: obj_data})
658 end
659 end
660
661 def user_data_from_user_object(data) do
662 avatar =
663 data["icon"]["url"] &&
664 %{
665 "type" => "Image",
666 "url" => [%{"href" => data["icon"]["url"]}]
667 }
668
669 banner =
670 data["image"]["url"] &&
671 %{
672 "type" => "Image",
673 "url" => [%{"href" => data["image"]["url"]}]
674 }
675
676 locked = data["manuallyApprovesFollowers"] || false
677 data = Transmogrifier.maybe_fix_user_object(data)
678
679 user_data = %{
680 ap_id: data["id"],
681 info: %{
682 "ap_enabled" => true,
683 "source_data" => data,
684 "banner" => banner,
685 "locked" => locked
686 },
687 avatar: avatar,
688 name: data["name"],
689 follower_address: data["followers"],
690 bio: data["summary"]
691 }
692
693 # nickname can be nil because of virtual actors
694 user_data =
695 if data["preferredUsername"] do
696 Map.put(
697 user_data,
698 :nickname,
699 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
700 )
701 else
702 Map.put(user_data, :nickname, nil)
703 end
704
705 {:ok, user_data}
706 end
707
708 def fetch_and_prepare_user_from_ap_id(ap_id) do
709 with {:ok, data} <- fetch_and_contain_remote_object_from_id(ap_id) do
710 user_data_from_user_object(data)
711 else
712 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
713 end
714 end
715
716 def make_user_from_ap_id(ap_id) do
717 if _user = User.get_by_ap_id(ap_id) do
718 Transmogrifier.upgrade_user_from_ap_id(ap_id)
719 else
720 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
721 User.insert_or_update_user(data)
722 else
723 e -> {:error, e}
724 end
725 end
726 end
727
728 def make_user_from_nickname(nickname) do
729 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
730 make_user_from_ap_id(ap_id)
731 else
732 _e -> {:error, "No AP id in WebFinger"}
733 end
734 end
735
736 def should_federate?(inbox, public) do
737 if public do
738 true
739 else
740 inbox_info = URI.parse(inbox)
741 !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
742 end
743 end
744
745 def publish(actor, activity) do
746 remote_followers =
747 if actor.follower_address in activity.recipients do
748 {:ok, followers} = User.get_followers(actor)
749 followers |> Enum.filter(&(!&1.local))
750 else
751 []
752 end
753
754 public = is_public?(activity)
755
756 {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
757 json = Jason.encode!(data)
758
759 (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
760 |> Enum.filter(fn user -> User.ap_enabled?(user) end)
761 |> Enum.map(fn %{info: %{source_data: data}} ->
762 (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
763 end)
764 |> Enum.uniq()
765 |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
766 |> Instances.filter_reachable()
767 |> Enum.each(fn {inbox, unreachable_since} ->
768 Federator.publish_single_ap(%{
769 inbox: inbox,
770 json: json,
771 actor: actor,
772 id: activity.data["id"],
773 unreachable_since: unreachable_since
774 })
775 end)
776 end
777
778 def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
779 Logger.info("Federating #{id} to #{inbox}")
780 host = URI.parse(inbox).host
781
782 digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
783
784 signature =
785 Pleroma.Web.HTTPSignatures.sign(actor, %{
786 host: host,
787 "content-length": byte_size(json),
788 digest: digest
789 })
790
791 with {:ok, %{status: code}} when code in 200..299 <-
792 result =
793 @httpoison.post(
794 inbox,
795 json,
796 [
797 {"Content-Type", "application/activity+json"},
798 {"signature", signature},
799 {"digest", digest}
800 ]
801 ) do
802 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
803 do: Instances.set_reachable(inbox)
804
805 result
806 else
807 {_post_result, response} ->
808 unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
809 {:error, response}
810 end
811 end
812
813 # TODO:
814 # This will create a Create activity, which we need internally at the moment.
815 def fetch_object_from_id(id) do
816 if object = Object.get_cached_by_ap_id(id) do
817 {:ok, object}
818 else
819 Logger.info("Fetching #{id} via AP")
820
821 with {:ok, data} <- fetch_and_contain_remote_object_from_id(id),
822 nil <- Object.normalize(data),
823 params <- %{
824 "type" => "Create",
825 "to" => data["to"],
826 "cc" => data["cc"],
827 "actor" => data["actor"] || data["attributedTo"],
828 "object" => data
829 },
830 :ok <- Transmogrifier.contain_origin(id, params),
831 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
832 {:ok, Object.normalize(activity.data["object"])}
833 else
834 {:error, {:reject, nil}} ->
835 {:reject, nil}
836
837 object = %Object{} ->
838 {:ok, object}
839
840 _e ->
841 Logger.info("Couldn't get object via AP, trying out OStatus fetching...")
842
843 case OStatus.fetch_activity_from_url(id) do
844 {:ok, [activity | _]} -> {:ok, Object.normalize(activity.data["object"])}
845 e -> e
846 end
847 end
848 end
849 end
850
851 def fetch_and_contain_remote_object_from_id(id) do
852 Logger.info("Fetching #{id} via AP")
853
854 with true <- String.starts_with?(id, "http"),
855 {:ok, %{body: body, status: code}} when code in 200..299 <-
856 @httpoison.get(
857 id,
858 [{:Accept, "application/activity+json"}]
859 ),
860 {:ok, data} <- Jason.decode(body),
861 :ok <- Transmogrifier.contain_origin_from_id(id, data) do
862 {:ok, data}
863 else
864 e ->
865 {:error, e}
866 end
867 end
868
869 def is_public?(%Object{data: %{"type" => "Tombstone"}}), do: false
870 def is_public?(%Object{data: data}), do: is_public?(data)
871 def is_public?(%Activity{data: data}), do: is_public?(data)
872 def is_public?(%{"directMessage" => true}), do: false
873
874 def is_public?(data) do
875 "https://www.w3.org/ns/activitystreams#Public" in (data["to"] ++ (data["cc"] || []))
876 end
877
878 def is_private?(activity) do
879 !is_public?(activity) && Enum.any?(activity.data["to"], &String.contains?(&1, "/followers"))
880 end
881
882 def is_direct?(%Activity{data: %{"directMessage" => true}}), do: true
883 def is_direct?(%Object{data: %{"directMessage" => true}}), do: true
884
885 def is_direct?(activity) do
886 !is_public?(activity) && !is_private?(activity)
887 end
888
889 def visible_for_user?(activity, nil) do
890 is_public?(activity)
891 end
892
893 def visible_for_user?(activity, user) do
894 x = [user.ap_id | user.following]
895 y = activity.data["to"] ++ (activity.data["cc"] || [])
896 visible_for_user?(activity, nil) || Enum.any?(x, &(&1 in y))
897 end
898
899 # guard
900 def entire_thread_visible_for_user?(nil, _user), do: false
901
902 # child
903 def entire_thread_visible_for_user?(
904 %Activity{data: %{"object" => %{"inReplyTo" => parent_id}}} = tail,
905 user
906 )
907 when is_binary(parent_id) do
908 parent = Activity.get_in_reply_to_activity(tail)
909 visible_for_user?(tail, user) && entire_thread_visible_for_user?(parent, user)
910 end
911
912 # root
913 def entire_thread_visible_for_user?(tail, user), do: visible_for_user?(tail, user)
914
915 # filter out broken threads
916 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
917 entire_thread_visible_for_user?(activity, user)
918 end
919
920 # do post-processing on a specific activity
921 def contain_activity(%Activity{} = activity, %User{} = user) do
922 contain_broken_threads(activity, user)
923 end
924
925 # do post-processing on a timeline
926 def contain_timeline(timeline, user) do
927 timeline
928 |> Enum.filter(fn activity ->
929 contain_activity(activity, user)
930 end)
931 end
932 end