[#1149] Added more oban workers. Refactoring.
[akkoma] / lib / pleroma / user.ex
index fd1c0a5447faed14ec6c9e7057e8c784ea816844..bc2102ca72bb43fdbd03110c8670d88a2a4245ed 100644 (file)
@@ -26,6 +26,7 @@ defmodule Pleroma.User do
   alias Pleroma.Web.OStatus
   alias Pleroma.Web.RelMe
   alias Pleroma.Web.Websub
+  alias Pleroma.Workers.BackgroundWorker
 
   require Logger
 
@@ -39,6 +40,8 @@ defmodule Pleroma.User do
   @strict_local_nickname_regex ~r/^[a-zA-Z\d]+$/
   @extended_local_nickname_regex ~r/^[a-zA-Z\d_-]+$/
 
+  defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+
   schema "users" do
     field(:bio, :string)
     field(:email, :string)
@@ -57,6 +60,7 @@ defmodule Pleroma.User do
     field(:search_type, :integer, virtual: true)
     field(:tags, {:array, :string}, default: [])
     field(:last_refreshed_at, :naive_datetime_usec)
+    field(:last_digest_emailed_at, :naive_datetime)
     has_many(:notifications, Notification)
     has_many(:registrations, Registration)
     embeds_one(:info, User.Info)
@@ -228,6 +232,7 @@ defmodule Pleroma.User do
     |> put_password_hash
   end
 
+  @spec reset_password(User.t(), map) :: {:ok, User.t()} | {:error, Ecto.Changeset.t()}
   def reset_password(%User{id: user_id} = user, data) do
     multi =
       Multi.new()
@@ -332,6 +337,7 @@ defmodule Pleroma.User do
 
   def needs_update?(_), do: true
 
+  @spec maybe_direct_follow(User.t(), User.t()) :: {:ok, User.t()} | {:error, String.t()}
   def maybe_direct_follow(%User{} = follower, %User{local: true, info: %{locked: true}}) do
     {:ok, follower}
   end
@@ -576,8 +582,11 @@ defmodule Pleroma.User do
   end
 
   @doc "Fetch some posts when the user has just been federated with"
-  def fetch_initial_posts(user),
-    do: PleromaJobQueue.enqueue(:background, __MODULE__, [:fetch_initial_posts, user])
+  def fetch_initial_posts(user) do
+    %{"op" => "fetch_initial_posts", "user_id" => user.id}
+    |> BackgroundWorker.new(worker_args(:background))
+    |> Repo.insert()
+  end
 
   @spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t()
   def get_followers_query(%User{} = user, nil) do
@@ -741,7 +750,7 @@ defmodule Pleroma.User do
   end
 
   def update_follower_count(%User{} = user) do
-    unless user.local == false and Pleroma.Config.get([:instance, :external_user_synchronization]) do
+    if user.local or !Pleroma.Config.get([:instance, :external_user_synchronization]) do
       follower_count_query =
         User.Query.build(%{followers: user, deactivated: false})
         |> select([u], %{count: count(u.id)})
@@ -998,7 +1007,9 @@ defmodule Pleroma.User do
   end
 
   def deactivate_async(user, status \\ true) do
-    PleromaJobQueue.enqueue(:background, __MODULE__, [:deactivate_async, user, status])
+    %{"op" => "deactivate_user", "user_id" => user.id, "status" => status}
+    |> BackgroundWorker.new(worker_args(:background))
+    |> Repo.insert()
   end
 
   def deactivate(%User{} = user, status \\ true) do
@@ -1026,9 +1037,11 @@ defmodule Pleroma.User do
     |> update_and_set_cache()
   end
 
-  @spec delete(User.t()) :: :ok
-  def delete(%User{} = user),
-    do: PleromaJobQueue.enqueue(:background, __MODULE__, [:delete, user])
+  def delete(%User{} = user) do
+    %{"op" => "delete_user", "user_id" => user.id}
+    |> BackgroundWorker.new(worker_args(:background))
+    |> Repo.insert()
+  end
 
   @spec perform(atom(), User.t()) :: {:ok, User.t()}
   def perform(:delete, %User{} = user) do
@@ -1135,21 +1148,26 @@ defmodule Pleroma.User do
     Repo.all(query)
   end
 
-  def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers),
-    do:
-      PleromaJobQueue.enqueue(:background, __MODULE__, [
-        :blocks_import,
-        blocker,
-        blocked_identifiers
-      ])
+  def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do
+    %{
+      "op" => "blocks_import",
+      "blocker_id" => blocker.id,
+      "blocked_identifiers" => blocked_identifiers
+    }
+    |> BackgroundWorker.new(worker_args(:background))
+    |> Repo.insert()
+  end
 
