1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
11 alias Pleroma.Notification
12 alias Pleroma.Instances
13 alias Pleroma.Web.ActivityPub.Transmogrifier
14 alias Pleroma.Web.ActivityPub.MRF
15 alias Pleroma.Web.WebFinger
16 alias Pleroma.Web.Federator
17 alias Pleroma.Web.OStatus
20 import Pleroma.Web.ActivityPub.Utils
21 import Pleroma.Web.ActivityPub.Visibility
25 @httpoison Application.get_env(:pleroma, :httpoison)
27 # For Announce activities, we filter the recipients based on following status for any actors
28 # that match actual users. See issue #164 for more information about why this is necessary.
29 defp get_recipients(%{"type" => "Announce"} = data) do
32 actor = User.get_cached_by_ap_id(data["actor"])
36 |> Enum.filter(fn recipient ->
37 case User.get_cached_by_ap_id(recipient) do
42 User.following?(user, actor)
49 defp get_recipients(%{"type" => "Create"} = data) do
52 actor = data["actor"] || []
53 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
57 defp get_recipients(data) do
64 defp check_actor_is_active(actor) do
65 if not is_nil(actor) do
66 with user <- User.get_cached_by_ap_id(actor),
67 false <- user.info.deactivated do
77 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
78 limit = Pleroma.Config.get([:instance, :remote_limit])
79 String.length(content) <= limit
82 defp check_remote_limit(_), do: true
84 def increase_note_count_if_public(actor, object) do
85 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
88 def decrease_note_count_if_public(actor, object) do
89 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
92 def insert(map, local \\ true) when is_map(map) do
93 with nil <- Activity.normalize(map),
94 map <- lazy_put_activity_defaults(map),
95 :ok <- check_actor_is_active(map["actor"]),
96 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
97 {:ok, map} <- MRF.filter(map),
98 :ok <- insert_full_object(map) do
99 {recipients, _, _} = get_recipients(map)
102 Repo.insert(%Activity{
106 recipients: recipients
110 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
113 Notification.create_notifications(activity)
117 %Activity{} = activity -> {:ok, activity}
118 error -> {:error, error}
122 def stream_out(activity) do
123 public = "https://www.w3.org/ns/activitystreams#Public"
125 if activity.data["type"] in ["Create", "Announce", "Delete"] do
126 Pleroma.Web.Streamer.stream("user", activity)
127 Pleroma.Web.Streamer.stream("list", activity)
129 if Enum.member?(activity.data["to"], public) do
130 Pleroma.Web.Streamer.stream("public", activity)
133 Pleroma.Web.Streamer.stream("public:local", activity)
136 if activity.data["type"] in ["Create"] do
137 activity.data["object"]
138 |> Map.get("tag", [])
139 |> Enum.filter(fn tag -> is_bitstring(tag) end)
140 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
142 if activity.data["object"]["attachment"] != [] do
143 Pleroma.Web.Streamer.stream("public:media", activity)
146 Pleroma.Web.Streamer.stream("public:local:media", activity)
151 if !Enum.member?(activity.data["cc"] || [], public) &&
154 User.get_by_ap_id(activity.data["actor"]).follower_address
156 do: Pleroma.Web.Streamer.stream("direct", activity)
161 def create(%{to: to, actor: actor, context: context, object: object} = params) do
162 additional = params[:additional] || %{}
163 # only accept false as false value
164 local = !(params[:local] == false)
165 published = params[:published]
169 %{to: to, actor: actor, published: published, context: context, object: object},
172 {:ok, activity} <- insert(create_data, local),
173 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
174 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
175 :ok <- maybe_federate(activity) do
180 def accept(%{to: to, actor: actor, object: object} = params) do
181 # only accept false as false value
182 local = !(params[:local] == false)
184 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
185 {:ok, activity} <- insert(data, local),
186 :ok <- maybe_federate(activity) do
191 def reject(%{to: to, actor: actor, object: object} = params) do
192 # only accept false as false value
193 local = !(params[:local] == false)
195 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
196 {:ok, activity} <- insert(data, local),
197 :ok <- maybe_federate(activity) do
202 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
203 # only accept false as false value
204 local = !(params[:local] == false)
213 {:ok, activity} <- insert(data, local),
214 :ok <- maybe_federate(activity) do
219 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
221 %User{ap_id: ap_id} = user,
222 %Object{data: %{"id" => _}} = object,
226 with nil <- get_existing_like(ap_id, object),
227 like_data <- make_like_data(user, object, activity_id),
228 {:ok, activity} <- insert(like_data, local),
229 {:ok, object} <- add_like_to_object(activity, object),
230 :ok <- maybe_federate(activity) do
231 {:ok, activity, object}
233 %Activity{} = activity -> {:ok, activity, object}
234 error -> {:error, error}
244 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
245 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
246 {:ok, unlike_activity} <- insert(unlike_data, local),
247 {:ok, _activity} <- Repo.delete(like_activity),
248 {:ok, object} <- remove_like_from_object(like_activity, object),
249 :ok <- maybe_federate(unlike_activity) do
250 {:ok, unlike_activity, like_activity, object}
257 %User{ap_id: _} = user,
258 %Object{data: %{"id" => _}} = object,
263 with true <- is_public?(object),
264 announce_data <- make_announce_data(user, object, activity_id, public),
265 {:ok, activity} <- insert(announce_data, local),
266 {:ok, object} <- add_announce_to_object(activity, object),
267 :ok <- maybe_federate(activity) do
268 {:ok, activity, object}
270 error -> {:error, error}
280 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
281 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
282 {:ok, unannounce_activity} <- insert(unannounce_data, local),
283 :ok <- maybe_federate(unannounce_activity),
284 {:ok, _activity} <- Repo.delete(announce_activity),
285 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
286 {:ok, unannounce_activity, object}
292 def follow(follower, followed, activity_id \\ nil, local \\ true) do
293 with data <- make_follow_data(follower, followed, activity_id),
294 {:ok, activity} <- insert(data, local),
295 :ok <- maybe_federate(activity) do
300 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
301 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
302 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
303 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
304 {:ok, activity} <- insert(unfollow_data, local),
305 :ok <- maybe_federate(activity) do
310 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
311 user = User.get_cached_by_ap_id(actor)
312 to = (object.data["to"] || []) ++ (object.data["cc"] || [])
321 with {:ok, _} <- Object.delete(object),
322 {:ok, activity} <- insert(data, local),
323 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
324 {:ok, _actor} <- decrease_note_count_if_public(user, object),
325 :ok <- maybe_federate(activity) do
330 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
331 ap_config = Application.get_env(:pleroma, :activitypub)
332 unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
333 outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
335 with true <- unfollow_blocked do
336 follow_activity = fetch_latest_follow(blocker, blocked)
338 if follow_activity do
339 unfollow(blocker, blocked, nil, local)
343 with true <- outgoing_blocks,
344 block_data <- make_block_data(blocker, blocked, activity_id),
345 {:ok, activity} <- insert(block_data, local),
346 :ok <- maybe_federate(activity) do
353 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
354 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
355 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
356 {:ok, activity} <- insert(unblock_data, local),
357 :ok <- maybe_federate(activity) do
371 additional = params[:additional] || %{}
373 # only accept false as false value
374 local = !(params[:local] == false)
383 |> make_flag_data(additional)
387 def fetch_activities_for_context(context, opts \\ %{}) do
388 public = ["https://www.w3.org/ns/activitystreams#Public"]
391 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
393 query = from(activity in Activity)
397 |> restrict_blocked(opts)
398 |> restrict_recipients(recipients, opts["user"])
405 "?->>'type' = ? and ?->>'context' = ?",
411 order_by: [desc: :id]
417 def fetch_public_activities(opts \\ %{}) do
418 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
421 |> restrict_unlisted()
426 @valid_visibilities ~w[direct unlisted public private]
428 defp restrict_visibility(query, %{visibility: visibility})
429 when is_list(visibility) do
430 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
436 "activity_visibility(?, ?, ?) = ANY (?)",
444 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
448 Logger.error("Could not restrict visibility to #{visibility}")
452 defp restrict_visibility(query, %{visibility: visibility})
453 when visibility in @valid_visibilities do
458 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
461 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
466 defp restrict_visibility(_query, %{visibility: visibility})
467 when visibility not in @valid_visibilities do
468 Logger.error("Could not restrict visibility to #{visibility}")
471 defp restrict_visibility(query, _visibility), do: query
473 def fetch_user_activities(user, reading_user, params \\ %{}) do
476 |> Map.put("type", ["Create", "Announce"])
477 |> Map.put("actor_id", user.ap_id)
478 |> Map.put("whole_db", true)
479 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
483 ["https://www.w3.org/ns/activitystreams#Public"] ++
484 [reading_user.ap_id | reading_user.following]
486 ["https://www.w3.org/ns/activitystreams#Public"]
489 fetch_activities(recipients, params)
493 defp restrict_since(query, %{"since_id" => ""}), do: query
495 defp restrict_since(query, %{"since_id" => since_id}) do
496 from(activity in query, where: activity.id > ^since_id)
499 defp restrict_since(query, _), do: query
501 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
502 when is_list(tag_reject) and tag_reject != [] do
505 where: fragment("(not (? #> '{\"object\",\"tag\"}') \\?| ?)", activity.data, ^tag_reject)
509 defp restrict_tag_reject(query, _), do: query
511 defp restrict_tag_all(query, %{"tag_all" => tag_all})
512 when is_list(tag_all) and tag_all != [] do
515 where: fragment("(? #> '{\"object\",\"tag\"}') \\?& ?", activity.data, ^tag_all)
519 defp restrict_tag_all(query, _), do: query
521 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
524 where: fragment("(? #> '{\"object\",\"tag\"}') \\?| ?", activity.data, ^tag)
528 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
531 where: fragment("? <@ (? #> '{\"object\",\"tag\"}')", ^tag, activity.data)
535 defp restrict_tag(query, _), do: query
537 defp restrict_to_cc(query, recipients_to, recipients_cc) do
542 "(?->'to' \\?| ?) or (?->'cc' \\?| ?)",
551 defp restrict_recipients(query, [], _user), do: query
553 defp restrict_recipients(query, recipients, nil) do
554 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
557 defp restrict_recipients(query, recipients, user) do
560 where: fragment("? && ?", ^recipients, activity.recipients),
561 or_where: activity.actor == ^user.ap_id
565 defp restrict_limit(query, %{"limit" => limit}) do
566 from(activity in query, limit: ^limit)
569 defp restrict_limit(query, _), do: query
571 defp restrict_local(query, %{"local_only" => true}) do
572 from(activity in query, where: activity.local == true)
575 defp restrict_local(query, _), do: query
577 defp restrict_max(query, %{"max_id" => ""}), do: query
579 defp restrict_max(query, %{"max_id" => max_id}) do
580 from(activity in query, where: activity.id < ^max_id)
583 defp restrict_max(query, _), do: query
585 defp restrict_actor(query, %{"actor_id" => actor_id}) do
586 from(activity in query, where: activity.actor == ^actor_id)
589 defp restrict_actor(query, _), do: query
591 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
592 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
595 defp restrict_type(query, %{"type" => type}) do
596 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
599 defp restrict_type(query, _), do: query
601 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
604 where: fragment("? <@ (? #> '{\"object\",\"likes\"}')", ^ap_id, activity.data)
608 defp restrict_favorited_by(query, _), do: query
610 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
613 where: fragment("not (? #> '{\"object\",\"attachment\"}' = ?)", activity.data, ^[])
617 defp restrict_media(query, _), do: query
619 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
622 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
626 defp restrict_replies(query, _), do: query
628 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
629 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
632 defp restrict_reblogs(query, _), do: query
634 defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query
636 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
641 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
642 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
646 defp restrict_muted(query, _), do: query
648 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
649 blocks = info.blocks || []
650 domain_blocks = info.domain_blocks || []
654 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
655 where: fragment("not (?->'to' \\?| ?)", activity.data, ^blocks),
656 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks)
660 defp restrict_blocked(query, _), do: query
662 defp restrict_unlisted(query) do
667 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
669 ^["https://www.w3.org/ns/activitystreams#Public"]
674 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
675 from(activity in query, where: activity.id in ^ids)
678 defp restrict_pinned(query, _), do: query
680 def fetch_activities_query(recipients, opts \\ %{}) do
683 activity in Activity,
685 order_by: [fragment("? desc nulls last", activity.id)]
689 |> restrict_recipients(recipients, opts["user"])
690 |> restrict_tag(opts)
691 |> restrict_tag_reject(opts)
692 |> restrict_tag_all(opts)
693 |> restrict_since(opts)
694 |> restrict_local(opts)
695 |> restrict_limit(opts)
696 |> restrict_max(opts)
697 |> restrict_actor(opts)
698 |> restrict_type(opts)
699 |> restrict_favorited_by(opts)
700 |> restrict_blocked(opts)
701 |> restrict_muted(opts)
702 |> restrict_media(opts)
703 |> restrict_visibility(opts)
704 |> restrict_replies(opts)
705 |> restrict_reblogs(opts)
706 |> restrict_pinned(opts)
709 def fetch_activities(recipients, opts \\ %{}) do
710 fetch_activities_query(recipients, opts)
715 def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do
716 fetch_activities_query([], opts)
717 |> restrict_to_cc(recipients_to, recipients_cc)
722 def upload(file, opts \\ []) do
723 with {:ok, data} <- Upload.store(file, opts) do
726 Map.put(data, "actor", opts[:actor])
731 Repo.insert(%Object{data: obj_data})
735 def user_data_from_user_object(data) do
737 data["icon"]["url"] &&
740 "url" => [%{"href" => data["icon"]["url"]}]
744 data["image"]["url"] &&
747 "url" => [%{"href" => data["image"]["url"]}]
750 locked = data["manuallyApprovesFollowers"] || false
751 data = Transmogrifier.maybe_fix_user_object(data)
756 "ap_enabled" => true,
757 "source_data" => data,
763 follower_address: data["followers"],
767 # nickname can be nil because of virtual actors
769 if data["preferredUsername"] do
773 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
776 Map.put(user_data, :nickname, nil)
782 def fetch_and_prepare_user_from_ap_id(ap_id) do
783 with {:ok, data} <- fetch_and_contain_remote_object_from_id(ap_id) do
784 user_data_from_user_object(data)
786 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
790 def make_user_from_ap_id(ap_id) do
791 if _user = User.get_by_ap_id(ap_id) do
792 Transmogrifier.upgrade_user_from_ap_id(ap_id)
794 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
795 User.insert_or_update_user(data)
802 def make_user_from_nickname(nickname) do
803 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
804 make_user_from_ap_id(ap_id)
806 _e -> {:error, "No AP id in WebFinger"}
810 def should_federate?(inbox, public) do
814 inbox_info = URI.parse(inbox)
815 !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
819 def publish(actor, activity) do
821 if actor.follower_address in activity.recipients do
822 {:ok, followers} = User.get_followers(actor)
823 followers |> Enum.filter(&(!&1.local))
828 public = is_public?(activity)
830 {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
831 json = Jason.encode!(data)
833 (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
834 |> Enum.filter(fn user -> User.ap_enabled?(user) end)
835 |> Enum.map(fn %{info: %{source_data: data}} ->
836 (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
839 |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
840 |> Instances.filter_reachable()
841 |> Enum.each(fn {inbox, unreachable_since} ->
842 Federator.publish_single_ap(%{
846 id: activity.data["id"],
847 unreachable_since: unreachable_since
852 def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
853 Logger.info("Federating #{id} to #{inbox}")
854 host = URI.parse(inbox).host
856 digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
859 NaiveDateTime.utc_now()
860 |> Timex.format!("{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT")
863 Pleroma.Web.HTTPSignatures.sign(actor, %{
865 "content-length": byte_size(json),
870 with {:ok, %{status: code}} when code in 200..299 <-
876 {"Content-Type", "application/activity+json"},
878 {"signature", signature},
882 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
883 do: Instances.set_reachable(inbox)
887 {_post_result, response} ->
888 unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
894 # This will create a Create activity, which we need internally at the moment.
895 def fetch_object_from_id(id) do
896 if object = Object.get_cached_by_ap_id(id) do
899 with {:ok, data} <- fetch_and_contain_remote_object_from_id(id),
900 nil <- Object.normalize(data),
905 "actor" => data["actor"] || data["attributedTo"],
908 :ok <- Transmogrifier.contain_origin(id, params),
909 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
910 {:ok, Object.normalize(activity.data["object"])}
912 {:error, {:reject, nil}} ->
915 object = %Object{} ->
919 Logger.info("Couldn't get object via AP, trying out OStatus fetching...")
921 case OStatus.fetch_activity_from_url(id) do
922 {:ok, [activity | _]} -> {:ok, Object.normalize(activity.data["object"])}
929 def fetch_and_contain_remote_object_from_id(id) do
930 Logger.info("Fetching object #{id} via AP")
932 with true <- String.starts_with?(id, "http"),
933 {:ok, %{body: body, status: code}} when code in 200..299 <-
936 [{:Accept, "application/activity+json"}]
938 {:ok, data} <- Jason.decode(body),
939 :ok <- Transmogrifier.contain_origin_from_id(id, data) do
947 # filter out broken threads
948 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
949 entire_thread_visible_for_user?(activity, user)
952 # do post-processing on a specific activity
953 def contain_activity(%Activity{} = activity, %User{} = user) do
954 contain_broken_threads(activity, user)
957 # do post-processing on a timeline
958 def contain_timeline(timeline, user) do
960 |> Enum.filter(fn activity ->
961 contain_activity(activity, user)