[#923] Merge remote-tracking branch 'remotes/upstream/develop' into twitter_oauth
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Instances
8 alias Pleroma.Notification
9 alias Pleroma.Object
10 alias Pleroma.Repo
11 alias Pleroma.Upload
12 alias Pleroma.User
13 alias Pleroma.Web.ActivityPub.MRF
14 alias Pleroma.Web.ActivityPub.Transmogrifier
15 alias Pleroma.Web.Federator
16 alias Pleroma.Web.OStatus
17 alias Pleroma.Web.WebFinger
18
19 import Ecto.Query
20 import Pleroma.Web.ActivityPub.Utils
21 import Pleroma.Web.ActivityPub.Visibility
22
23 require Logger
24
25 @httpoison Application.get_env(:pleroma, :httpoison)
26
27 # For Announce activities, we filter the recipients based on following status for any actors
28 # that match actual users. See issue #164 for more information about why this is necessary.
29 defp get_recipients(%{"type" => "Announce"} = data) do
30 to = data["to"] || []
31 cc = data["cc"] || []
32 actor = User.get_cached_by_ap_id(data["actor"])
33
34 recipients =
35 (to ++ cc)
36 |> Enum.filter(fn recipient ->
37 case User.get_cached_by_ap_id(recipient) do
38 nil ->
39 true
40
41 user ->
42 User.following?(user, actor)
43 end
44 end)
45
46 {recipients, to, cc}
47 end
48
49 defp get_recipients(%{"type" => "Create"} = data) do
50 to = data["to"] || []
51 cc = data["cc"] || []
52 actor = data["actor"] || []
53 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
54 {recipients, to, cc}
55 end
56
57 defp get_recipients(data) do
58 to = data["to"] || []
59 cc = data["cc"] || []
60 recipients = to ++ cc
61 {recipients, to, cc}
62 end
63
64 defp check_actor_is_active(actor) do
65 if not is_nil(actor) do
66 with user <- User.get_cached_by_ap_id(actor),
67 false <- user.info.deactivated do
68 :ok
69 else
70 _e -> :reject
71 end
72 else
73 :ok
74 end
75 end
76
77 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
78 limit = Pleroma.Config.get([:instance, :remote_limit])
79 String.length(content) <= limit
80 end
81
82 defp check_remote_limit(_), do: true
83
84 def increase_note_count_if_public(actor, object) do
85 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
86 end
87
88 def decrease_note_count_if_public(actor, object) do
89 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
90 end
91
92 def increase_replies_count_if_reply(%{
93 "object" =>
94 %{"inReplyTo" => reply_ap_id, "inReplyToStatusId" => reply_status_id} = object,
95 "type" => "Create"
96 }) do
97 if is_public?(object) do
98 Activity.increase_replies_count(reply_status_id)
99 Object.increase_replies_count(reply_ap_id)
100 end
101 end
102
103 def increase_replies_count_if_reply(_create_data), do: :noop
104
105 def decrease_replies_count_if_reply(%Object{
106 data: %{"inReplyTo" => reply_ap_id, "inReplyToStatusId" => reply_status_id} = object
107 }) do
108 if is_public?(object) do
109 Activity.decrease_replies_count(reply_status_id)
110 Object.decrease_replies_count(reply_ap_id)
111 end
112 end
113
114 def decrease_replies_count_if_reply(_object), do: :noop
115
116 def insert(map, local \\ true) when is_map(map) do
117 with nil <- Activity.normalize(map),
118 map <- lazy_put_activity_defaults(map),
119 :ok <- check_actor_is_active(map["actor"]),
120 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
121 {:ok, map} <- MRF.filter(map),
122 {:ok, object} <- insert_full_object(map) do
123 {recipients, _, _} = get_recipients(map)
124
125 {:ok, activity} =
126 Repo.insert(%Activity{
127 data: map,
128 local: local,
129 actor: map["actor"],
130 recipients: recipients
131 })
132
133 # Splice in the child object if we have one.
134 activity =
135 if !is_nil(object) do
136 Map.put(activity, :object, object)
137 else
138 activity
139 end
140
141 Task.start(fn ->
142 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
143 end)
144
145 Notification.create_notifications(activity)
146 stream_out(activity)
147 {:ok, activity}
148 else
149 %Activity{} = activity -> {:ok, activity}
150 error -> {:error, error}
151 end
152 end
153
154 def stream_out(activity) do
155 public = "https://www.w3.org/ns/activitystreams#Public"
156
157 if activity.data["type"] in ["Create", "Announce", "Delete"] do
158 Pleroma.Web.Streamer.stream("user", activity)
159 Pleroma.Web.Streamer.stream("list", activity)
160
161 if Enum.member?(activity.data["to"], public) do
162 Pleroma.Web.Streamer.stream("public", activity)
163
164 if activity.local do
165 Pleroma.Web.Streamer.stream("public:local", activity)
166 end
167
168 if activity.data["type"] in ["Create"] do
169 activity.data["object"]
170 |> Map.get("tag", [])
171 |> Enum.filter(fn tag -> is_bitstring(tag) end)
172 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
173
174 if activity.data["object"]["attachment"] != [] do
175 Pleroma.Web.Streamer.stream("public:media", activity)
176
177 if activity.local do
178 Pleroma.Web.Streamer.stream("public:local:media", activity)
179 end
180 end
181 end
182 else
183 if !Enum.member?(activity.data["cc"] || [], public) &&
184 !Enum.member?(
185 activity.data["to"],
186 User.get_by_ap_id(activity.data["actor"]).follower_address
187 ),
188 do: Pleroma.Web.Streamer.stream("direct", activity)
189 end
190 end
191 end
192
193 def create(%{to: to, actor: actor, context: context, object: object} = params) do
194 additional = params[:additional] || %{}
195 # only accept false as false value
196 local = !(params[:local] == false)
197 published = params[:published]
198
199 with create_data <-
200 make_create_data(
201 %{to: to, actor: actor, published: published, context: context, object: object},
202 additional
203 ),
204 {:ok, activity} <- insert(create_data, local),
205 _ <- increase_replies_count_if_reply(create_data),
206 # Changing note count prior to enqueuing federation task in order to avoid
207 # race conditions on updating user.info
208 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
209 :ok <- maybe_federate(activity) do
210 {:ok, activity}
211 end
212 end
213
214 def accept(%{to: to, actor: actor, object: object} = params) do
215 # only accept false as false value
216 local = !(params[:local] == false)
217
218 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
219 {:ok, activity} <- insert(data, local),
220 :ok <- maybe_federate(activity) do
221 {:ok, activity}
222 end
223 end
224
225 def reject(%{to: to, actor: actor, object: object} = params) do
226 # only accept false as false value
227 local = !(params[:local] == false)
228
229 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
230 {:ok, activity} <- insert(data, local),
231 :ok <- maybe_federate(activity) do
232 {:ok, activity}
233 end
234 end
235
236 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
237 # only accept false as false value
238 local = !(params[:local] == false)
239
240 with data <- %{
241 "to" => to,
242 "cc" => cc,
243 "type" => "Update",
244 "actor" => actor,
245 "object" => object
246 },
247 {:ok, activity} <- insert(data, local),
248 :ok <- maybe_federate(activity) do
249 {:ok, activity}
250 end
251 end
252
253 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
254 def like(
255 %User{ap_id: ap_id} = user,
256 %Object{data: %{"id" => _}} = object,
257 activity_id \\ nil,
258 local \\ true
259 ) do
260 with nil <- get_existing_like(ap_id, object),
261 like_data <- make_like_data(user, object, activity_id),
262 {:ok, activity} <- insert(like_data, local),
263 {:ok, object} <- add_like_to_object(activity, object),
264 :ok <- maybe_federate(activity) do
265 {:ok, activity, object}
266 else
267 %Activity{} = activity -> {:ok, activity, object}
268 error -> {:error, error}
269 end
270 end
271
272 def unlike(
273 %User{} = actor,
274 %Object{} = object,
275 activity_id \\ nil,
276 local \\ true
277 ) do
278 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
279 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
280 {:ok, unlike_activity} <- insert(unlike_data, local),
281 {:ok, _activity} <- Repo.delete(like_activity),
282 {:ok, object} <- remove_like_from_object(like_activity, object),
283 :ok <- maybe_federate(unlike_activity) do
284 {:ok, unlike_activity, like_activity, object}
285 else
286 _e -> {:ok, object}
287 end
288 end
289
290 def announce(
291 %User{ap_id: _} = user,
292 %Object{data: %{"id" => _}} = object,
293 activity_id \\ nil,
294 local \\ true,
295 public \\ true
296 ) do
297 with true <- is_public?(object),
298 announce_data <- make_announce_data(user, object, activity_id, public),
299 {:ok, activity} <- insert(announce_data, local),
300 {:ok, object} <- add_announce_to_object(activity, object),
301 :ok <- maybe_federate(activity) do
302 {:ok, activity, object}
303 else
304 error -> {:error, error}
305 end
306 end
307
308 def unannounce(
309 %User{} = actor,
310 %Object{} = object,
311 activity_id \\ nil,
312 local \\ true
313 ) do
314 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
315 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
316 {:ok, unannounce_activity} <- insert(unannounce_data, local),
317 :ok <- maybe_federate(unannounce_activity),
318 {:ok, _activity} <- Repo.delete(announce_activity),
319 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
320 {:ok, unannounce_activity, object}
321 else
322 _e -> {:ok, object}
323 end
324 end
325
326 def follow(follower, followed, activity_id \\ nil, local \\ true) do
327 with data <- make_follow_data(follower, followed, activity_id),
328 {:ok, activity} <- insert(data, local),
329 :ok <- maybe_federate(activity) do
330 {:ok, activity}
331 end
332 end
333
334 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
335 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
336 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
337 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
338 {:ok, activity} <- insert(unfollow_data, local),
339 :ok <- maybe_federate(activity) do
340 {:ok, activity}
341 end
342 end
343
344 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
345 user = User.get_cached_by_ap_id(actor)
346 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
347
348 with {:ok, object, activity} <- Object.delete(object),
349 data <- %{
350 "type" => "Delete",
351 "actor" => actor,
352 "object" => id,
353 "to" => to,
354 "deleted_activity_id" => activity && activity.id
355 },
356 {:ok, activity} <- insert(data, local),
357 _ <- decrease_replies_count_if_reply(object),
358 # Changing note count prior to enqueuing federation task in order to avoid
359 # race conditions on updating user.info
360 {:ok, _actor} <- decrease_note_count_if_public(user, object),
361 :ok <- maybe_federate(activity) do
362 {:ok, activity}
363 end
364 end
365
366 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
367 ap_config = Application.get_env(:pleroma, :activitypub)
368 unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
369 outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
370
371 with true <- unfollow_blocked do
372 follow_activity = fetch_latest_follow(blocker, blocked)
373
374 if follow_activity do
375 unfollow(blocker, blocked, nil, local)
376 end
377 end
378
379 with true <- outgoing_blocks,
380 block_data <- make_block_data(blocker, blocked, activity_id),
381 {:ok, activity} <- insert(block_data, local),
382 :ok <- maybe_federate(activity) do
383 {:ok, activity}
384 else
385 _e -> {:ok, nil}
386 end
387 end
388
389 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
390 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
391 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
392 {:ok, activity} <- insert(unblock_data, local),
393 :ok <- maybe_federate(activity) do
394 {:ok, activity}
395 end
396 end
397
398 def flag(
399 %{
400 actor: actor,
401 context: context,
402 account: account,
403 statuses: statuses,
404 content: content
405 } = params
406 ) do
407 # only accept false as false value
408 local = !(params[:local] == false)
409 forward = !(params[:forward] == false)
410
411 additional = params[:additional] || %{}
412
413 params = %{
414 actor: actor,
415 context: context,
416 account: account,
417 statuses: statuses,
418 content: content
419 }
420
421 additional =
422 if forward do
423 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
424 else
425 Map.merge(additional, %{"to" => [], "cc" => []})
426 end
427
428 with flag_data <- make_flag_data(params, additional),
429 {:ok, activity} <- insert(flag_data, local),
430 :ok <- maybe_federate(activity) do
431 Enum.each(User.all_superusers(), fn superuser ->
432 superuser
433 |> Pleroma.AdminEmail.report(actor, account, statuses, content)
434 |> Pleroma.Mailer.deliver_async()
435 end)
436
437 {:ok, activity}
438 end
439 end
440
441 def fetch_activities_for_context(context, opts \\ %{}) do
442 public = ["https://www.w3.org/ns/activitystreams#Public"]
443
444 recipients =
445 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
446
447 query = from(activity in Activity)
448
449 query =
450 query
451 |> restrict_blocked(opts)
452 |> restrict_recipients(recipients, opts["user"])
453
454 query =
455 from(
456 activity in query,
457 where:
458 fragment(
459 "?->>'type' = ? and ?->>'context' = ?",
460 activity.data,
461 "Create",
462 activity.data,
463 ^context
464 ),
465 order_by: [desc: :id]
466 )
467 |> Activity.with_preloaded_object()
468
469 Repo.all(query)
470 end
471
472 def fetch_public_activities(opts \\ %{}) do
473 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
474
475 q
476 |> restrict_unlisted()
477 |> Repo.all()
478 |> Enum.reverse()
479 end
480
481 @valid_visibilities ~w[direct unlisted public private]
482
483 defp restrict_visibility(query, %{visibility: visibility})
484 when is_list(visibility) do
485 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
486 query =
487 from(
488 a in query,
489 where:
490 fragment(
491 "activity_visibility(?, ?, ?) = ANY (?)",
492 a.actor,
493 a.recipients,
494 a.data,
495 ^visibility
496 )
497 )
498
499 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
500
501 query
502 else
503 Logger.error("Could not restrict visibility to #{visibility}")
504 end
505 end
506
507 defp restrict_visibility(query, %{visibility: visibility})
508 when visibility in @valid_visibilities do
509 query =
510 from(
511 a in query,
512 where:
513 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
514 )
515
516 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
517
518 query
519 end
520
521 defp restrict_visibility(_query, %{visibility: visibility})
522 when visibility not in @valid_visibilities do
523 Logger.error("Could not restrict visibility to #{visibility}")
524 end
525
526 defp restrict_visibility(query, _visibility), do: query
527
528 def fetch_user_activities(user, reading_user, params \\ %{}) do
529 params =
530 params
531 |> Map.put("type", ["Create", "Announce"])
532 |> Map.put("actor_id", user.ap_id)
533 |> Map.put("whole_db", true)
534 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
535
536 recipients =
537 if reading_user do
538 ["https://www.w3.org/ns/activitystreams#Public"] ++
539 [reading_user.ap_id | reading_user.following]
540 else
541 ["https://www.w3.org/ns/activitystreams#Public"]
542 end
543
544 fetch_activities(recipients, params)
545 |> Enum.reverse()
546 end
547
548 defp restrict_since(query, %{"since_id" => ""}), do: query
549
550 defp restrict_since(query, %{"since_id" => since_id}) do
551 from(activity in query, where: activity.id > ^since_id)
552 end
553
554 defp restrict_since(query, _), do: query
555
556 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
557 when is_list(tag_reject) and tag_reject != [] do
558 from(
559 activity in query,
560 where: fragment(~s(\(not \(? #> '{"object","tag"}'\) \\?| ?\)), activity.data, ^tag_reject)
561 )
562 end
563
564 defp restrict_tag_reject(query, _), do: query
565
566 defp restrict_tag_all(query, %{"tag_all" => tag_all})
567 when is_list(tag_all) and tag_all != [] do
568 from(
569 activity in query,
570 where: fragment(~s(\(? #> '{"object","tag"}'\) \\?& ?), activity.data, ^tag_all)
571 )
572 end
573
574 defp restrict_tag_all(query, _), do: query
575
576 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
577 from(
578 activity in query,
579 where: fragment(~s(\(? #> '{"object","tag"}'\) \\?| ?), activity.data, ^tag)
580 )
581 end
582
583 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
584 from(
585 activity in query,
586 where: fragment(~s(? <@ (? #> '{"object","tag"}'\)), ^tag, activity.data)
587 )
588 end
589
590 defp restrict_tag(query, _), do: query
591
592 defp restrict_to_cc(query, recipients_to, recipients_cc) do
593 from(
594 activity in query,
595 where:
596 fragment(
597 "(?->'to' \\?| ?) or (?->'cc' \\?| ?)",
598 activity.data,
599 ^recipients_to,
600 activity.data,
601 ^recipients_cc
602 )
603 )
604 end
605
606 defp restrict_recipients(query, [], _user), do: query
607
608 defp restrict_recipients(query, recipients, nil) do
609 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
610 end
611
612 defp restrict_recipients(query, recipients, user) do
613 from(
614 activity in query,
615 where: fragment("? && ?", ^recipients, activity.recipients),
616 or_where: activity.actor == ^user.ap_id
617 )
618 end
619
620 defp restrict_limit(query, %{"limit" => limit}) do
621 from(activity in query, limit: ^limit)
622 end
623
624 defp restrict_limit(query, _), do: query
625
626 defp restrict_local(query, %{"local_only" => true}) do
627 from(activity in query, where: activity.local == true)
628 end
629
630 defp restrict_local(query, _), do: query
631
632 defp restrict_max(query, %{"max_id" => ""}), do: query
633
634 defp restrict_max(query, %{"max_id" => max_id}) do
635 from(activity in query, where: activity.id < ^max_id)
636 end
637
638 defp restrict_max(query, _), do: query
639
640 defp restrict_actor(query, %{"actor_id" => actor_id}) do
641 from(activity in query, where: activity.actor == ^actor_id)
642 end
643
644 defp restrict_actor(query, _), do: query
645
646 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
647 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
648 end
649
650 defp restrict_type(query, %{"type" => type}) do
651 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
652 end
653
654 defp restrict_type(query, _), do: query
655
656 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
657 from(
658 activity in query,
659 where: fragment(~s(? <@ (? #> '{"object","likes"}'\)), ^ap_id, activity.data)
660 )
661 end
662
663 defp restrict_favorited_by(query, _), do: query
664
665 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
666 from(
667 activity in query,
668 where: fragment(~s(not (? #> '{"object","attachment"}' = ?\)), activity.data, ^[])
669 )
670 end
671
672 defp restrict_media(query, _), do: query
673
674 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
675 from(
676 activity in query,
677 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
678 )
679 end
680
681 defp restrict_replies(query, _), do: query
682
683 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
684 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
685 end
686
687 defp restrict_reblogs(query, _), do: query
688
689 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
690
691 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
692 mutes = info.mutes
693
694 from(
695 activity in query,
696 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
697 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
698 )
699 end
700
701 defp restrict_muted(query, _), do: query
702
703 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
704 blocks = info.blocks || []
705 domain_blocks = info.domain_blocks || []
706
707 from(
708 activity in query,
709 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
710 where: fragment("not (?->'to' \\?| ?)", activity.data, ^blocks),
711 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks)
712 )
713 end
714
715 defp restrict_blocked(query, _), do: query
716
717 defp restrict_unlisted(query) do
718 from(
719 activity in query,
720 where:
721 fragment(
722 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
723 activity.data,
724 ^["https://www.w3.org/ns/activitystreams#Public"]
725 )
726 )
727 end
728
729 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
730 from(activity in query, where: activity.id in ^ids)
731 end
732
733 defp restrict_pinned(query, _), do: query
734
735 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
736 muted_reblogs = info.muted_reblogs || []
737
738 from(
739 activity in query,
740 where:
741 fragment(
742 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
743 activity.data,
744 activity.actor,
745 ^muted_reblogs
746 )
747 )
748 end
749
750 defp restrict_muted_reblogs(query, _), do: query
751
752 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
753
754 defp maybe_preload_objects(query, _) do
755 query
756 |> Activity.with_preloaded_object()
757 end
758
759 def fetch_activities_query(recipients, opts \\ %{}) do
760 base_query =
761 from(
762 activity in Activity,
763 limit: 20,
764 order_by: [fragment("? desc nulls last", activity.id)]
765 )
766
767 base_query
768 |> maybe_preload_objects(opts)
769 |> restrict_recipients(recipients, opts["user"])
770 |> restrict_tag(opts)
771 |> restrict_tag_reject(opts)
772 |> restrict_tag_all(opts)
773 |> restrict_since(opts)
774 |> restrict_local(opts)
775 |> restrict_limit(opts)
776 |> restrict_max(opts)
777 |> restrict_actor(opts)
778 |> restrict_type(opts)
779 |> restrict_favorited_by(opts)
780 |> restrict_blocked(opts)
781 |> restrict_muted(opts)
782 |> restrict_media(opts)
783 |> restrict_visibility(opts)
784 |> restrict_replies(opts)
785 |> restrict_reblogs(opts)
786 |> restrict_pinned(opts)
787 |> restrict_muted_reblogs(opts)
788 end
789
790 def fetch_activities(recipients, opts \\ %{}) do
791 fetch_activities_query(recipients, opts)
792 |> Repo.all()
793 |> Enum.reverse()
794 end
795
796 def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do
797 fetch_activities_query([], opts)
798 |> restrict_to_cc(recipients_to, recipients_cc)
799 |> Repo.all()
800 |> Enum.reverse()
801 end
802
803 def upload(file, opts \\ []) do
804 with {:ok, data} <- Upload.store(file, opts) do
805 obj_data =
806 if opts[:actor] do
807 Map.put(data, "actor", opts[:actor])
808 else
809 data
810 end
811
812 Repo.insert(%Object{data: obj_data})
813 end
814 end
815
816 def user_data_from_user_object(data) do
817 avatar =
818 data["icon"]["url"] &&
819 %{
820 "type" => "Image",
821 "url" => [%{"href" => data["icon"]["url"]}]
822 }
823
824 banner =
825 data["image"]["url"] &&
826 %{
827 "type" => "Image",
828 "url" => [%{"href" => data["image"]["url"]}]
829 }
830
831 locked = data["manuallyApprovesFollowers"] || false
832 data = Transmogrifier.maybe_fix_user_object(data)
833
834 user_data = %{
835 ap_id: data["id"],
836 info: %{
837 "ap_enabled" => true,
838 "source_data" => data,
839 "banner" => banner,
840 "locked" => locked
841 },
842 avatar: avatar,
843 name: data["name"],
844 follower_address: data["followers"],
845 bio: data["summary"]
846 }
847
848 # nickname can be nil because of virtual actors
849 user_data =
850 if data["preferredUsername"] do
851 Map.put(
852 user_data,
853 :nickname,
854 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
855 )
856 else
857 Map.put(user_data, :nickname, nil)
858 end
859
860 {:ok, user_data}
861 end
862
863 def fetch_and_prepare_user_from_ap_id(ap_id) do
864 with {:ok, data} <- fetch_and_contain_remote_object_from_id(ap_id) do
865 user_data_from_user_object(data)
866 else
867 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
868 end
869 end
870
871 def make_user_from_ap_id(ap_id) do
872 if _user = User.get_by_ap_id(ap_id) do
873 Transmogrifier.upgrade_user_from_ap_id(ap_id)
874 else
875 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
876 User.insert_or_update_user(data)
877 else
878 e -> {:error, e}
879 end
880 end
881 end
882
883 def make_user_from_nickname(nickname) do
884 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
885 make_user_from_ap_id(ap_id)
886 else
887 _e -> {:error, "No AP id in WebFinger"}
888 end
889 end
890
891 def should_federate?(inbox, public) do
892 if public do
893 true
894 else
895 inbox_info = URI.parse(inbox)
896 !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
897 end
898 end
899
900 def publish(actor, activity) do
901 remote_followers =
902 if actor.follower_address in activity.recipients do
903 {:ok, followers} = User.get_followers(actor)
904 followers |> Enum.filter(&(!&1.local))
905 else
906 []
907 end
908
909 public = is_public?(activity)
910
911 {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
912 json = Jason.encode!(data)
913
914 (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
915 |> Enum.filter(fn user -> User.ap_enabled?(user) end)
916 |> Enum.map(fn %{info: %{source_data: data}} ->
917 (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
918 end)
919 |> Enum.uniq()
920 |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
921 |> Instances.filter_reachable()
922 |> Enum.each(fn {inbox, unreachable_since} ->
923 Federator.publish_single_ap(%{
924 inbox: inbox,
925 json: json,
926 actor: actor,
927 id: activity.data["id"],
928 unreachable_since: unreachable_since
929 })
930 end)
931 end
932
933 def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
934 Logger.info("Federating #{id} to #{inbox}")
935 host = URI.parse(inbox).host
936
937 digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
938
939 date =
940 NaiveDateTime.utc_now()
941 |> Timex.format!("{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT")
942
943 signature =
944 Pleroma.Web.HTTPSignatures.sign(actor, %{
945 host: host,
946 "content-length": byte_size(json),
947 digest: digest,
948 date: date
949 })
950
951 with {:ok, %{status: code}} when code in 200..299 <-
952 result =
953 @httpoison.post(
954 inbox,
955 json,
956 [
957 {"Content-Type", "application/activity+json"},
958 {"Date", date},
959 {"signature", signature},
960 {"digest", digest}
961 ]
962 ) do
963 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
964 do: Instances.set_reachable(inbox)
965
966 result
967 else
968 {_post_result, response} ->
969 unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
970 {:error, response}
971 end
972 end
973
974 # TODO:
975 # This will create a Create activity, which we need internally at the moment.
976 def fetch_object_from_id(id) do
977 if object = Object.get_cached_by_ap_id(id) do
978 {:ok, object}
979 else
980 with {:ok, data} <- fetch_and_contain_remote_object_from_id(id),
981 nil <- Object.normalize(data),
982 params <- %{
983 "type" => "Create",
984 "to" => data["to"],
985 "cc" => data["cc"],
986 "actor" => data["actor"] || data["attributedTo"],
987 "object" => data
988 },
989 :ok <- Transmogrifier.contain_origin(id, params),
990 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
991 {:ok, Object.normalize(activity)}
992 else
993 {:error, {:reject, nil}} ->
994 {:reject, nil}
995
996 object = %Object{} ->
997 {:ok, object}
998
999 _e ->
1000 Logger.info("Couldn't get object via AP, trying out OStatus fetching...")
1001
1002 case OStatus.fetch_activity_from_url(id) do
1003 {:ok, [activity | _]} -> {:ok, Object.normalize(activity)}
1004 e -> e
1005 end
1006 end
1007 end
1008 end
1009
1010 def fetch_and_contain_remote_object_from_id(id) do
1011 Logger.info("Fetching object #{id} via AP")
1012
1013 with true <- String.starts_with?(id, "http"),
1014 {:ok, %{body: body, status: code}} when code in 200..299 <-
1015 @httpoison.get(
1016 id,
1017 [{:Accept, "application/activity+json"}]
1018 ),
1019 {:ok, data} <- Jason.decode(body),
1020 :ok <- Transmogrifier.contain_origin_from_id(id, data) do
1021 {:ok, data}
1022 else
1023 e ->
1024 {:error, e}
1025 end
1026 end
1027
1028 # filter out broken threads
1029 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1030 entire_thread_visible_for_user?(activity, user)
1031 end
1032
1033 # do post-processing on a specific activity
1034 def contain_activity(%Activity{} = activity, %User{} = user) do
1035 contain_broken_threads(activity, user)
1036 end
1037
1038 # do post-processing on a timeline
1039 def contain_timeline(timeline, user) do
1040 timeline
1041 |> Enum.filter(fn activity ->
1042 contain_activity(activity, user)
1043 end)
1044 end
1045 end