[#1149] Merge remote-tracking branch 'remotes/upstream/develop' into 1149-oban-job...
[akkoma] / lib / pleroma / user.ex
index 5ea2b518bc55c97c05b5eb730a7fa7e3f1472b6f..f0306652c2bc73890dcb1a28b0a7881bee61f06e 100644 (file)
@@ -21,11 +21,13 @@ defmodule Pleroma.User do
   alias Pleroma.Web
   alias Pleroma.Web.ActivityPub.ActivityPub
   alias Pleroma.Web.ActivityPub.Utils
+  alias Pleroma.Web.CommonAPI
   alias Pleroma.Web.CommonAPI.Utils, as: CommonUtils
   alias Pleroma.Web.OAuth
   alias Pleroma.Web.OStatus
   alias Pleroma.Web.RelMe
   alias Pleroma.Web.Websub
+  alias Pleroma.Workers.BackgroundWorker
 
   require Logger
 
@@ -57,6 +59,7 @@ defmodule Pleroma.User do
     field(:search_type, :integer, virtual: true)
     field(:tags, {:array, :string}, default: [])
     field(:last_refreshed_at, :naive_datetime_usec)
+    field(:last_digest_emailed_at, :naive_datetime)
     has_many(:notifications, Notification)
     has_many(:registrations, Registration)
     embeds_one(:info, User.Info)
@@ -114,7 +117,9 @@ defmodule Pleroma.User do
 
   def user_info(%User{} = user, args \\ %{}) do
     following_count =
-      if args[:following_count], do: args[:following_count], else: following_count(user)
+      if args[:following_count],
+        do: args[:following_count],
+        else: user.info.following_count || following_count(user)
 
     follower_count =
       if args[:follower_count], do: args[:follower_count], else: user.info.follower_count
@@ -129,6 +134,28 @@ defmodule Pleroma.User do
     |> Map.put(:follower_count, follower_count)
   end
 
+  def follow_state(%User{} = user, %User{} = target) do
+    follow_activity = Utils.fetch_latest_follow(user, target)
+
+    if follow_activity,
+      do: follow_activity.data["state"],
+      # Ideally this would be nil, but then Cachex does not commit the value
+      else: false
+  end
+
+  def get_cached_follow_state(user, target) do
+    key = "follow_state:#{user.ap_id}|#{target.ap_id}"
+    Cachex.fetch!(:user_cache, key, fn _ -> {:commit, follow_state(user, target)} end)
+  end
+
+  def set_follow_state_cache(user_ap_id, target_ap_id, state) do
+    Cachex.put(
+      :user_cache,
+      "follow_state:#{user_ap_id}|#{target_ap_id}",
+      state
+    )
+  end
+
   def set_info_cache(user, args) do
     Cachex.put(:user_cache, "user_info:#{user.id}", user_info(user, args))
   end
@@ -148,10 +175,24 @@ defmodule Pleroma.User do
     |> Repo.aggregate(:count, :id)
   end
 
+  defp truncate_if_exists(params, key, max_length) do
+    if Map.has_key?(params, key) and is_binary(params[key]) do
+      {value, _chopped} = String.split_at(params[key], max_length)
+      Map.put(params, key, value)
+    else
+      params
+    end
+  end
+
   def remote_user_creation(params) do
+    bio_limit = Pleroma.Config.get([:instance, :user_bio_length], 5000)
+    name_limit = Pleroma.Config.get([:instance, :user_name_length], 100)
+
     params =
       params
       |> Map.put(:info, params[:info] || %{})
+      |> truncate_if_exists(:name, name_limit)
+      |> truncate_if_exists(:bio, bio_limit)
 
     info_cng = User.Info.remote_user_creation(%User.Info{}, params[:info])
 
@@ -161,8 +202,8 @@ defmodule Pleroma.User do
       |> validate_required([:name, :ap_id])
       |> unique_constraint(:nickname)
       |> validate_format(:nickname, @email_regex)
-      |> validate_length(:bio, max: 5000)
-      |> validate_length(:name, max: 100)
+      |> validate_length(:bio, max: bio_limit)
+      |> validate_length(:name, max: name_limit)
       |> put_change(:local, false)
       |> put_embed(:info, info_cng)
 
@@ -185,22 +226,23 @@ defmodule Pleroma.User do
   end
 
   def update_changeset(struct, params \\ %{}) do
+    bio_limit = Pleroma.Config.get([:instance, :user_bio_length], 5000)
+    name_limit = Pleroma.Config.get([:instance, :user_name_length], 100)
+
     struct
     |> cast(params, [:bio, :name, :avatar, :following])
     |> unique_constraint(:nickname)
     |> validate_format(:nickname, local_nickname_regex())
-    |> validate_length(:bio, max: 5000)
-    |> validate_length(:name, min: 1, max: 100)
+    |> validate_length(:bio, max: bio_limit)
+    |> validate_length(:name, min: 1, max: name_limit)
   end
 
-  def upgrade_changeset(struct, params \\ %{}) do
-    params =
-      params
-      |> Map.put(:last_refreshed_at, NaiveDateTime.utc_now())
+  def upgrade_changeset(struct, params \\ %{}, remote? \\ false) do
+    bio_limit = Pleroma.Config.get([:instance, :user_bio_length], 5000)
+    name_limit = Pleroma.Config.get([:instance, :user_name_length], 100)
 
-    info_cng =
-      struct.info
-      |> User.Info.user_upgrade(params[:info])
+    params = Map.put(params, :last_refreshed_at, NaiveDateTime.utc_now())
+    info_cng = User.Info.user_upgrade(struct.info, params[:info], remote?)
 
     struct
     |> cast(params, [
@@ -213,8 +255,8 @@ defmodule Pleroma.User do
     ])
     |> unique_constraint(:nickname)
     |> validate_format(:nickname, local_nickname_regex())
-    |> validate_length(:bio, max: 5000)
-    |> validate_length(:name, max: 100)
+    |> validate_length(:bio, max: bio_limit)
+    |> validate_length(:name, max: name_limit)
     |> put_embed(:info, info_cng)
   end
 
@@ -226,6 +268,7 @@ defmodule Pleroma.User do
     |> put_password_hash
   end
 
+  @spec reset_password(User.t(), map) :: {:ok, User.t()} | {:error, Ecto.Changeset.t()}
   def reset_password(%User{id: user_id} = user, data) do
     multi =
       Multi.new()
@@ -240,6 +283,9 @@ defmodule Pleroma.User do
   end
 
   def register_changeset(struct, params \\ %{}, opts \\ []) do
+    bio_limit = Pleroma.Config.get([:instance, :user_bio_length], 5000)
+    name_limit = Pleroma.Config.get([:instance, :user_name_length], 100)
+
     need_confirmation? =
       if is_nil(opts[:need_confirmation]) do
         Pleroma.Config.get([:instance, :account_activation_required])
@@ -260,8 +306,8 @@ defmodule Pleroma.User do
       |> validate_exclusion(:nickname, Pleroma.Config.get([User, :restricted_nicknames]))
       |> validate_format(:nickname, local_nickname_regex())
       |> validate_format(:email, @email_regex)
-      |> validate_length(:bio, max: 1000)
-      |> validate_length(:name, min: 1, max: 100)
+      |> validate_length(:bio, max: bio_limit)
+      |> validate_length(:name, min: 1, max: name_limit)
       |> put_change(:info, info_change)
 
     changeset =
@@ -299,7 +345,13 @@ defmodule Pleroma.User do
   @doc "Inserts provided changeset, performs post-registration actions (confirmation email sending etc.)"
   def register(%Ecto.Changeset{} = changeset) do
     with {:ok, user} <- Repo.insert(changeset),
-         {:ok, user} <- autofollow_users(user),
+         {:ok, user} <- post_register_action(user) do
+      {:ok, user}
+    end
+  end
+
+  def post_register_action(%User{} = user) do
+    with {:ok, user} <- autofollow_users(user),
          {:ok, user} <- set_cache(user),
          {:ok, _} <- User.WelcomeMessage.post_welcome_message_to_user(user),
          {:ok, _} <- try_send_confirmation_email(user) do
@@ -330,6 +382,7 @@ defmodule Pleroma.User do
 
   def needs_update?(_), do: true
 
+  @spec maybe_direct_follow(User.t(), User.t()) :: {:ok, User.t()} | {:error, String.t()}
   def maybe_direct_follow(%User{} = follower, %User{local: true, info: %{locked: true}}) do
     {:ok, follower}
   end
@@ -404,6 +457,8 @@ defmodule Pleroma.User do
 
         {1, [follower]} = Repo.update_all(q, [])
 
+        follower = maybe_update_following_count(follower)
+
         {:ok, _} = update_follower_count(followed)
 
         set_cache(follower)
@@ -423,6 +478,8 @@ defmodule Pleroma.User do
 
       {1, [follower]} = Repo.update_all(q, [])
 
+      follower = maybe_update_following_count(follower)
+
       {:ok, followed} = update_follower_count(followed)
 
       set_cache(follower)
@@ -450,6 +507,13 @@ defmodule Pleroma.User do
     Repo.get_by(User, ap_id: ap_id)
   end
 
+  def get_all_by_ap_id(ap_ids) do
+    from(u in __MODULE__,
+      where: u.ap_id in ^ap_ids
+    )
+    |> Repo.all()
+  end
+
   # This is mostly an SPC migration fix. This guesses the user nickname by taking the last part
   # of the ap_id and the domain and tries to get that user
   def get_by_guessed_nickname(ap_id) do
@@ -471,7 +535,7 @@ defmodule Pleroma.User do
   end
 
   def update_and_set_cache(changeset) do
-    with {:ok, user} <- Repo.update(changeset) do
+    with {:ok, user} <- Repo.update(changeset, stale_error_field: :id) do
       set_cache(user)
     else
       e -> e
@@ -520,8 +584,22 @@ defmodule Pleroma.User do
     end)
   end
 
