1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
11 alias Pleroma.Notification
12 alias Pleroma.Instances
13 alias Pleroma.Web.ActivityPub.Transmogrifier
14 alias Pleroma.Web.ActivityPub.MRF
15 alias Pleroma.Web.WebFinger
16 alias Pleroma.Web.Federator
17 alias Pleroma.Web.OStatus
20 import Pleroma.Web.ActivityPub.Utils
21 import Pleroma.Web.ActivityPub.Visibility
25 @httpoison Application.get_env(:pleroma, :httpoison)
27 # For Announce activities, we filter the recipients based on following status for any actors
28 # that match actual users. See issue #164 for more information about why this is necessary.
29 defp get_recipients(%{"type" => "Announce"} = data) do
32 actor = User.get_cached_by_ap_id(data["actor"])
36 |> Enum.filter(fn recipient ->
37 case User.get_cached_by_ap_id(recipient) do
42 User.following?(user, actor)
49 defp get_recipients(%{"type" => "Create"} = data) do
52 actor = data["actor"] || []
53 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
57 defp get_recipients(data) do
64 defp check_actor_is_active(actor) do
65 if not is_nil(actor) do
66 with user <- User.get_cached_by_ap_id(actor),
67 false <- user.info.deactivated do
77 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
78 limit = Pleroma.Config.get([:instance, :remote_limit])
79 String.length(content) <= limit
82 defp check_remote_limit(_), do: true
84 def increase_note_count_if_public(actor, object) do
85 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
88 def decrease_note_count_if_public(actor, object) do
89 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
92 def insert(map, local \\ true) when is_map(map) do
93 with nil <- Activity.normalize(map),
94 map <- lazy_put_activity_defaults(map),
95 :ok <- check_actor_is_active(map["actor"]),
96 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
97 {:ok, map} <- MRF.filter(map),
98 :ok <- insert_full_object(map) do
99 {recipients, _, _} = get_recipients(map)
102 Repo.insert(%Activity{
106 recipients: recipients
110 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
113 Notification.create_notifications(activity)
117 %Activity{} = activity -> {:ok, activity}
118 error -> {:error, error}
122 def stream_out(activity) do
123 public = "https://www.w3.org/ns/activitystreams#Public"
125 if activity.data["type"] in ["Create", "Announce", "Delete"] do
126 Pleroma.Web.Streamer.stream("user", activity)
127 Pleroma.Web.Streamer.stream("list", activity)
129 if Enum.member?(activity.data["to"], public) do
130 Pleroma.Web.Streamer.stream("public", activity)
133 Pleroma.Web.Streamer.stream("public:local", activity)
136 if activity.data["type"] in ["Create"] do
137 activity.data["object"]
138 |> Map.get("tag", [])
139 |> Enum.filter(fn tag -> is_bitstring(tag) end)
140 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
142 if activity.data["object"]["attachment"] != [] do
143 Pleroma.Web.Streamer.stream("public:media", activity)
146 Pleroma.Web.Streamer.stream("public:local:media", activity)
151 if !Enum.member?(activity.data["cc"] || [], public) &&
154 User.get_by_ap_id(activity.data["actor"]).follower_address
156 do: Pleroma.Web.Streamer.stream("direct", activity)
161 def create(%{to: to, actor: actor, context: context, object: object} = params) do
162 additional = params[:additional] || %{}
163 # only accept false as false value
164 local = !(params[:local] == false)
165 published = params[:published]
169 %{to: to, actor: actor, published: published, context: context, object: object},
172 {:ok, activity} <- insert(create_data, local),
173 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
174 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
175 :ok <- maybe_federate(activity) do
180 def accept(%{to: to, actor: actor, object: object} = params) do
181 # only accept false as false value
182 local = !(params[:local] == false)
184 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
185 {:ok, activity} <- insert(data, local),
186 :ok <- maybe_federate(activity) do
191 def reject(%{to: to, actor: actor, object: object} = params) do
192 # only accept false as false value
193 local = !(params[:local] == false)
195 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
196 {:ok, activity} <- insert(data, local),
197 :ok <- maybe_federate(activity) do
202 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
203 # only accept false as false value
204 local = !(params[:local] == false)
213 {:ok, activity} <- insert(data, local),
214 :ok <- maybe_federate(activity) do
219 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
221 %User{ap_id: ap_id} = user,
222 %Object{data: %{"id" => _}} = object,
226 with nil <- get_existing_like(ap_id, object),
227 like_data <- make_like_data(user, object, activity_id),
228 {:ok, activity} <- insert(like_data, local),
229 {:ok, object} <- add_like_to_object(activity, object),
230 :ok <- maybe_federate(activity) do
231 {:ok, activity, object}
233 %Activity{} = activity -> {:ok, activity, object}
234 error -> {:error, error}
244 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
245 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
246 {:ok, unlike_activity} <- insert(unlike_data, local),
247 {:ok, _activity} <- Repo.delete(like_activity),
248 {:ok, object} <- remove_like_from_object(like_activity, object),
249 :ok <- maybe_federate(unlike_activity) do
250 {:ok, unlike_activity, like_activity, object}
257 %User{ap_id: _} = user,
258 %Object{data: %{"id" => _}} = object,
263 with true <- is_public?(object),
264 announce_data <- make_announce_data(user, object, activity_id, public),
265 {:ok, activity} <- insert(announce_data, local),
266 {:ok, object} <- add_announce_to_object(activity, object),
267 :ok <- maybe_federate(activity) do
268 {:ok, activity, object}
270 error -> {:error, error}
280 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
281 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
282 {:ok, unannounce_activity} <- insert(unannounce_data, local),
283 :ok <- maybe_federate(unannounce_activity),
284 {:ok, _activity} <- Repo.delete(announce_activity),
285 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
286 {:ok, unannounce_activity, object}
292 def follow(follower, followed, activity_id \\ nil, local \\ true) do
293 with data <- make_follow_data(follower, followed, activity_id),
294 {:ok, activity} <- insert(data, local),
295 :ok <- maybe_federate(activity) do
300 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
301 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
302 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
303 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
304 {:ok, activity} <- insert(unfollow_data, local),
305 :ok <- maybe_federate(activity) do
310 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
311 user = User.get_cached_by_ap_id(actor)
317 "to" => [user.follower_address, "https://www.w3.org/ns/activitystreams#Public"]
320 with {:ok, _} <- Object.delete(object),
321 {:ok, activity} <- insert(data, local),
322 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
323 {:ok, _actor} <- decrease_note_count_if_public(user, object),
324 :ok <- maybe_federate(activity) do
329 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
330 ap_config = Application.get_env(:pleroma, :activitypub)
331 unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
332 outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
334 with true <- unfollow_blocked do
335 follow_activity = fetch_latest_follow(blocker, blocked)
337 if follow_activity do
338 unfollow(blocker, blocked, nil, local)
342 with true <- outgoing_blocks,
343 block_data <- make_block_data(blocker, blocked, activity_id),
344 {:ok, activity} <- insert(block_data, local),
345 :ok <- maybe_federate(activity) do
352 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
353 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
354 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
355 {:ok, activity} <- insert(unblock_data, local),
356 :ok <- maybe_federate(activity) do
370 additional = params[:additional] || %{}
372 # only accept false as false value
373 local = !(params[:local] == false)
382 |> make_flag_data(additional)
386 def fetch_activities_for_context(context, opts \\ %{}) do
387 public = ["https://www.w3.org/ns/activitystreams#Public"]
390 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
392 query = from(activity in Activity)
396 |> restrict_blocked(opts)
397 |> restrict_recipients(recipients, opts["user"])
404 "?->>'type' = ? and ?->>'context' = ?",
410 order_by: [desc: :id]
416 def fetch_public_activities(opts \\ %{}) do
417 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
420 |> restrict_unlisted()
425 @valid_visibilities ~w[direct unlisted public private]
427 defp restrict_visibility(query, %{visibility: visibility})
428 when is_list(visibility) do
429 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
435 "activity_visibility(?, ?, ?) = ANY (?)",
443 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
447 Logger.error("Could not restrict visibility to #{visibility}")
451 defp restrict_visibility(query, %{visibility: visibility})
452 when visibility in @valid_visibilities do
457 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
460 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
465 defp restrict_visibility(_query, %{visibility: visibility})
466 when visibility not in @valid_visibilities do
467 Logger.error("Could not restrict visibility to #{visibility}")
470 defp restrict_visibility(query, _visibility), do: query
472 def fetch_user_activities(user, reading_user, params \\ %{}) do
475 |> Map.put("type", ["Create", "Announce"])
476 |> Map.put("actor_id", user.ap_id)
477 |> Map.put("whole_db", true)
478 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
482 ["https://www.w3.org/ns/activitystreams#Public"] ++
483 [reading_user.ap_id | reading_user.following]
485 ["https://www.w3.org/ns/activitystreams#Public"]
488 fetch_activities(recipients, params)
492 defp restrict_since(query, %{"since_id" => ""}), do: query
494 defp restrict_since(query, %{"since_id" => since_id}) do
495 from(activity in query, where: activity.id > ^since_id)
498 defp restrict_since(query, _), do: query
500 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
501 when is_list(tag_reject) and tag_reject != [] do
504 where: fragment("(not (? #> '{\"object\",\"tag\"}') \\?| ?)", activity.data, ^tag_reject)
508 defp restrict_tag_reject(query, _), do: query
510 defp restrict_tag_all(query, %{"tag_all" => tag_all})
511 when is_list(tag_all) and tag_all != [] do
514 where: fragment("(? #> '{\"object\",\"tag\"}') \\?& ?", activity.data, ^tag_all)
518 defp restrict_tag_all(query, _), do: query
520 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
523 where: fragment("(? #> '{\"object\",\"tag\"}') \\?| ?", activity.data, ^tag)
527 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
530 where: fragment("? <@ (? #> '{\"object\",\"tag\"}')", ^tag, activity.data)
534 defp restrict_tag(query, _), do: query
536 defp restrict_to_cc(query, recipients_to, recipients_cc) do
541 "(?->'to' \\?| ?) or (?->'cc' \\?| ?)",
550 defp restrict_recipients(query, [], _user), do: query
552 defp restrict_recipients(query, recipients, nil) do
553 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
556 defp restrict_recipients(query, recipients, user) do
559 where: fragment("? && ?", ^recipients, activity.recipients),
560 or_where: activity.actor == ^user.ap_id
564 defp restrict_limit(query, %{"limit" => limit}) do
565 from(activity in query, limit: ^limit)
568 defp restrict_limit(query, _), do: query
570 defp restrict_local(query, %{"local_only" => true}) do
571 from(activity in query, where: activity.local == true)
574 defp restrict_local(query, _), do: query
576 defp restrict_max(query, %{"max_id" => ""}), do: query
578 defp restrict_max(query, %{"max_id" => max_id}) do
579 from(activity in query, where: activity.id < ^max_id)
582 defp restrict_max(query, _), do: query
584 defp restrict_actor(query, %{"actor_id" => actor_id}) do
585 from(activity in query, where: activity.actor == ^actor_id)
588 defp restrict_actor(query, _), do: query
590 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
591 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
594 defp restrict_type(query, %{"type" => type}) do
595 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
598 defp restrict_type(query, _), do: query
600 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
603 where: fragment("? <@ (? #> '{\"object\",\"likes\"}')", ^ap_id, activity.data)
607 defp restrict_favorited_by(query, _), do: query
609 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
612 where: fragment("not (? #> '{\"object\",\"attachment\"}' = ?)", activity.data, ^[])
616 defp restrict_media(query, _), do: query
618 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
621 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
625 defp restrict_replies(query, _), do: query
627 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
628 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
631 defp restrict_reblogs(query, _), do: query
633 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
635 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
640 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
641 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
645 defp restrict_muted(query, _), do: query
647 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
648 blocks = info.blocks || []
649 domain_blocks = info.domain_blocks || []
653 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
654 where: fragment("not (?->'to' \\?| ?)", activity.data, ^blocks),
655 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks)
659 defp restrict_blocked(query, _), do: query
661 defp restrict_unlisted(query) do
666 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
668 ^["https://www.w3.org/ns/activitystreams#Public"]
673 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
674 from(activity in query, where: activity.id in ^ids)
677 defp restrict_pinned(query, _), do: query
679 def fetch_activities_query(recipients, opts \\ %{}) do
682 activity in Activity,
684 order_by: [fragment("? desc nulls last", activity.id)]
688 |> restrict_recipients(recipients, opts["user"])
689 |> restrict_tag(opts)
690 |> restrict_tag_reject(opts)
691 |> restrict_tag_all(opts)
692 |> restrict_since(opts)
693 |> restrict_local(opts)
694 |> restrict_limit(opts)
695 |> restrict_max(opts)
696 |> restrict_actor(opts)
697 |> restrict_type(opts)
698 |> restrict_favorited_by(opts)
699 |> restrict_blocked(opts)
700 |> restrict_muted(opts)
701 |> restrict_media(opts)
702 |> restrict_visibility(opts)
703 |> restrict_replies(opts)
704 |> restrict_reblogs(opts)
705 |> restrict_pinned(opts)
708 def fetch_activities(recipients, opts \\ %{}) do
709 fetch_activities_query(recipients, opts)
714 def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do
715 fetch_activities_query([], opts)
716 |> restrict_to_cc(recipients_to, recipients_cc)
721 def upload(file, opts \\ []) do
722 with {:ok, data} <- Upload.store(file, opts) do
725 Map.put(data, "actor", opts[:actor])
730 Repo.insert(%Object{data: obj_data})
734 def user_data_from_user_object(data) do
736 data["icon"]["url"] &&
739 "url" => [%{"href" => data["icon"]["url"]}]
743 data["image"]["url"] &&
746 "url" => [%{"href" => data["image"]["url"]}]
749 locked = data["manuallyApprovesFollowers"] || false
750 data = Transmogrifier.maybe_fix_user_object(data)
755 "ap_enabled" => true,
756 "source_data" => data,
762 follower_address: data["followers"],
766 # nickname can be nil because of virtual actors
768 if data["preferredUsername"] do
772 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
775 Map.put(user_data, :nickname, nil)
781 def fetch_and_prepare_user_from_ap_id(ap_id) do
782 with {:ok, data} <- fetch_and_contain_remote_object_from_id(ap_id) do
783 user_data_from_user_object(data)
785 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
789 def make_user_from_ap_id(ap_id) do
790 if _user = User.get_by_ap_id(ap_id) do
791 Transmogrifier.upgrade_user_from_ap_id(ap_id)
793 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
794 User.insert_or_update_user(data)
801 def make_user_from_nickname(nickname) do
802 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
803 make_user_from_ap_id(ap_id)
805 _e -> {:error, "No AP id in WebFinger"}
809 def should_federate?(inbox, public) do
813 inbox_info = URI.parse(inbox)
814 !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
818 def publish(actor, activity) do
820 if actor.follower_address in activity.recipients do
821 {:ok, followers} = User.get_followers(actor)
822 followers |> Enum.filter(&(!&1.local))
827 public = is_public?(activity)
829 {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
830 json = Jason.encode!(data)
832 (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
833 |> Enum.filter(fn user -> User.ap_enabled?(user) end)
834 |> Enum.map(fn %{info: %{source_data: data}} ->
835 (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
838 |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
839 |> Instances.filter_reachable()
840 |> Enum.each(fn {inbox, unreachable_since} ->
841 Federator.publish_single_ap(%{
845 id: activity.data["id"],
846 unreachable_since: unreachable_since
851 def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
852 Logger.info("Federating #{id} to #{inbox}")
853 host = URI.parse(inbox).host
855 digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
858 NaiveDateTime.utc_now()
859 |> Timex.format!("{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT")
862 Pleroma.Web.HTTPSignatures.sign(actor, %{
864 "content-length": byte_size(json),
869 with {:ok, %{status: code}} when code in 200..299 <-
875 {"Content-Type", "application/activity+json"},
877 {"signature", signature},
881 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
882 do: Instances.set_reachable(inbox)
886 {_post_result, response} ->
887 unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
893 # This will create a Create activity, which we need internally at the moment.
894 def fetch_object_from_id(id) do
895 if object = Object.get_cached_by_ap_id(id) do
898 with {:ok, data} <- fetch_and_contain_remote_object_from_id(id),
899 nil <- Object.normalize(data),
904 "actor" => data["actor"] || data["attributedTo"],
907 :ok <- Transmogrifier.contain_origin(id, params),
908 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
909 {:ok, Object.normalize(activity.data["object"])}
911 {:error, {:reject, nil}} ->
914 object = %Object{} ->
918 Logger.info("Couldn't get object via AP, trying out OStatus fetching...")
920 case OStatus.fetch_activity_from_url(id) do
921 {:ok, [activity | _]} -> {:ok, Object.normalize(activity.data["object"])}
928 def fetch_and_contain_remote_object_from_id(id) do
929 Logger.info("Fetching object #{id} via AP")
931 with true <- String.starts_with?(id, "http"),
932 {:ok, %{body: body, status: code}} when code in 200..299 <-
935 [{:Accept, "application/activity+json"}]
937 {:ok, data} <- Jason.decode(body),
938 :ok <- Transmogrifier.contain_origin_from_id(id, data) do
946 # filter out broken threads
947 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
948 entire_thread_visible_for_user?(activity, user)
951 # do post-processing on a specific activity
952 def contain_activity(%Activity{} = activity, %User{} = user) do
953 contain_broken_threads(activity, user)
956 # do post-processing on a timeline
957 def contain_timeline(timeline, user) do
959 |> Enum.filter(fn activity ->
960 contain_activity(activity, user)