1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
11 alias Pleroma.Notification
12 alias Pleroma.Instances
13 alias Pleroma.Web.ActivityPub.Transmogrifier
14 alias Pleroma.Web.ActivityPub.MRF
15 alias Pleroma.Web.WebFinger
16 alias Pleroma.Web.Federator
17 alias Pleroma.Web.OStatus
20 import Pleroma.Web.ActivityPub.Utils
21 import Pleroma.Web.ActivityPub.Visibility
25 @httpoison Application.get_env(:pleroma, :httpoison)
27 # For Announce activities, we filter the recipients based on following status for any actors
28 # that match actual users. See issue #164 for more information about why this is necessary.
29 defp get_recipients(%{"type" => "Announce"} = data) do
32 actor = User.get_cached_by_ap_id(data["actor"])
36 |> Enum.filter(fn recipient ->
37 case User.get_cached_by_ap_id(recipient) do
42 User.following?(user, actor)
49 defp get_recipients(%{"type" => "Create"} = data) do
52 actor = data["actor"] || []
53 recipients = (to ++ cc ++ [actor]) |> Enum.uniq()
57 defp get_recipients(data) do
64 defp check_actor_is_active(actor) do
65 if not is_nil(actor) do
66 with user <- User.get_cached_by_ap_id(actor),
67 false <- user.info.deactivated do
77 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
78 limit = Pleroma.Config.get([:instance, :remote_limit])
79 String.length(content) <= limit
82 defp check_remote_limit(_), do: true
84 def insert(map, local \\ true) when is_map(map) do
85 with nil <- Activity.normalize(map),
86 map <- lazy_put_activity_defaults(map),
87 :ok <- check_actor_is_active(map["actor"]),
88 {_, true} <- {:remote_limit_error, check_remote_limit(map)},
89 {:ok, map} <- MRF.filter(map),
90 :ok <- insert_full_object(map) do
91 {recipients, _, _} = get_recipients(map)
94 Repo.insert(%Activity{
98 recipients: recipients
102 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
105 Notification.create_notifications(activity)
109 %Activity{} = activity -> {:ok, activity}
110 error -> {:error, error}
114 def stream_out(activity) do
115 public = "https://www.w3.org/ns/activitystreams#Public"
117 if activity.data["type"] in ["Create", "Announce", "Delete"] do
118 Pleroma.Web.Streamer.stream("user", activity)
119 Pleroma.Web.Streamer.stream("list", activity)
121 if Enum.member?(activity.data["to"], public) do
122 Pleroma.Web.Streamer.stream("public", activity)
125 Pleroma.Web.Streamer.stream("public:local", activity)
128 if activity.data["type"] in ["Create"] do
129 activity.data["object"]
130 |> Map.get("tag", [])
131 |> Enum.filter(fn tag -> is_bitstring(tag) end)
132 |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
134 if activity.data["object"]["attachment"] != [] do
135 Pleroma.Web.Streamer.stream("public:media", activity)
138 Pleroma.Web.Streamer.stream("public:local:media", activity)
143 if !Enum.member?(activity.data["cc"] || [], public) &&
146 User.get_by_ap_id(activity.data["actor"]).follower_address
148 do: Pleroma.Web.Streamer.stream("direct", activity)
153 def create(%{to: to, actor: actor, context: context, object: object} = params) do
154 additional = params[:additional] || %{}
155 # only accept false as false value
156 local = !(params[:local] == false)
157 published = params[:published]
161 %{to: to, actor: actor, published: published, context: context, object: object},
164 {:ok, activity} <- insert(create_data, local),
165 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
166 {:ok, _actor} <- User.increase_note_count(actor),
167 :ok <- maybe_federate(activity) do
172 def accept(%{to: to, actor: actor, object: object} = params) do
173 # only accept false as false value
174 local = !(params[:local] == false)
176 with data <- %{"to" => to, "type" => "Accept", "actor" => actor.ap_id, "object" => object},
177 {:ok, activity} <- insert(data, local),
178 :ok <- maybe_federate(activity),
179 _ <- User.update_follow_request_count(actor) do
184 def reject(%{to: to, actor: actor, object: object} = params) do
185 # only accept false as false value
186 local = !(params[:local] == false)
188 with data <- %{"to" => to, "type" => "Reject", "actor" => actor.ap_id, "object" => object},
189 {:ok, activity} <- insert(data, local),
190 :ok <- maybe_federate(activity),
191 _ <- User.update_follow_request_count(actor) do
196 def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
197 # only accept false as false value
198 local = !(params[:local] == false)
207 {:ok, activity} <- insert(data, local),
208 :ok <- maybe_federate(activity) do
213 # TODO: This is weird, maybe we shouldn't check here if we can make the activity.
215 %User{ap_id: ap_id} = user,
216 %Object{data: %{"id" => _}} = object,
220 with nil <- get_existing_like(ap_id, object),
221 like_data <- make_like_data(user, object, activity_id),
222 {:ok, activity} <- insert(like_data, local),
223 {:ok, object} <- add_like_to_object(activity, object),
224 :ok <- maybe_federate(activity) do
225 {:ok, activity, object}
227 %Activity{} = activity -> {:ok, activity, object}
228 error -> {:error, error}
238 with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
239 unlike_data <- make_unlike_data(actor, like_activity, activity_id),
240 {:ok, unlike_activity} <- insert(unlike_data, local),
241 {:ok, _activity} <- Repo.delete(like_activity),
242 {:ok, object} <- remove_like_from_object(like_activity, object),
243 :ok <- maybe_federate(unlike_activity) do
244 {:ok, unlike_activity, like_activity, object}
251 %User{ap_id: _} = user,
252 %Object{data: %{"id" => _}} = object,
257 with true <- is_public?(object),
258 announce_data <- make_announce_data(user, object, activity_id, public),
259 {:ok, activity} <- insert(announce_data, local),
260 {:ok, object} <- add_announce_to_object(activity, object),
261 :ok <- maybe_federate(activity) do
262 {:ok, activity, object}
264 error -> {:error, error}
274 with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
275 unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
276 {:ok, unannounce_activity} <- insert(unannounce_data, local),
277 :ok <- maybe_federate(unannounce_activity),
278 {:ok, _activity} <- Repo.delete(announce_activity),
279 {:ok, object} <- remove_announce_from_object(announce_activity, object) do
280 {:ok, unannounce_activity, object}
286 def follow(follower, followed, activity_id \\ nil, local \\ true) do
287 with data <- make_follow_data(follower, followed, activity_id),
288 {:ok, activity} <- insert(data, local),
289 :ok <- maybe_federate(activity),
290 _ <- User.update_follow_request_count(followed) do
295 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
296 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
297 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
298 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
299 {:ok, activity} <- insert(unfollow_data, local),
300 :ok <- maybe_federate(activity),
301 _ <- User.update_follow_request_count(followed) do
306 def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
307 user = User.get_cached_by_ap_id(actor)
313 "to" => [user.follower_address, "https://www.w3.org/ns/activitystreams#Public"]
316 with {:ok, _} <- Object.delete(object),
317 {:ok, activity} <- insert(data, local),
318 # Changing note count prior to enqueuing federation task in order to avoid race conditions on updating user.info
319 {:ok, _actor} <- User.decrease_note_count(user),
320 :ok <- maybe_federate(activity) do
325 def block(blocker, blocked, activity_id \\ nil, local \\ true) do
326 ap_config = Application.get_env(:pleroma, :activitypub)
327 unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
328 outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
330 with true <- unfollow_blocked do
331 follow_activity = fetch_latest_follow(blocker, blocked)
333 if follow_activity do
334 unfollow(blocker, blocked, nil, local)
338 with true <- outgoing_blocks,
339 block_data <- make_block_data(blocker, blocked, activity_id),
340 {:ok, activity} <- insert(block_data, local),
341 :ok <- maybe_federate(activity) do
348 def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
349 with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
350 unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
351 {:ok, activity} <- insert(unblock_data, local),
352 :ok <- maybe_federate(activity) do
366 additional = params[:additional] || %{}
368 # only accept false as false value
369 local = !(params[:local] == false)
378 |> make_flag_data(additional)
382 def fetch_activities_for_context(context, opts \\ %{}) do
383 public = ["https://www.w3.org/ns/activitystreams#Public"]
386 if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
388 query = from(activity in Activity)
392 |> restrict_blocked(opts)
393 |> restrict_recipients(recipients, opts["user"])
400 "?->>'type' = ? and ?->>'context' = ?",
406 order_by: [desc: :id]
412 def fetch_public_activities(opts \\ %{}) do
413 q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
416 |> restrict_unlisted()
421 @valid_visibilities ~w[direct unlisted public private]
423 defp restrict_visibility(query, %{visibility: visibility})
424 when is_list(visibility) do
425 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
431 "activity_visibility(?, ?, ?) = ANY (?)",
439 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
443 Logger.error("Could not restrict visibility to #{visibility}")
447 defp restrict_visibility(query, %{visibility: visibility})
448 when visibility in @valid_visibilities do
453 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
456 Ecto.Adapters.SQL.to_sql(:all, Repo, query)
461 defp restrict_visibility(_query, %{visibility: visibility})
462 when visibility not in @valid_visibilities do
463 Logger.error("Could not restrict visibility to #{visibility}")
466 defp restrict_visibility(query, _visibility), do: query
468 def fetch_user_activities(user, reading_user, params \\ %{}) do
471 |> Map.put("type", ["Create", "Announce"])
472 |> Map.put("actor_id", user.ap_id)
473 |> Map.put("whole_db", true)
474 |> Map.put("pinned_activity_ids", user.info.pinned_activities)
478 ["https://www.w3.org/ns/activitystreams#Public"] ++
479 [reading_user.ap_id | reading_user.following]
481 ["https://www.w3.org/ns/activitystreams#Public"]
484 fetch_activities(recipients, params)
488 defp restrict_since(query, %{"since_id" => ""}), do: query
490 defp restrict_since(query, %{"since_id" => since_id}) do
491 from(activity in query, where: activity.id > ^since_id)
494 defp restrict_since(query, _), do: query
496 defp restrict_tag_reject(query, %{"tag_reject" => tag_reject})
497 when is_list(tag_reject) and tag_reject != [] do
500 where: fragment("(not (? #> '{\"object\",\"tag\"}') \\?| ?)", activity.data, ^tag_reject)
504 defp restrict_tag_reject(query, _), do: query
506 defp restrict_tag_all(query, %{"tag_all" => tag_all})
507 when is_list(tag_all) and tag_all != [] do
510 where: fragment("(? #> '{\"object\",\"tag\"}') \\?& ?", activity.data, ^tag_all)
514 defp restrict_tag_all(query, _), do: query
516 defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do
519 where: fragment("(? #> '{\"object\",\"tag\"}') \\?| ?", activity.data, ^tag)
523 defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do
526 where: fragment("? <@ (? #> '{\"object\",\"tag\"}')", ^tag, activity.data)
530 defp restrict_tag(query, _), do: query
532 defp restrict_to_cc(query, recipients_to, recipients_cc) do
537 "(?->'to' \\?| ?) or (?->'cc' \\?| ?)",
546 defp restrict_recipients(query, [], _user), do: query
548 defp restrict_recipients(query, recipients, nil) do
549 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
552 defp restrict_recipients(query, recipients, user) do
555 where: fragment("? && ?", ^recipients, activity.recipients),
556 or_where: activity.actor == ^user.ap_id
560 defp restrict_limit(query, %{"limit" => limit}) do
561 from(activity in query, limit: ^limit)
564 defp restrict_limit(query, _), do: query
566 defp restrict_local(query, %{"local_only" => true}) do
567 from(activity in query, where: activity.local == true)
570 defp restrict_local(query, _), do: query
572 defp restrict_max(query, %{"max_id" => ""}), do: query
574 defp restrict_max(query, %{"max_id" => max_id}) do
575 from(activity in query, where: activity.id < ^max_id)
578 defp restrict_max(query, _), do: query
580 defp restrict_actor(query, %{"actor_id" => actor_id}) do
581 from(activity in query, where: activity.actor == ^actor_id)
584 defp restrict_actor(query, _), do: query
586 defp restrict_type(query, %{"type" => type}) when is_binary(type) do
587 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
590 defp restrict_type(query, %{"type" => type}) do
591 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
594 defp restrict_type(query, _), do: query
596 defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
599 where: fragment("? <@ (? #> '{\"object\",\"likes\"}')", ^ap_id, activity.data)
603 defp restrict_favorited_by(query, _), do: query
605 defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
608 where: fragment("not (? #> '{\"object\",\"attachment\"}' = ?)", activity.data, ^[])
612 defp restrict_media(query, _), do: query
614 defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
617 where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
621 defp restrict_replies(query, _), do: query
623 defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val == "true" or val == "1" do
624 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
627 defp restrict_reblogs(query, _), do: query
629 defp restrict_muted(query, %{"muting_user" => %User{info: info}}) do
634 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
635 where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes)
639 defp restrict_muted(query, _), do: query
641 defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
642 blocks = info.blocks || []
643 domain_blocks = info.domain_blocks || []
647 where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
648 where: fragment("not (?->'to' \\?| ?)", activity.data, ^blocks),
649 where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks)
653 defp restrict_blocked(query, _), do: query
655 defp restrict_unlisted(query) do
660 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
662 ^["https://www.w3.org/ns/activitystreams#Public"]
667 defp restrict_pinned(query, %{"pinned" => "true", "pinned_activity_ids" => ids}) do
668 from(activity in query, where: activity.id in ^ids)
671 defp restrict_pinned(query, _), do: query
673 def fetch_activities_query(recipients, opts \\ %{}) do
676 activity in Activity,
678 order_by: [fragment("? desc nulls last", activity.id)]
682 |> restrict_recipients(recipients, opts["user"])
683 |> restrict_tag(opts)
684 |> restrict_tag_reject(opts)
685 |> restrict_tag_all(opts)
686 |> restrict_since(opts)
687 |> restrict_local(opts)
688 |> restrict_limit(opts)
689 |> restrict_max(opts)
690 |> restrict_actor(opts)
691 |> restrict_type(opts)
692 |> restrict_favorited_by(opts)
693 |> restrict_blocked(opts)
694 |> restrict_muted(opts)
695 |> restrict_media(opts)
696 |> restrict_visibility(opts)
697 |> restrict_replies(opts)
698 |> restrict_reblogs(opts)
699 |> restrict_pinned(opts)
702 def fetch_activities(recipients, opts \\ %{}) do
703 fetch_activities_query(recipients, opts)
708 def fetch_activities_bounded(recipients_to, recipients_cc, opts \\ %{}) do
709 fetch_activities_query([], opts)
710 |> restrict_to_cc(recipients_to, recipients_cc)
715 def upload(file, opts \\ []) do
716 with {:ok, data} <- Upload.store(file, opts) do
719 Map.put(data, "actor", opts[:actor])
724 Repo.insert(%Object{data: obj_data})
728 def user_data_from_user_object(data) do
730 data["icon"]["url"] &&
733 "url" => [%{"href" => data["icon"]["url"]}]
737 data["image"]["url"] &&
740 "url" => [%{"href" => data["image"]["url"]}]
743 locked = data["manuallyApprovesFollowers"] || false
744 data = Transmogrifier.maybe_fix_user_object(data)
749 "ap_enabled" => true,
750 "source_data" => data,
756 follower_address: data["followers"],
760 # nickname can be nil because of virtual actors
762 if data["preferredUsername"] do
766 "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
769 Map.put(user_data, :nickname, nil)
775 def fetch_and_prepare_user_from_ap_id(ap_id) do
776 with {:ok, data} <- fetch_and_contain_remote_object_from_id(ap_id) do
777 user_data_from_user_object(data)
779 e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
783 def make_user_from_ap_id(ap_id) do
784 if _user = User.get_by_ap_id(ap_id) do
785 Transmogrifier.upgrade_user_from_ap_id(ap_id)
787 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
788 User.insert_or_update_user(data)
795 def make_user_from_nickname(nickname) do
796 with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
797 make_user_from_ap_id(ap_id)
799 _e -> {:error, "No AP id in WebFinger"}
803 def should_federate?(inbox, public) do
807 inbox_info = URI.parse(inbox)
808 !Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
812 def publish(actor, activity) do
814 if actor.follower_address in activity.recipients do
815 {:ok, followers} = User.get_followers(actor)
816 followers |> Enum.filter(&(!&1.local))
821 public = is_public?(activity)
823 {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
824 json = Jason.encode!(data)
826 (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
827 |> Enum.filter(fn user -> User.ap_enabled?(user) end)
828 |> Enum.map(fn %{info: %{source_data: data}} ->
829 (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
832 |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
833 |> Instances.filter_reachable()
834 |> Enum.each(fn {inbox, unreachable_since} ->
835 Federator.publish_single_ap(%{
839 id: activity.data["id"],
840 unreachable_since: unreachable_since
845 def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
846 Logger.info("Federating #{id} to #{inbox}")
847 host = URI.parse(inbox).host
849 digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
852 NaiveDateTime.utc_now()
853 |> Timex.format!("{WDshort}, {D} {Mshort} {YYYY} {h24}:{m}:{s} GMT")
856 Pleroma.Web.HTTPSignatures.sign(actor, %{
858 "content-length": byte_size(json),
863 with {:ok, %{status: code}} when code in 200..299 <-
869 {"Content-Type", "application/activity+json"},
871 {"signature", signature},
875 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
876 do: Instances.set_reachable(inbox)
880 {_post_result, response} ->
881 unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
887 # This will create a Create activity, which we need internally at the moment.
888 def fetch_object_from_id(id) do
889 if object = Object.get_cached_by_ap_id(id) do
892 with {:ok, data} <- fetch_and_contain_remote_object_from_id(id),
893 nil <- Object.normalize(data),
898 "actor" => data["actor"] || data["attributedTo"],
901 :ok <- Transmogrifier.contain_origin(id, params),
902 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
903 {:ok, Object.normalize(activity.data["object"])}
905 {:error, {:reject, nil}} ->
908 object = %Object{} ->
912 Logger.info("Couldn't get object via AP, trying out OStatus fetching...")
914 case OStatus.fetch_activity_from_url(id) do
915 {:ok, [activity | _]} -> {:ok, Object.normalize(activity.data["object"])}
922 def fetch_and_contain_remote_object_from_id(id) do
923 Logger.info("Fetching object #{id} via AP")
925 with true <- String.starts_with?(id, "http"),
926 {:ok, %{body: body, status: code}} when code in 200..299 <-
929 [{:Accept, "application/activity+json"}]
931 {:ok, data} <- Jason.decode(body),
932 :ok <- Transmogrifier.contain_origin_from_id(id, data) do
940 # filter out broken threads
941 def contain_broken_threads(%Activity{} = activity, %User{} = user) do
942 entire_thread_visible_for_user?(activity, user)
945 # do post-processing on a specific activity
946 def contain_activity(%Activity{} = activity, %User{} = user) do
947 contain_broken_threads(activity, user)
950 # do post-processing on a timeline
951 def contain_timeline(timeline, user) do
953 |> Enum.filter(fn activity ->
954 contain_activity(activity, user)