1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
11 alias Pleroma.Notification
12 alias Pleroma.Instances
13 alias Pleroma.Web.ActivityPub.Transmogrifier
14 alias Pleroma.Web.ActivityPub.MRF
15 alias Pleroma.Web.WebFinger
16 alias Pleroma.Web.Federator
17 alias Pleroma.Web.OStatus
20 import Pleroma.Web.ActivityPub.Utils
21 import Pleroma.Web.ActivityPub.Visibility
25 @httpoison Application.get_env(:pleroma, :httpoison)
27 # For Announce activities, we filter the recipients based on following status for any actors
28 # that match actual users. See issue #164 for more information about why this is necessary.
29 defp get_recipients(%{"type" => "Announce"} = data) do
32 actor = User.get_cached_by_ap_id(data["actor"])
36 |> Enum.filter(fn recipient ->
37 case User.get_cached_by_ap_id(recipient) do
42 User.following?(user, actor)
49 defp get_recipients(%{"type" => "Create"} = data) do
52 actor = data["actor"] || []
53 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
57 defp get_recipients(data) do
64 defp check_actor_is_active(actor) do
65 if not is_nil(actor) do
66 with user <- User.get_cached_by_ap_id(actor),
67 false <- user.info.deactivated do
77 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
78 limit = Pleroma.Config.get([:instance, :remote_limit])
79 String.length(content) <= limit
82 defp check_remote_limit(_), do: true
84 def insert(map, local \\ true) when is_map(map) do
85 with nil <- Activity.normalize(map),
86 map <- lazy_put_activity_defaults(map),
87 :ok <- check_actor_is_active(map["actor"]),
88 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
89 {:ok, map} <- MRF.filter(map),
90 :ok <- insert_full_object(map) do
91 {recipients, _, _} = get_recipients(map)
94 Repo.insert(%Activity{
98 recipients: recipients
102 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
105 Notification.create_notifications(activity)
109 %Activity{} = activity -> {:ok, activity}
110 error -> {:error, error}
114 def stream_out(activity) do
115 public = "https://www.w3.org/ns/activitystreams#Public"
117 if activity.data["type"] in ["Create", "Announce", "Delete"] do
118 Pleroma.Web.Streamer.stream("user", activity)
119 Pleroma.Web.Streamer.stream("list", activity)
121 if Enum.member?(activity.data["to"], public) do
122 Pleroma.Web.Streamer.stream("public", activity)
125 Pleroma.Web.Streamer.stream("public:local", activity)
128 if activity.data["type"] in ["Create"] do
129 activity.data["object"]
130 |> Map.get("tag", [])
131 |> Enum.filter(fn tag -> is_bitstring(tag) end)
132 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
134 if activity.data["object"]["attachment"] != [] do
135 Pleroma.Web.Streamer.stream("public:media", activity)
138 Pleroma.Web.Streamer.stream("public:local:media", activity)
143 if !Enum.member?(activity.data["cc"] || [], public) &&
146 User.get_by_ap_id(activity.data["actor"]).follower_address
148 do: Pleroma.Web.Streamer.stream("direct", activity)
153 def create(%{to: to, actor: actor, context: context, object: object} = params) do
154 additional = params[:additional] || %{}
155 # only accept false as false value
156 local = !(params[:local] == false)
157 published = params[:published]
161 %{to: to, actor: actor, published: published, context: context, object: object},
164 {:ok, activity} <- insert(create_data, local),
165 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
166 {:ok, _actor} <- User.increase_note_count(actor),
167 :ok <- maybe_federate(activity) do
172 def accept(%{to: to, actor: actor, object: object} = params) do
173 # only accept false as false value
174 local = !(params[:local] == false)
176 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
177 {:ok, activity} <- insert(data, local),
178 :ok <- maybe_federate(activity),
179 _ <- User.update_follow_request_count(actor) do
184 def reject(%{to: to, actor: actor, object: object} = params) do
185 # only accept false as false value
186 local = !(params[:local] == false)
188 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
189 {:ok, activity} <- insert(data, local),
190 :ok <- maybe_federate(activity),
191 _ <- User.update_follow_request_count(actor) do
196 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
197 # only accept false as false value
198 local = !(params[:local] == false)
207 {:ok, activity} <- insert(data, local),
208 :ok <- maybe_federate(activity) do
213 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
215 %User{ap_id: ap_id} = user,
216 %Object{data: %{"id" => _}} = object,
220 with nil <- get_existing_like(ap_id, object),
221 like_data <- make_like_data(user, object, activity_id),
222 {:ok, activity} <- insert(like_data, local),
223 {:ok, object} <- add_like_to_object(activity, object),
224 :ok <- maybe_federate(activity) do
225 {:ok, activity, object}
227 %Activity{} = activity -> {:ok, activity, object}
228 error -> {:error, error}
238 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
239 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
240 {:ok, unlike_activity} <- insert(unlike_data, local),
241 {:ok, _activity} <- Repo.delete(like_activity),
242 {:ok, object} <- remove_like_from_object(like_activity, object),
243 :ok <- maybe_federate(unlike_activity) do
244 {:ok, unlike_activity, like_activity, object}
251 %User{ap_id: _} = user,
252 %Object{data: %{"id" => _}} = object,
257 with true <- is_public?(object),
258 announce_data <- make_announce_data(user, object, activity_id, public),
259 {:ok, activity} <- insert(announce_data, local),
260 {:ok, object} <- add_announce_to_object(activity, object),
261 :ok <- maybe_federate(activity) do
262 {:ok, activity, object}
264 error -> {:error, error}
274 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
275 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
276 {:ok, unannounce_activity} <- insert(unannounce_data, local),
277 :ok <- maybe_federate(unannounce_activity),
278 {:ok, _activity} <- Repo.delete(announce_activity),
279 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
280 {:ok, unannounce_activity, object}
286 def follow(follower, followed, activity_id \\ nil, local \\ true) do
287 with data <- make_follow_data(follower, followed, activity_id),
288 {:ok, activity} <- insert(data, local),
289 :ok <- maybe_federate(activity),
290 _ <- User.update_follow_request_count(followed) do
295 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
296 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
297 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
298 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
299 {:ok, activity} <- insert(unfollow_data, local),
300 :ok <- maybe_federate(activity),
301 _ <- User.update_follow_request_count(followed) do
306 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
307 user = User.get_cached_by_ap_id(actor)
313 "to" => [user.follower_address, "https://www.w3.org/ns/activitystreams#Public"]
316 with {:ok, _} <- Object.delete(object),
317 {:ok, activity} <- insert(data, local),
318 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
319 {:ok, _actor} <- User.decrease_note_count(user),
320 :ok <- maybe_federate(activity) do
325 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
326 ap_config = Application.get_env(:pleroma, :activitypub)
327 unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
328 outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
330 with true <- unfollow_blocked do
331 follow_activity = fetch_latest_follow(blocker, blocked)
333 if follow_activity do
334 unfollow(blocker, blocked, nil, local)
338 with true <- outgoing_blocks,
339 block_data <- make_block_data(blocker, blocked, activity_id),
340 {:ok, activity} <- insert(block_data, local),
341 :ok <- maybe_federate(activity) do
348 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
349 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
350 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
351 {:ok, activity} <- insert(unblock_data, local),
352 :ok <- maybe_federate(activity) do
366 additional = params[:additional] || %{}
368 # only accept false as false value
369 local = !(params[:local] == false)
378 |> make_flag_data(additional)
382 def fetch_activities_for_context(context, opts \\ %{}) do
383 public = ["https://www.w3.org/ns/activitystreams#Public"]
386 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
388 query = from(activity in Activity)
392 |> restrict_blocked(opts)
393 |> restrict_recipients(recipients, opts["user"])
400 "?->>'type' = ? and ?->>'context' = ?",
406 order_by: [desc: :id]
412 def fetch_public_activities(opts \\ %{}) do
413 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
416 |> restrict_unlisted()
421 @valid_visibilities ~w[direct unlisted public private]
423 defp restrict_visibility(query, %{visibility: visibility})
424 when visibility in @valid_visibilities do
429 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
432 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
437 defp restrict_visibility(_query, %{visibility: visibility})
438 when visibility not in @valid_visibilities do
439 Logger.error("Could not restrict visibility to #{visibility}")
442 defp restrict_visibility(query, _visibility), do: query
444 def fetch_user_activities(user, reading_user, params \\ %{}) do
447 |> Map.put("type", ["Create", "Announce"])
448 |> Map.put("actor_id", user.ap_id)
449 |> Map.put("whole_db", true)
450 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
454 ["https://www.w3.org/ns/activitystreams#Public"] ++
455 [reading_user.ap_id | reading_user.following]
457 ["https://www.w3.org/ns/activitystreams#Public"]
460 fetch_activities(recipients, params)
464 defp restrict_since(query, %{"since_id" => ""}), do: query
466 defp restrict_since(query, %{"since_id" => since_id}) do
467 from(activity in query, where: activity.id > ^since_id)
470 defp restrict_since(query, _), do: query
472 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
473 when is_list(tag_reject) and tag_reject != [] do
476 where: fragment("(not (? #> '{\"object\",\"tag\"}') \\?| ?)", activity.data, ^tag_reject)
480 defp restrict_tag_reject(query, _), do: query
482 defp restrict_tag_all(query, %{"tag_all" => tag_all})
483 when is_list(tag_all) and tag_all != [] do
486 where: fragment("(? #> '{\"object\",\"tag\"}') \\?& ?", activity.data, ^tag_all)
490 defp restrict_tag_all(query, _), do: query
492 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
495 where: fragment("(? #> '{\"object\",\"tag\"}') \\?| ?", activity.data, ^tag)
499 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
502 where: fragment("? <@ (? #> '{\"object\",\"tag\"}')", ^tag, activity.data)
506 defp restrict_tag(query, _), do: query
508 defp restrict_to_cc(query, recipients_to, recipients_cc) do
513 "(?->'to' \\?| ?) or (?->'cc' \\?| ?)",
522 defp restrict_recipients(query, [], _user), do: query
524 defp restrict_recipients(query, recipients, nil) do
525 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
528 defp restrict_recipients(query, recipients, user) do
531 where: fragment("? && ?", ^recipients, activity.recipients),
532 or_where: activity.actor == ^user.ap_id
536 defp restrict_limit(query, %{"limit" => limit}) do
537 from(activity in query, limit: ^limit)
540 defp restrict_limit(query, _), do: query
542 defp restrict_local(query, %{"local_only" => true}) do
543 from(activity in query, where: activity.local == true)
546 defp restrict_local(query, _), do: query
548 defp restrict_max(query, %{"max_id" => ""}), do: query
550 defp restrict_max(query, %{"max_id" => max_id}) do
551 from(activity in query, where: activity.id < ^max_id)
554 defp restrict_max(query, _), do: query
556 defp restrict_actor(query, %{"actor_id" => actor_id}) do
557 from(activity in query, where: activity.actor == ^actor_id)
560 defp restrict_actor(query, _), do: query
562 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
563 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
566 defp restrict_type(query, %{"type" => type}) do
567 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
570 defp restrict_type(query, _), do: query
572 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
575 where: fragment("? <@ (? #> '{\"object\",\"likes\"}')", ^ap_id, activity.data)
579 defp restrict_favorited_by(query, _), do: query
581 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
584 where: fragment("not (? #> '{\"object\",\"attachment\"}' = ?)", activity.data, ^[])
588 defp restrict_media(query, _), do: query
590 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
593 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
597 defp restrict_replies(query, _), do: query
599 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
600 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
603 defp restrict_reblogs(query, _), do: query
605 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
607 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
612 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
613 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
617 defp restrict_muted(query, _), do: query
619 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
620 blocks = info.blocks || []
621 domain_blocks = info.domain_blocks || []
625 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
626 where: fragment("not (?->'to' \\?| ?)", activity.data, ^blocks),
627 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks)
631 defp restrict_blocked(query, _), do: query
633 defp restrict_unlisted(query) do
638 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
640 ^["https://www.w3.org/ns/activitystreams#Public"]
645 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
646 from(activity in query, where: activity.id in ^ids)
649 defp restrict_pinned(query, _), do: query
651 def fetch_activities_query(recipients, opts \\ %{}) do
654 activity in Activity,
656 order_by: [fragment("? desc nulls last", activity.id)]
660 |> restrict_recipients(recipients, opts["user"])
661 |> restrict_tag(opts)
662 |> restrict_tag_reject(opts)
663 |> restrict_tag_all(opts)
664 |> restrict_since(opts)
665 |> restrict_local(opts)
666 |> restrict_limit(opts)
667 |> restrict_max(opts)
668 |> restrict_actor(opts)
669 |> restrict_type(opts)
670 |> restrict_favorited_by(opts)
671 |> restrict_blocked(opts)
672 |> restrict_muted(opts)
673 |> restrict_media(opts)
674 |> restrict_visibility(opts)
675 |> restrict_replies(opts)
676 |> restrict_reblogs(opts)
677 |> restrict_pinned(opts)
680 def fetch_activities(recipients, opts \\ %{}) do
681 fetch_activities_query(recipients, opts)
686 def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do
687 fetch_activities_query([], opts)
688 |> restrict_to_cc(recipients_to, recipients_cc)
693 def upload(file, opts \\ []) do
694 with {:ok, data} <- Upload.store(file, opts) do
697 Map.put(data, "actor", opts[:actor])
702 Repo.insert(%Object{data: obj_data})
706 def user_data_from_user_object(data) do
708 data["icon"]["url"] &&
711 "url" => [%{"href" => data["icon"]["url"]}]
715 data["image"]["url"] &&
718 "url" => [%{"href" => data["image"]["url"]}]
721 locked = data["manuallyApprovesFollowers"] || false
722 data = Transmogrifier.maybe_fix_user_object(data)
727 "ap_enabled" => true,
728 "source_data" => data,
734 follower_address: data["followers"],
738 # nickname can be nil because of virtual actors
740 if data["preferredUsername"] do
744 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
747 Map.put(user_data, :nickname, nil)
753 def fetch_and_prepare_user_from_ap_id(ap_id) do
754 with {:ok, data} <- fetch_and_contain_remote_object_from_id(ap_id) do
755 user_data_from_user_object(data)
757 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
761 def make_user_from_ap_id(ap_id) do
762 if _user = User.get_by_ap_id(ap_id) do
763 Transmogrifier.upgrade_user_from_ap_id(ap_id)
765 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
766 User.insert_or_update_user(data)
773 def make_user_from_nickname(nickname) do
774 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
775 make_user_from_ap_id(ap_id)
777 _e -> {:error, "No AP id in WebFinger"}
781 def should_federate?(inbox, public) do
785 inbox_info = URI.parse(inbox)
786 !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
790 def publish(actor, activity) do
792 if actor.follower_address in activity.recipients do
793 {:ok, followers} = User.get_followers(actor)
794 followers |> Enum.filter(&(!&1.local))
799 public = is_public?(activity)
801 {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
802 json = Jason.encode!(data)
804 (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
805 |> Enum.filter(fn user -> User.ap_enabled?(user) end)
806 |> Enum.map(fn %{info: %{source_data: data}} ->
807 (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
810 |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
811 |> Instances.filter_reachable()
812 |> Enum.each(fn {inbox, unreachable_since} ->
813 Federator.publish_single_ap(%{
817 id: activity.data["id"],
818 unreachable_since: unreachable_since
823 def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
824 Logger.info("Federating #{id} to #{inbox}")
825 host = URI.parse(inbox).host
827 digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
830 NaiveDateTime.utc_now()
831 |> Timex.format!("{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT")
834 Pleroma.Web.HTTPSignatures.sign(actor, %{
836 "content-length": byte_size(json),
841 with {:ok, %{status: code}} when code in 200..299 <-
847 {"Content-Type", "application/activity+json"},
849 {"signature", signature},
853 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
854 do: Instances.set_reachable(inbox)
858 {_post_result, response} ->
859 unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
865 # This will create a Create activity, which we need internally at the moment.
866 def fetch_object_from_id(id) do
867 if object = Object.get_cached_by_ap_id(id) do
870 with {:ok, data} <- fetch_and_contain_remote_object_from_id(id),
871 nil <- Object.normalize(data),
876 "actor" => data["actor"] || data["attributedTo"],
879 :ok <- Transmogrifier.contain_origin(id, params),
880 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
881 {:ok, Object.normalize(activity.data["object"])}
883 {:error, {:reject, nil}} ->
886 object = %Object{} ->
890 Logger.info("Couldn't get object via AP, trying out OStatus fetching...")
892 case OStatus.fetch_activity_from_url(id) do
893 {:ok, [activity | _]} -> {:ok, Object.normalize(activity.data["object"])}
900 def fetch_and_contain_remote_object_from_id(id) do
901 Logger.info("Fetching object #{id} via AP")
903 with true <- String.starts_with?(id, "http"),
904 {:ok, %{body: body, status: code}} when code in 200..299 <-
907 [{:Accept, "application/activity+json"}]
909 {:ok, data} <- Jason.decode(body),
910 :ok <- Transmogrifier.contain_origin_from_id(id, data) do
918 # filter out broken threads
919 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
920 entire_thread_visible_for_user?(activity, user)
923 # do post-processing on a specific activity
924 def contain_activity(%Activity{} = activity, %User{} = user) do
925 contain_broken_threads(activity, user)
928 # do post-processing on a timeline
929 def contain_timeline(timeline, user) do
931 |> Enum.filter(fn activity ->
932 contain_activity(activity, user)