-  def get_cached_by_nickname_or_id(nickname_or_id) do
-    get_cached_by_id(nickname_or_id) || get_cached_by_nickname(nickname_or_id)
+  def get_cached_by_nickname_or_id(nickname_or_id, opts \\ []) do
+    restrict_to_local = Pleroma.Config.get([:instance, :limit_to_local_content])
+
+    cond do
+      is_integer(nickname_or_id) or Pleroma.FlakeId.is_flake_id?(nickname_or_id) ->
+        get_cached_by_id(nickname_or_id) || get_cached_by_nickname(nickname_or_id)
+
+      restrict_to_local == false ->
+        get_cached_by_nickname(nickname_or_id)
+
+      restrict_to_local == :unauthenticated and match?(%User{}, opts[:for]) ->
+        get_cached_by_nickname(nickname_or_id)
+
+      true ->
+        nil
+    end
   end
 
   def get_by_nickname(nickname) do
@@ -570,8 +648,9 @@ defmodule Pleroma.User do
   end
 
   @doc "Fetch some posts when the user has just been federated with"
-  def fetch_initial_posts(user),
-    do: PleromaJobQueue.enqueue(:background, __MODULE__, [:fetch_initial_posts, user])
+  def fetch_initial_posts(user) do
+    BackgroundWorker.enqueue("fetch_initial_posts", %{"user_id" => user.id})
+  end
 
   @spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t()
   def get_followers_query(%User{} = user, nil) do
