1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
11 alias Pleroma.Notification
12 alias Pleroma.Instances
13 alias Pleroma.Web.ActivityPub.Transmogrifier
14 alias Pleroma.Web.ActivityPub.MRF
15 alias Pleroma.Web.WebFinger
16 alias Pleroma.Web.Federator
17 alias Pleroma.Web.OStatus
20 import Pleroma.Web.ActivityPub.Utils
21 import Pleroma.Web.ActivityPub.Visibility
25 @httpoison Application.get_env(:pleroma, :httpoison)
27 # For Announce activities, we filter the recipients based on following status for any actors
28 # that match actual users. See issue #164 for more information about why this is necessary.
29 defp get_recipients(%{"type" => "Announce"} = data) do
32 actor = User.get_cached_by_ap_id(data["actor"])
36 |> Enum.filter(fn recipient ->
37 case User.get_cached_by_ap_id(recipient) do
42 User.following?(user, actor)
49 defp get_recipients(%{"type" => "Create"} = data) do
52 actor = data["actor"] || []
53 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
57 defp get_recipients(data) do
64 defp check_actor_is_active(actor) do
65 if not is_nil(actor) do
66 with user <- User.get_cached_by_ap_id(actor),
67 false <- user.info.deactivated do
77 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
78 limit = Pleroma.Config.get([:instance, :remote_limit])
79 String.length(content) <= limit
82 defp check_remote_limit(_), do: true
84 def increase_note_count_if_public(actor, object) do
85 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
88 def decrease_note_count_if_public(actor, object) do
89 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
92 def insert(map, local \\ true) when is_map(map) do
93 with nil <- Activity.normalize(map),
94 map <- lazy_put_activity_defaults(map),
95 :ok <- check_actor_is_active(map["actor"]),
96 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
97 {:ok, map} <- MRF.filter(map),
98 :ok <- insert_full_object(map) do
99 {recipients, _, _} = get_recipients(map)
102 Repo.insert(%Activity{
106 recipients: recipients
110 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
113 Notification.create_notifications(activity)
117 %Activity{} = activity -> {:ok, activity}
118 error -> {:error, error}
122 def stream_out(activity) do
123 public = "https://www.w3.org/ns/activitystreams#Public"
125 if activity.data["type"] in ["Create", "Announce", "Delete"] do
126 Pleroma.Web.Streamer.stream("user", activity)
127 Pleroma.Web.Streamer.stream("list", activity)
129 if Enum.member?(activity.data["to"], public) do
130 Pleroma.Web.Streamer.stream("public", activity)
133 Pleroma.Web.Streamer.stream("public:local", activity)
136 if activity.data["type"] in ["Create"] do
137 activity.data["object"]
138 |> Map.get("tag", [])
139 |> Enum.filter(fn tag -> is_bitstring(tag) end)
140 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
142 if activity.data["object"]["attachment"] != [] do
143 Pleroma.Web.Streamer.stream("public:media", activity)
146 Pleroma.Web.Streamer.stream("public:local:media", activity)
151 if !Enum.member?(activity.data["cc"] || [], public) &&
154 User.get_by_ap_id(activity.data["actor"]).follower_address
156 do: Pleroma.Web.Streamer.stream("direct", activity)
161 def create(%{to: to, actor: actor, context: context, object: object} = params) do
162 additional = params[:additional] || %{}
163 # only accept false as false value
164 local = !(params[:local] == false)
165 published = params[:published]
169 %{to: to, actor: actor, published: published, context: context, object: object},
172 {:ok, activity} <- insert(create_data, local),
173 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
174 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
175 :ok <- maybe_federate(activity) do
180 def accept(%{to: to, actor: actor, object: object} = params) do
181 # only accept false as false value
182 local = !(params[:local] == false)
184 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
185 {:ok, activity} <- insert(data, local),
186 :ok <- maybe_federate(activity) do
191 def reject(%{to: to, actor: actor, object: object} = params) do
192 # only accept false as false value
193 local = !(params[:local] == false)
195 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
196 {:ok, activity} <- insert(data, local),
197 :ok <- maybe_federate(activity) do
202 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
203 # only accept false as false value
204 local = !(params[:local] == false)
213 {:ok, activity} <- insert(data, local),
214 :ok <- maybe_federate(activity) do
219 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
221 %User{ap_id: ap_id} = user,
222 %Object{data: %{"id" => _}} = object,
226 with nil <- get_existing_like(ap_id, object),
227 like_data <- make_like_data(user, object, activity_id),
228 {:ok, activity} <- insert(like_data, local),
229 {:ok, object} <- add_like_to_object(activity, object),
230 :ok <- maybe_federate(activity) do
231 {:ok, activity, object}
233 %Activity{} = activity -> {:ok, activity, object}
234 error -> {:error, error}
244 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
245 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
246 {:ok, unlike_activity} <- insert(unlike_data, local),
247 {:ok, _activity} <- Repo.delete(like_activity),
248 {:ok, object} <- remove_like_from_object(like_activity, object),
249 :ok <- maybe_federate(unlike_activity) do
250 {:ok, unlike_activity, like_activity, object}
257 %User{ap_id: _} = user,
258 %Object{data: %{"id" => _}} = object,
263 with true <- is_public?(object),
264 announce_data <- make_announce_data(user, object, activity_id, public),
265 {:ok, activity} <- insert(announce_data, local),
266 {:ok, object} <- add_announce_to_object(activity, object),
267 :ok <- maybe_federate(activity) do
268 {:ok, activity, object}
270 error -> {:error, error}
280 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
281 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
282 {:ok, unannounce_activity} <- insert(unannounce_data, local),
283 :ok <- maybe_federate(unannounce_activity),
284 {:ok, _activity} <- Repo.delete(announce_activity),
285 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
286 {:ok, unannounce_activity, object}
292 def follow(follower, followed, activity_id \\ nil, local \\ true) do
293 with data <- make_follow_data(follower, followed, activity_id),
294 {:ok, activity} <- insert(data, local),
295 :ok <- maybe_federate(activity) do
300 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
301 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
302 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
303 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
304 {:ok, activity} <- insert(unfollow_data, local),
305 :ok <- maybe_federate(activity) do
310 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
311 user = User.get_cached_by_ap_id(actor)
314 object.data["to"] || [] ++ object.data["cc"] ||
315 [] ++ [user.follower_address, "https://www.w3.org/ns/activitystreams#Public"]
321 "to" => Enum.uniq(to)
324 with {:ok, _} <- Object.delete(object),
325 {:ok, activity} <- insert(data, local),
326 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
327 {:ok, _actor} <- decrease_note_count_if_public(user, object),
328 :ok <- maybe_federate(activity) do
333 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
334 ap_config = Application.get_env(:pleroma, :activitypub)
335 unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
336 outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
338 with true <- unfollow_blocked do
339 follow_activity = fetch_latest_follow(blocker, blocked)
341 if follow_activity do
342 unfollow(blocker, blocked, nil, local)
346 with true <- outgoing_blocks,
347 block_data <- make_block_data(blocker, blocked, activity_id),
348 {:ok, activity} <- insert(block_data, local),
349 :ok <- maybe_federate(activity) do
356 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
357 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
358 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
359 {:ok, activity} <- insert(unblock_data, local),
360 :ok <- maybe_federate(activity) do
374 additional = params[:additional] || %{}
376 # only accept false as false value
377 local = !(params[:local] == false)
386 |> make_flag_data(additional)
390 def fetch_activities_for_context(context, opts \\ %{}) do
391 public = ["https://www.w3.org/ns/activitystreams#Public"]
394 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
396 query = from(activity in Activity)
400 |> restrict_blocked(opts)
401 |> restrict_recipients(recipients, opts["user"])
408 "?->>'type' = ? and ?->>'context' = ?",
414 order_by: [desc: :id]
420 def fetch_public_activities(opts \\ %{}) do
421 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
424 |> restrict_unlisted()
429 @valid_visibilities ~w[direct unlisted public private]
431 defp restrict_visibility(query, %{visibility: visibility})
432 when is_list(visibility) do
433 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
439 "activity_visibility(?, ?, ?) = ANY (?)",
447 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
451 Logger.error("Could not restrict visibility to #{visibility}")
455 defp restrict_visibility(query, %{visibility: visibility})
456 when visibility in @valid_visibilities do
461 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
464 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
469 defp restrict_visibility(_query, %{visibility: visibility})
470 when visibility not in @valid_visibilities do
471 Logger.error("Could not restrict visibility to #{visibility}")
474 defp restrict_visibility(query, _visibility), do: query
476 def fetch_user_activities(user, reading_user, params \\ %{}) do
479 |> Map.put("type", ["Create", "Announce"])
480 |> Map.put("actor_id", user.ap_id)
481 |> Map.put("whole_db", true)
482 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
486 ["https://www.w3.org/ns/activitystreams#Public"] ++
487 [reading_user.ap_id | reading_user.following]
489 ["https://www.w3.org/ns/activitystreams#Public"]
492 fetch_activities(recipients, params)
496 defp restrict_since(query, %{"since_id" => ""}), do: query
498 defp restrict_since(query, %{"since_id" => since_id}) do
499 from(activity in query, where: activity.id > ^since_id)
502 defp restrict_since(query, _), do: query
504 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
505 when is_list(tag_reject) and tag_reject != [] do
508 where: fragment("(not (? #> '{\"object\",\"tag\"}') \\?| ?)", activity.data, ^tag_reject)
512 defp restrict_tag_reject(query, _), do: query
514 defp restrict_tag_all(query, %{"tag_all" => tag_all})
515 when is_list(tag_all) and tag_all != [] do
518 where: fragment("(? #> '{\"object\",\"tag\"}') \\?& ?", activity.data, ^tag_all)
522 defp restrict_tag_all(query, _), do: query
524 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
527 where: fragment("(? #> '{\"object\",\"tag\"}') \\?| ?", activity.data, ^tag)
531 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
534 where: fragment("? <@ (? #> '{\"object\",\"tag\"}')", ^tag, activity.data)
538 defp restrict_tag(query, _), do: query
540 defp restrict_to_cc(query, recipients_to, recipients_cc) do
545 "(?->'to' \\?| ?) or (?->'cc' \\?| ?)",
554 defp restrict_recipients(query, [], _user), do: query
556 defp restrict_recipients(query, recipients, nil) do
557 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
560 defp restrict_recipients(query, recipients, user) do
563 where: fragment("? && ?", ^recipients, activity.recipients),
564 or_where: activity.actor == ^user.ap_id
568 defp restrict_limit(query, %{"limit" => limit}) do
569 from(activity in query, limit: ^limit)
572 defp restrict_limit(query, _), do: query
574 defp restrict_local(query, %{"local_only" => true}) do
575 from(activity in query, where: activity.local == true)
578 defp restrict_local(query, _), do: query
580 defp restrict_max(query, %{"max_id" => ""}), do: query
582 defp restrict_max(query, %{"max_id" => max_id}) do
583 from(activity in query, where: activity.id < ^max_id)
586 defp restrict_max(query, _), do: query
588 defp restrict_actor(query, %{"actor_id" => actor_id}) do
589 from(activity in query, where: activity.actor == ^actor_id)
592 defp restrict_actor(query, _), do: query
594 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
595 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
598 defp restrict_type(query, %{"type" => type}) do
599 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
602 defp restrict_type(query, _), do: query
604 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
607 where: fragment("? <@ (? #> '{\"object\",\"likes\"}')", ^ap_id, activity.data)
611 defp restrict_favorited_by(query, _), do: query
613 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
616 where: fragment("not (? #> '{\"object\",\"attachment\"}' = ?)", activity.data, ^[])
620 defp restrict_media(query, _), do: query
622 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
625 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
629 defp restrict_replies(query, _), do: query
631 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
632 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
635 defp restrict_reblogs(query, _), do: query
637 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
639 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
644 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
645 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
649 defp restrict_muted(query, _), do: query
651 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
652 blocks = info.blocks || []
653 domain_blocks = info.domain_blocks || []
657 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
658 where: fragment("not (?->'to' \\?| ?)", activity.data, ^blocks),
659 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks)
663 defp restrict_blocked(query, _), do: query
665 defp restrict_unlisted(query) do
670 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
672 ^["https://www.w3.org/ns/activitystreams#Public"]
677 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
678 from(activity in query, where: activity.id in ^ids)
681 defp restrict_pinned(query, _), do: query
683 def fetch_activities_query(recipients, opts \\ %{}) do
686 activity in Activity,
688 order_by: [fragment("? desc nulls last", activity.id)]
692 |> restrict_recipients(recipients, opts["user"])
693 |> restrict_tag(opts)
694 |> restrict_tag_reject(opts)
695 |> restrict_tag_all(opts)
696 |> restrict_since(opts)
697 |> restrict_local(opts)
698 |> restrict_limit(opts)
699 |> restrict_max(opts)
700 |> restrict_actor(opts)
701 |> restrict_type(opts)
702 |> restrict_favorited_by(opts)
703 |> restrict_blocked(opts)
704 |> restrict_muted(opts)
705 |> restrict_media(opts)
706 |> restrict_visibility(opts)
707 |> restrict_replies(opts)
708 |> restrict_reblogs(opts)
709 |> restrict_pinned(opts)
712 def fetch_activities(recipients, opts \\ %{}) do
713 fetch_activities_query(recipients, opts)
718 def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do
719 fetch_activities_query([], opts)
720 |> restrict_to_cc(recipients_to, recipients_cc)
725 def upload(file, opts \\ []) do
726 with {:ok, data} <- Upload.store(file, opts) do
729 Map.put(data, "actor", opts[:actor])
734 Repo.insert(%Object{data: obj_data})
738 def user_data_from_user_object(data) do
740 data["icon"]["url"] &&
743 "url" => [%{"href" => data["icon"]["url"]}]
747 data["image"]["url"] &&
750 "url" => [%{"href" => data["image"]["url"]}]
753 locked = data["manuallyApprovesFollowers"] || false
754 data = Transmogrifier.maybe_fix_user_object(data)
759 "ap_enabled" => true,
760 "source_data" => data,
766 follower_address: data["followers"],
770 # nickname can be nil because of virtual actors
772 if data["preferredUsername"] do
776 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
779 Map.put(user_data, :nickname, nil)
785 def fetch_and_prepare_user_from_ap_id(ap_id) do
786 with {:ok, data} <- fetch_and_contain_remote_object_from_id(ap_id) do
787 user_data_from_user_object(data)
789 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
793 def make_user_from_ap_id(ap_id) do
794 if _user = User.get_by_ap_id(ap_id) do
795 Transmogrifier.upgrade_user_from_ap_id(ap_id)
797 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
798 User.insert_or_update_user(data)
805 def make_user_from_nickname(nickname) do
806 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
807 make_user_from_ap_id(ap_id)
809 _e -> {:error, "No AP id in WebFinger"}
813 def should_federate?(inbox, public) do
817 inbox_info = URI.parse(inbox)
818 !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
822 def publish(actor, activity) do
824 if actor.follower_address in activity.recipients do
825 {:ok, followers} = User.get_followers(actor)
826 followers |> Enum.filter(&(!&1.local))
831 public = is_public?(activity)
833 {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
834 json = Jason.encode!(data)
836 (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
837 |> Enum.filter(fn user -> User.ap_enabled?(user) end)
838 |> Enum.map(fn %{info: %{source_data: data}} ->
839 (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
842 |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
843 |> Instances.filter_reachable()
844 |> Enum.each(fn {inbox, unreachable_since} ->
845 Federator.publish_single_ap(%{
849 id: activity.data["id"],
850 unreachable_since: unreachable_since
855 def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
856 Logger.info("Federating #{id} to #{inbox}")
857 host = URI.parse(inbox).host
859 digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
862 NaiveDateTime.utc_now()
863 |> Timex.format!("{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT")
866 Pleroma.Web.HTTPSignatures.sign(actor, %{
868 "content-length": byte_size(json),
873 with {:ok, %{status: code}} when code in 200..299 <-
879 {"Content-Type", "application/activity+json"},
881 {"signature", signature},
885 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
886 do: Instances.set_reachable(inbox)
890 {_post_result, response} ->
891 unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
897 # This will create a Create activity, which we need internally at the moment.
898 def fetch_object_from_id(id) do
899 if object = Object.get_cached_by_ap_id(id) do
902 with {:ok, data} <- fetch_and_contain_remote_object_from_id(id),
903 nil <- Object.normalize(data),
908 "actor" => data["actor"] || data["attributedTo"],
911 :ok <- Transmogrifier.contain_origin(id, params),
912 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
913 {:ok, Object.normalize(activity.data["object"])}
915 {:error, {:reject, nil}} ->
918 object = %Object{} ->
922 Logger.info("Couldn't get object via AP, trying out OStatus fetching...")
924 case OStatus.fetch_activity_from_url(id) do
925 {:ok, [activity | _]} -> {:ok, Object.normalize(activity.data["object"])}
932 def fetch_and_contain_remote_object_from_id(id) do
933 Logger.info("Fetching object #{id} via AP")
935 with true <- String.starts_with?(id, "http"),
936 {:ok, %{body: body, status: code}} when code in 200..299 <-
939 [{:Accept, "application/activity+json"}]
941 {:ok, data} <- Jason.decode(body),
942 :ok <- Transmogrifier.contain_origin_from_id(id, data) do
950 # filter out broken threads
951 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
952 entire_thread_visible_for_user?(activity, user)
955 # do post-processing on a specific activity
956 def contain_activity(%Activity{} = activity, %User{} = user) do
957 contain_broken_threads(activity, user)
960 # do post-processing on a timeline
961 def contain_timeline(timeline, user) do
963 |> Enum.filter(fn activity ->
964 contain_activity(activity, user)