Increment user note count only on public activities
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Repo
8 alias Pleroma.Object
9 alias Pleroma.Upload
10 alias Pleroma.User
11 alias Pleroma.Notification
12 alias Pleroma.Instances
13 alias Pleroma.Web.ActivityPub.Transmogrifier
14 alias Pleroma.Web.ActivityPub.MRF
15 alias Pleroma.Web.WebFinger
16 alias Pleroma.Web.Federator
17 alias Pleroma.Web.OStatus
18
19 import Ecto.Query
20 import Pleroma.Web.ActivityPub.Utils
21 import Pleroma.Web.ActivityPub.Visibility
22
23 require Logger
24
25 @httpoison Application.get_env(:pleroma, :httpoison)
26
27 # For Announce activities, we filter the recipients based on following status for any actors
28 # that match actual users. See issue #164 for more information about why this is necessary.
29 defp get_recipients(%{"type" => "Announce"} = data) do
30 to = data["to"] || []
31 cc = data["cc"] || []
32 actor = User.get_cached_by_ap_id(data["actor"])
33
34 recipients =
35 (to ++ cc)
36 |> Enum.filter(fn recipient ->
37 case User.get_cached_by_ap_id(recipient) do
38 nil ->
39 true
40
41 user ->
42 User.following?(user, actor)
43 end
44 end)
45
46 {recipients, to, cc}
47 end
48
49 defp get_recipients(%{"type" => "Create"} = data) do
50 to = data["to"] || []
51 cc = data["cc"] || []
52 actor = data["actor"] || []
53 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
54 {recipients, to, cc}
55 end
56
57 defp get_recipients(data) do
58 to = data["to"] || []
59 cc = data["cc"] || []
60 recipients = to ++ cc
61 {recipients, to, cc}
62 end
63
64 defp check_actor_is_active(actor) do
65 if not is_nil(actor) do
66 with user <- User.get_cached_by_ap_id(actor),
67 false <- user.info.deactivated do
68 :ok
69 else
70 _e -> :reject
71 end
72 else
73 :ok
74 end
75 end
76
77 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
78 limit = Pleroma.Config.get([:instance, :remote_limit])
79 String.length(content) <= limit
80 end
81
82 defp check_remote_limit(_), do: true
83
84 def increase_note_count_if_public(actor, object) do
85 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
86 end
87
88 def decrease_note_count_if_public(actor, object) do
89 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
90 end
91
92 def insert(map, local \\ true) when is_map(map) do
93 with nil <- Activity.normalize(map),
94 map <- lazy_put_activity_defaults(map),
95 :ok <- check_actor_is_active(map["actor"]),
96 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
97 {:ok, map} <- MRF.filter(map),
98 :ok <- insert_full_object(map) do
99 {recipients, _, _} = get_recipients(map)
100
101 {:ok, activity} =
102 Repo.insert(%Activity{
103 data: map,
104 local: local,
105 actor: map["actor"],
106 recipients: recipients
107 })
108
109 Task.start(fn ->
110 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
111 end)
112
113 Notification.create_notifications(activity)
114 stream_out(activity)
115 {:ok, activity}
116 else
117 %Activity{} = activity -> {:ok, activity}
118 error -> {:error, error}
119 end
120 end
121
122 def stream_out(activity) do
123 public = "https://www.w3.org/ns/activitystreams#Public"
124
125 if activity.data["type"] in ["Create", "Announce", "Delete"] do
126 Pleroma.Web.Streamer.stream("user", activity)
127 Pleroma.Web.Streamer.stream("list", activity)
128
129 if Enum.member?(activity.data["to"], public) do
130 Pleroma.Web.Streamer.stream("public", activity)
131
132 if activity.local do
133 Pleroma.Web.Streamer.stream("public:local", activity)
134 end
135
136 if activity.data["type"] in ["Create"] do
137 activity.data["object"]
138 |> Map.get("tag", [])
139 |> Enum.filter(fn tag -> is_bitstring(tag) end)
140 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
141
142 if activity.data["object"]["attachment"] != [] do
143 Pleroma.Web.Streamer.stream("public:media", activity)
144
145 if activity.local do
146 Pleroma.Web.Streamer.stream("public:local:media", activity)
147 end
148 end
149 end
150 else
151 if !Enum.member?(activity.data["cc"] || [], public) &&
152 !Enum.member?(
153 activity.data["to"],
154 User.get_by_ap_id(activity.data["actor"]).follower_address
155 ),
156 do: Pleroma.Web.Streamer.stream("direct", activity)
157 end
158 end
159 end
160
161 def create(%{to: to, actor: actor, context: context, object: object} = params) do
162 additional = params[:additional] || %{}
163 # only accept false as false value
164 local = !(params[:local] == false)
165 published = params[:published]
166
167 with create_data <-
168 make_create_data(
169 %{to: to, actor: actor, published: published, context: context, object: object},
170 additional
171 ),
172 {:ok, activity} <- insert(create_data, local),
173 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
174 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
175 :ok <- maybe_federate(activity) do
176 {:ok, activity}
177 end
178 end
179
180 def accept(%{to: to, actor: actor, object: object} = params) do
181 # only accept false as false value
182 local = !(params[:local] == false)
183
184 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
185 {:ok, activity} <- insert(data, local),
186 :ok <- maybe_federate(activity),
187 _ <- User.update_follow_request_count(actor) do
188 {:ok, activity}
189 end
190 end
191
192 def reject(%{to: to, actor: actor, object: object} = params) do
193 # only accept false as false value
194 local = !(params[:local] == false)
195
196 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
197 {:ok, activity} <- insert(data, local),
198 :ok <- maybe_federate(activity),
199 _ <- User.update_follow_request_count(actor) do
200 {:ok, activity}
201 end
202 end
203
204 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
205 # only accept false as false value
206 local = !(params[:local] == false)
207
208 with data <- %{
209 "to" => to,
210 "cc" => cc,
211 "type" => "Update",
212 "actor" => actor,
213 "object" => object
214 },
215 {:ok, activity} <- insert(data, local),
216 :ok <- maybe_federate(activity) do
217 {:ok, activity}
218 end
219 end
220
221 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
222 def like(
223 %User{ap_id: ap_id} = user,
224 %Object{data: %{"id" => _}} = object,
225 activity_id \\ nil,
226 local \\ true
227 ) do
228 with nil <- get_existing_like(ap_id, object),
229 like_data <- make_like_data(user, object, activity_id),
230 {:ok, activity} <- insert(like_data, local),
231 {:ok, object} <- add_like_to_object(activity, object),
232 :ok <- maybe_federate(activity) do
233 {:ok, activity, object}
234 else
235 %Activity{} = activity -> {:ok, activity, object}
236 error -> {:error, error}
237 end
238 end
239
240 def unlike(
241 %User{} = actor,
242 %Object{} = object,
243 activity_id \\ nil,
244 local \\ true
245 ) do
246 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
247 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
248 {:ok, unlike_activity} <- insert(unlike_data, local),
249 {:ok, _activity} <- Repo.delete(like_activity),
250 {:ok, object} <- remove_like_from_object(like_activity, object),
251 :ok <- maybe_federate(unlike_activity) do
252 {:ok, unlike_activity, like_activity, object}
253 else
254 _e -> {:ok, object}
255 end
256 end
257
258 def announce(
259 %User{ap_id: _} = user,
260 %Object{data: %{"id" => _}} = object,
261 activity_id \\ nil,
262 local \\ true,
263 public \\ true
264 ) do
265 with true <- is_public?(object),
266 announce_data <- make_announce_data(user, object, activity_id, public),
267 {:ok, activity} <- insert(announce_data, local),
268 {:ok, object} <- add_announce_to_object(activity, object),
269 :ok <- maybe_federate(activity) do
270 {:ok, activity, object}
271 else
272 error -> {:error, error}
273 end
274 end
275
276 def unannounce(
277 %User{} = actor,
278 %Object{} = object,
279 activity_id \\ nil,
280 local \\ true
281 ) do
282 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
283 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
284 {:ok, unannounce_activity} <- insert(unannounce_data, local),
285 :ok <- maybe_federate(unannounce_activity),
286 {:ok, _activity} <- Repo.delete(announce_activity),
287 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
288 {:ok, unannounce_activity, object}
289 else
290 _e -> {:ok, object}
291 end
292 end
293
294 def follow(follower, followed, activity_id \\ nil, local \\ true) do
295 with data <- make_follow_data(follower, followed, activity_id),
296 {:ok, activity} <- insert(data, local),
297 :ok <- maybe_federate(activity),
298 _ <- User.update_follow_request_count(followed) do
299 {:ok, activity}
300 end
301 end
302
303 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
304 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
305 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
306 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
307 {:ok, activity} <- insert(unfollow_data, local),
308 :ok <- maybe_federate(activity),
309 _ <- User.update_follow_request_count(followed) do
310 {:ok, activity}
311 end
312 end
313
314 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
315 user = User.get_cached_by_ap_id(actor)
316
317 data = %{
318 "type" => "Delete",
319 "actor" => actor,
320 "object" => id,
321 "to" => [user.follower_address, "https://www.w3.org/ns/activitystreams#Public"]
322 }
323
324 with {:ok, _} <- Object.delete(object),
325 {:ok, activity} <- insert(data, local),
326 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
327 {:ok, _actor} <- decrease_note_count_if_public(user, object),
328 :ok <- maybe_federate(activity) do
329 {:ok, activity}
330 end
331 end
332
333 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
334 ap_config = Application.get_env(:pleroma, :activitypub)
335 unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
336 outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
337
338 with true <- unfollow_blocked do
339 follow_activity = fetch_latest_follow(blocker, blocked)
340
341 if follow_activity do
342 unfollow(blocker, blocked, nil, local)
343 end
344 end
345
346 with true <- outgoing_blocks,
347 block_data <- make_block_data(blocker, blocked, activity_id),
348 {:ok, activity} <- insert(block_data, local),
349 :ok <- maybe_federate(activity) do
350 {:ok, activity}
351 else
352 _e -> {:ok, nil}
353 end
354 end
355
356 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
357 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
358 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
359 {:ok, activity} <- insert(unblock_data, local),
360 :ok <- maybe_federate(activity) do
361 {:ok, activity}
362 end
363 end
364
365 def flag(
366 %{
367 actor: actor,
368 context: context,
369 account: account,
370 statuses: statuses,
371 content: content
372 } = params
373 ) do
374 additional = params[:additional] || %{}
375
376 # only accept false as false value
377 local = !(params[:local] == false)
378
379 %{
380 actor: actor,
381 context: context,
382 account: account,
383 statuses: statuses,
384 content: content
385 }
386 |> make_flag_data(additional)
387 |> insert(local)
388 end
389
390 def fetch_activities_for_context(context, opts \\ %{}) do
391 public = ["https://www.w3.org/ns/activitystreams#Public"]
392
393 recipients =
394 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
395
396 query = from(activity in Activity)
397
398 query =
399 query
400 |> restrict_blocked(opts)
401 |> restrict_recipients(recipients, opts["user"])
402
403 query =
404 from(
405 activity in query,
406 where:
407 fragment(
408 "?->>'type' = ? and ?->>'context' = ?",
409 activity.data,
410 "Create",
411 activity.data,
412 ^context
413 ),
414 order_by: [desc: :id]
415 )
416
417 Repo.all(query)
418 end
419
420 def fetch_public_activities(opts \\ %{}) do
421 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
422
423 q
424 |> restrict_unlisted()
425 |> Repo.all()
426 |> Enum.reverse()
427 end
428
429 @valid_visibilities ~w[direct unlisted public private]
430
431 defp restrict_visibility(query, %{visibility: visibility})
432 when visibility in @valid_visibilities do
433 query =
434 from(
435 a in query,
436 where:
437 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
438 )
439
440 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
441
442 query
443 end
444
445 defp restrict_visibility(_query, %{visibility: visibility})
446 when visibility not in @valid_visibilities do
447 Logger.error("Could not restrict visibility to #{visibility}")
448 end
449
450 defp restrict_visibility(query, _visibility), do: query
451
452 def fetch_user_activities(user, reading_user, params \\ %{}) do
453 params =
454 params
455 |> Map.put("type", ["Create", "Announce"])
456 |> Map.put("actor_id", user.ap_id)
457 |> Map.put("whole_db", true)
458 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
459
460 recipients =
461 if reading_user do
462 ["https://www.w3.org/ns/activitystreams#Public"] ++
463 [reading_user.ap_id | reading_user.following]
464 else
465 ["https://www.w3.org/ns/activitystreams#Public"]
466 end
467
468 fetch_activities(recipients, params)
469 |> Enum.reverse()
470 end
471
472 defp restrict_since(query, %{"since_id" => ""}), do: query
473
474 defp restrict_since(query, %{"since_id" => since_id}) do
475 from(activity in query, where: activity.id > ^since_id)
476 end
477
478 defp restrict_since(query, _), do: query
479
480 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
481 when is_list(tag_reject) and tag_reject != [] do
482 from(
483 activity in query,
484 where: fragment("(not (? #> '{\"object\",\"tag\"}') \\?| ?)", activity.data, ^tag_reject)
485 )
486 end
487
488 defp restrict_tag_reject(query, _), do: query
489
490 defp restrict_tag_all(query, %{"tag_all" => tag_all})
491 when is_list(tag_all) and tag_all != [] do
492 from(
493 activity in query,
494 where: fragment("(? #> '{\"object\",\"tag\"}') \\?& ?", activity.data, ^tag_all)
495 )
496 end
497
498 defp restrict_tag_all(query, _), do: query
499
500 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
501 from(
502 activity in query,
503 where: fragment("(? #> '{\"object\",\"tag\"}') \\?| ?", activity.data, ^tag)
504 )
505 end
506
507 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
508 from(
509 activity in query,
510 where: fragment("? <@ (? #> '{\"object\",\"tag\"}')", ^tag, activity.data)
511 )
512 end
513
514 defp restrict_tag(query, _), do: query
515
516 defp restrict_to_cc(query, recipients_to, recipients_cc) do
517 from(
518 activity in query,
519 where:
520 fragment(
521 "(?->'to' \\?| ?) or (?->'cc' \\?| ?)",
522 activity.data,
523 ^recipients_to,
524 activity.data,
525 ^recipients_cc
526 )
527 )
528 end
529
530 defp restrict_recipients(query, [], _user), do: query
531
532 defp restrict_recipients(query, recipients, nil) do
533 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
534 end
535
536 defp restrict_recipients(query, recipients, user) do
537 from(
538 activity in query,
539 where: fragment("? && ?", ^recipients, activity.recipients),
540 or_where: activity.actor == ^user.ap_id
541 )
542 end
543
544 defp restrict_limit(query, %{"limit" => limit}) do
545 from(activity in query, limit: ^limit)
546 end
547
548 defp restrict_limit(query, _), do: query
549
550 defp restrict_local(query, %{"local_only" => true}) do
551 from(activity in query, where: activity.local == true)
552 end
553
554 defp restrict_local(query, _), do: query
555
556 defp restrict_max(query, %{"max_id" => ""}), do: query
557
558 defp restrict_max(query, %{"max_id" => max_id}) do
559 from(activity in query, where: activity.id < ^max_id)
560 end
561
562 defp restrict_max(query, _), do: query
563
564 defp restrict_actor(query, %{"actor_id" => actor_id}) do
565 from(activity in query, where: activity.actor == ^actor_id)
566 end
567
568 defp restrict_actor(query, _), do: query
569
570 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
571 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
572 end
573
574 defp restrict_type(query, %{"type" => type}) do
575 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
576 end
577
578 defp restrict_type(query, _), do: query
579
580 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
581 from(
582 activity in query,
583 where: fragment("? <@ (? #> '{\"object\",\"likes\"}')", ^ap_id, activity.data)
584 )
585 end
586
587 defp restrict_favorited_by(query, _), do: query
588
589 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
590 from(
591 activity in query,
592 where: fragment("not (? #> '{\"object\",\"attachment\"}' = ?)", activity.data, ^[])
593 )
594 end
595
596 defp restrict_media(query, _), do: query
597
598 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
599 from(
600 activity in query,
601 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
602 )
603 end
604
605 defp restrict_replies(query, _), do: query
606
607 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
608 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
609 end
610
611 defp restrict_reblogs(query, _), do: query
612
613 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
614
615 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
616 mutes = info.mutes
617
618 from(
619 activity in query,
620 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
621 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
622 )
623 end
624
625 defp restrict_muted(query, _), do: query
626
627 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
628 blocks = info.blocks || []
629 domain_blocks = info.domain_blocks || []
630
631 from(
632 activity in query,
633 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
634 where: fragment("not (?->'to' \\?| ?)", activity.data, ^blocks),
635 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks)
636 )
637 end
638
639 defp restrict_blocked(query, _), do: query
640
641 defp restrict_unlisted(query) do
642 from(
643 activity in query,
644 where:
645 fragment(
646 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
647 activity.data,
648 ^["https://www.w3.org/ns/activitystreams#Public"]
649 )
650 )
651 end
652
653 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
654 from(activity in query, where: activity.id in ^ids)
655 end
656
657 defp restrict_pinned(query, _), do: query
658
659 def fetch_activities_query(recipients, opts \\ %{}) do
660 base_query =
661 from(
662 activity in Activity,
663 limit: 20,
664 order_by: [fragment("? desc nulls last", activity.id)]
665 )
666
667 base_query
668 |> restrict_recipients(recipients, opts["user"])
669 |> restrict_tag(opts)
670 |> restrict_tag_reject(opts)
671 |> restrict_tag_all(opts)
672 |> restrict_since(opts)
673 |> restrict_local(opts)
674 |> restrict_limit(opts)
675 |> restrict_max(opts)
676 |> restrict_actor(opts)
677 |> restrict_type(opts)
678 |> restrict_favorited_by(opts)
679 |> restrict_blocked(opts)
680 |> restrict_muted(opts)
681 |> restrict_media(opts)
682 |> restrict_visibility(opts)
683 |> restrict_replies(opts)
684 |> restrict_reblogs(opts)
685 |> restrict_pinned(opts)
686 end
687
688 def fetch_activities(recipients, opts \\ %{}) do
689 fetch_activities_query(recipients, opts)
690 |> Repo.all()
691 |> Enum.reverse()
692 end
693
694 def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do
695 fetch_activities_query([], opts)
696 |> restrict_to_cc(recipients_to, recipients_cc)
697 |> Repo.all()
698 |> Enum.reverse()
699 end
700
701 def upload(file, opts \\ []) do
702 with {:ok, data} <- Upload.store(file, opts) do
703 obj_data =
704 if opts[:actor] do
705 Map.put(data, "actor", opts[:actor])
706 else
707 data
708 end
709
710 Repo.insert(%Object{data: obj_data})
711 end
712 end
713
714 def user_data_from_user_object(data) do
715 avatar =
716 data["icon"]["url"] &&
717 %{
718 "type" => "Image",
719 "url" => [%{"href" => data["icon"]["url"]}]
720 }
721
722 banner =
723 data["image"]["url"] &&
724 %{
725 "type" => "Image",
726 "url" => [%{"href" => data["image"]["url"]}]
727 }
728
729 locked = data["manuallyApprovesFollowers"] || false
730 data = Transmogrifier.maybe_fix_user_object(data)
731
732 user_data = %{
733 ap_id: data["id"],
734 info: %{
735 "ap_enabled" => true,
736 "source_data" => data,
737 "banner" => banner,
738 "locked" => locked
739 },
740 avatar: avatar,
741 name: data["name"],
742 follower_address: data["followers"],
743 bio: data["summary"]
744 }
745
746 # nickname can be nil because of virtual actors
747 user_data =
748 if data["preferredUsername"] do
749 Map.put(
750 user_data,
751 :nickname,
752 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
753 )
754 else
755 Map.put(user_data, :nickname, nil)
756 end
757
758 {:ok, user_data}
759 end
760
761 def fetch_and_prepare_user_from_ap_id(ap_id) do
762 with {:ok, data} <- fetch_and_contain_remote_object_from_id(ap_id) do
763 user_data_from_user_object(data)
764 else
765 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
766 end
767 end
768
769 def make_user_from_ap_id(ap_id) do
770 if _user = User.get_by_ap_id(ap_id) do
771 Transmogrifier.upgrade_user_from_ap_id(ap_id)
772 else
773 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
774 User.insert_or_update_user(data)
775 else
776 e -> {:error, e}
777 end
778 end
779 end
780
781 def make_user_from_nickname(nickname) do
782 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
783 make_user_from_ap_id(ap_id)
784 else
785 _e -> {:error, "No AP id in WebFinger"}
786 end
787 end
788
789 def should_federate?(inbox, public) do
790 if public do
791 true
792 else
793 inbox_info = URI.parse(inbox)
794 !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
795 end
796 end
797
798 def publish(actor, activity) do
799 remote_followers =
800 if actor.follower_address in activity.recipients do
801 {:ok, followers} = User.get_followers(actor)
802 followers |> Enum.filter(&(!&1.local))
803 else
804 []
805 end
806
807 public = is_public?(activity)
808
809 {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
810 json = Jason.encode!(data)
811
812 (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
813 |> Enum.filter(fn user -> User.ap_enabled?(user) end)
814 |> Enum.map(fn %{info: %{source_data: data}} ->
815 (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
816 end)
817 |> Enum.uniq()
818 |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
819 |> Instances.filter_reachable()
820 |> Enum.each(fn {inbox, unreachable_since} ->
821 Federator.publish_single_ap(%{
822 inbox: inbox,
823 json: json,
824 actor: actor,
825 id: activity.data["id"],
826 unreachable_since: unreachable_since
827 })
828 end)
829 end
830
831 def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
832 Logger.info("Federating #{id} to #{inbox}")
833 host = URI.parse(inbox).host
834
835 digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
836
837 date =
838 NaiveDateTime.utc_now()
839 |> Timex.format!("{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT")
840
841 signature =
842 Pleroma.Web.HTTPSignatures.sign(actor, %{
843 host: host,
844 "content-length": byte_size(json),
845 digest: digest,
846 date: date
847 })
848
849 with {:ok, %{status: code}} when code in 200..299 <-
850 result =
851 @httpoison.post(
852 inbox,
853 json,
854 [
855 {"Content-Type", "application/activity+json"},
856 {"Date", date},
857 {"signature", signature},
858 {"digest", digest}
859 ]
860 ) do
861 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
862 do: Instances.set_reachable(inbox)
863
864 result
865 else
866 {_post_result, response} ->
867 unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
868 {:error, response}
869 end
870 end
871
872 # TODO:
873 # This will create a Create activity, which we need internally at the moment.
874 def fetch_object_from_id(id) do
875 if object = Object.get_cached_by_ap_id(id) do
876 {:ok, object}
877 else
878 with {:ok, data} <- fetch_and_contain_remote_object_from_id(id),
879 nil <- Object.normalize(data),
880 params <- %{
881 "type" => "Create",
882 "to" => data["to"],
883 "cc" => data["cc"],
884 "actor" => data["actor"] || data["attributedTo"],
885 "object" => data
886 },
887 :ok <- Transmogrifier.contain_origin(id, params),
888 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
889 {:ok, Object.normalize(activity.data["object"])}
890 else
891 {:error, {:reject, nil}} ->
892 {:reject, nil}
893
894 object = %Object{} ->
895 {:ok, object}
896
897 _e ->
898 Logger.info("Couldn't get object via AP, trying out OStatus fetching...")
899
900 case OStatus.fetch_activity_from_url(id) do
901 {:ok, [activity | _]} -> {:ok, Object.normalize(activity.data["object"])}
902 e -> e
903 end
904 end
905 end
906 end
907
908 def fetch_and_contain_remote_object_from_id(id) do
909 Logger.info("Fetching object #{id} via AP")
910
911 with true <- String.starts_with?(id, "http"),
912 {:ok, %{body: body, status: code}} when code in 200..299 <-
913 @httpoison.get(
914 id,
915 [{:Accept, "application/activity+json"}]
916 ),
917 {:ok, data} <- Jason.decode(body),
918 :ok <- Transmogrifier.contain_origin_from_id(id, data) do
919 {:ok, data}
920 else
921 e ->
922 {:error, e}
923 end
924 end
925
926 # filter out broken threads
927 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
928 entire_thread_visible_for_user?(activity, user)
929 end
930
931 # do post-processing on a specific activity
932 def contain_activity(%Activity{} = activity, %User{} = user) do
933 contain_broken_threads(activity, user)
934 end
935
936 # do post-processing on a timeline
937 def contain_timeline(timeline, user) do
938 timeline
939 |> Enum.filter(fn activity ->
940 contain_activity(activity, user)
941 end)
942 end
943 end