@@ -586,12 +665,23 @@ defmodule Pleroma.User do
   @spec get_followers_query(User.t()) :: Ecto.Query.t()
   def get_followers_query(user), do: get_followers_query(user, nil)
 
+  @spec get_followers(User.t(), pos_integer()) :: {:ok, list(User.t())}
   def get_followers(user, page \\ nil) do
     q = get_followers_query(user, page)
 
     {:ok, Repo.all(q)}
   end
 
+  @spec get_external_followers(User.t(), pos_integer()) :: {:ok, list(User.t())}
+  def get_external_followers(user, page \\ nil) do
+    q =
+      user
+      |> get_followers_query(page)
+      |> User.Query.build(%{external: true})
+
+    {:ok, Repo.all(q)}
+  end
+
   def get_followers_ids(user, page \\ nil) do
     q = get_followers_query(user, page)
 
@@ -696,32 +786,75 @@ defmodule Pleroma.User do
     |> update_and_set_cache()
   end
 
+  @spec maybe_fetch_follow_information(User.t()) :: User.t()
+  def maybe_fetch_follow_information(user) do
+    with {:ok, user} <- fetch_follow_information(user) do
+      user
+    else
+      e ->
+        Logger.error("Follower/Following counter update for #{user.ap_id} failed.\n#{inspect(e)}")
+
+        user
+    end
+  end
+
+  def fetch_follow_information(user) do
+    with {:ok, info} <- ActivityPub.fetch_follow_information_for_user(user) do
+      info_cng = User.Info.follow_information_update(user.info, info)
+
+      changeset =
+        user
+        |> change()
+        |> put_embed(:info, info_cng)
+
+      update_and_set_cache(changeset)
+    else
+      {:error, _} = e -> e
+      e -> {:error, e}
+    end
+  end
+
   def update_follower_count(%User{} = user) do
