1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
11 alias Pleroma.Notification
12 alias Pleroma.Instances
13 alias Pleroma.Web.ActivityPub.Transmogrifier
14 alias Pleroma.Web.ActivityPub.MRF
15 alias Pleroma.Web.WebFinger
16 alias Pleroma.Web.Federator
17 alias Pleroma.Web.OStatus
20 import Pleroma.Web.ActivityPub.Utils
21 import Pleroma.Web.ActivityPub.Visibility
25 @httpoison Application.get_env(:pleroma, :httpoison)
27 # For Announce activities, we filter the recipients based on following status for any actors
28 # that match actual users. See issue #164 for more information about why this is necessary.
29 defp get_recipients(%{"type" => "Announce"} = data) do
32 actor = User.get_cached_by_ap_id(data["actor"])
36 |> Enum.filter(fn recipient ->
37 case User.get_cached_by_ap_id(recipient) do
42 User.following?(user, actor)
49 defp get_recipients(%{"type" => "Create"} = data) do
52 actor = data["actor"] || []
53 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
57 defp get_recipients(data) do
64 defp check_actor_is_active(actor) do
65 if not is_nil(actor) do
66 with user <- User.get_cached_by_ap_id(actor),
67 false <- user.info.deactivated do
77 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
78 limit = Pleroma.Config.get([:instance, :remote_limit])
79 String.length(content) <= limit
82 defp check_remote_limit(_), do: true
84 def increase_note_count_if_public(actor, object) do
85 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
88 def decrease_note_count_if_public(actor, object) do
89 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
92 def insert(map, local \\ true) when is_map(map) do
93 with nil <- Activity.normalize(map),
94 map <- lazy_put_activity_defaults(map),
95 :ok <- check_actor_is_active(map["actor"]),
96 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
97 {:ok, map} <- MRF.filter(map),
98 :ok <- insert_full_object(map) do
99 {recipients, _, _} = get_recipients(map)
102 Repo.insert(%Activity{
106 recipients: recipients
110 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
113 Notification.create_notifications(activity)
117 %Activity{} = activity -> {:ok, activity}
118 error -> {:error, error}
122 def stream_out(activity) do
123 public = "https://www.w3.org/ns/activitystreams#Public"
125 if activity.data["type"] in ["Create", "Announce", "Delete"] do
126 Pleroma.Web.Streamer.stream("user", activity)
127 Pleroma.Web.Streamer.stream("list", activity)
129 if Enum.member?(activity.data["to"], public) do
130 Pleroma.Web.Streamer.stream("public", activity)
133 Pleroma.Web.Streamer.stream("public:local", activity)
136 if activity.data["type"] in ["Create"] do
137 activity.data["object"]
138 |> Map.get("tag", [])
139 |> Enum.filter(fn tag -> is_bitstring(tag) end)
140 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
142 if activity.data["object"]["attachment"] != [] do
143 Pleroma.Web.Streamer.stream("public:media", activity)
146 Pleroma.Web.Streamer.stream("public:local:media", activity)
151 if !Enum.member?(activity.data["cc"] || [], public) &&
154 User.get_by_ap_id(activity.data["actor"]).follower_address
156 do: Pleroma.Web.Streamer.stream("direct", activity)
161 def create(%{to: to, actor: actor, context: context, object: object} = params) do
162 additional = params[:additional] || %{}
163 # only accept false as false value
164 local = !(params[:local] == false)
165 published = params[:published]
169 %{to: to, actor: actor, published: published, context: context, object: object},
172 {:ok, activity} <- insert(create_data, local),
173 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
174 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
175 :ok <- maybe_federate(activity) do
180 def accept(%{to: to, actor: actor, object: object} = params) do
181 # only accept false as false value
182 local = !(params[:local] == false)
184 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
185 {:ok, activity} <- insert(data, local),
186 :ok <- maybe_federate(activity) do
191 def reject(%{to: to, actor: actor, object: object} = params) do
192 # only accept false as false value
193 local = !(params[:local] == false)
195 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
196 {:ok, activity} <- insert(data, local),
197 :ok <- maybe_federate(activity) do
202 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
203 # only accept false as false value
204 local = !(params[:local] == false)
213 {:ok, activity} <- insert(data, local),
214 :ok <- maybe_federate(activity) do
219 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
221 %User{ap_id: ap_id} = user,
222 %Object{data: %{"id" => _}} = object,
226 with nil <- get_existing_like(ap_id, object),
227 like_data <- make_like_data(user, object, activity_id),
228 {:ok, activity} <- insert(like_data, local),
229 {:ok, object} <- add_like_to_object(activity, object),
230 :ok <- maybe_federate(activity) do
231 {:ok, activity, object}
233 %Activity{} = activity -> {:ok, activity, object}
234 error -> {:error, error}
244 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
245 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
246 {:ok, unlike_activity} <- insert(unlike_data, local),
247 {:ok, _activity} <- Repo.delete(like_activity),
248 {:ok, object} <- remove_like_from_object(like_activity, object),
249 :ok <- maybe_federate(unlike_activity) do
250 {:ok, unlike_activity, like_activity, object}
257 %User{ap_id: _} = user,
258 %Object{data: %{"id" => _}} = object,
263 with true <- is_public?(object),
264 announce_data <- make_announce_data(user, object, activity_id, public),
265 {:ok, activity} <- insert(announce_data, local),
266 {:ok, object} <- add_announce_to_object(activity, object),
267 :ok <- maybe_federate(activity) do
268 {:ok, activity, object}
270 error -> {:error, error}
280 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
281 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
282 {:ok, unannounce_activity} <- insert(unannounce_data, local),
283 :ok <- maybe_federate(unannounce_activity),
284 {:ok, _activity} <- Repo.delete(announce_activity),
285 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
286 {:ok, unannounce_activity, object}
292 def follow(follower, followed, activity_id \\ nil, local \\ true) do
293 with data <- make_follow_data(follower, followed, activity_id),
294 {:ok, activity} <- insert(data, local),
295 :ok <- maybe_federate(activity) do
300 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
301 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
302 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
303 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
304 {:ok, activity} <- insert(unfollow_data, local),
305 :ok <- maybe_federate(activity) do
310 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
311 user = User.get_cached_by_ap_id(actor)
314 case Object.get_cached_by_ap_id(id) do
316 [user.follower_address, "https://www.w3.org/ns/activitystreams#Public"]
319 object.data["to"] || [] ++ object.data["cc"] ||
320 [] ++ [user.follower_address, "https://www.w3.org/ns/activitystreams#Public"]
327 "to" => Enum.uniq(to)
330 with {:ok, _} <- Object.delete(object),
331 {:ok, activity} <- insert(data, local),
332 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
333 {:ok, _actor} <- decrease_note_count_if_public(user, object),
334 :ok <- maybe_federate(activity) do
339 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
340 ap_config = Application.get_env(:pleroma, :activitypub)
341 unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
342 outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
344 with true <- unfollow_blocked do
345 follow_activity = fetch_latest_follow(blocker, blocked)
347 if follow_activity do
348 unfollow(blocker, blocked, nil, local)
352 with true <- outgoing_blocks,
353 block_data <- make_block_data(blocker, blocked, activity_id),
354 {:ok, activity} <- insert(block_data, local),
355 :ok <- maybe_federate(activity) do
362 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
363 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
364 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
365 {:ok, activity} <- insert(unblock_data, local),
366 :ok <- maybe_federate(activity) do
380 additional = params[:additional] || %{}
382 # only accept false as false value
383 local = !(params[:local] == false)
392 |> make_flag_data(additional)
396 def fetch_activities_for_context(context, opts \\ %{}) do
397 public = ["https://www.w3.org/ns/activitystreams#Public"]
400 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
402 query = from(activity in Activity)
406 |> restrict_blocked(opts)
407 |> restrict_recipients(recipients, opts["user"])
414 "?->>'type' = ? and ?->>'context' = ?",
420 order_by: [desc: :id]
426 def fetch_public_activities(opts \\ %{}) do
427 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
430 |> restrict_unlisted()
435 @valid_visibilities ~w[direct unlisted public private]
437 defp restrict_visibility(query, %{visibility: visibility})
438 when is_list(visibility) do
439 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
445 "activity_visibility(?, ?, ?) = ANY (?)",
453 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
457 Logger.error("Could not restrict visibility to #{visibility}")
461 defp restrict_visibility(query, %{visibility: visibility})
462 when visibility in @valid_visibilities do
467 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
470 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
475 defp restrict_visibility(_query, %{visibility: visibility})
476 when visibility not in @valid_visibilities do
477 Logger.error("Could not restrict visibility to #{visibility}")
480 defp restrict_visibility(query, _visibility), do: query
482 def fetch_user_activities(user, reading_user, params \\ %{}) do
485 |> Map.put("type", ["Create", "Announce"])
486 |> Map.put("actor_id", user.ap_id)
487 |> Map.put("whole_db", true)
488 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
492 ["https://www.w3.org/ns/activitystreams#Public"] ++
493 [reading_user.ap_id | reading_user.following]
495 ["https://www.w3.org/ns/activitystreams#Public"]
498 fetch_activities(recipients, params)
502 defp restrict_since(query, %{"since_id" => ""}), do: query
504 defp restrict_since(query, %{"since_id" => since_id}) do
505 from(activity in query, where: activity.id > ^since_id)
508 defp restrict_since(query, _), do: query
510 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
511 when is_list(tag_reject) and tag_reject != [] do
514 where: fragment("(not (? #> '{\"object\",\"tag\"}') \\?| ?)", activity.data, ^tag_reject)
518 defp restrict_tag_reject(query, _), do: query
520 defp restrict_tag_all(query, %{"tag_all" => tag_all})
521 when is_list(tag_all) and tag_all != [] do
524 where: fragment("(? #> '{\"object\",\"tag\"}') \\?& ?", activity.data, ^tag_all)
528 defp restrict_tag_all(query, _), do: query
530 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
533 where: fragment("(? #> '{\"object\",\"tag\"}') \\?| ?", activity.data, ^tag)
537 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
540 where: fragment("? <@ (? #> '{\"object\",\"tag\"}')", ^tag, activity.data)
544 defp restrict_tag(query, _), do: query
546 defp restrict_to_cc(query, recipients_to, recipients_cc) do
551 "(?->'to' \\?| ?) or (?->'cc' \\?| ?)",
560 defp restrict_recipients(query, [], _user), do: query
562 defp restrict_recipients(query, recipients, nil) do
563 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
566 defp restrict_recipients(query, recipients, user) do
569 where: fragment("? && ?", ^recipients, activity.recipients),
570 or_where: activity.actor == ^user.ap_id
574 defp restrict_limit(query, %{"limit" => limit}) do
575 from(activity in query, limit: ^limit)
578 defp restrict_limit(query, _), do: query
580 defp restrict_local(query, %{"local_only" => true}) do
581 from(activity in query, where: activity.local == true)
584 defp restrict_local(query, _), do: query
586 defp restrict_max(query, %{"max_id" => ""}), do: query
588 defp restrict_max(query, %{"max_id" => max_id}) do
589 from(activity in query, where: activity.id < ^max_id)
592 defp restrict_max(query, _), do: query
594 defp restrict_actor(query, %{"actor_id" => actor_id}) do
595 from(activity in query, where: activity.actor == ^actor_id)
598 defp restrict_actor(query, _), do: query
600 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
601 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
604 defp restrict_type(query, %{"type" => type}) do
605 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
608 defp restrict_type(query, _), do: query
610 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
613 where: fragment("? <@ (? #> '{\"object\",\"likes\"}')", ^ap_id, activity.data)
617 defp restrict_favorited_by(query, _), do: query
619 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
622 where: fragment("not (? #> '{\"object\",\"attachment\"}' = ?)", activity.data, ^[])
626 defp restrict_media(query, _), do: query
628 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
631 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
635 defp restrict_replies(query, _), do: query
637 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
638 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
641 defp restrict_reblogs(query, _), do: query
643 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
645 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
650 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
651 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
655 defp restrict_muted(query, _), do: query
657 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
658 blocks = info.blocks || []
659 domain_blocks = info.domain_blocks || []
663 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
664 where: fragment("not (?->'to' \\?| ?)", activity.data, ^blocks),
665 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks)
669 defp restrict_blocked(query, _), do: query
671 defp restrict_unlisted(query) do
676 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
678 ^["https://www.w3.org/ns/activitystreams#Public"]
683 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
684 from(activity in query, where: activity.id in ^ids)
687 defp restrict_pinned(query, _), do: query
689 def fetch_activities_query(recipients, opts \\ %{}) do
692 activity in Activity,
694 order_by: [fragment("? desc nulls last", activity.id)]
698 |> restrict_recipients(recipients, opts["user"])
699 |> restrict_tag(opts)
700 |> restrict_tag_reject(opts)
701 |> restrict_tag_all(opts)
702 |> restrict_since(opts)
703 |> restrict_local(opts)
704 |> restrict_limit(opts)
705 |> restrict_max(opts)
706 |> restrict_actor(opts)
707 |> restrict_type(opts)
708 |> restrict_favorited_by(opts)
709 |> restrict_blocked(opts)
710 |> restrict_muted(opts)
711 |> restrict_media(opts)
712 |> restrict_visibility(opts)
713 |> restrict_replies(opts)
714 |> restrict_reblogs(opts)
715 |> restrict_pinned(opts)
718 def fetch_activities(recipients, opts \\ %{}) do
719 fetch_activities_query(recipients, opts)
724 def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do
725 fetch_activities_query([], opts)
726 |> restrict_to_cc(recipients_to, recipients_cc)
731 def upload(file, opts \\ []) do
732 with {:ok, data} <- Upload.store(file, opts) do
735 Map.put(data, "actor", opts[:actor])
740 Repo.insert(%Object{data: obj_data})
744 def user_data_from_user_object(data) do
746 data["icon"]["url"] &&
749 "url" => [%{"href" => data["icon"]["url"]}]
753 data["image"]["url"] &&
756 "url" => [%{"href" => data["image"]["url"]}]
759 locked = data["manuallyApprovesFollowers"] || false
760 data = Transmogrifier.maybe_fix_user_object(data)
765 "ap_enabled" => true,
766 "source_data" => data,
772 follower_address: data["followers"],
776 # nickname can be nil because of virtual actors
778 if data["preferredUsername"] do
782 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
785 Map.put(user_data, :nickname, nil)
791 def fetch_and_prepare_user_from_ap_id(ap_id) do
792 with {:ok, data} <- fetch_and_contain_remote_object_from_id(ap_id) do
793 user_data_from_user_object(data)
795 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
799 def make_user_from_ap_id(ap_id) do
800 if _user = User.get_by_ap_id(ap_id) do
801 Transmogrifier.upgrade_user_from_ap_id(ap_id)
803 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
804 User.insert_or_update_user(data)
811 def make_user_from_nickname(nickname) do
812 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
813 make_user_from_ap_id(ap_id)
815 _e -> {:error, "No AP id in WebFinger"}
819 def should_federate?(inbox, public) do
823 inbox_info = URI.parse(inbox)
824 !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
828 def publish(actor, activity) do
830 if actor.follower_address in activity.recipients do
831 {:ok, followers} = User.get_followers(actor)
832 followers |> Enum.filter(&(!&1.local))
837 public = is_public?(activity)
839 {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
840 json = Jason.encode!(data)
842 (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
843 |> Enum.filter(fn user -> User.ap_enabled?(user) end)
844 |> Enum.map(fn %{info: %{source_data: data}} ->
845 (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
848 |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
849 |> Instances.filter_reachable()
850 |> Enum.each(fn {inbox, unreachable_since} ->
851 Federator.publish_single_ap(%{
855 id: activity.data["id"],
856 unreachable_since: unreachable_since
861 def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
862 Logger.info("Federating #{id} to #{inbox}")
863 host = URI.parse(inbox).host
865 digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
868 NaiveDateTime.utc_now()
869 |> Timex.format!("{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT")
872 Pleroma.Web.HTTPSignatures.sign(actor, %{
874 "content-length": byte_size(json),
879 with {:ok, %{status: code}} when code in 200..299 <-
885 {"Content-Type", "application/activity+json"},
887 {"signature", signature},
891 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
892 do: Instances.set_reachable(inbox)
896 {_post_result, response} ->
897 unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
903 # This will create a Create activity, which we need internally at the moment.
904 def fetch_object_from_id(id) do
905 if object = Object.get_cached_by_ap_id(id) do
908 with {:ok, data} <- fetch_and_contain_remote_object_from_id(id),
909 nil <- Object.normalize(data),
914 "actor" => data["actor"] || data["attributedTo"],
917 :ok <- Transmogrifier.contain_origin(id, params),
918 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
919 {:ok, Object.normalize(activity.data["object"])}
921 {:error, {:reject, nil}} ->
924 object = %Object{} ->
928 Logger.info("Couldn't get object via AP, trying out OStatus fetching...")
930 case OStatus.fetch_activity_from_url(id) do
931 {:ok, [activity | _]} -> {:ok, Object.normalize(activity.data["object"])}
938 def fetch_and_contain_remote_object_from_id(id) do
939 Logger.info("Fetching object #{id} via AP")
941 with true <- String.starts_with?(id, "http"),
942 {:ok, %{body: body, status: code}} when code in 200..299 <-
945 [{:Accept, "application/activity+json"}]
947 {:ok, data} <- Jason.decode(body),
948 :ok <- Transmogrifier.contain_origin_from_id(id, data) do
956 # filter out broken threads
957 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
958 entire_thread_visible_for_user?(activity, user)
961 # do post-processing on a specific activity
962 def contain_activity(%Activity{} = activity, %User{} = user) do
963 contain_broken_threads(activity, user)
966 # do post-processing on a timeline
967 def contain_timeline(timeline, user) do
969 |> Enum.filter(fn activity ->
970 contain_activity(activity, user)