1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
11 alias Pleroma.Notification
12 alias Pleroma.Instances
13 alias Pleroma.Web.ActivityPub.Transmogrifier
14 alias Pleroma.Web.ActivityPub.MRF
15 alias Pleroma.Web.WebFinger
16 alias Pleroma.Web.Federator
17 alias Pleroma.Web.OStatus
20 import Pleroma.Web.ActivityPub.Utils
21 import Pleroma.Web.ActivityPub.Visibility
25 @httpoison Application.get_env(:pleroma, :httpoison)
27 # For Announce activities, we filter the recipients based on following status for any actors
28 # that match actual users. See issue #164 for more information about why this is necessary.
29 defp get_recipients(%{"type" => "Announce"} = data) do
32 actor = User.get_cached_by_ap_id(data["actor"])
36 |> Enum.filter(fn recipient ->
37 case User.get_cached_by_ap_id(recipient) do
42 User.following?(user, actor)
49 defp get_recipients(%{"type" => "Create"} = data) do
52 actor = data["actor"] || []
53 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
57 defp get_recipients(data) do
64 defp check_actor_is_active(actor) do
65 if not is_nil(actor) do
66 with user <- User.get_cached_by_ap_id(actor),
67 false <- user.info.deactivated do
77 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
78 limit = Pleroma.Config.get([:instance, :remote_limit])
79 String.length(content) <= limit
82 defp check_remote_limit(_), do: true
84 def increase_note_count_if_public(actor, object) do
85 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
88 def decrease_note_count_if_public(actor, object) do
89 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
92 def insert(map, local \\ true) when is_map(map) do
93 with nil <- Activity.normalize(map),
94 map <- lazy_put_activity_defaults(map),
95 :ok <- check_actor_is_active(map["actor"]),
96 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
97 {:ok, map} <- MRF.filter(map),
98 :ok <- insert_full_object(map) do
99 {recipients, _, _} = get_recipients(map)
102 Repo.insert(%Activity{
106 recipients: recipients
110 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
113 Notification.create_notifications(activity)
117 %Activity{} = activity -> {:ok, activity}
118 error -> {:error, error}
122 def stream_out(activity) do
123 public = "https://www.w3.org/ns/activitystreams#Public"
125 if activity.data["type"] in ["Create", "Announce", "Delete"] do
126 Pleroma.Web.Streamer.stream("user", activity)
127 Pleroma.Web.Streamer.stream("list", activity)
129 if Enum.member?(activity.data["to"], public) do
130 Pleroma.Web.Streamer.stream("public", activity)
133 Pleroma.Web.Streamer.stream("public:local", activity)
136 if activity.data["type"] in ["Create"] do
137 activity.data["object"]
138 |> Map.get("tag", [])
139 |> Enum.filter(fn tag -> is_bitstring(tag) end)
140 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
142 if activity.data["object"]["attachment"] != [] do
143 Pleroma.Web.Streamer.stream("public:media", activity)
146 Pleroma.Web.Streamer.stream("public:local:media", activity)
151 if !Enum.member?(activity.data["cc"] || [], public) &&
154 User.get_by_ap_id(activity.data["actor"]).follower_address
156 do: Pleroma.Web.Streamer.stream("direct", activity)
161 def create(%{to: to, actor: actor, context: context, object: object} = params) do
162 additional = params[:additional] || %{}
163 # only accept false as false value
164 local = !(params[:local] == false)
165 published = params[:published]
169 %{to: to, actor: actor, published: published, context: context, object: object},
172 {:ok, activity} <- insert(create_data, local),
173 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
174 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
175 :ok <- maybe_federate(activity) do
180 def accept(%{to: to, actor: actor, object: object} = params) do
181 # only accept false as false value
182 local = !(params[:local] == false)
184 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
185 {:ok, activity} <- insert(data, local),
186 :ok <- maybe_federate(activity),
187 _ <- User.update_follow_request_count(actor) do
192 def reject(%{to: to, actor: actor, object: object} = params) do
193 # only accept false as false value
194 local = !(params[:local] == false)
196 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
197 {:ok, activity} <- insert(data, local),
198 :ok <- maybe_federate(activity),
199 _ <- User.update_follow_request_count(actor) do
204 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
205 # only accept false as false value
206 local = !(params[:local] == false)
215 {:ok, activity} <- insert(data, local),
216 :ok <- maybe_federate(activity) do
221 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
223 %User{ap_id: ap_id} = user,
224 %Object{data: %{"id" => _}} = object,
228 with nil <- get_existing_like(ap_id, object),
229 like_data <- make_like_data(user, object, activity_id),
230 {:ok, activity} <- insert(like_data, local),
231 {:ok, object} <- add_like_to_object(activity, object),
232 :ok <- maybe_federate(activity) do
233 {:ok, activity, object}
235 %Activity{} = activity -> {:ok, activity, object}
236 error -> {:error, error}
246 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
247 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
248 {:ok, unlike_activity} <- insert(unlike_data, local),
249 {:ok, _activity} <- Repo.delete(like_activity),
250 {:ok, object} <- remove_like_from_object(like_activity, object),
251 :ok <- maybe_federate(unlike_activity) do
252 {:ok, unlike_activity, like_activity, object}
259 %User{ap_id: _} = user,
260 %Object{data: %{"id" => _}} = object,
265 with true <- is_public?(object),
266 announce_data <- make_announce_data(user, object, activity_id, public),
267 {:ok, activity} <- insert(announce_data, local),
268 {:ok, object} <- add_announce_to_object(activity, object),
269 :ok <- maybe_federate(activity) do
270 {:ok, activity, object}
272 error -> {:error, error}
282 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
283 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
284 {:ok, unannounce_activity} <- insert(unannounce_data, local),
285 :ok <- maybe_federate(unannounce_activity),
286 {:ok, _activity} <- Repo.delete(announce_activity),
287 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
288 {:ok, unannounce_activity, object}
294 def follow(follower, followed, activity_id \\ nil, local \\ true) do
295 with data <- make_follow_data(follower, followed, activity_id),
296 {:ok, activity} <- insert(data, local),
297 :ok <- maybe_federate(activity),
298 _ <- User.update_follow_request_count(followed) do
303 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
304 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
305 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
306 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
307 {:ok, activity} <- insert(unfollow_data, local),
308 :ok <- maybe_federate(activity),
309 _ <- User.update_follow_request_count(followed) do
314 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
315 user = User.get_cached_by_ap_id(actor)
321 "to" => [user.follower_address, "https://www.w3.org/ns/activitystreams#Public"]
324 with {:ok, _} <- Object.delete(object),
325 {:ok, activity} <- insert(data, local),
326 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
327 {:ok, _actor} <- decrease_note_count_if_public(user, object),
328 :ok <- maybe_federate(activity) do
333 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
334 ap_config = Application.get_env(:pleroma, :activitypub)
335 unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
336 outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
338 with true <- unfollow_blocked do
339 follow_activity = fetch_latest_follow(blocker, blocked)
341 if follow_activity do
342 unfollow(blocker, blocked, nil, local)
346 with true <- outgoing_blocks,
347 block_data <- make_block_data(blocker, blocked, activity_id),
348 {:ok, activity} <- insert(block_data, local),
349 :ok <- maybe_federate(activity) do
356 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
357 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
358 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
359 {:ok, activity} <- insert(unblock_data, local),
360 :ok <- maybe_federate(activity) do
374 additional = params[:additional] || %{}
376 # only accept false as false value
377 local = !(params[:local] == false)
386 |> make_flag_data(additional)
390 def fetch_activities_for_context(context, opts \\ %{}) do
391 public = ["https://www.w3.org/ns/activitystreams#Public"]
394 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
396 query = from(activity in Activity)
400 |> restrict_blocked(opts)
401 |> restrict_recipients(recipients, opts["user"])
408 "?->>'type' = ? and ?->>'context' = ?",
414 order_by: [desc: :id]
420 def fetch_public_activities(opts \\ %{}) do
421 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
424 |> restrict_unlisted()
429 @valid_visibilities ~w[direct unlisted public private]
431 defp restrict_visibility(query, %{visibility: visibility})
432 when visibility in @valid_visibilities do
437 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
440 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
445 defp restrict_visibility(_query, %{visibility: visibility})
446 when visibility not in @valid_visibilities do
447 Logger.error("Could not restrict visibility to #{visibility}")
450 defp restrict_visibility(query, _visibility), do: query
452 def fetch_user_activities(user, reading_user, params \\ %{}) do
455 |> Map.put("type", ["Create", "Announce"])
456 |> Map.put("actor_id", user.ap_id)
457 |> Map.put("whole_db", true)
458 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
462 ["https://www.w3.org/ns/activitystreams#Public"] ++
463 [reading_user.ap_id | reading_user.following]
465 ["https://www.w3.org/ns/activitystreams#Public"]
468 fetch_activities(recipients, params)
472 defp restrict_since(query, %{"since_id" => ""}), do: query
474 defp restrict_since(query, %{"since_id" => since_id}) do
475 from(activity in query, where: activity.id > ^since_id)
478 defp restrict_since(query, _), do: query
480 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
481 when is_list(tag_reject) and tag_reject != [] do
484 where: fragment("(not (? #> '{\"object\",\"tag\"}') \\?| ?)", activity.data, ^tag_reject)
488 defp restrict_tag_reject(query, _), do: query
490 defp restrict_tag_all(query, %{"tag_all" => tag_all})
491 when is_list(tag_all) and tag_all != [] do
494 where: fragment("(? #> '{\"object\",\"tag\"}') \\?& ?", activity.data, ^tag_all)
498 defp restrict_tag_all(query, _), do: query
500 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
503 where: fragment("(? #> '{\"object\",\"tag\"}') \\?| ?", activity.data, ^tag)
507 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
510 where: fragment("? <@ (? #> '{\"object\",\"tag\"}')", ^tag, activity.data)
514 defp restrict_tag(query, _), do: query
516 defp restrict_to_cc(query, recipients_to, recipients_cc) do
521 "(?->'to' \\?| ?) or (?->'cc' \\?| ?)",
530 defp restrict_recipients(query, [], _user), do: query
532 defp restrict_recipients(query, recipients, nil) do
533 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
536 defp restrict_recipients(query, recipients, user) do
539 where: fragment("? && ?", ^recipients, activity.recipients),
540 or_where: activity.actor == ^user.ap_id
544 defp restrict_limit(query, %{"limit" => limit}) do
545 from(activity in query, limit: ^limit)
548 defp restrict_limit(query, _), do: query
550 defp restrict_local(query, %{"local_only" => true}) do
551 from(activity in query, where: activity.local == true)
554 defp restrict_local(query, _), do: query
556 defp restrict_max(query, %{"max_id" => ""}), do: query
558 defp restrict_max(query, %{"max_id" => max_id}) do
559 from(activity in query, where: activity.id < ^max_id)
562 defp restrict_max(query, _), do: query
564 defp restrict_actor(query, %{"actor_id" => actor_id}) do
565 from(activity in query, where: activity.actor == ^actor_id)
568 defp restrict_actor(query, _), do: query
570 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
571 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
574 defp restrict_type(query, %{"type" => type}) do
575 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
578 defp restrict_type(query, _), do: query
580 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
583 where: fragment("? <@ (? #> '{\"object\",\"likes\"}')", ^ap_id, activity.data)
587 defp restrict_favorited_by(query, _), do: query
589 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
592 where: fragment("not (? #> '{\"object\",\"attachment\"}' = ?)", activity.data, ^[])
596 defp restrict_media(query, _), do: query
598 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
601 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
605 defp restrict_replies(query, _), do: query
607 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
608 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
611 defp restrict_reblogs(query, _), do: query
613 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
615 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
620 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
621 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
625 defp restrict_muted(query, _), do: query
627 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
628 blocks = info.blocks || []
629 domain_blocks = info.domain_blocks || []
633 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
634 where: fragment("not (?->'to' \\?| ?)", activity.data, ^blocks),
635 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks)
639 defp restrict_blocked(query, _), do: query
641 defp restrict_unlisted(query) do
646 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
648 ^["https://www.w3.org/ns/activitystreams#Public"]
653 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
654 from(activity in query, where: activity.id in ^ids)
657 defp restrict_pinned(query, _), do: query
659 def fetch_activities_query(recipients, opts \\ %{}) do
662 activity in Activity,
664 order_by: [fragment("? desc nulls last", activity.id)]
668 |> restrict_recipients(recipients, opts["user"])
669 |> restrict_tag(opts)
670 |> restrict_tag_reject(opts)
671 |> restrict_tag_all(opts)
672 |> restrict_since(opts)
673 |> restrict_local(opts)
674 |> restrict_limit(opts)
675 |> restrict_max(opts)
676 |> restrict_actor(opts)
677 |> restrict_type(opts)
678 |> restrict_favorited_by(opts)
679 |> restrict_blocked(opts)
680 |> restrict_muted(opts)
681 |> restrict_media(opts)
682 |> restrict_visibility(opts)
683 |> restrict_replies(opts)
684 |> restrict_reblogs(opts)
685 |> restrict_pinned(opts)
688 def fetch_activities(recipients, opts \\ %{}) do
689 fetch_activities_query(recipients, opts)
694 def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do
695 fetch_activities_query([], opts)
696 |> restrict_to_cc(recipients_to, recipients_cc)
701 def upload(file, opts \\ []) do
702 with {:ok, data} <- Upload.store(file, opts) do
705 Map.put(data, "actor", opts[:actor])
710 Repo.insert(%Object{data: obj_data})
714 def user_data_from_user_object(data) do
716 data["icon"]["url"] &&
719 "url" => [%{"href" => data["icon"]["url"]}]
723 data["image"]["url"] &&
726 "url" => [%{"href" => data["image"]["url"]}]
729 locked = data["manuallyApprovesFollowers"] || false
730 data = Transmogrifier.maybe_fix_user_object(data)
735 "ap_enabled" => true,
736 "source_data" => data,
742 follower_address: data["followers"],
746 # nickname can be nil because of virtual actors
748 if data["preferredUsername"] do
752 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
755 Map.put(user_data, :nickname, nil)
761 def fetch_and_prepare_user_from_ap_id(ap_id) do
762 with {:ok, data} <- fetch_and_contain_remote_object_from_id(ap_id) do
763 user_data_from_user_object(data)
765 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
769 def make_user_from_ap_id(ap_id) do
770 if _user = User.get_by_ap_id(ap_id) do
771 Transmogrifier.upgrade_user_from_ap_id(ap_id)
773 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
774 User.insert_or_update_user(data)
781 def make_user_from_nickname(nickname) do
782 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
783 make_user_from_ap_id(ap_id)
785 _e -> {:error, "No AP id in WebFinger"}
789 def should_federate?(inbox, public) do
793 inbox_info = URI.parse(inbox)
794 !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
798 def publish(actor, activity) do
800 if actor.follower_address in activity.recipients do
801 {:ok, followers} = User.get_followers(actor)
802 followers |> Enum.filter(&(!&1.local))
807 public = is_public?(activity)
809 {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
810 json = Jason.encode!(data)
812 (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
813 |> Enum.filter(fn user -> User.ap_enabled?(user) end)
814 |> Enum.map(fn %{info: %{source_data: data}} ->
815 (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
818 |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
819 |> Instances.filter_reachable()
820 |> Enum.each(fn {inbox, unreachable_since} ->
821 Federator.publish_single_ap(%{
825 id: activity.data["id"],
826 unreachable_since: unreachable_since
831 def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
832 Logger.info("Federating #{id} to #{inbox}")
833 host = URI.parse(inbox).host
835 digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
838 NaiveDateTime.utc_now()
839 |> Timex.format!("{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT")
842 Pleroma.Web.HTTPSignatures.sign(actor, %{
844 "content-length": byte_size(json),
849 with {:ok, %{status: code}} when code in 200..299 <-
855 {"Content-Type", "application/activity+json"},
857 {"signature", signature},
861 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
862 do: Instances.set_reachable(inbox)
866 {_post_result, response} ->
867 unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
873 # This will create a Create activity, which we need internally at the moment.
874 def fetch_object_from_id(id) do
875 if object = Object.get_cached_by_ap_id(id) do
878 with {:ok, data} <- fetch_and_contain_remote_object_from_id(id),
879 nil <- Object.normalize(data),
884 "actor" => data["actor"] || data["attributedTo"],
887 :ok <- Transmogrifier.contain_origin(id, params),
888 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
889 {:ok, Object.normalize(activity.data["object"])}
891 {:error, {:reject, nil}} ->
894 object = %Object{} ->
898 Logger.info("Couldn't get object via AP, trying out OStatus fetching...")
900 case OStatus.fetch_activity_from_url(id) do
901 {:ok, [activity | _]} -> {:ok, Object.normalize(activity.data["object"])}
908 def fetch_and_contain_remote_object_from_id(id) do
909 Logger.info("Fetching object #{id} via AP")
911 with true <- String.starts_with?(id, "http"),
912 {:ok, %{body: body, status: code}} when code in 200..299 <-
915 [{:Accept, "application/activity+json"}]
917 {:ok, data} <- Jason.decode(body),
918 :ok <- Transmogrifier.contain_origin_from_id(id, data) do
926 # filter out broken threads
927 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
928 entire_thread_visible_for_user?(activity, user)
931 # do post-processing on a specific activity
932 def contain_activity(%Activity{} = activity, %User{} = user) do
933 contain_broken_threads(activity, user)
936 # do post-processing on a timeline
937 def contain_timeline(timeline, user) do
939 |> Enum.filter(fn activity ->
940 contain_activity(activity, user)