-    follower_count_query =
-      User.Query.build(%{followers: user, deactivated: false})
-      |> select([u], %{count: count(u.id)})
+    if user.local or !Pleroma.Config.get([:instance, :external_user_synchronization]) do
+      follower_count_query =
+        User.Query.build(%{followers: user, deactivated: false})
+        |> select([u], %{count: count(u.id)})
+
+      User
+      |> where(id: ^user.id)
+      |> join(:inner, [u], s in subquery(follower_count_query))
+      |> update([u, s],
+        set: [
+          info:
+            fragment(
+              "jsonb_set(?, '{follower_count}', ?::varchar::jsonb, true)",
+              u.info,
+              s.count
+            )
+        ]
+      )
+      |> select([u], u)
+      |> Repo.update_all([])
+      |> case do
+        {1, [user]} -> set_cache(user)
+        _ -> {:error, user}
+      end
+    else
+      {:ok, maybe_fetch_follow_information(user)}
+    end
+  end
 
-    User
-    |> where(id: ^user.id)
-    |> join(:inner, [u], s in subquery(follower_count_query))
-    |> update([u, s],
-      set: [
-        info:
-          fragment(
-            "jsonb_set(?, '{follower_count}', ?::varchar::jsonb, true)",
-            u.info,
-            s.count
-          )
-      ]
-    )
-    |> select([u], u)
-    |> Repo.update_all([])
-    |> case do
-      {1, [user]} -> set_cache(user)
-      _ -> {:error, user}
+  @spec maybe_update_following_count(User.t()) :: User.t()
+  def maybe_update_following_count(%User{local: false} = user) do
+    if Pleroma.Config.get([:instance, :external_user_synchronization]) do
+      maybe_fetch_follow_information(user)
+    else
+      user
     end
   end
 
+  def maybe_update_following_count(user), do: user
+
   def remove_duplicated_following(%User{following: following} = user) do
     uniq_following = Enum.uniq(following)
 
@@ -820,6 +953,13 @@ defmodule Pleroma.User do
         blocker
       end
 