-  def follow_import(%User{} = follower, followed_identifiers) when is_list(followed_identifiers),
-    do:
-      PleromaJobQueue.enqueue(:background, __MODULE__, [
-        :follow_import,
-        follower,
-        followed_identifiers
-      ])
+  def follow_import(%User{} = follower, followed_identifiers)
+      when is_list(followed_identifiers) do
+    %{
+      "op" => "follow_import",
+      "follower_id" => follower.id,
+      "followed_identifiers" => followed_identifiers
+    }
+    |> BackgroundWorker.new(worker_args(:background))
+    |> Repo.insert()
+  end
 
   def delete_user_activities(%User{ap_id: ap_id} = user) do
     ap_id
@@ -1417,6 +1435,80 @@ defmodule Pleroma.User do
     target.ap_id not in user.info.muted_reblogs
   end
 
+  @doc """
+  The function returns a query to get users with no activity for given interval of days.
+  Inactive users are those who didn't read any notification, or had any activity where
+  the user is the activity's actor, during `inactivity_threshold` days.
+  Deactivated users will not appear in this list.
+
+  ## Examples
+
+      iex> Pleroma.User.list_inactive_users()
+      %Ecto.Query{}
+  """
+  @spec list_inactive_users_query(integer()) :: Ecto.Query.t()
+  def list_inactive_users_query(inactivity_threshold \\ 7) do
+    negative_inactivity_threshold = -inactivity_threshold
+    now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
+    # Subqueries are not supported in `where` clauses, join gets too complicated.
+    has_read_notifications =
+      from(n in Pleroma.Notification,
+        where: n.seen == true,
+        group_by: n.id,
+        having: max(n.updated_at) > datetime_add(^now, ^negative_inactivity_threshold, "day"),
+        select: n.user_id
+      )
+      |> Pleroma.Repo.all()
+
+    from(u in Pleroma.User,
+      left_join: a in Pleroma.Activity,
+      on: u.ap_id == a.actor,
+      where: not is_nil(u.nickname),
+      where: fragment("not (?->'deactivated' @> 'true')", u.info),
+      where: u.id not in ^has_read_notifications,
+      group_by: u.id,
+      having:
+        max(a.inserted_at) < datetime_add(^now, ^negative_inactivity_threshold, "day") or
+          is_nil(max(a.inserted_at))
+    )
+  end
+
+  @doc """
+  Enable or disable email notifications for user
+
+  ## Examples
+
+      iex> Pleroma.User.switch_email_notifications(Pleroma.User{info: %{email_notifications: %{"digest" => false}}}, "digest", true)
+      Pleroma.User{info: %{email_notifications: %{"digest" => true}}}
+
+      iex> Pleroma.User.switch_email_notifications(Pleroma.User{info: %{email_notifications: %{"digest" => true}}}, "digest", false)
+      Pleroma.User{info: %{email_notifications: %{"digest" => false}}}
+  """
+  @spec switch_email_notifications(t(), String.t(), boolean()) ::
+          {:ok, t()} | {:error, Ecto.Changeset.t()}
+  def switch_email_notifications(user, type, status) do
+    info = Pleroma.User.Info.update_email_notifications(user.info, %{type => status})
+
+    change(user)
+    |> put_embed(:info, info)
+    |> update_and_set_cache()
+  end
+
+  @doc """
+  Set `last_digest_emailed_at` value for the user to current time
+  """
+  @spec touch_last_digest_emailed_at(t()) :: t()
+  def touch_last_digest_emailed_at(user) do
+    now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
+
+    {:ok, updated_user} =
+      user
+      |> change(%{last_digest_emailed_at: now})
+      |> update_and_set_cache()
+
+    updated_user
+  end
+
   @spec toggle_confirmation(User.t()) :: {:ok, User.t()} | {:error, Changeset.t()}
   def toggle_confirmation(%User{} = user) do
     need_confirmation? = !user.info.confirmation_pending