Conversations: Create or bump on inserting a dm.
[akkoma] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6 alias Pleroma.Activity
7 alias Pleroma.Conversation
8 alias Pleroma.Instances
9 alias Pleroma.Notification
10 alias Pleroma.Object
11 alias Pleroma.Repo
12 alias Pleroma.Upload
13 alias Pleroma.User
14 alias Pleroma.Web.ActivityPub.MRF
15 alias Pleroma.Web.ActivityPub.Transmogrifier
16 alias Pleroma.Web.Federator
17 alias Pleroma.Web.OStatus
18 alias Pleroma.Web.WebFinger
19
20 import Ecto.Query
21 import Pleroma.Web.ActivityPub.Utils
22 import Pleroma.Web.ActivityPub.Visibility
23
24 require Logger
25
26 @httpoison Application.get_env(:pleroma, :httpoison)
27
28 # For Announce activities, we filter the recipients based on following status for any actors
29 # that match actual users. See issue #164 for more information about why this is necessary.
30 defp get_recipients(%{"type" => "Announce"} = data) do
31 to = data["to"] || []
32 cc = data["cc"] || []
33 actor = User.get_cached_by_ap_id(data["actor"])
34
35 recipients =
36 (to ++ cc)
37 |> Enum.filter(fn recipient ->
38 case User.get_cached_by_ap_id(recipient) do
39 nil ->
40 true
41
42 user ->
43 User.following?(user, actor)
44 end
45 end)
46
47 {recipients, to, cc}
48 end
49
50 defp get_recipients(%{"type" => "Create"} = data) do
51 to = data["to"] || []
52 cc = data["cc"] || []
53 actor = data["actor"] || []
54 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
55 {recipients, to, cc}
56 end
57
58 defp get_recipients(data) do
59 to = data["to"] || []
60 cc = data["cc"] || []
61 recipients = to ++ cc
62 {recipients, to, cc}
63 end
64
65 defp check_actor_is_active(actor) do
66 if not is_nil(actor) do
67 with user <- User.get_cached_by_ap_id(actor),
68 false <- user.info.deactivated do
69 :ok
70 else
71 _e -> :reject
72 end
73 else
74 :ok
75 end
76 end
77
78 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
79 limit = Pleroma.Config.get([:instance, :remote_limit])
80 String.length(content) <= limit
81 end
82
83 defp check_remote_limit(_), do: true
84
85 def increase_note_count_if_public(actor, object) do
86 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
87 end
88
89 def decrease_note_count_if_public(actor, object) do
90 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
91 end
92
93 def increase_replies_count_if_reply(%{
94 "object" =>
95 %{"inReplyTo" => reply_ap_id, "inReplyToStatusId" => reply_status_id} = object,
96 "type" => "Create"
97 }) do
98 if is_public?(object) do
99 Activity.increase_replies_count(reply_status_id)
100 Object.increase_replies_count(reply_ap_id)
101 end
102 end
103
104 def increase_replies_count_if_reply(_create_data), do: :noop
105
106 def decrease_replies_count_if_reply(%Object{
107 data: %{"inReplyTo" => reply_ap_id, "inReplyToStatusId" => reply_status_id} = object
108 }) do
109 if is_public?(object) do
110 Activity.decrease_replies_count(reply_status_id)
111 Object.decrease_replies_count(reply_ap_id)
112 end
113 end
114
115 def decrease_replies_count_if_reply(_object), do: :noop
116
117 def insert(map, local \\ true, fake \\ false) when is_map(map) do
118 with nil <- Activity.normalize(map),
119 map <- lazy_put_activity_defaults(map, fake),
120 :ok <- check_actor_is_active(map["actor"]),
121 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
122 {:ok, map} <- MRF.filter(map),
123 {recipients, _, _} = get_recipients(map),
124 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
125 {:ok, object} <- insert_full_object(map) do
126 {:ok, activity} =
127 Repo.insert(%Activity{
128 data: map,
129 local: local,
130 actor: map["actor"],
131 recipients: recipients
132 })
133
134 # Splice in the child object if we have one.
135 activity =
136 if !is_nil(object) do
137 Map.put(activity, :object, object)
138 else
139 activity
140 end
141
142 Task.start(fn ->
143 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
144 end)
145
146 Notification.create_notifications(activity)
147 Conversation.create_or_bump_for(activity)
148 stream_out(activity)
149 {:ok, activity}
150 else
151 %Activity{} = activity ->
152 {:ok, activity}
153
154 {:fake, true, map, recipients} ->
155 activity = %Activity{
156 data: map,
157 local: local,
158 actor: map["actor"],
159 recipients: recipients,
160 id: "pleroma:fakeid"
161 }
162
163 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
164 {:ok, activity}
165
166 error ->
167 {:error, error}
168 end
169 end
170
171 def stream_out(activity) do
172 public = "https://www.w3.org/ns/activitystreams#Public"
173
174 if activity.data["type"] in ["Create", "Announce", "Delete"] do
175 Pleroma.Web.Streamer.stream("user", activity)
176 Pleroma.Web.Streamer.stream("list", activity)
177
178 if Enum.member?(activity.data["to"], public) do
179 Pleroma.Web.Streamer.stream("public", activity)
180
181 if activity.local do
182 Pleroma.Web.Streamer.stream("public:local", activity)
183 end
184
185 if activity.data["type"] in ["Create"] do
186 activity.data["object"]
187 |> Map.get("tag", [])
188 |> Enum.filter(fn tag -> is_bitstring(tag) end)
189 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
190
191 if activity.data["object"]["attachment"] != [] do
192 Pleroma.Web.Streamer.stream("public:media", activity)
193
194 if activity.local do
195 Pleroma.Web.Streamer.stream("public:local:media", activity)
196 end
197 end
198 end
199 else
200 if !Enum.member?(activity.data["cc"] || [], public) &&
201 !Enum.member?(
202 activity.data["to"],
203 User.get_by_ap_id(activity.data["actor"]).follower_address
204 ),
205 do: Pleroma.Web.Streamer.stream("direct", activity)
206 end
207 end
208 end
209
210 def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
211 additional = params[:additional] || %{}
212 # only accept false as false value
213 local = !(params[:local] == false)
214 published = params[:published]
215
216 with create_data <-
217 make_create_data(
218 %{to: to, actor: actor, published: published, context: context, object: object},
219 additional
220 ),
221 {:ok, activity} <- insert(create_data, local, fake),
222 {:fake, false, activity} <- {:fake, fake, activity},
223 _ <- increase_replies_count_if_reply(create_data),
224 # Changing note count prior to enqueuing federation task in order to avoid
225 # race conditions on updating user.info
226 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
227 :ok <- maybe_federate(activity) do
228 {:ok, activity}
229 else
230 {:fake, true, activity} ->
231 {:ok, activity}
232 end
233 end
234
235 def accept(%{to: to, actor: actor, object: object} = params) do
236 # only accept false as false value
237 local = !(params[:local] == false)
238
239 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
240 {:ok, activity} <- insert(data, local),
241 :ok <- maybe_federate(activity) do
242 {:ok, activity}
243 end
244 end
245
246 def reject(%{to: to, actor: actor, object: object} = params) do
247 # only accept false as false value
248 local = !(params[:local] == false)
249
250 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
251 {:ok, activity} <- insert(data, local),
252 :ok <- maybe_federate(activity) do
253 {:ok, activity}
254 end
255 end
256
257 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
258 # only accept false as false value
259 local = !(params[:local] == false)
260
261 with data <- %{
262 "to" => to,
263 "cc" => cc,
264 "type" => "Update",
265 "actor" => actor,
266 "object" => object
267 },
268 {:ok, activity} <- insert(data, local),
269 :ok <- maybe_federate(activity) do
270 {:ok, activity}
271 end
272 end
273
274 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
275 def like(
276 %User{ap_id: ap_id} = user,
277 %Object{data: %{"id" => _}} = object,
278 activity_id \\ nil,
279 local \\ true
280 ) do
281 with nil <- get_existing_like(ap_id, object),
282 like_data <- make_like_data(user, object, activity_id),
283 {:ok, activity} <- insert(like_data, local),
284 {:ok, object} <- add_like_to_object(activity, object),
285 :ok <- maybe_federate(activity) do
286 {:ok, activity, object}
287 else
288 %Activity{} = activity -> {:ok, activity, object}
289 error -> {:error, error}
290 end
291 end
292
293 def unlike(
294 %User{} = actor,
295 %Object{} = object,
296 activity_id \\ nil,
297 local \\ true
298 ) do
299 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
300 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
301 {:ok, unlike_activity} <- insert(unlike_data, local),
302 {:ok, _activity} <- Repo.delete(like_activity),
303 {:ok, object} <- remove_like_from_object(like_activity, object),
304 :ok <- maybe_federate(unlike_activity) do
305 {:ok, unlike_activity, like_activity, object}
306 else
307 _e -> {:ok, object}
308 end
309 end
310
311 def announce(
312 %User{ap_id: _} = user,
313 %Object{data: %{"id" => _}} = object,
314 activity_id \\ nil,
315 local \\ true,
316 public \\ true
317 ) do
318 with true <- is_public?(object),
319 announce_data <- make_announce_data(user, object, activity_id, public),
320 {:ok, activity} <- insert(announce_data, local),
321 {:ok, object} <- add_announce_to_object(activity, object),
322 :ok <- maybe_federate(activity) do
323 {:ok, activity, object}
324 else
325 error -> {:error, error}
326 end
327 end
328
329 def unannounce(
330 %User{} = actor,
331 %Object{} = object,
332 activity_id \\ nil,
333 local \\ true
334 ) do
335 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
336 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
337 {:ok, unannounce_activity} <- insert(unannounce_data, local),
338 :ok <- maybe_federate(unannounce_activity),
339 {:ok, _activity} <- Repo.delete(announce_activity),
340 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
341 {:ok, unannounce_activity, object}
342 else
343 _e -> {:ok, object}
344 end
345 end
346
347 def follow(follower, followed, activity_id \\ nil, local \\ true) do
348 with data <- make_follow_data(follower, followed, activity_id),
349 {:ok, activity} <- insert(data, local),
350 :ok <- maybe_federate(activity) do
351 {:ok, activity}
352 end
353 end
354
355 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
356 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
357 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
358 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
359 {:ok, activity} <- insert(unfollow_data, local),
360 :ok <- maybe_federate(activity) do
361 {:ok, activity}
362 end
363 end
364
365 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
366 user = User.get_cached_by_ap_id(actor)
367 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
368
369 with {:ok, object, activity} <- Object.delete(object),
370 data <- %{
371 "type" => "Delete",
372 "actor" => actor,
373 "object" => id,
374 "to" => to,
375 "deleted_activity_id" => activity && activity.id
376 },
377 {:ok, activity} <- insert(data, local),
378 _ <- decrease_replies_count_if_reply(object),
379 # Changing note count prior to enqueuing federation task in order to avoid
380 # race conditions on updating user.info
381 {:ok, _actor} <- decrease_note_count_if_public(user, object),
382 :ok <- maybe_federate(activity) do
383 {:ok, activity}
384 end
385 end
386
387 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
388 ap_config = Application.get_env(:pleroma, :activitypub)
389 unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
390 outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
391
392 with true <- unfollow_blocked do
393 follow_activity = fetch_latest_follow(blocker, blocked)
394
395 if follow_activity do
396 unfollow(blocker, blocked, nil, local)
397 end
398 end
399
400 with true <- outgoing_blocks,
401 block_data <- make_block_data(blocker, blocked, activity_id),
402 {:ok, activity} <- insert(block_data, local),
403 :ok <- maybe_federate(activity) do
404 {:ok, activity}
405 else
406 _e -> {:ok, nil}
407 end
408 end
409
410 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
411 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
412 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
413 {:ok, activity} <- insert(unblock_data, local),
414 :ok <- maybe_federate(activity) do
415 {:ok, activity}
416 end
417 end
418
419 def flag(
420 %{
421 actor: actor,
422 context: context,
423 account: account,
424 statuses: statuses,
425 content: content
426 } = params
427 ) do
428 # only accept false as false value
429 local = !(params[:local] == false)
430 forward = !(params[:forward] == false)
431
432 additional = params[:additional] || %{}
433
434 params = %{
435 actor: actor,
436 context: context,
437 account: account,
438 statuses: statuses,
439 content: content
440 }
441
442 additional =
443 if forward do
444 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
445 else
446 Map.merge(additional, %{"to" => [], "cc" => []})
447 end
448
449 with flag_data <- make_flag_data(params, additional),
450 {:ok, activity} <- insert(flag_data, local),
451 :ok <- maybe_federate(activity) do
452 Enum.each(User.all_superusers(), fn superuser ->
453 superuser
454 |> Pleroma.AdminEmail.report(actor, account, statuses, content)
455 |> Pleroma.Mailer.deliver_async()
456 end)
457
458 {:ok, activity}
459 end
460 end
461
462 def fetch_activities_for_context(context, opts \\ %{}) do
463 public = ["https://www.w3.org/ns/activitystreams#Public"]
464
465 recipients =
466 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
467
468 query = from(activity in Activity)
469
470 query =
471 query
472 |> restrict_blocked(opts)
473 |> restrict_recipients(recipients, opts["user"])
474
475 query =
476 from(
477 activity in query,
478 where:
479 fragment(
480 "?->>'type' = ? and ?->>'context' = ?",
481 activity.data,
482 "Create",
483 activity.data,
484 ^context
485 ),
486 order_by: [desc: :id]
487 )
488 |> Activity.with_preloaded_object()
489
490 Repo.all(query)
491 end
492
493 def fetch_public_activities(opts \\ %{}) do
494 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
495
496 q
497 |> restrict_unlisted()
498 |> Repo.all()
499 |> Enum.reverse()
500 end
501
502 @valid_visibilities ~w[direct unlisted public private]
503
504 defp restrict_visibility(query, %{visibility: visibility})
505 when is_list(visibility) do
506 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
507 query =
508 from(
509 a in query,
510 where:
511 fragment(
512 "activity_visibility(?, ?, ?) = ANY (?)",
513 a.actor,
514 a.recipients,
515 a.data,
516 ^visibility
517 )
518 )
519
520 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
521
522 query
523 else
524 Logger.error("Could not restrict visibility to #{visibility}")
525 end
526 end
527
528 defp restrict_visibility(query, %{visibility: visibility})
529 when visibility in @valid_visibilities do
530 query =
531 from(
532 a in query,
533 where:
534 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
535 )
536
537 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
538
539 query
540 end
541
542 defp restrict_visibility(_query, %{visibility: visibility})
543 when visibility not in @valid_visibilities do
544 Logger.error("Could not restrict visibility to #{visibility}")
545 end
546
547 defp restrict_visibility(query, _visibility), do: query
548
549 def fetch_user_activities(user, reading_user, params \\ %{}) do
550 params =
551 params
552 |> Map.put("type", ["Create", "Announce"])
553 |> Map.put("actor_id", user.ap_id)
554 |> Map.put("whole_db", true)
555 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
556
557 recipients =
558 if reading_user do
559 ["https://www.w3.org/ns/activitystreams#Public"] ++
560 [reading_user.ap_id | reading_user.following]
561 else
562 ["https://www.w3.org/ns/activitystreams#Public"]
563 end
564
565 fetch_activities(recipients, params)
566 |> Enum.reverse()
567 end
568
569 defp restrict_since(query, %{"since_id" => ""}), do: query
570
571 defp restrict_since(query, %{"since_id" => since_id}) do
572 from(activity in query, where: activity.id > ^since_id)
573 end
574
575 defp restrict_since(query, _), do: query
576
577 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
578 when is_list(tag_reject) and tag_reject != [] do
579 from(
580 activity in query,
581 where: fragment(~s(\(not \(? #> '{"object","tag"}'\) \\?| ?\)), activity.data, ^tag_reject)
582 )
583 end
584
585 defp restrict_tag_reject(query, _), do: query
586
587 defp restrict_tag_all(query, %{"tag_all" => tag_all})
588 when is_list(tag_all) and tag_all != [] do
589 from(
590 activity in query,
591 where: fragment(~s(\(? #> '{"object","tag"}'\) \\?& ?), activity.data, ^tag_all)
592 )
593 end
594
595 defp restrict_tag_all(query, _), do: query
596
597 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
598 from(
599 activity in query,
600 where: fragment(~s(\(? #> '{"object","tag"}'\) \\?| ?), activity.data, ^tag)
601 )
602 end
603
604 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
605 from(
606 activity in query,
607 where: fragment(~s(? <@ (? #> '{"object","tag"}'\)), ^tag, activity.data)
608 )
609 end
610
611 defp restrict_tag(query, _), do: query
612
613 defp restrict_to_cc(query, recipients_to, recipients_cc) do
614 from(
615 activity in query,
616 where:
617 fragment(
618 "(?->'to' \\?| ?) or (?->'cc' \\?| ?)",
619 activity.data,
620 ^recipients_to,
621 activity.data,
622 ^recipients_cc
623 )
624 )
625 end
626
627 defp restrict_recipients(query, [], _user), do: query
628
629 defp restrict_recipients(query, recipients, nil) do
630 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
631 end
632
633 defp restrict_recipients(query, recipients, user) do
634 from(
635 activity in query,
636 where: fragment("? && ?", ^recipients, activity.recipients),
637 or_where: activity.actor == ^user.ap_id
638 )
639 end
640
641 defp restrict_limit(query, %{"limit" => limit}) do
642 from(activity in query, limit: ^limit)
643 end
644
645 defp restrict_limit(query, _), do: query
646
647 defp restrict_local(query, %{"local_only" => true}) do
648 from(activity in query, where: activity.local == true)
649 end
650
651 defp restrict_local(query, _), do: query
652
653 defp restrict_max(query, %{"max_id" => ""}), do: query
654
655 defp restrict_max(query, %{"max_id" => max_id}) do
656 from(activity in query, where: activity.id < ^max_id)
657 end
658
659 defp restrict_max(query, _), do: query
660
661 defp restrict_actor(query, %{"actor_id" => actor_id}) do
662 from(activity in query, where: activity.actor == ^actor_id)
663 end
664
665 defp restrict_actor(query, _), do: query
666
667 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
668 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
669 end
670
671 defp restrict_type(query, %{"type" => type}) do
672 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
673 end
674
675 defp restrict_type(query, _), do: query
676
677 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
678 from(
679 activity in query,
680 where: fragment(~s(? <@ (? #> '{"object","likes"}'\)), ^ap_id, activity.data)
681 )
682 end
683
684 defp restrict_favorited_by(query, _), do: query
685
686 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
687 from(
688 activity in query,
689 where: fragment(~s(not (? #> '{"object","attachment"}' = ?\)), activity.data, ^[])
690 )
691 end
692
693 defp restrict_media(query, _), do: query
694
695 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
696 from(
697 activity in query,
698 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
699 )
700 end
701
702 defp restrict_replies(query, _), do: query
703
704 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
705 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
706 end
707
708 defp restrict_reblogs(query, _), do: query
709
710 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
711
712 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
713 mutes = info.mutes
714
715 from(
716 activity in query,
717 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
718 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
719 )
720 end
721
722 defp restrict_muted(query, _), do: query
723
724 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
725 blocks = info.blocks || []
726 domain_blocks = info.domain_blocks || []
727
728 from(
729 activity in query,
730 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
731 where: fragment("not (?->'to' \\?| ?)", activity.data, ^blocks),
732 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks)
733 )
734 end
735
736 defp restrict_blocked(query, _), do: query
737
738 defp restrict_unlisted(query) do
739 from(
740 activity in query,
741 where:
742 fragment(
743 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
744 activity.data,
745 ^["https://www.w3.org/ns/activitystreams#Public"]
746 )
747 )
748 end
749
750 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
751 from(activity in query, where: activity.id in ^ids)
752 end
753
754 defp restrict_pinned(query, _), do: query
755
756 defp restrict_muted_reblogs(query, %{"muting_user" => %User{info: info}}) do
757 muted_reblogs = info.muted_reblogs || []
758
759 from(
760 activity in query,
761 where:
762 fragment(
763 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
764 activity.data,
765 activity.actor,
766 ^muted_reblogs
767 )
768 )
769 end
770
771 defp restrict_muted_reblogs(query, _), do: query
772
773 defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query
774
775 defp maybe_preload_objects(query, _) do
776 query
777 |> Activity.with_preloaded_object()
778 end
779
780 def fetch_activities_query(recipients, opts \\ %{}) do
781 base_query =
782 from(
783 activity in Activity,
784 limit: 20,
785 order_by: [fragment("? desc nulls last", activity.id)]
786 )
787
788 base_query
789 |> maybe_preload_objects(opts)
790 |> restrict_recipients(recipients, opts["user"])
791 |> restrict_tag(opts)
792 |> restrict_tag_reject(opts)
793 |> restrict_tag_all(opts)
794 |> restrict_since(opts)
795 |> restrict_local(opts)
796 |> restrict_limit(opts)
797 |> restrict_max(opts)
798 |> restrict_actor(opts)
799 |> restrict_type(opts)
800 |> restrict_favorited_by(opts)
801 |> restrict_blocked(opts)
802 |> restrict_muted(opts)
803 |> restrict_media(opts)
804 |> restrict_visibility(opts)
805 |> restrict_replies(opts)
806 |> restrict_reblogs(opts)
807 |> restrict_pinned(opts)
808 |> restrict_muted_reblogs(opts)
809 end
810
811 def fetch_activities(recipients, opts \\ %{}) do
812 fetch_activities_query(recipients, opts)
813 |> Repo.all()
814 |> Enum.reverse()
815 end
816
817 def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do
818 fetch_activities_query([], opts)
819 |> restrict_to_cc(recipients_to, recipients_cc)
820 |> Repo.all()
821 |> Enum.reverse()
822 end
823
824 def upload(file, opts \\ []) do
825 with {:ok, data} <- Upload.store(file, opts) do
826 obj_data =
827 if opts[:actor] do
828 Map.put(data, "actor", opts[:actor])
829 else
830 data
831 end
832
833 Repo.insert(%Object{data: obj_data})
834 end
835 end
836
837 def user_data_from_user_object(data) do
838 avatar =
839 data["icon"]["url"] &&
840 %{
841 "type" => "Image",
842 "url" => [%{"href" => data["icon"]["url"]}]
843 }
844
845 banner =
846 data["image"]["url"] &&
847 %{
848 "type" => "Image",
849 "url" => [%{"href" => data["image"]["url"]}]
850 }
851
852 locked = data["manuallyApprovesFollowers"] || false
853 data = Transmogrifier.maybe_fix_user_object(data)
854
855 user_data = %{
856 ap_id: data["id"],
857 info: %{
858 "ap_enabled" => true,
859 "source_data" => data,
860 "banner" => banner,
861 "locked" => locked
862 },
863 avatar: avatar,
864 name: data["name"],
865 follower_address: data["followers"],
866 bio: data["summary"]
867 }
868
869 # nickname can be nil because of virtual actors
870 user_data =
871 if data["preferredUsername"] do
872 Map.put(
873 user_data,
874 :nickname,
875 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
876 )
877 else
878 Map.put(user_data, :nickname, nil)
879 end
880
881 {:ok, user_data}
882 end
883
884 def fetch_and_prepare_user_from_ap_id(ap_id) do
885 with {:ok, data} <- fetch_and_contain_remote_object_from_id(ap_id) do
886 user_data_from_user_object(data)
887 else
888 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
889 end
890 end
891
892 def make_user_from_ap_id(ap_id) do
893 if _user = User.get_by_ap_id(ap_id) do
894 Transmogrifier.upgrade_user_from_ap_id(ap_id)
895 else
896 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
897 User.insert_or_update_user(data)
898 else
899 e -> {:error, e}
900 end
901 end
902 end
903
904 def make_user_from_nickname(nickname) do
905 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
906 make_user_from_ap_id(ap_id)
907 else
908 _e -> {:error, "No AP id in WebFinger"}
909 end
910 end
911
912 def should_federate?(inbox, public) do
913 if public do
914 true
915 else
916 inbox_info = URI.parse(inbox)
917 !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
918 end
919 end
920
921 def publish(actor, activity) do
922 remote_followers =
923 if actor.follower_address in activity.recipients do
924 {:ok, followers} = User.get_followers(actor)
925 followers |> Enum.filter(&(!&1.local))
926 else
927 []
928 end
929
930 public = is_public?(activity)
931
932 {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
933 json = Jason.encode!(data)
934
935 (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
936 |> Enum.filter(fn user -> User.ap_enabled?(user) end)
937 |> Enum.map(fn %{info: %{source_data: data}} ->
938 (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
939 end)
940 |> Enum.uniq()
941 |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
942 |> Instances.filter_reachable()
943 |> Enum.each(fn {inbox, unreachable_since} ->
944 Federator.publish_single_ap(%{
945 inbox: inbox,
946 json: json,
947 actor: actor,
948 id: activity.data["id"],
949 unreachable_since: unreachable_since
950 })
951 end)
952 end
953
954 def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
955 Logger.info("Federating #{id} to #{inbox}")
956 host = URI.parse(inbox).host
957
958 digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
959
960 date =
961 NaiveDateTime.utc_now()
962 |> Timex.format!("{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT")
963
964 signature =
965 Pleroma.Web.HTTPSignatures.sign(actor, %{
966 host: host,
967 "content-length": byte_size(json),
968 digest: digest,
969 date: date
970 })
971
972 with {:ok, %{status: code}} when code in 200..299 <-
973 result =
974 @httpoison.post(
975 inbox,
976 json,
977 [
978 {"Content-Type", "application/activity+json"},
979 {"Date", date},
980 {"signature", signature},
981 {"digest", digest}
982 ]
983 ) do
984 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
985 do: Instances.set_reachable(inbox)
986
987 result
988 else
989 {_post_result, response} ->
990 unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
991 {:error, response}
992 end
993 end
994
995 # TODO:
996 # This will create a Create activity, which we need internally at the moment.
997 def fetch_object_from_id(id) do
998 if object = Object.get_cached_by_ap_id(id) do
999 {:ok, object}
1000 else
1001 with {:ok, data} <- fetch_and_contain_remote_object_from_id(id),
1002 nil <- Object.normalize(data),
1003 params <- %{
1004 "type" => "Create",
1005 "to" => data["to"],
1006 "cc" => data["cc"],
1007 "actor" => data["actor"] || data["attributedTo"],
1008 "object" => data
1009 },
1010 :ok <- Transmogrifier.contain_origin(id, params),
1011 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
1012 {:ok, Object.normalize(activity)}
1013 else
1014 {:error, {:reject, nil}} ->
1015 {:reject, nil}
1016
1017 object = %Object{} ->
1018 {:ok, object}
1019
1020 _e ->
1021 Logger.info("Couldn't get object via AP, trying out OStatus fetching...")
1022
1023 case OStatus.fetch_activity_from_url(id) do
1024 {:ok, [activity | _]} -> {:ok, Object.normalize(activity)}
1025 e -> e
1026 end
1027 end
1028 end
1029 end
1030
1031 def fetch_and_contain_remote_object_from_id(id) do
1032 Logger.info("Fetching object #{id} via AP")
1033
1034 with true <- String.starts_with?(id, "http"),
1035 {:ok, %{body: body, status: code}} when code in 200..299 <-
1036 @httpoison.get(
1037 id,
1038 [{:Accept, "application/activity+json"}]
1039 ),
1040 {:ok, data} <- Jason.decode(body),
1041 :ok <- Transmogrifier.contain_origin_from_id(id, data) do
1042 {:ok, data}
1043 else
1044 e ->
1045 {:error, e}
1046 end
1047 end
1048
1049 # filter out broken threads
1050 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
1051 entire_thread_visible_for_user?(activity, user)
1052 end
1053
1054 # do post-processing on a specific activity
1055 def contain_activity(%Activity{} = activity, %User{} = user) do
1056 contain_broken_threads(activity, user)
1057 end
1058
1059 # do post-processing on a timeline
1060 def contain_timeline(timeline, user) do
1061 timeline
1062 |> Enum.filter(fn activity ->
1063 contain_activity(activity, user)
1064 end)
1065 end
1066 end