+    # clear any requested follows as well
+    blocked =
+      case CommonAPI.reject_follow_request(blocked, blocker) do
+        {:ok, %User{} = updated_blocked} -> updated_blocked
+        nil -> blocked
+      end
+
     blocker =
       if subscribed_to?(blocked, blocker) do
         {:ok, blocker} = unsubscribe(blocked, blocker)
@@ -871,14 +1011,26 @@ defmodule Pleroma.User do
   def muted_notifications?(user, %{ap_id: ap_id}),
     do: Enum.member?(user.info.muted_notifications, ap_id)
 
-  def blocks?(%User{info: info} = _user, %{ap_id: ap_id}) do
-    blocks = info.blocks
-    domain_blocks = info.domain_blocks
-    %{host: host} = URI.parse(ap_id)
+  def blocks?(%User{} = user, %User{} = target) do
+    blocks_ap_id?(user, target) || blocks_domain?(user, target)
+  end
+
+  def blocks?(nil, _), do: false
 
-    Enum.member?(blocks, ap_id) || Enum.any?(domain_blocks, &(&1 == host))
+  def blocks_ap_id?(%User{} = user, %User{} = target) do
+    Enum.member?(user.info.blocks, target.ap_id)
   end
 
+  def blocks_ap_id?(_, _), do: false
+
+  def blocks_domain?(%User{} = user, %User{} = target) do
+    domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
+    %{host: host} = URI.parse(target.ap_id)
+    Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, host)
+  end
+
+  def blocks_domain?(_, _), do: false
+
   def subscribed_to?(user, %{ap_id: ap_id}) do
     with %User{} = target <- get_cached_by_ap_id(ap_id) do
       Enum.member?(target.info.subscribers, user.ap_id)
@@ -928,7 +1080,7 @@ defmodule Pleroma.User do
   end
 
   def deactivate_async(user, status \\ true) do
-    PleromaJobQueue.enqueue(:background, __MODULE__, [:deactivate_async, user, status])
+    BackgroundWorker.enqueue("deactivate_user", %{"user_id" => user.id, "status" => status})
   end
 
   def deactivate(%User{} = user, status \\ true) do
@@ -956,9 +1108,9 @@ defmodule Pleroma.User do
     |> update_and_set_cache()
   end
 
-  @spec delete(User.t()) :: :ok
-  def delete(%User{} = user),
-    do: PleromaJobQueue.enqueue(:background, __MODULE__, [:delete, user])
+  def delete(%User{} = user) do
+    BackgroundWorker.enqueue("delete_user", %{"user_id" => user.id})
+  end
 
   @spec perform(atom(), User.t()) :: {:ok, User.t()}
   def perform(:delete, %User{} = user) do
@@ -1065,25 +1217,24 @@ defmodule Pleroma.User do
     Repo.all(query)
   end
 
-  def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers),
-    do:
-      PleromaJobQueue.enqueue(:background, __MODULE__, [
-        :blocks_import,
-        blocker,
-        blocked_identifiers
-      ])
+  def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do
+    BackgroundWorker.enqueue("blocks_import", %{
+      "blocker_id" => blocker.id,
+      "blocked_identifiers" => blocked_identifiers
+    })
+  end
 
-  def follow_import(%User{} = follower, followed_identifiers) when is_list(followed_identifiers),
-    do:
-      PleromaJobQueue.enqueue(:background, __MODULE__, [
-        :follow_import,
-        follower,
-        followed_identifiers
-      ])
+  def follow_import(%User{} = follower, followed_identifiers)
+      when is_list(followed_identifiers) do
+    BackgroundWorker.enqueue("follow_import", %{
+      "follower_id" => follower.id,
+      "followed_identifiers" => followed_identifiers
+    })
+  end
 
   def delete_user_activities(%User{ap_id: ap_id} = user) do
     ap_id
