Merge branch 'develop' into issue/1276
[akkoma] / lib / pleroma / marker.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Marker do
6 use Ecto.Schema
7
8 import Ecto.Changeset
9 import Ecto.Query
10
11 alias Ecto.Multi
12 alias Pleroma.Notification
13 alias Pleroma.Repo
14 alias Pleroma.User
15 alias __MODULE__
16
17 @timelines ["notifications"]
18 @type t :: %__MODULE__{}
19
20 schema "markers" do
21 field(:last_read_id, :string, default: "")
22 field(:timeline, :string, default: "")
23 field(:lock_version, :integer, default: 0)
24 field(:unread_count, :integer, default: 0)
25
26 belongs_to(:user, User, type: FlakeId.Ecto.CompatType)
27 timestamps()
28 end
29
30 @doc """
31 Gets markers by user and timeline.
32
33 opts:
34 `recount_unread` - run force recount unread notifications for `true` value
35 """
36 @spec get_markers(User.t(), list(String), map()) :: list(t())
37 def get_markers(user, timelines \\ [], opts \\ %{}) do
38 user
39 |> get_query(timelines)
40 |> recount_unread_notifications(opts[:recount_unread])
41 |> Repo.all()
42 end
43
44 def upsert(%User{} = user, attrs) do
45 attrs
46 |> Map.take(@timelines)
47 |> Enum.reduce(Multi.new(), fn {timeline, timeline_attrs}, multi ->
48 marker =
49 user
50 |> get_marker(timeline)
51 |> changeset(timeline_attrs)
52
53 Multi.insert(multi, timeline, marker,
54 returning: true,
55 on_conflict: {:replace, [:last_read_id, :unread_count]},
56 conflict_target: [:user_id, :timeline]
57 )
58 end)
59 |> Repo.transaction()
60 end
61
62 @spec multi_set_unread_count(Multi.t(), User.t(), String.t()) :: Multi.t()
63 def multi_set_unread_count(multi, %User{} = user, "notifications") do
64 multi
65 |> Multi.run(:counters, fn _repo, _changes ->
66 {:ok,
67 %{
68 unread_count: Repo.aggregate(Notification.unread_count_query(user), :count, :id),
69 last_read_id: Repo.one(Notification.last_read_query(user))
70 }}
71 end)
72 |> Multi.insert(
73 :marker,
74 fn %{counters: attrs} ->
75 %Marker{timeline: "notifications", user_id: user.id}
76 |> struct(attrs)
77 |> Ecto.Changeset.change()
78 end,
79 returning: true,
80 on_conflict: {:replace, [:last_read_id, :unread_count]},
81 conflict_target: [:user_id, :timeline]
82 )
83 end
84
85 def multi_set_unread_count(multi, _, _), do: multi
86
87 defp get_marker(user, timeline) do
88 case Repo.find_resource(get_query(user, timeline)) do
89 {:ok, marker} -> %__MODULE__{marker | user: user}
90 _ -> %__MODULE__{timeline: timeline, user_id: user.id}
91 end
92 end
93
94 @doc false
95 defp changeset(marker, attrs) do
96 marker
97 |> cast(attrs, [:last_read_id, :unread_count])
98 |> validate_required([:user_id, :timeline, :last_read_id])
99 |> validate_inclusion(:timeline, @timelines)
100 end
101
102 defp by_timeline(query, timeline) do
103 from(m in query, where: m.timeline in ^List.wrap(timeline))
104 end
105
106 defp by_user_id(query, id), do: from(m in query, where: m.user_id == ^id)
107
108 defp get_query(user, timelines) do
109 __MODULE__
110 |> by_user_id(user.id)
111 |> by_timeline(timelines)
112 end
113
114 defp recount_unread_notifications(query, true) do
115 from(
116 q in query,
117 left_join: n in "notifications",
118 on: n.user_id == q.user_id and n.seen == false,
119 group_by: [:id],
120 select_merge: %{
121 unread_count: fragment("count(?)", n.id)
122 }
123 )
124 end
125
126 defp recount_unread_notifications(query, _), do: query
127 end