Broadcast deleted activity id on deletion to conform to MastoAPI streamig spec
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Repo
8 alias Pleroma.Object
9 alias Pleroma.Upload
10 alias Pleroma.User
11 alias Pleroma.Notification
12 alias Pleroma.Instances
13 alias Pleroma.Web.ActivityPub.Transmogrifier
14 alias Pleroma.Web.ActivityPub.MRF
15 alias Pleroma.Web.WebFinger
16 alias Pleroma.Web.Federator
17 alias Pleroma.Web.OStatus
18
19 import Ecto.Query
20 import Pleroma.Web.ActivityPub.Utils
21 import Pleroma.Web.ActivityPub.Visibility
22
23 require Logger
24
25 @httpoison Application.get_env(:pleroma, :httpoison)
26
27 # For Announce activities, we filter the recipients based on following status for any actors
28 # that match actual users. See issue #164 for more information about why this is necessary.
29 defp get_recipients(%{"type" => "Announce"} = data) do
30 to = data["to"] || []
31 cc = data["cc"] || []
32 actor = User.get_cached_by_ap_id(data["actor"])
33
34 recipients =
35 (to ++ cc)
36 |> Enum.filter(fn recipient ->
37 case User.get_cached_by_ap_id(recipient) do
38 nil ->
39 true
40
41 user ->
42 User.following?(user, actor)
43 end
44 end)
45
46 {recipients, to, cc}
47 end
48
49 defp get_recipients(%{"type" => "Create"} = data) do
50 to = data["to"] || []
51 cc = data["cc"] || []
52 actor = data["actor"] || []
53 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
54 {recipients, to, cc}
55 end
56
57 defp get_recipients(data) do
58 to = data["to"] || []
59 cc = data["cc"] || []
60 recipients = to ++ cc
61 {recipients, to, cc}
62 end
63
64 defp check_actor_is_active(actor) do
65 if not is_nil(actor) do
66 with user <- User.get_cached_by_ap_id(actor),
67 false <- user.info.deactivated do
68 :ok
69 else
70 _e -> :reject
71 end
72 else
73 :ok
74 end
75 end
76
77 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
78 limit = Pleroma.Config.get([:instance, :remote_limit])
79 String.length(content) <= limit
80 end
81
82 defp check_remote_limit(_), do: true
83
84 def increase_note_count_if_public(actor, object) do
85 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
86 end
87
88 def decrease_note_count_if_public(actor, object) do
89 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
90 end
91
92 def insert(map, local \\ true) when is_map(map) do
93 with nil <- Activity.normalize(map),
94 map <- lazy_put_activity_defaults(map),
95 :ok <- check_actor_is_active(map["actor"]),
96 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
97 {:ok, map} <- MRF.filter(map),
98 :ok <- insert_full_object(map) do
99 {recipients, _, _} = get_recipients(map)
100
101 {:ok, activity} =
102 Repo.insert(%Activity{
103 data: map,
104 local: local,
105 actor: map["actor"],
106 recipients: recipients
107 })
108
109 Task.start(fn ->
110 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
111 end)
112
113 Notification.create_notifications(activity)
114 stream_out(activity)
115 {:ok, activity}
116 else
117 %Activity{} = activity -> {:ok, activity}
118 error -> {:error, error}
119 end
120 end
121
122 def stream_out(activity) do
123 public = "https://www.w3.org/ns/activitystreams#Public"
124
125 if activity.data["type"] in ["Create", "Announce", "Delete"] do
126 Pleroma.Web.Streamer.stream("user", activity)
127 Pleroma.Web.Streamer.stream("list", activity)
128
129 if Enum.member?(activity.data["to"], public) do
130 Pleroma.Web.Streamer.stream("public", activity)
131
132 if activity.local do
133 Pleroma.Web.Streamer.stream("public:local", activity)
134 end
135
136 if activity.data["type"] in ["Create"] do
137 activity.data["object"]
138 |> Map.get("tag", [])
139 |> Enum.filter(fn tag -> is_bitstring(tag) end)
140 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
141
142 if activity.data["object"]["attachment"] != [] do
143 Pleroma.Web.Streamer.stream("public:media", activity)
144
145 if activity.local do
146 Pleroma.Web.Streamer.stream("public:local:media", activity)
147 end
148 end
149 end
150 else
151 if !Enum.member?(activity.data["cc"] || [], public) &&
152 !Enum.member?(
153 activity.data["to"],
154 User.get_by_ap_id(activity.data["actor"]).follower_address
155 ),
156 do: Pleroma.Web.Streamer.stream("direct", activity)
157 end
158 end
159 end
160
161 def create(%{to: to, actor: actor, context: context, object: object} = params) do
162 additional = params[:additional] || %{}
163 # only accept false as false value
164 local = !(params[:local] == false)
165 published = params[:published]
166
167 with create_data <-
168 make_create_data(
169 %{to: to, actor: actor, published: published, context: context, object: object},
170 additional
171 ),
172 {:ok, activity} <- insert(create_data, local),
173 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
174 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
175 :ok <- maybe_federate(activity) do
176 {:ok, activity}
177 end
178 end
179
180 def accept(%{to: to, actor: actor, object: object} = params) do
181 # only accept false as false value
182 local = !(params[:local] == false)
183
184 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
185 {:ok, activity} <- insert(data, local),
186 :ok <- maybe_federate(activity) do
187 {:ok, activity}
188 end
189 end
190
191 def reject(%{to: to, actor: actor, object: object} = params) do
192 # only accept false as false value
193 local = !(params[:local] == false)
194
195 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
196 {:ok, activity} <- insert(data, local),
197 :ok <- maybe_federate(activity) do
198 {:ok, activity}
199 end
200 end
201
202 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
203 # only accept false as false value
204 local = !(params[:local] == false)
205
206 with data <- %{
207 "to" => to,
208 "cc" => cc,
209 "type" => "Update",
210 "actor" => actor,
211 "object" => object
212 },
213 {:ok, activity} <- insert(data, local),
214 :ok <- maybe_federate(activity) do
215 {:ok, activity}
216 end
217 end
218
219 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
220 def like(
221 %User{ap_id: ap_id} = user,
222 %Object{data: %{"id" => _}} = object,
223 activity_id \\ nil,
224 local \\ true
225 ) do
226 with nil <- get_existing_like(ap_id, object),
227 like_data <- make_like_data(user, object, activity_id),
228 {:ok, activity} <- insert(like_data, local),
229 {:ok, object} <- add_like_to_object(activity, object),
230 :ok <- maybe_federate(activity) do
231 {:ok, activity, object}
232 else
233 %Activity{} = activity -> {:ok, activity, object}
234 error -> {:error, error}
235 end
236 end
237
238 def unlike(
239 %User{} = actor,
240 %Object{} = object,
241 activity_id \\ nil,
242 local \\ true
243 ) do
244 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
245 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
246 {:ok, unlike_activity} <- insert(unlike_data, local),
247 {:ok, _activity} <- Repo.delete(like_activity),
248 {:ok, object} <- remove_like_from_object(like_activity, object),
249 :ok <- maybe_federate(unlike_activity) do
250 {:ok, unlike_activity, like_activity, object}
251 else
252 _e -> {:ok, object}
253 end
254 end
255
256 def announce(
257 %User{ap_id: _} = user,
258 %Object{data: %{"id" => _}} = object,
259 activity_id \\ nil,
260 local \\ true,
261 public \\ true
262 ) do
263 with true <- is_public?(object),
264 announce_data <- make_announce_data(user, object, activity_id, public),
265 {:ok, activity} <- insert(announce_data, local),
266 {:ok, object} <- add_announce_to_object(activity, object),
267 :ok <- maybe_federate(activity) do
268 {:ok, activity, object}
269 else
270 error -> {:error, error}
271 end
272 end
273
274 def unannounce(
275 %User{} = actor,
276 %Object{} = object,
277 activity_id \\ nil,
278 local \\ true
279 ) do
280 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
281 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
282 {:ok, unannounce_activity} <- insert(unannounce_data, local),
283 :ok <- maybe_federate(unannounce_activity),
284 {:ok, _activity} <- Repo.delete(announce_activity),
285 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
286 {:ok, unannounce_activity, object}
287 else
288 _e -> {:ok, object}
289 end
290 end
291
292 def follow(follower, followed, activity_id \\ nil, local \\ true) do
293 with data <- make_follow_data(follower, followed, activity_id),
294 {:ok, activity} <- insert(data, local),
295 :ok <- maybe_federate(activity) do
296 {:ok, activity}
297 end
298 end
299
300 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
301 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
302 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
303 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
304 {:ok, activity} <- insert(unfollow_data, local),
305 :ok <- maybe_federate(activity) do
306 {:ok, activity}
307 end
308 end
309
310 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
311 user = User.get_cached_by_ap_id(actor)
312 to = object.data["to"] || [] ++ object.data["cc"] || []
313
314 with {:ok, object, activity} <- Object.delete(object),
315 data <- %{
316 "type" => "Delete",
317 "actor" => actor,
318 "object" => id,
319 "to" => to,
320 "deleted_activity_id" => activity && activity.id
321 },
322 {:ok, activity} <- insert(data, local),
323 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
324 {:ok, _actor} <- decrease_note_count_if_public(user, object),
325 :ok <- maybe_federate(activity) do
326 {:ok, activity}
327 end
328 end
329
330 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
331 ap_config = Application.get_env(:pleroma, :activitypub)
332 unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
333 outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
334
335 with true <- unfollow_blocked do
336 follow_activity = fetch_latest_follow(blocker, blocked)
337
338 if follow_activity do
339 unfollow(blocker, blocked, nil, local)
340 end
341 end
342
343 with true <- outgoing_blocks,
344 block_data <- make_block_data(blocker, blocked, activity_id),
345 {:ok, activity} <- insert(block_data, local),
346 :ok <- maybe_federate(activity) do
347 {:ok, activity}
348 else
349 _e -> {:ok, nil}
350 end
351 end
352
353 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
354 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
355 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
356 {:ok, activity} <- insert(unblock_data, local),
357 :ok <- maybe_federate(activity) do
358 {:ok, activity}
359 end
360 end
361
362 def flag(
363 %{
364 actor: actor,
365 context: context,
366 account: account,
367 statuses: statuses,
368 content: content
369 } = params
370 ) do
371 additional = params[:additional] || %{}
372
373 # only accept false as false value
374 local = !(params[:local] == false)
375
376 %{
377 actor: actor,
378 context: context,
379 account: account,
380 statuses: statuses,
381 content: content
382 }
383 |> make_flag_data(additional)
384 |> insert(local)
385 end
386
387 def fetch_activities_for_context(context, opts \\ %{}) do
388 public = ["https://www.w3.org/ns/activitystreams#Public"]
389
390 recipients =
391 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
392
393 query = from(activity in Activity)
394
395 query =
396 query
397 |> restrict_blocked(opts)
398 |> restrict_recipients(recipients, opts["user"])
399
400 query =
401 from(
402 activity in query,
403 where:
404 fragment(
405 "?->>'type' = ? and ?->>'context' = ?",
406 activity.data,
407 "Create",
408 activity.data,
409 ^context
410 ),
411 order_by: [desc: :id]
412 )
413
414 Repo.all(query)
415 end
416
417 def fetch_public_activities(opts \\ %{}) do
418 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
419
420 q
421 |> restrict_unlisted()
422 |> Repo.all()
423 |> Enum.reverse()
424 end
425
426 @valid_visibilities ~w[direct unlisted public private]
427
428 defp restrict_visibility(query, %{visibility: visibility})
429 when is_list(visibility) do
430 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
431 query =
432 from(
433 a in query,
434 where:
435 fragment(
436 "activity_visibility(?, ?, ?) = ANY (?)",
437 a.actor,
438 a.recipients,
439 a.data,
440 ^visibility
441 )
442 )
443
444 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
445
446 query
447 else
448 Logger.error("Could not restrict visibility to #{visibility}")
449 end
450 end
451
452 defp restrict_visibility(query, %{visibility: visibility})
453 when visibility in @valid_visibilities do
454 query =
455 from(
456 a in query,
457 where:
458 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
459 )
460
461 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
462
463 query
464 end
465
466 defp restrict_visibility(_query, %{visibility: visibility})
467 when visibility not in @valid_visibilities do
468 Logger.error("Could not restrict visibility to #{visibility}")
469 end
470
471 defp restrict_visibility(query, _visibility), do: query
472
473 def fetch_user_activities(user, reading_user, params \\ %{}) do
474 params =
475 params
476 |> Map.put("type", ["Create", "Announce"])
477 |> Map.put("actor_id", user.ap_id)
478 |> Map.put("whole_db", true)
479 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
480
481 recipients =
482 if reading_user do
483 ["https://www.w3.org/ns/activitystreams#Public"] ++
484 [reading_user.ap_id | reading_user.following]
485 else
486 ["https://www.w3.org/ns/activitystreams#Public"]
487 end
488
489 fetch_activities(recipients, params)
490 |> Enum.reverse()
491 end
492
493 defp restrict_since(query, %{"since_id" => ""}), do: query
494
495 defp restrict_since(query, %{"since_id" => since_id}) do
496 from(activity in query, where: activity.id > ^since_id)
497 end
498
499 defp restrict_since(query, _), do: query
500
501 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
502 when is_list(tag_reject) and tag_reject != [] do
503 from(
504 activity in query,
505 where: fragment("(not (? #> '{\"object\",\"tag\"}') \\?| ?)", activity.data, ^tag_reject)
506 )
507 end
508
509 defp restrict_tag_reject(query, _), do: query
510
511 defp restrict_tag_all(query, %{"tag_all" => tag_all})
512 when is_list(tag_all) and tag_all != [] do
513 from(
514 activity in query,
515 where: fragment("(? #> '{\"object\",\"tag\"}') \\?& ?", activity.data, ^tag_all)
516 )
517 end
518
519 defp restrict_tag_all(query, _), do: query
520
521 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
522 from(
523 activity in query,
524 where: fragment("(? #> '{\"object\",\"tag\"}') \\?| ?", activity.data, ^tag)
525 )
526 end
527
528 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
529 from(
530 activity in query,
531 where: fragment("? <@ (? #> '{\"object\",\"tag\"}')", ^tag, activity.data)
532 )
533 end
534
535 defp restrict_tag(query, _), do: query
536
537 defp restrict_to_cc(query, recipients_to, recipients_cc) do
538 from(
539 activity in query,
540 where:
541 fragment(
542 "(?->'to' \\?| ?) or (?->'cc' \\?| ?)",
543 activity.data,
544 ^recipients_to,
545 activity.data,
546 ^recipients_cc
547 )
548 )
549 end
550
551 defp restrict_recipients(query, [], _user), do: query
552
553 defp restrict_recipients(query, recipients, nil) do
554 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
555 end
556
557 defp restrict_recipients(query, recipients, user) do
558 from(
559 activity in query,
560 where: fragment("? && ?", ^recipients, activity.recipients),
561 or_where: activity.actor == ^user.ap_id
562 )
563 end
564
565 defp restrict_limit(query, %{"limit" => limit}) do
566 from(activity in query, limit: ^limit)
567 end
568
569 defp restrict_limit(query, _), do: query
570
571 defp restrict_local(query, %{"local_only" => true}) do
572 from(activity in query, where: activity.local == true)
573 end
574
575 defp restrict_local(query, _), do: query
576
577 defp restrict_max(query, %{"max_id" => ""}), do: query
578
579 defp restrict_max(query, %{"max_id" => max_id}) do
580 from(activity in query, where: activity.id < ^max_id)
581 end
582
583 defp restrict_max(query, _), do: query
584
585 defp restrict_actor(query, %{"actor_id" => actor_id}) do
586 from(activity in query, where: activity.actor == ^actor_id)
587 end
588
589 defp restrict_actor(query, _), do: query
590
591 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
592 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
593 end
594
595 defp restrict_type(query, %{"type" => type}) do
596 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
597 end
598
599 defp restrict_type(query, _), do: query
600
601 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
602 from(
603 activity in query,
604 where: fragment("? <@ (? #> '{\"object\",\"likes\"}')", ^ap_id, activity.data)
605 )
606 end
607
608 defp restrict_favorited_by(query, _), do: query
609
610 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
611 from(
612 activity in query,
613 where: fragment("not (? #> '{\"object\",\"attachment\"}' = ?)", activity.data, ^[])
614 )
615 end
616
617 defp restrict_media(query, _), do: query
618
619 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
620 from(
621 activity in query,
622 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
623 )
624 end
625
626 defp restrict_replies(query, _), do: query
627
628 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
629 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
630 end
631
632 defp restrict_reblogs(query, _), do: query
633
634 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
635
636 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
637 mutes = info.mutes
638
639 from(
640 activity in query,
641 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
642 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
643 )
644 end
645
646 defp restrict_muted(query, _), do: query
647
648 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
649 blocks = info.blocks || []
650 domain_blocks = info.domain_blocks || []
651
652 from(
653 activity in query,
654 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
655 where: fragment("not (?->'to' \\?| ?)", activity.data, ^blocks),
656 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks)
657 )
658 end
659
660 defp restrict_blocked(query, _), do: query
661
662 defp restrict_unlisted(query) do
663 from(
664 activity in query,
665 where:
666 fragment(
667 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
668 activity.data,
669 ^["https://www.w3.org/ns/activitystreams#Public"]
670 )
671 )
672 end
673
674 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
675 from(activity in query, where: activity.id in ^ids)
676 end
677
678 defp restrict_pinned(query, _), do: query
679
680 def fetch_activities_query(recipients, opts \\ %{}) do
681 base_query =
682 from(
683 activity in Activity,
684 limit: 20,
685 order_by: [fragment("? desc nulls last", activity.id)]
686 )
687
688 base_query
689 |> restrict_recipients(recipients, opts["user"])
690 |> restrict_tag(opts)
691 |> restrict_tag_reject(opts)
692 |> restrict_tag_all(opts)
693 |> restrict_since(opts)
694 |> restrict_local(opts)
695 |> restrict_limit(opts)
696 |> restrict_max(opts)
697 |> restrict_actor(opts)
698 |> restrict_type(opts)
699 |> restrict_favorited_by(opts)
700 |> restrict_blocked(opts)
701 |> restrict_muted(opts)
702 |> restrict_media(opts)
703 |> restrict_visibility(opts)
704 |> restrict_replies(opts)
705 |> restrict_reblogs(opts)
706 |> restrict_pinned(opts)
707 end
708
709 def fetch_activities(recipients, opts \\ %{}) do
710 fetch_activities_query(recipients, opts)
711 |> Repo.all()
712 |> Enum.reverse()
713 end
714
715 def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do
716 fetch_activities_query([], opts)
717 |> restrict_to_cc(recipients_to, recipients_cc)
718 |> Repo.all()
719 |> Enum.reverse()
720 end
721
722 def upload(file, opts \\ []) do
723 with {:ok, data} <- Upload.store(file, opts) do
724 obj_data =
725 if opts[:actor] do
726 Map.put(data, "actor", opts[:actor])
727 else
728 data
729 end
730
731 Repo.insert(%Object{data: obj_data})
732 end
733 end
734
735 def user_data_from_user_object(data) do
736 avatar =
737 data["icon"]["url"] &&
738 %{
739 "type" => "Image",
740 "url" => [%{"href" => data["icon"]["url"]}]
741 }
742
743 banner =
744 data["image"]["url"] &&
745 %{
746 "type" => "Image",
747 "url" => [%{"href" => data["image"]["url"]}]
748 }
749
750 locked = data["manuallyApprovesFollowers"] || false
751 data = Transmogrifier.maybe_fix_user_object(data)
752
753 user_data = %{
754 ap_id: data["id"],
755 info: %{
756 "ap_enabled" => true,
757 "source_data" => data,
758 "banner" => banner,
759 "locked" => locked
760 },
761 avatar: avatar,
762 name: data["name"],
763 follower_address: data["followers"],
764 bio: data["summary"]
765 }
766
767 # nickname can be nil because of virtual actors
768 user_data =
769 if data["preferredUsername"] do
770 Map.put(
771 user_data,
772 :nickname,
773 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
774 )
775 else
776 Map.put(user_data, :nickname, nil)
777 end
778
779 {:ok, user_data}
780 end
781
782 def fetch_and_prepare_user_from_ap_id(ap_id) do
783 with {:ok, data} <- fetch_and_contain_remote_object_from_id(ap_id) do
784 user_data_from_user_object(data)
785 else
786 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
787 end
788 end
789
790 def make_user_from_ap_id(ap_id) do
791 if _user = User.get_by_ap_id(ap_id) do
792 Transmogrifier.upgrade_user_from_ap_id(ap_id)
793 else
794 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
795 User.insert_or_update_user(data)
796 else
797 e -> {:error, e}
798 end
799 end
800 end
801
802 def make_user_from_nickname(nickname) do
803 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
804 make_user_from_ap_id(ap_id)
805 else
806 _e -> {:error, "No AP id in WebFinger"}
807 end
808 end
809
810 def should_federate?(inbox, public) do
811 if public do
812 true
813 else
814 inbox_info = URI.parse(inbox)
815 !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
816 end
817 end
818
819 def publish(actor, activity) do
820 remote_followers =
821 if actor.follower_address in activity.recipients do
822 {:ok, followers} = User.get_followers(actor)
823 followers |> Enum.filter(&(!&1.local))
824 else
825 []
826 end
827
828 public = is_public?(activity)
829
830 {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
831 json = Jason.encode!(data)
832
833 (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
834 |> Enum.filter(fn user -> User.ap_enabled?(user) end)
835 |> Enum.map(fn %{info: %{source_data: data}} ->
836 (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
837 end)
838 |> Enum.uniq()
839 |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
840 |> Instances.filter_reachable()
841 |> Enum.each(fn {inbox, unreachable_since} ->
842 Federator.publish_single_ap(%{
843 inbox: inbox,
844 json: json,
845 actor: actor,
846 id: activity.data["id"],
847 unreachable_since: unreachable_since
848 })
849 end)
850 end
851
852 def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
853 Logger.info("Federating #{id} to #{inbox}")
854 host = URI.parse(inbox).host
855
856 digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
857
858 date =
859 NaiveDateTime.utc_now()
860 |> Timex.format!("{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT")
861
862 signature =
863 Pleroma.Web.HTTPSignatures.sign(actor, %{
864 host: host,
865 "content-length": byte_size(json),
866 digest: digest,
867 date: date
868 })
869
870 with {:ok, %{status: code}} when code in 200..299 <-
871 result =
872 @httpoison.post(
873 inbox,
874 json,
875 [
876 {"Content-Type", "application/activity+json"},
877 {"Date", date},
878 {"signature", signature},
879 {"digest", digest}
880 ]
881 ) do
882 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
883 do: Instances.set_reachable(inbox)
884
885 result
886 else
887 {_post_result, response} ->
888 unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
889 {:error, response}
890 end
891 end
892
893 # TODO:
894 # This will create a Create activity, which we need internally at the moment.
895 def fetch_object_from_id(id) do
896 if object = Object.get_cached_by_ap_id(id) do
897 {:ok, object}
898 else
899 with {:ok, data} <- fetch_and_contain_remote_object_from_id(id),
900 nil <- Object.normalize(data),
901 params <- %{
902 "type" => "Create",
903 "to" => data["to"],
904 "cc" => data["cc"],
905 "actor" => data["actor"] || data["attributedTo"],
906 "object" => data
907 },
908 :ok <- Transmogrifier.contain_origin(id, params),
909 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
910 {:ok, Object.normalize(activity.data["object"])}
911 else
912 {:error, {:reject, nil}} ->
913 {:reject, nil}
914
915 object = %Object{} ->
916 {:ok, object}
917
918 _e ->
919 Logger.info("Couldn't get object via AP, trying out OStatus fetching...")
920
921 case OStatus.fetch_activity_from_url(id) do
922 {:ok, [activity | _]} -> {:ok, Object.normalize(activity.data["object"])}
923 e -> e
924 end
925 end
926 end
927 end
928
929 def fetch_and_contain_remote_object_from_id(id) do
930 Logger.info("Fetching object #{id} via AP")
931
932 with true <- String.starts_with?(id, "http"),
933 {:ok, %{body: body, status: code}} when code in 200..299 <-
934 @httpoison.get(
935 id,
936 [{:Accept, "application/activity+json"}]
937 ),
938 {:ok, data} <- Jason.decode(body),
939 :ok <- Transmogrifier.contain_origin_from_id(id, data) do
940 {:ok, data}
941 else
942 e ->
943 {:error, e}
944 end
945 end
946
947 # filter out broken threads
948 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
949 entire_thread_visible_for_user?(activity, user)
950 end
951
952 # do post-processing on a specific activity
953 def contain_activity(%Activity{} = activity, %User{} = user) do
954 contain_broken_threads(activity, user)
955 end
956
957 # do post-processing on a timeline
958 def contain_timeline(timeline, user) do
959 timeline
960 |> Enum.filter(fn activity ->
961 contain_activity(activity, user)
962 end)
963 end
964 end