[#675] Do not show DMs in mentions timeline
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Repo
8 alias Pleroma.Object
9 alias Pleroma.Upload
10 alias Pleroma.User
11 alias Pleroma.Notification
12 alias Pleroma.Instances
13 alias Pleroma.Web.ActivityPub.Transmogrifier
14 alias Pleroma.Web.ActivityPub.MRF
15 alias Pleroma.Web.WebFinger
16 alias Pleroma.Web.Federator
17 alias Pleroma.Web.OStatus
18
19 import Ecto.Query
20 import Pleroma.Web.ActivityPub.Utils
21 import Pleroma.Web.ActivityPub.Visibility
22
23 require Logger
24
25 @httpoison Application.get_env(:pleroma, :httpoison)
26
27 # For Announce activities, we filter the recipients based on following status for any actors
28 # that match actual users. See issue #164 for more information about why this is necessary.
29 defp get_recipients(%{"type" => "Announce"} = data) do
30 to = data["to"] || []
31 cc = data["cc"] || []
32 actor = User.get_cached_by_ap_id(data["actor"])
33
34 recipients =
35 (to ++ cc)
36 |> Enum.filter(fn recipient ->
37 case User.get_cached_by_ap_id(recipient) do
38 nil ->
39 true
40
41 user ->
42 User.following?(user, actor)
43 end
44 end)
45
46 {recipients, to, cc}
47 end
48
49 defp get_recipients(%{"type" => "Create"} = data) do
50 to = data["to"] || []
51 cc = data["cc"] || []
52 actor = data["actor"] || []
53 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
54 {recipients, to, cc}
55 end
56
57 defp get_recipients(data) do
58 to = data["to"] || []
59 cc = data["cc"] || []
60 recipients = to ++ cc
61 {recipients, to, cc}
62 end
63
64 defp check_actor_is_active(actor) do
65 if not is_nil(actor) do
66 with user <- User.get_cached_by_ap_id(actor),
67 false <- user.info.deactivated do
68 :ok
69 else
70 _e -> :reject
71 end
72 else
73 :ok
74 end
75 end
76
77 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
78 limit = Pleroma.Config.get([:instance, :remote_limit])
79 String.length(content) <= limit
80 end
81
82 defp check_remote_limit(_), do: true
83
84 def insert(map, local \\ true) when is_map(map) do
85 with nil <- Activity.normalize(map),
86 map <- lazy_put_activity_defaults(map),
87 :ok <- check_actor_is_active(map["actor"]),
88 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
89 {:ok, map} <- MRF.filter(map),
90 :ok <- insert_full_object(map) do
91 {recipients, _, _} = get_recipients(map)
92
93 {:ok, activity} =
94 Repo.insert(%Activity{
95 data: map,
96 local: local,
97 actor: map["actor"],
98 recipients: recipients
99 })
100
101 Task.start(fn ->
102 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
103 end)
104
105 Notification.create_notifications(activity)
106 stream_out(activity)
107 {:ok, activity}
108 else
109 %Activity{} = activity -> {:ok, activity}
110 error -> {:error, error}
111 end
112 end
113
114 def stream_out(activity) do
115 public = "https://www.w3.org/ns/activitystreams#Public"
116
117 if activity.data["type"] in ["Create", "Announce", "Delete"] do
118 Pleroma.Web.Streamer.stream("user", activity)
119 Pleroma.Web.Streamer.stream("list", activity)
120
121 if Enum.member?(activity.data["to"], public) do
122 Pleroma.Web.Streamer.stream("public", activity)
123
124 if activity.local do
125 Pleroma.Web.Streamer.stream("public:local", activity)
126 end
127
128 if activity.data["type"] in ["Create"] do
129 activity.data["object"]
130 |> Map.get("tag", [])
131 |> Enum.filter(fn tag -> is_bitstring(tag) end)
132 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
133
134 if activity.data["object"]["attachment"] != [] do
135 Pleroma.Web.Streamer.stream("public:media", activity)
136
137 if activity.local do
138 Pleroma.Web.Streamer.stream("public:local:media", activity)
139 end
140 end
141 end
142 else
143 if !Enum.member?(activity.data["cc"] || [], public) &&
144 !Enum.member?(
145 activity.data["to"],
146 User.get_by_ap_id(activity.data["actor"]).follower_address
147 ),
148 do: Pleroma.Web.Streamer.stream("direct", activity)
149 end
150 end
151 end
152
153 def create(%{to: to, actor: actor, context: context, object: object} = params) do
154 additional = params[:additional] || %{}
155 # only accept false as false value
156 local = !(params[:local] == false)
157 published = params[:published]
158
159 with create_data <-
160 make_create_data(
161 %{to: to, actor: actor, published: published, context: context, object: object},
162 additional
163 ),
164 {:ok, activity} <- insert(create_data, local),
165 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
166 {:ok, _actor} <- User.increase_note_count(actor),
167 :ok <- maybe_federate(activity) do
168 {:ok, activity}
169 end
170 end
171
172 def accept(%{to: to, actor: actor, object: object} = params) do
173 # only accept false as false value
174 local = !(params[:local] == false)
175
176 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
177 {:ok, activity} <- insert(data, local),
178 :ok <- maybe_federate(activity),
179 _ <- User.update_follow_request_count(actor) do
180 {:ok, activity}
181 end
182 end
183
184 def reject(%{to: to, actor: actor, object: object} = params) do
185 # only accept false as false value
186 local = !(params[:local] == false)
187
188 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
189 {:ok, activity} <- insert(data, local),
190 :ok <- maybe_federate(activity),
191 _ <- User.update_follow_request_count(actor) do
192 {:ok, activity}
193 end
194 end
195
196 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
197 # only accept false as false value
198 local = !(params[:local] == false)
199
200 with data <- %{
201 "to" => to,
202 "cc" => cc,
203 "type" => "Update",
204 "actor" => actor,
205 "object" => object
206 },
207 {:ok, activity} <- insert(data, local),
208 :ok <- maybe_federate(activity) do
209 {:ok, activity}
210 end
211 end
212
213 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
214 def like(
215 %User{ap_id: ap_id} = user,
216 %Object{data: %{"id" => _}} = object,
217 activity_id \\ nil,
218 local \\ true
219 ) do
220 with nil <- get_existing_like(ap_id, object),
221 like_data <- make_like_data(user, object, activity_id),
222 {:ok, activity} <- insert(like_data, local),
223 {:ok, object} <- add_like_to_object(activity, object),
224 :ok <- maybe_federate(activity) do
225 {:ok, activity, object}
226 else
227 %Activity{} = activity -> {:ok, activity, object}
228 error -> {:error, error}
229 end
230 end
231
232 def unlike(
233 %User{} = actor,
234 %Object{} = object,
235 activity_id \\ nil,
236 local \\ true
237 ) do
238 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
239 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
240 {:ok, unlike_activity} <- insert(unlike_data, local),
241 {:ok, _activity} <- Repo.delete(like_activity),
242 {:ok, object} <- remove_like_from_object(like_activity, object),
243 :ok <- maybe_federate(unlike_activity) do
244 {:ok, unlike_activity, like_activity, object}
245 else
246 _e -> {:ok, object}
247 end
248 end
249
250 def announce(
251 %User{ap_id: _} = user,
252 %Object{data: %{"id" => _}} = object,
253 activity_id \\ nil,
254 local \\ true,
255 public \\ true
256 ) do
257 with true <- is_public?(object),
258 announce_data <- make_announce_data(user, object, activity_id, public),
259 {:ok, activity} <- insert(announce_data, local),
260 {:ok, object} <- add_announce_to_object(activity, object),
261 :ok <- maybe_federate(activity) do
262 {:ok, activity, object}
263 else
264 error -> {:error, error}
265 end
266 end
267
268 def unannounce(
269 %User{} = actor,
270 %Object{} = object,
271 activity_id \\ nil,
272 local \\ true
273 ) do
274 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
275 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
276 {:ok, unannounce_activity} <- insert(unannounce_data, local),
277 :ok <- maybe_federate(unannounce_activity),
278 {:ok, _activity} <- Repo.delete(announce_activity),
279 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
280 {:ok, unannounce_activity, object}
281 else
282 _e -> {:ok, object}
283 end
284 end
285
286 def follow(follower, followed, activity_id \\ nil, local \\ true) do
287 with data <- make_follow_data(follower, followed, activity_id),
288 {:ok, activity} <- insert(data, local),
289 :ok <- maybe_federate(activity),
290 _ <- User.update_follow_request_count(followed) do
291 {:ok, activity}
292 end
293 end
294
295 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
296 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
297 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
298 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
299 {:ok, activity} <- insert(unfollow_data, local),
300 :ok <- maybe_federate(activity),
301 _ <- User.update_follow_request_count(followed) do
302 {:ok, activity}
303 end
304 end
305
306 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
307 user = User.get_cached_by_ap_id(actor)
308
309 data = %{
310 "type" => "Delete",
311 "actor" => actor,
312 "object" => id,
313 "to" => [user.follower_address, "https://www.w3.org/ns/activitystreams#Public"]
314 }
315
316 with {:ok, _} <- Object.delete(object),
317 {:ok, activity} <- insert(data, local),
318 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
319 {:ok, _actor} <- User.decrease_note_count(user),
320 :ok <- maybe_federate(activity) do
321 {:ok, activity}
322 end
323 end
324
325 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
326 ap_config = Application.get_env(:pleroma, :activitypub)
327 unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
328 outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
329
330 with true <- unfollow_blocked do
331 follow_activity = fetch_latest_follow(blocker, blocked)
332
333 if follow_activity do
334 unfollow(blocker, blocked, nil, local)
335 end
336 end
337
338 with true <- outgoing_blocks,
339 block_data <- make_block_data(blocker, blocked, activity_id),
340 {:ok, activity} <- insert(block_data, local),
341 :ok <- maybe_federate(activity) do
342 {:ok, activity}
343 else
344 _e -> {:ok, nil}
345 end
346 end
347
348 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
349 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
350 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
351 {:ok, activity} <- insert(unblock_data, local),
352 :ok <- maybe_federate(activity) do
353 {:ok, activity}
354 end
355 end
356
357 def flag(
358 %{
359 actor: actor,
360 context: context,
361 account: account,
362 statuses: statuses,
363 content: content
364 } = params
365 ) do
366 additional = params[:additional] || %{}
367
368 # only accept false as false value
369 local = !(params[:local] == false)
370
371 %{
372 actor: actor,
373 context: context,
374 account: account,
375 statuses: statuses,
376 content: content
377 }
378 |> make_flag_data(additional)
379 |> insert(local)
380 end
381
382 def fetch_activities_for_context(context, opts \\ %{}) do
383 public = ["https://www.w3.org/ns/activitystreams#Public"]
384
385 recipients =
386 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
387
388 query = from(activity in Activity)
389
390 query =
391 query
392 |> restrict_blocked(opts)
393 |> restrict_recipients(recipients, opts["user"])
394
395 query =
396 from(
397 activity in query,
398 where:
399 fragment(
400 "?->>'type' = ? and ?->>'context' = ?",
401 activity.data,
402 "Create",
403 activity.data,
404 ^context
405 ),
406 order_by: [desc: :id]
407 )
408
409 Repo.all(query)
410 end
411
412 def fetch_public_activities(opts \\ %{}) do
413 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
414
415 q
416 |> restrict_unlisted()
417 |> Repo.all()
418 |> Enum.reverse()
419 end
420
421 @valid_visibilities ~w[direct unlisted public private]
422
423 defp restrict_visibility(query, %{visibility: visibility})
424 when is_list(visibility) do
425 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
426 query =
427 from(
428 a in query,
429 where:
430 fragment(
431 "activity_visibility(?, ?, ?) = ANY (?)",
432 a.actor,
433 a.recipients,
434 a.data,
435 ^visibility
436 )
437 )
438
439 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
440
441 query
442 else
443 Logger.error("Could not restrict visibility to #{visibility}")
444 end
445 end
446
447 defp restrict_visibility(query, %{visibility: visibility})
448 when visibility in @valid_visibilities do
449 query =
450 from(
451 a in query,
452 where:
453 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
454 )
455
456 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
457
458 query
459 end
460
461 defp restrict_visibility(_query, %{visibility: visibility})
462 when visibility not in @valid_visibilities do
463 Logger.error("Could not restrict visibility to #{visibility}")
464 end
465
466 defp restrict_visibility(query, _visibility), do: query
467
468 def fetch_user_activities(user, reading_user, params \\ %{}) do
469 params =
470 params
471 |> Map.put("type", ["Create", "Announce"])
472 |> Map.put("actor_id", user.ap_id)
473 |> Map.put("whole_db", true)
474 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
475
476 recipients =
477 if reading_user do
478 ["https://www.w3.org/ns/activitystreams#Public"] ++
479 [reading_user.ap_id | reading_user.following]
480 else
481 ["https://www.w3.org/ns/activitystreams#Public"]
482 end
483
484 fetch_activities(recipients, params)
485 |> Enum.reverse()
486 end
487
488 defp restrict_since(query, %{"since_id" => ""}), do: query
489
490 defp restrict_since(query, %{"since_id" => since_id}) do
491 from(activity in query, where: activity.id > ^since_id)
492 end
493
494 defp restrict_since(query, _), do: query
495
496 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
497 when is_list(tag_reject) and tag_reject != [] do
498 from(
499 activity in query,
500 where: fragment("(not (? #> '{\"object\",\"tag\"}') \\?| ?)", activity.data, ^tag_reject)
501 )
502 end
503
504 defp restrict_tag_reject(query, _), do: query
505
506 defp restrict_tag_all(query, %{"tag_all" => tag_all})
507 when is_list(tag_all) and tag_all != [] do
508 from(
509 activity in query,
510 where: fragment("(? #> '{\"object\",\"tag\"}') \\?& ?", activity.data, ^tag_all)
511 )
512 end
513
514 defp restrict_tag_all(query, _), do: query
515
516 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
517 from(
518 activity in query,
519 where: fragment("(? #> '{\"object\",\"tag\"}') \\?| ?", activity.data, ^tag)
520 )
521 end
522
523 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
524 from(
525 activity in query,
526 where: fragment("? <@ (? #> '{\"object\",\"tag\"}')", ^tag, activity.data)
527 )
528 end
529
530 defp restrict_tag(query, _), do: query
531
532 defp restrict_to_cc(query, recipients_to, recipients_cc) do
533 from(
534 activity in query,
535 where:
536 fragment(
537 "(?->'to' \\?| ?) or (?->'cc' \\?| ?)",
538 activity.data,
539 ^recipients_to,
540 activity.data,
541 ^recipients_cc
542 )
543 )
544 end
545
546 defp restrict_recipients(query, [], _user), do: query
547
548 defp restrict_recipients(query, recipients, nil) do
549 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
550 end
551
552 defp restrict_recipients(query, recipients, user) do
553 from(
554 activity in query,
555 where: fragment("? && ?", ^recipients, activity.recipients),
556 or_where: activity.actor == ^user.ap_id
557 )
558 end
559
560 defp restrict_limit(query, %{"limit" => limit}) do
561 from(activity in query, limit: ^limit)
562 end
563
564 defp restrict_limit(query, _), do: query
565
566 defp restrict_local(query, %{"local_only" => true}) do
567 from(activity in query, where: activity.local == true)
568 end
569
570 defp restrict_local(query, _), do: query
571
572 defp restrict_max(query, %{"max_id" => ""}), do: query
573
574 defp restrict_max(query, %{"max_id" => max_id}) do
575 from(activity in query, where: activity.id < ^max_id)
576 end
577
578 defp restrict_max(query, _), do: query
579
580 defp restrict_actor(query, %{"actor_id" => actor_id}) do
581 from(activity in query, where: activity.actor == ^actor_id)
582 end
583
584 defp restrict_actor(query, _), do: query
585
586 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
587 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
588 end
589
590 defp restrict_type(query, %{"type" => type}) do
591 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
592 end
593
594 defp restrict_type(query, _), do: query
595
596 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
597 from(
598 activity in query,
599 where: fragment("? <@ (? #> '{\"object\",\"likes\"}')", ^ap_id, activity.data)
600 )
601 end
602
603 defp restrict_favorited_by(query, _), do: query
604
605 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
606 from(
607 activity in query,
608 where: fragment("not (? #> '{\"object\",\"attachment\"}' = ?)", activity.data, ^[])
609 )
610 end
611
612 defp restrict_media(query, _), do: query
613
614 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
615 from(
616 activity in query,
617 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
618 )
619 end
620
621 defp restrict_replies(query, _), do: query
622
623 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
624 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
625 end
626
627 defp restrict_reblogs(query, _), do: query
628
629 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
630 mutes = info.mutes
631
632 from(
633 activity in query,
634 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
635 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
636 )
637 end
638
639 defp restrict_muted(query, _), do: query
640
641 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
642 blocks = info.blocks || []
643 domain_blocks = info.domain_blocks || []
644
645 from(
646 activity in query,
647 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
648 where: fragment("not (?->'to' \\?| ?)", activity.data, ^blocks),
649 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks)
650 )
651 end
652
653 defp restrict_blocked(query, _), do: query
654
655 defp restrict_unlisted(query) do
656 from(
657 activity in query,
658 where:
659 fragment(
660 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
661 activity.data,
662 ^["https://www.w3.org/ns/activitystreams#Public"]
663 )
664 )
665 end
666
667 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
668 from(activity in query, where: activity.id in ^ids)
669 end
670
671 defp restrict_pinned(query, _), do: query
672
673 def fetch_activities_query(recipients, opts \\ %{}) do
674 base_query =
675 from(
676 activity in Activity,
677 limit: 20,
678 order_by: [fragment("? desc nulls last", activity.id)]
679 )
680
681 base_query
682 |> restrict_recipients(recipients, opts["user"])
683 |> restrict_tag(opts)
684 |> restrict_tag_reject(opts)
685 |> restrict_tag_all(opts)
686 |> restrict_since(opts)
687 |> restrict_local(opts)
688 |> restrict_limit(opts)
689 |> restrict_max(opts)
690 |> restrict_actor(opts)
691 |> restrict_type(opts)
692 |> restrict_favorited_by(opts)
693 |> restrict_blocked(opts)
694 |> restrict_muted(opts)
695 |> restrict_media(opts)
696 |> restrict_visibility(opts)
697 |> restrict_replies(opts)
698 |> restrict_reblogs(opts)
699 |> restrict_pinned(opts)
700 end
701
702 def fetch_activities(recipients, opts \\ %{}) do
703 fetch_activities_query(recipients, opts)
704 |> Repo.all()
705 |> Enum.reverse()
706 end
707
708 def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do
709 fetch_activities_query([], opts)
710 |> restrict_to_cc(recipients_to, recipients_cc)
711 |> Repo.all()
712 |> Enum.reverse()
713 end
714
715 def upload(file, opts \\ []) do
716 with {:ok, data} <- Upload.store(file, opts) do
717 obj_data =
718 if opts[:actor] do
719 Map.put(data, "actor", opts[:actor])
720 else
721 data
722 end
723
724 Repo.insert(%Object{data: obj_data})
725 end
726 end
727
728 def user_data_from_user_object(data) do
729 avatar =
730 data["icon"]["url"] &&
731 %{
732 "type" => "Image",
733 "url" => [%{"href" => data["icon"]["url"]}]
734 }
735
736 banner =
737 data["image"]["url"] &&
738 %{
739 "type" => "Image",
740 "url" => [%{"href" => data["image"]["url"]}]
741 }
742
743 locked = data["manuallyApprovesFollowers"] || false
744 data = Transmogrifier.maybe_fix_user_object(data)
745
746 user_data = %{
747 ap_id: data["id"],
748 info: %{
749 "ap_enabled" => true,
750 "source_data" => data,
751 "banner" => banner,
752 "locked" => locked
753 },
754 avatar: avatar,
755 name: data["name"],
756 follower_address: data["followers"],
757 bio: data["summary"]
758 }
759
760 # nickname can be nil because of virtual actors
761 user_data =
762 if data["preferredUsername"] do
763 Map.put(
764 user_data,
765 :nickname,
766 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
767 )
768 else
769 Map.put(user_data, :nickname, nil)
770 end
771
772 {:ok, user_data}
773 end
774
775 def fetch_and_prepare_user_from_ap_id(ap_id) do
776 with {:ok, data} <- fetch_and_contain_remote_object_from_id(ap_id) do
777 user_data_from_user_object(data)
778 else
779 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
780 end
781 end
782
783 def make_user_from_ap_id(ap_id) do
784 if _user = User.get_by_ap_id(ap_id) do
785 Transmogrifier.upgrade_user_from_ap_id(ap_id)
786 else
787 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
788 User.insert_or_update_user(data)
789 else
790 e -> {:error, e}
791 end
792 end
793 end
794
795 def make_user_from_nickname(nickname) do
796 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
797 make_user_from_ap_id(ap_id)
798 else
799 _e -> {:error, "No AP id in WebFinger"}
800 end
801 end
802
803 def should_federate?(inbox, public) do
804 if public do
805 true
806 else
807 inbox_info = URI.parse(inbox)
808 !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
809 end
810 end
811
812 def publish(actor, activity) do
813 remote_followers =
814 if actor.follower_address in activity.recipients do
815 {:ok, followers} = User.get_followers(actor)
816 followers |> Enum.filter(&(!&1.local))
817 else
818 []
819 end
820
821 public = is_public?(activity)
822
823 {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
824 json = Jason.encode!(data)
825
826 (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
827 |> Enum.filter(fn user -> User.ap_enabled?(user) end)
828 |> Enum.map(fn %{info: %{source_data: data}} ->
829 (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
830 end)
831 |> Enum.uniq()
832 |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
833 |> Instances.filter_reachable()
834 |> Enum.each(fn {inbox, unreachable_since} ->
835 Federator.publish_single_ap(%{
836 inbox: inbox,
837 json: json,
838 actor: actor,
839 id: activity.data["id"],
840 unreachable_since: unreachable_since
841 })
842 end)
843 end
844
845 def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
846 Logger.info("Federating #{id} to #{inbox}")
847 host = URI.parse(inbox).host
848
849 digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
850
851 date =
852 NaiveDateTime.utc_now()
853 |> Timex.format!("{WDshort}, {D} {Mshort} {YYYY} {h24}:{m}:{s} GMT")
854
855 signature =
856 Pleroma.Web.HTTPSignatures.sign(actor, %{
857 host: host,
858 "content-length": byte_size(json),
859 digest: digest,
860 date: date
861 })
862
863 with {:ok, %{status: code}} when code in 200..299 <-
864 result =
865 @httpoison.post(
866 inbox,
867 json,
868 [
869 {"Content-Type", "application/activity+json"},
870 {"Date", date},
871 {"signature", signature},
872 {"digest", digest}
873 ]
874 ) do
875 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
876 do: Instances.set_reachable(inbox)
877
878 result
879 else
880 {_post_result, response} ->
881 unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
882 {:error, response}
883 end
884 end
885
886 # TODO:
887 # This will create a Create activity, which we need internally at the moment.
888 def fetch_object_from_id(id) do
889 if object = Object.get_cached_by_ap_id(id) do
890 {:ok, object}
891 else
892 with {:ok, data} <- fetch_and_contain_remote_object_from_id(id),
893 nil <- Object.normalize(data),
894 params <- %{
895 "type" => "Create",
896 "to" => data["to"],
897 "cc" => data["cc"],
898 "actor" => data["actor"] || data["attributedTo"],
899 "object" => data
900 },
901 :ok <- Transmogrifier.contain_origin(id, params),
902 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
903 {:ok, Object.normalize(activity.data["object"])}
904 else
905 {:error, {:reject, nil}} ->
906 {:reject, nil}
907
908 object = %Object{} ->
909 {:ok, object}
910
911 _e ->
912 Logger.info("Couldn't get object via AP, trying out OStatus fetching...")
913
914 case OStatus.fetch_activity_from_url(id) do
915 {:ok, [activity | _]} -> {:ok, Object.normalize(activity.data["object"])}
916 e -> e
917 end
918 end
919 end
920 end
921
922 def fetch_and_contain_remote_object_from_id(id) do
923 Logger.info("Fetching object #{id} via AP")
924
925 with true <- String.starts_with?(id, "http"),
926 {:ok, %{body: body, status: code}} when code in 200..299 <-
927 @httpoison.get(
928 id,
929 [{:Accept, "application/activity+json"}]
930 ),
931 {:ok, data} <- Jason.decode(body),
932 :ok <- Transmogrifier.contain_origin_from_id(id, data) do
933 {:ok, data}
934 else
935 e ->
936 {:error, e}
937 end
938 end
939
940 # filter out broken threads
941 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
942 entire_thread_visible_for_user?(activity, user)
943 end
944
945 # do post-processing on a specific activity
946 def contain_activity(%Activity{} = activity, %User{} = user) do
947 contain_broken_threads(activity, user)
948 end
949
950 # do post-processing on a timeline
951 def contain_timeline(timeline, user) do
952 timeline
953 |> Enum.filter(fn activity ->
954 contain_activity(activity, user)
955 end)
956 end
957 end