1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
11 alias Pleroma.Notification
12 alias Pleroma.Instances
13 alias Pleroma.Web.ActivityPub.Transmogrifier
14 alias Pleroma.Web.ActivityPub.MRF
15 alias Pleroma.Web.WebFinger
16 alias Pleroma.Web.Federator
17 alias Pleroma.Web.OStatus
20 import Pleroma.Web.ActivityPub.Utils
21 import Pleroma.Web.ActivityPub.Visibility
25 @httpoison Application.get_env(:pleroma, :httpoison)
27 # For Announce activities, we filter the recipients based on following status for any actors
28 # that match actual users. See issue #164 for more information about why this is necessary.
29 defp get_recipients(%{"type" => "Announce"} = data) do
32 actor = User.get_cached_by_ap_id(data["actor"])
36 |> Enum.filter(fn recipient ->
37 case User.get_cached_by_ap_id(recipient) do
42 User.following?(user, actor)
49 defp get_recipients(%{"type" => "Create"} = data) do
52 actor = data["actor"] || []
53 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
57 defp get_recipients(data) do
64 defp check_actor_is_active(actor) do
65 if not is_nil(actor) do
66 with user <- User.get_cached_by_ap_id(actor),
67 false <- user.info.deactivated do
77 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
78 limit = Pleroma.Config.get([:instance, :remote_limit])
79 String.length(content) <= limit
82 defp check_remote_limit(_), do: true
84 def insert(map, local \\ true) when is_map(map) do
85 with nil <- Activity.normalize(map),
86 map <- lazy_put_activity_defaults(map),
87 :ok <- check_actor_is_active(map["actor"]),
88 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
89 {:ok, map} <- MRF.filter(map),
90 :ok <- insert_full_object(map) do
91 {recipients, _, _} = get_recipients(map)
94 Repo.insert(%Activity{
98 recipients: recipients
102 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
105 Notification.create_notifications(activity)
109 %Activity{} = activity -> {:ok, activity}
110 error -> {:error, error}
114 def stream_out(activity) do
115 public = "https://www.w3.org/ns/activitystreams#Public"
117 if activity.data["type"] in ["Create", "Announce", "Delete"] do
118 Pleroma.Web.Streamer.stream("user", activity)
119 Pleroma.Web.Streamer.stream("list", activity)
121 if Enum.member?(activity.data["to"], public) do
122 Pleroma.Web.Streamer.stream("public", activity)
125 Pleroma.Web.Streamer.stream("public:local", activity)
128 if activity.data["type"] in ["Create"] do
129 activity.data["object"]
130 |> Map.get("tag", [])
131 |> Enum.filter(fn tag -> is_bitstring(tag) end)
132 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
134 if activity.data["object"]["attachment"] != [] do
135 Pleroma.Web.Streamer.stream("public:media", activity)
138 Pleroma.Web.Streamer.stream("public:local:media", activity)
143 if !Enum.member?(activity.data["cc"] || [], public) &&
146 User.get_by_ap_id(activity.data["actor"]).follower_address
148 do: Pleroma.Web.Streamer.stream("direct", activity)
153 def create(%{to: to, actor: actor, context: context, object: object} = params) do
154 additional = params[:additional] || %{}
155 # only accept false as false value
156 local = !(params[:local] == false)
157 published = params[:published]
161 %{to: to, actor: actor, published: published, context: context, object: object},
164 {:ok, activity} <- insert(create_data, local),
165 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
166 {:ok, _actor} <- User.increase_note_count(actor),
167 :ok <- maybe_federate(activity) do
172 def accept(%{to: to, actor: actor, object: object} = params) do
173 # only accept false as false value
174 local = !(params[:local] == false)
176 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
177 {:ok, activity} <- insert(data, local),
178 :ok <- maybe_federate(activity),
179 _ <- User.update_follow_request_count(actor) do
184 def reject(%{to: to, actor: actor, object: object} = params) do
185 # only accept false as false value
186 local = !(params[:local] == false)
188 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
189 {:ok, activity} <- insert(data, local),
190 :ok <- maybe_federate(activity),
191 _ <- User.update_follow_request_count(actor) do
196 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
197 # only accept false as false value
198 local = !(params[:local] == false)
207 {:ok, activity} <- insert(data, local),
208 :ok <- maybe_federate(activity) do
213 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
215 %User{ap_id: ap_id} = user,
216 %Object{data: %{"id" => _}} = object,
220 with nil <- get_existing_like(ap_id, object),
221 like_data <- make_like_data(user, object, activity_id),
222 {:ok, activity} <- insert(like_data, local),
223 {:ok, object} <- add_like_to_object(activity, object),
224 :ok <- maybe_federate(activity) do
225 {:ok, activity, object}
227 %Activity{} = activity -> {:ok, activity, object}
228 error -> {:error, error}
238 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
239 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
240 {:ok, unlike_activity} <- insert(unlike_data, local),
241 {:ok, _activity} <- Repo.delete(like_activity),
242 {:ok, object} <- remove_like_from_object(like_activity, object),
243 :ok <- maybe_federate(unlike_activity) do
244 {:ok, unlike_activity, like_activity, object}
251 %User{ap_id: _} = user,
252 %Object{data: %{"id" => _}} = object,
257 with true <- is_public?(object),
258 announce_data <- make_announce_data(user, object, activity_id, public),
259 {:ok, activity} <- insert(announce_data, local),
260 {:ok, object} <- add_announce_to_object(activity, object),
261 :ok <- maybe_federate(activity) do
262 {:ok, activity, object}
264 error -> {:error, error}
274 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
275 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
276 {:ok, unannounce_activity} <- insert(unannounce_data, local),
277 :ok <- maybe_federate(unannounce_activity),
278 {:ok, _activity} <- Repo.delete(announce_activity),
279 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
280 {:ok, unannounce_activity, object}
286 def follow(follower, followed, activity_id \\ nil, local \\ true) do
287 with data <- make_follow_data(follower, followed, activity_id),
288 {:ok, activity} <- insert(data, local),
289 :ok <- maybe_federate(activity),
290 _ <- User.update_follow_request_count(followed) do
295 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
296 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
297 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
298 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
299 {:ok, activity} <- insert(unfollow_data, local),
300 :ok <- maybe_federate(activity),
301 _ <- User.update_follow_request_count(followed) do
306 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
307 user = User.get_cached_by_ap_id(actor)
313 "to" => [user.follower_address, "https://www.w3.org/ns/activitystreams#Public"]
316 with {:ok, _} <- Object.delete(object),
317 {:ok, activity} <- insert(data, local),
318 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
319 {:ok, _actor} <- User.decrease_note_count(user),
320 :ok <- maybe_federate(activity) do
325 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
326 ap_config = Application.get_env(:pleroma, :activitypub)
327 unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
328 outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
330 with true <- unfollow_blocked do
331 follow_activity = fetch_latest_follow(blocker, blocked)
333 if follow_activity do
334 unfollow(blocker, blocked, nil, local)
338 with true <- outgoing_blocks,
339 block_data <- make_block_data(blocker, blocked, activity_id),
340 {:ok, activity} <- insert(block_data, local),
341 :ok <- maybe_federate(activity) do
348 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
349 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
350 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
351 {:ok, activity} <- insert(unblock_data, local),
352 :ok <- maybe_federate(activity) do
366 additional = params[:additional] || %{}
368 # only accept false as false value
369 local = !(params[:local] == false)
378 |> make_flag_data(additional)
382 def fetch_activities_for_context(context, opts \\ %{}) do
383 public = ["https://www.w3.org/ns/activitystreams#Public"]
386 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
388 query = from(activity in Activity)
392 |> restrict_blocked(opts)
393 |> restrict_recipients(recipients, opts["user"])
400 "?->>'type' = ? and ?->>'context' = ?",
406 order_by: [desc: :id]
412 def fetch_public_activities(opts \\ %{}) do
413 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
416 |> restrict_unlisted()
421 @valid_visibilities ~w[direct unlisted public private]
423 defp restrict_visibility(query, %{visibility: visibility})
424 when visibility in @valid_visibilities do
429 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
432 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
437 defp restrict_visibility(_query, %{visibility: visibility})
438 when visibility not in @valid_visibilities do
439 Logger.error("Could not restrict visibility to #{visibility}")
442 defp restrict_visibility(query, _visibility), do: query
444 def fetch_user_activities(user, reading_user, params \\ %{}) do
447 |> Map.put("type", ["Create", "Announce"])
448 |> Map.put("actor_id", user.ap_id)
449 |> Map.put("whole_db", true)
450 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
454 ["https://www.w3.org/ns/activitystreams#Public"] ++
455 [reading_user.ap_id | reading_user.following]
457 ["https://www.w3.org/ns/activitystreams#Public"]
460 fetch_activities(recipients, params)
464 defp restrict_since(query, %{"since_id" => ""}), do: query
466 defp restrict_since(query, %{"since_id" => since_id}) do
467 from(activity in query, where: activity.id > ^since_id)
470 defp restrict_since(query, _), do: query
472 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
473 when is_list(tag_reject) and tag_reject != [] do
476 where: fragment("(not (? #> '{\"object\",\"tag\"}') \\?| ?)", activity.data, ^tag_reject)
480 defp restrict_tag_reject(query, _), do: query
482 defp restrict_tag_all(query, %{"tag_all" => tag_all})
483 when is_list(tag_all) and tag_all != [] do
486 where: fragment("(? #> '{\"object\",\"tag\"}') \\?& ?", activity.data, ^tag_all)
490 defp restrict_tag_all(query, _), do: query
492 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
495 where: fragment("(? #> '{\"object\",\"tag\"}') \\?| ?", activity.data, ^tag)
499 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
502 where: fragment("? <@ (? #> '{\"object\",\"tag\"}')", ^tag, activity.data)
506 defp restrict_tag(query, _), do: query
508 defp restrict_to_cc(query, recipients_to, recipients_cc) do
513 "(?->'to' \\?| ?) or (?->'cc' \\?| ?)",
522 defp restrict_recipients(query, [], _user), do: query
524 defp restrict_recipients(query, recipients, nil) do
525 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
528 defp restrict_recipients(query, recipients, user) do
531 where: fragment("? && ?", ^recipients, activity.recipients),
532 or_where: activity.actor == ^user.ap_id
536 defp restrict_limit(query, %{"limit" => limit}) do
537 from(activity in query, limit: ^limit)
540 defp restrict_limit(query, _), do: query
542 defp restrict_local(query, %{"local_only" => true}) do
543 from(activity in query, where: activity.local == true)
546 defp restrict_local(query, _), do: query
548 defp restrict_max(query, %{"max_id" => ""}), do: query
550 defp restrict_max(query, %{"max_id" => max_id}) do
551 from(activity in query, where: activity.id < ^max_id)
554 defp restrict_max(query, _), do: query
556 defp restrict_actor(query, %{"actor_id" => actor_id}) do
557 from(activity in query, where: activity.actor == ^actor_id)
560 defp restrict_actor(query, _), do: query
562 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
563 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
566 defp restrict_type(query, %{"type" => type}) do
567 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
570 defp restrict_type(query, _), do: query
572 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
575 where: fragment("? <@ (? #> '{\"object\",\"likes\"}')", ^ap_id, activity.data)
579 defp restrict_favorited_by(query, _), do: query
581 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
584 where: fragment("not (? #> '{\"object\",\"attachment\"}' = ?)", activity.data, ^[])
588 defp restrict_media(query, _), do: query
590 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
593 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
597 defp restrict_replies(query, _), do: query
599 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
600 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
603 defp restrict_reblogs(query, _), do: query
605 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
610 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
611 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
615 defp restrict_muted(query, _), do: query
617 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
618 blocks = info.blocks || []
619 domain_blocks = info.domain_blocks || []
623 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
624 where: fragment("not (?->'to' \\?| ?)", activity.data, ^blocks),
625 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks)
629 defp restrict_blocked(query, _), do: query
631 defp restrict_unlisted(query) do
636 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
638 ^["https://www.w3.org/ns/activitystreams#Public"]
643 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
644 from(activity in query, where: activity.id in ^ids)
647 defp restrict_pinned(query, _), do: query
649 def fetch_activities_query(recipients, opts \\ %{}) do
652 activity in Activity,
654 order_by: [fragment("? desc nulls last", activity.id)]
658 |> restrict_recipients(recipients, opts["user"])
659 |> restrict_tag(opts)
660 |> restrict_tag_reject(opts)
661 |> restrict_tag_all(opts)
662 |> restrict_since(opts)
663 |> restrict_local(opts)
664 |> restrict_limit(opts)
665 |> restrict_max(opts)
666 |> restrict_actor(opts)
667 |> restrict_type(opts)
668 |> restrict_favorited_by(opts)
669 |> restrict_blocked(opts)
670 |> restrict_muted(opts)
671 |> restrict_media(opts)
672 |> restrict_visibility(opts)
673 |> restrict_replies(opts)
674 |> restrict_reblogs(opts)
675 |> restrict_pinned(opts)
678 def fetch_activities(recipients, opts \\ %{}) do
679 fetch_activities_query(recipients, opts)
684 def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do
685 fetch_activities_query([], opts)
686 |> restrict_to_cc(recipients_to, recipients_cc)
691 def upload(file, opts \\ []) do
692 with {:ok, data} <- Upload.store(file, opts) do
695 Map.put(data, "actor", opts[:actor])
700 Repo.insert(%Object{data: obj_data})
704 def user_data_from_user_object(data) do
706 data["icon"]["url"] &&
709 "url" => [%{"href" => data["icon"]["url"]}]
713 data["image"]["url"] &&
716 "url" => [%{"href" => data["image"]["url"]}]
719 locked = data["manuallyApprovesFollowers"] || false
720 data = Transmogrifier.maybe_fix_user_object(data)
725 "ap_enabled" => true,
726 "source_data" => data,
732 follower_address: data["followers"],
736 # nickname can be nil because of virtual actors
738 if data["preferredUsername"] do
742 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
745 Map.put(user_data, :nickname, nil)
751 def fetch_and_prepare_user_from_ap_id(ap_id) do
752 with {:ok, data} <- fetch_and_contain_remote_object_from_id(ap_id) do
753 user_data_from_user_object(data)
755 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
759 def make_user_from_ap_id(ap_id) do
760 if _user = User.get_by_ap_id(ap_id) do
761 Transmogrifier.upgrade_user_from_ap_id(ap_id)
763 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
764 User.insert_or_update_user(data)
771 def make_user_from_nickname(nickname) do
772 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
773 make_user_from_ap_id(ap_id)
775 _e -> {:error, "No AP id in WebFinger"}
779 def should_federate?(inbox, public) do
783 inbox_info = URI.parse(inbox)
784 !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
788 def publish(actor, activity) do
790 if actor.follower_address in activity.recipients do
791 {:ok, followers} = User.get_followers(actor)
792 followers |> Enum.filter(&(!&1.local))
797 public = is_public?(activity)
799 {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
800 json = Jason.encode!(data)
802 (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
803 |> Enum.filter(fn user -> User.ap_enabled?(user) end)
804 |> Enum.map(fn %{info: %{source_data: data}} ->
805 (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
808 |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
809 |> Instances.filter_reachable()
810 |> Enum.each(fn {inbox, unreachable_since} ->
811 Federator.publish_single_ap(%{
815 id: activity.data["id"],
816 unreachable_since: unreachable_since
821 def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
822 Logger.info("Federating #{id} to #{inbox}")
823 host = URI.parse(inbox).host
825 digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
828 NaiveDateTime.utc_now()
829 |> Timex.format!("{WDshort}, {D} {Mshort} {YYYY} {h24}:{m}:{s} GMT")
832 Pleroma.Web.HTTPSignatures.sign(actor, %{
834 "content-length": byte_size(json),
839 with {:ok, %{status: code}} when code in 200..299 <-
845 {"Content-Type", "application/activity+json"},
847 {"signature", signature},
851 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
852 do: Instances.set_reachable(inbox)
856 {_post_result, response} ->
857 unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
863 # This will create a Create activity, which we need internally at the moment.
864 def fetch_object_from_id(id) do
865 if object = Object.get_cached_by_ap_id(id) do
868 with {:ok, data} <- fetch_and_contain_remote_object_from_id(id),
869 nil <- Object.normalize(data),
874 "actor" => data["actor"] || data["attributedTo"],
877 :ok <- Transmogrifier.contain_origin(id, params),
878 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
879 {:ok, Object.normalize(activity.data["object"])}
881 {:error, {:reject, nil}} ->
884 object = %Object{} ->
888 Logger.info("Couldn't get object via AP, trying out OStatus fetching...")
890 case OStatus.fetch_activity_from_url(id) do
891 {:ok, [activity | _]} -> {:ok, Object.normalize(activity.data["object"])}
898 def fetch_and_contain_remote_object_from_id(id) do
899 Logger.info("Fetching object #{id} via AP")
901 with true <- String.starts_with?(id, "http"),
902 {:ok, %{body: body, status: code}} when code in 200..299 <-
905 [{:Accept, "application/activity+json"}]
907 {:ok, data} <- Jason.decode(body),
908 :ok <- Transmogrifier.contain_origin_from_id(id, data) do
916 # filter out broken threads
917 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
918 entire_thread_visible_for_user?(activity, user)
921 # do post-processing on a specific activity
922 def contain_activity(%Activity{} = activity, %User{} = user) do
923 contain_broken_threads(activity, user)
926 # do post-processing on a timeline
927 def contain_timeline(timeline, user) do
929 |> Enum.filter(fn activity ->
930 contain_activity(activity, user)