1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
11 alias Pleroma.Notification
12 alias Pleroma.Instances
13 alias Pleroma.Web.ActivityPub.Transmogrifier
14 alias Pleroma.Web.ActivityPub.MRF
15 alias Pleroma.Web.WebFinger
16 alias Pleroma.Web.Federator
17 alias Pleroma.Web.OStatus
20 import Pleroma.Web.ActivityPub.Utils
21 import Pleroma.Web.ActivityPub.Visibility
25 @httpoison Application.get_env(:pleroma, :httpoison)
27 # For Announce activities, we filter the recipients based on following status for any actors
28 # that match actual users. See issue #164 for more information about why this is necessary.
29 defp get_recipients(%{"type" => "Announce"} = data) do
32 actor = User.get_cached_by_ap_id(data["actor"])
36 |> Enum.filter(fn recipient ->
37 case User.get_cached_by_ap_id(recipient) do
42 User.following?(user, actor)
49 defp get_recipients(%{"type" => "Create"} = data) do
52 actor = data["actor"] || []
53 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
57 defp get_recipients(data) do
64 defp check_actor_is_active(actor) do
65 if not is_nil(actor) do
66 with user <- User.get_cached_by_ap_id(actor),
67 false <- user.info.deactivated do
77 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
78 limit = Pleroma.Config.get([:instance, :remote_limit])
79 String.length(content) <= limit
82 defp check_remote_limit(_), do: true
84 def increase_note_count_if_public(actor, object) do
85 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
88 def decrease_note_count_if_public(actor, object) do
89 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
92 def insert(map, local \\ true) when is_map(map) do
93 with nil <- Activity.normalize(map),
94 map <- lazy_put_activity_defaults(map),
95 :ok <- check_actor_is_active(map["actor"]),
96 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
97 {:ok, map} <- MRF.filter(map),
98 :ok <- insert_full_object(map) do
99 {recipients, _, _} = get_recipients(map)
102 Repo.insert(%Activity{
106 recipients: recipients
110 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
113 Notification.create_notifications(activity)
117 %Activity{} = activity -> {:ok, activity}
118 error -> {:error, error}
122 def stream_out(activity) do
123 public = "https://www.w3.org/ns/activitystreams#Public"
125 if activity.data["type"] in ["Create", "Announce", "Delete"] do
126 Pleroma.Web.Streamer.stream("user", activity)
127 Pleroma.Web.Streamer.stream("list", activity)
129 if Enum.member?(activity.data["to"], public) do
130 Pleroma.Web.Streamer.stream("public", activity)
133 Pleroma.Web.Streamer.stream("public:local", activity)
136 if activity.data["type"] in ["Create"] do
137 activity.data["object"]
138 |> Map.get("tag", [])
139 |> Enum.filter(fn tag -> is_bitstring(tag) end)
140 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
142 if activity.data["object"]["attachment"] != [] do
143 Pleroma.Web.Streamer.stream("public:media", activity)
146 Pleroma.Web.Streamer.stream("public:local:media", activity)
151 if !Enum.member?(activity.data["cc"] || [], public) &&
154 User.get_by_ap_id(activity.data["actor"]).follower_address
156 do: Pleroma.Web.Streamer.stream("direct", activity)
161 def create(%{to: to, actor: actor, context: context, object: object} = params) do
162 additional = params[:additional] || %{}
163 # only accept false as false value
164 local = !(params[:local] == false)
165 published = params[:published]
169 %{to: to, actor: actor, published: published, context: context, object: object},
172 {:ok, activity} <- insert(create_data, local),
173 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
174 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
175 :ok <- maybe_federate(activity) do
180 def accept(%{to: to, actor: actor, object: object} = params) do
181 # only accept false as false value
182 local = !(params[:local] == false)
184 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
185 {:ok, activity} <- insert(data, local),
186 :ok <- maybe_federate(activity) do
191 def reject(%{to: to, actor: actor, object: object} = params) do
192 # only accept false as false value
193 local = !(params[:local] == false)
195 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
196 {:ok, activity} <- insert(data, local),
197 :ok <- maybe_federate(activity) do
202 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
203 # only accept false as false value
204 local = !(params[:local] == false)
213 {:ok, activity} <- insert(data, local),
214 :ok <- maybe_federate(activity) do
219 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
221 %User{ap_id: ap_id} = user,
222 %Object{data: %{"id" => _}} = object,
226 with nil <- get_existing_like(ap_id, object),
227 like_data <- make_like_data(user, object, activity_id),
228 {:ok, activity} <- insert(like_data, local),
229 {:ok, object} <- add_like_to_object(activity, object),
230 :ok <- maybe_federate(activity) do
231 {:ok, activity, object}
233 %Activity{} = activity -> {:ok, activity, object}
234 error -> {:error, error}
244 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
245 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
246 {:ok, unlike_activity} <- insert(unlike_data, local),
247 {:ok, _activity} <- Repo.delete(like_activity),
248 {:ok, object} <- remove_like_from_object(like_activity, object),
249 :ok <- maybe_federate(unlike_activity) do
250 {:ok, unlike_activity, like_activity, object}
257 %User{ap_id: _} = user,
258 %Object{data: %{"id" => _}} = object,
263 with true <- is_public?(object),
264 announce_data <- make_announce_data(user, object, activity_id, public),
265 {:ok, activity} <- insert(announce_data, local),
266 {:ok, object} <- add_announce_to_object(activity, object),
267 :ok <- maybe_federate(activity) do
268 {:ok, activity, object}
270 error -> {:error, error}
280 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
281 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
282 {:ok, unannounce_activity} <- insert(unannounce_data, local),
283 :ok <- maybe_federate(unannounce_activity),
284 {:ok, _activity} <- Repo.delete(announce_activity),
285 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
286 {:ok, unannounce_activity, object}
292 def follow(follower, followed, activity_id \\ nil, local \\ true) do
293 with data <- make_follow_data(follower, followed, activity_id),
294 {:ok, activity} <- insert(data, local),
295 :ok <- maybe_federate(activity) do
300 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
301 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
302 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
303 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
304 {:ok, activity} <- insert(unfollow_data, local),
305 :ok <- maybe_federate(activity) do
310 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
311 user = User.get_cached_by_ap_id(actor)
313 to = object.data["to"] || [] ++ object.data["cc"] ||
314 [] ++ [user.follower_address, "https://www.w3.org/ns/activitystreams#Public"]
320 "to" => Enum.uniq(to)
323 with {:ok, _} <- Object.delete(object),
324 {:ok, activity} <- insert(data, local),
325 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
326 {:ok, _actor} <- decrease_note_count_if_public(user, object),
327 :ok <- maybe_federate(activity) do
332 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
333 ap_config = Application.get_env(:pleroma, :activitypub)
334 unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
335 outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
337 with true <- unfollow_blocked do
338 follow_activity = fetch_latest_follow(blocker, blocked)
340 if follow_activity do
341 unfollow(blocker, blocked, nil, local)
345 with true <- outgoing_blocks,
346 block_data <- make_block_data(blocker, blocked, activity_id),
347 {:ok, activity} <- insert(block_data, local),
348 :ok <- maybe_federate(activity) do
355 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
356 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
357 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
358 {:ok, activity} <- insert(unblock_data, local),
359 :ok <- maybe_federate(activity) do
373 additional = params[:additional] || %{}
375 # only accept false as false value
376 local = !(params[:local] == false)
385 |> make_flag_data(additional)
389 def fetch_activities_for_context(context, opts \\ %{}) do
390 public = ["https://www.w3.org/ns/activitystreams#Public"]
393 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
395 query = from(activity in Activity)
399 |> restrict_blocked(opts)
400 |> restrict_recipients(recipients, opts["user"])
407 "?->>'type' = ? and ?->>'context' = ?",
413 order_by: [desc: :id]
419 def fetch_public_activities(opts \\ %{}) do
420 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
423 |> restrict_unlisted()
428 @valid_visibilities ~w[direct unlisted public private]
430 defp restrict_visibility(query, %{visibility: visibility})
431 when is_list(visibility) do
432 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
438 "activity_visibility(?, ?, ?) = ANY (?)",
446 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
450 Logger.error("Could not restrict visibility to #{visibility}")
454 defp restrict_visibility(query, %{visibility: visibility})
455 when visibility in @valid_visibilities do
460 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
463 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
468 defp restrict_visibility(_query, %{visibility: visibility})
469 when visibility not in @valid_visibilities do
470 Logger.error("Could not restrict visibility to #{visibility}")
473 defp restrict_visibility(query, _visibility), do: query
475 def fetch_user_activities(user, reading_user, params \\ %{}) do
478 |> Map.put("type", ["Create", "Announce"])
479 |> Map.put("actor_id", user.ap_id)
480 |> Map.put("whole_db", true)
481 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
485 ["https://www.w3.org/ns/activitystreams#Public"] ++
486 [reading_user.ap_id | reading_user.following]
488 ["https://www.w3.org/ns/activitystreams#Public"]
491 fetch_activities(recipients, params)
495 defp restrict_since(query, %{"since_id" => ""}), do: query
497 defp restrict_since(query, %{"since_id" => since_id}) do
498 from(activity in query, where: activity.id > ^since_id)
501 defp restrict_since(query, _), do: query
503 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
504 when is_list(tag_reject) and tag_reject != [] do
507 where: fragment("(not (? #> '{\"object\",\"tag\"}') \\?| ?)", activity.data, ^tag_reject)
511 defp restrict_tag_reject(query, _), do: query
513 defp restrict_tag_all(query, %{"tag_all" => tag_all})
514 when is_list(tag_all) and tag_all != [] do
517 where: fragment("(? #> '{\"object\",\"tag\"}') \\?& ?", activity.data, ^tag_all)
521 defp restrict_tag_all(query, _), do: query
523 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
526 where: fragment("(? #> '{\"object\",\"tag\"}') \\?| ?", activity.data, ^tag)
530 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
533 where: fragment("? <@ (? #> '{\"object\",\"tag\"}')", ^tag, activity.data)
537 defp restrict_tag(query, _), do: query
539 defp restrict_to_cc(query, recipients_to, recipients_cc) do
544 "(?->'to' \\?| ?) or (?->'cc' \\?| ?)",
553 defp restrict_recipients(query, [], _user), do: query
555 defp restrict_recipients(query, recipients, nil) do
556 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
559 defp restrict_recipients(query, recipients, user) do
562 where: fragment("? && ?", ^recipients, activity.recipients),
563 or_where: activity.actor == ^user.ap_id
567 defp restrict_limit(query, %{"limit" => limit}) do
568 from(activity in query, limit: ^limit)
571 defp restrict_limit(query, _), do: query
573 defp restrict_local(query, %{"local_only" => true}) do
574 from(activity in query, where: activity.local == true)
577 defp restrict_local(query, _), do: query
579 defp restrict_max(query, %{"max_id" => ""}), do: query
581 defp restrict_max(query, %{"max_id" => max_id}) do
582 from(activity in query, where: activity.id < ^max_id)
585 defp restrict_max(query, _), do: query
587 defp restrict_actor(query, %{"actor_id" => actor_id}) do
588 from(activity in query, where: activity.actor == ^actor_id)
591 defp restrict_actor(query, _), do: query
593 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
594 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
597 defp restrict_type(query, %{"type" => type}) do
598 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
601 defp restrict_type(query, _), do: query
603 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
606 where: fragment("? <@ (? #> '{\"object\",\"likes\"}')", ^ap_id, activity.data)
610 defp restrict_favorited_by(query, _), do: query
612 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
615 where: fragment("not (? #> '{\"object\",\"attachment\"}' = ?)", activity.data, ^[])
619 defp restrict_media(query, _), do: query
621 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
624 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
628 defp restrict_replies(query, _), do: query
630 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
631 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
634 defp restrict_reblogs(query, _), do: query
636 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
638 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
643 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
644 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
648 defp restrict_muted(query, _), do: query
650 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
651 blocks = info.blocks || []
652 domain_blocks = info.domain_blocks || []
656 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
657 where: fragment("not (?->'to' \\?| ?)", activity.data, ^blocks),
658 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks)
662 defp restrict_blocked(query, _), do: query
664 defp restrict_unlisted(query) do
669 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
671 ^["https://www.w3.org/ns/activitystreams#Public"]
676 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
677 from(activity in query, where: activity.id in ^ids)
680 defp restrict_pinned(query, _), do: query
682 def fetch_activities_query(recipients, opts \\ %{}) do
685 activity in Activity,
687 order_by: [fragment("? desc nulls last", activity.id)]
691 |> restrict_recipients(recipients, opts["user"])
692 |> restrict_tag(opts)
693 |> restrict_tag_reject(opts)
694 |> restrict_tag_all(opts)
695 |> restrict_since(opts)
696 |> restrict_local(opts)
697 |> restrict_limit(opts)
698 |> restrict_max(opts)
699 |> restrict_actor(opts)
700 |> restrict_type(opts)
701 |> restrict_favorited_by(opts)
702 |> restrict_blocked(opts)
703 |> restrict_muted(opts)
704 |> restrict_media(opts)
705 |> restrict_visibility(opts)
706 |> restrict_replies(opts)
707 |> restrict_reblogs(opts)
708 |> restrict_pinned(opts)
711 def fetch_activities(recipients, opts \\ %{}) do
712 fetch_activities_query(recipients, opts)
717 def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do
718 fetch_activities_query([], opts)
719 |> restrict_to_cc(recipients_to, recipients_cc)
724 def upload(file, opts \\ []) do
725 with {:ok, data} <- Upload.store(file, opts) do
728 Map.put(data, "actor", opts[:actor])
733 Repo.insert(%Object{data: obj_data})
737 def user_data_from_user_object(data) do
739 data["icon"]["url"] &&
742 "url" => [%{"href" => data["icon"]["url"]}]
746 data["image"]["url"] &&
749 "url" => [%{"href" => data["image"]["url"]}]
752 locked = data["manuallyApprovesFollowers"] || false
753 data = Transmogrifier.maybe_fix_user_object(data)
758 "ap_enabled" => true,
759 "source_data" => data,
765 follower_address: data["followers"],
769 # nickname can be nil because of virtual actors
771 if data["preferredUsername"] do
775 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
778 Map.put(user_data, :nickname, nil)
784 def fetch_and_prepare_user_from_ap_id(ap_id) do
785 with {:ok, data} <- fetch_and_contain_remote_object_from_id(ap_id) do
786 user_data_from_user_object(data)
788 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
792 def make_user_from_ap_id(ap_id) do
793 if _user = User.get_by_ap_id(ap_id) do
794 Transmogrifier.upgrade_user_from_ap_id(ap_id)
796 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
797 User.insert_or_update_user(data)
804 def make_user_from_nickname(nickname) do
805 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
806 make_user_from_ap_id(ap_id)
808 _e -> {:error, "No AP id in WebFinger"}
812 def should_federate?(inbox, public) do
816 inbox_info = URI.parse(inbox)
817 !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
821 def publish(actor, activity) do
823 if actor.follower_address in activity.recipients do
824 {:ok, followers} = User.get_followers(actor)
825 followers |> Enum.filter(&(!&1.local))
830 public = is_public?(activity)
832 {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
833 json = Jason.encode!(data)
835 (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
836 |> Enum.filter(fn user -> User.ap_enabled?(user) end)
837 |> Enum.map(fn %{info: %{source_data: data}} ->
838 (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
841 |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
842 |> Instances.filter_reachable()
843 |> Enum.each(fn {inbox, unreachable_since} ->
844 Federator.publish_single_ap(%{
848 id: activity.data["id"],
849 unreachable_since: unreachable_since
854 def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
855 Logger.info("Federating #{id} to #{inbox}")
856 host = URI.parse(inbox).host
858 digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
861 NaiveDateTime.utc_now()
862 |> Timex.format!("{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT")
865 Pleroma.Web.HTTPSignatures.sign(actor, %{
867 "content-length": byte_size(json),
872 with {:ok, %{status: code}} when code in 200..299 <-
878 {"Content-Type", "application/activity+json"},
880 {"signature", signature},
884 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
885 do: Instances.set_reachable(inbox)
889 {_post_result, response} ->
890 unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
896 # This will create a Create activity, which we need internally at the moment.
897 def fetch_object_from_id(id) do
898 if object = Object.get_cached_by_ap_id(id) do
901 with {:ok, data} <- fetch_and_contain_remote_object_from_id(id),
902 nil <- Object.normalize(data),
907 "actor" => data["actor"] || data["attributedTo"],
910 :ok <- Transmogrifier.contain_origin(id, params),
911 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
912 {:ok, Object.normalize(activity.data["object"])}
914 {:error, {:reject, nil}} ->
917 object = %Object{} ->
921 Logger.info("Couldn't get object via AP, trying out OStatus fetching...")
923 case OStatus.fetch_activity_from_url(id) do
924 {:ok, [activity | _]} -> {:ok, Object.normalize(activity.data["object"])}
931 def fetch_and_contain_remote_object_from_id(id) do
932 Logger.info("Fetching object #{id} via AP")
934 with true <- String.starts_with?(id, "http"),
935 {:ok, %{body: body, status: code}} when code in 200..299 <-
938 [{:Accept, "application/activity+json"}]
940 {:ok, data} <- Jason.decode(body),
941 :ok <- Transmogrifier.contain_origin_from_id(id, data) do
949 # filter out broken threads
950 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
951 entire_thread_visible_for_user?(activity, user)
954 # do post-processing on a specific activity
955 def contain_activity(%Activity{} = activity, %User{} = user) do
956 contain_broken_threads(activity, user)
959 # do post-processing on a timeline
960 def contain_timeline(timeline, user) do
962 |> Enum.filter(fn activity ->
963 contain_activity(activity, user)