-    |> Activity.query_by_actor()
+    |> Activity.Queries.by_actor()
     |> RepoStreamer.chunk_stream(50)
     |> Stream.each(fn activities ->
       Enum.each(activities, &delete_activity(&1))
@@ -1347,6 +1498,80 @@ defmodule Pleroma.User do
     target.ap_id not in user.info.muted_reblogs
   end
 
+  @doc """
+  The function returns a query to get users with no activity for given interval of days.
+  Inactive users are those who didn't read any notification, or had any activity where
+  the user is the activity's actor, during `inactivity_threshold` days.
+  Deactivated users will not appear in this list.
+
+  ## Examples
+
+      iex> Pleroma.User.list_inactive_users()
+      %Ecto.Query{}
+  """
+  @spec list_inactive_users_query(integer()) :: Ecto.Query.t()
+  def list_inactive_users_query(inactivity_threshold \\ 7) do
+    negative_inactivity_threshold = -inactivity_threshold
+    now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
+    # Subqueries are not supported in `where` clauses, join gets too complicated.
+    has_read_notifications =
+      from(n in Pleroma.Notification,
+        where: n.seen == true,
+        group_by: n.id,
+        having: max(n.updated_at) > datetime_add(^now, ^negative_inactivity_threshold, "day"),
+        select: n.user_id
+      )
+      |> Pleroma.Repo.all()
+
+    from(u in Pleroma.User,
+      left_join: a in Pleroma.Activity,
+      on: u.ap_id == a.actor,
+      where: not is_nil(u.nickname),
+      where: fragment("not (?->'deactivated' @> 'true')", u.info),
+      where: u.id not in ^has_read_notifications,
+      group_by: u.id,
+      having:
+        max(a.inserted_at) < datetime_add(^now, ^negative_inactivity_threshold, "day") or
+          is_nil(max(a.inserted_at))
+    )
+  end
+
+  @doc """
+  Enable or disable email notifications for user
+
+  ## Examples
+
+      iex> Pleroma.User.switch_email_notifications(Pleroma.User{info: %{email_notifications: %{"digest" => false}}}, "digest", true)
+      Pleroma.User{info: %{email_notifications: %{"digest" => true}}}
+
+      iex> Pleroma.User.switch_email_notifications(Pleroma.User{info: %{email_notifications: %{"digest" => true}}}, "digest", false)
+      Pleroma.User{info: %{email_notifications: %{"digest" => false}}}
+  """
+  @spec switch_email_notifications(t(), String.t(), boolean()) ::
+          {:ok, t()} | {:error, Ecto.Changeset.t()}
+  def switch_email_notifications(user, type, status) do
+    info = Pleroma.User.Info.update_email_notifications(user.info, %{type => status})
+
+    change(user)
+    |> put_embed(:info, info)
+    |> update_and_set_cache()
+  end
+
+  @doc """
+  Set `last_digest_emailed_at` value for the user to current time
+  """
+  @spec touch_last_digest_emailed_at(t()) :: t()
+  def touch_last_digest_emailed_at(user) do
+    now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
+
+    {:ok, updated_user} =
+      user
+      |> change(%{last_digest_emailed_at: now})
+      |> update_and_set_cache()
+
+    updated_user
+  end
+
   @spec toggle_confirmation(User.t()) :: {:ok, User.t()} | {:error, Changeset.t()}
   def toggle_confirmation(%User{} = user) do
     need_confirmation? = !user.info.confirmation_pending
@@ -1414,4 +1639,13 @@ defmodule Pleroma.User do
   def is_internal_user?(%User{nickname: nil}), do: true
   def is_internal_user?(%User{local: true, nickname: "internal." <> _}), do: true
   def is_internal_user?(_), do: false
+
+  def change_email(user, email) do
+    user
+    |> cast(%{email: email}, [:email])
+    |> validate_required([:email])
+    |> unique_constraint(:email)
+    |> validate_format(:email, @email_regex)
+    |> update_and_set_cache()
+  end
 end