Revert "Merge branch 'streamer-refactoring' into 'develop'"
[akkoma] / lib / pleroma / notification.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Notification do
6 use Ecto.Schema
7
8 alias Pleroma.Activity
9 alias Pleroma.Notification
10 alias Pleroma.Object
11 alias Pleroma.Pagination
12 alias Pleroma.Repo
13 alias Pleroma.User
14 alias Pleroma.Web.CommonAPI.Utils
15 alias Pleroma.Web.Push
16 alias Pleroma.Web.Streamer
17
18 import Ecto.Query
19 import Ecto.Changeset
20
21 @type t :: %__MODULE__{}
22
23 schema "notifications" do
24 field(:seen, :boolean, default: false)
25 belongs_to(:user, User, type: Pleroma.FlakeId)
26 belongs_to(:activity, Activity, type: Pleroma.FlakeId)
27
28 timestamps()
29 end
30
31 def changeset(%Notification{} = notification, attrs) do
32 notification
33 |> cast(attrs, [:seen])
34 end
35
36 def for_user_query(user, opts \\ []) do
37 query =
38 Notification
39 |> where(user_id: ^user.id)
40 |> where(
41 [n, a],
42 fragment(
43 "? not in (SELECT ap_id FROM users WHERE info->'deactivated' @> 'true')",
44 a.actor
45 )
46 )
47 |> join(:inner, [n], activity in assoc(n, :activity))
48 |> join(:left, [n, a], object in Object,
49 on:
50 fragment(
51 "(?->>'id') = COALESCE((? -> 'object'::text) ->> 'id'::text)",
52 object.data,
53 a.data
54 )
55 )
56 |> preload([n, a, o], activity: {a, object: o})
57
58 if opts[:with_muted] do
59 query
60 else
61 where(query, [n, a], a.actor not in ^user.info.muted_notifications)
62 |> where([n, a], a.actor not in ^user.info.blocks)
63 |> where(
64 [n, a],
65 fragment("substring(? from '.*://([^/]*)')", a.actor) not in ^user.info.domain_blocks
66 )
67 |> join(:left, [n, a], tm in Pleroma.ThreadMute,
68 on: tm.user_id == ^user.id and tm.context == fragment("?->>'context'", a.data)
69 )
70 |> where([n, a, o, tm], is_nil(tm.user_id))
71 end
72 end
73
74 def for_user(user, opts \\ %{}) do
75 user
76 |> for_user_query(opts)
77 |> Pagination.fetch_paginated(opts)
78 end
79
80 @doc """
81 Returns notifications for user received since given date.
82
83 ## Examples
84
85 iex> Pleroma.Notification.for_user_since(%Pleroma.User{}, ~N[2019-04-13 11:22:33])
86 [%Pleroma.Notification{}, %Pleroma.Notification{}]
87
88 iex> Pleroma.Notification.for_user_since(%Pleroma.User{}, ~N[2019-04-15 11:22:33])
89 []
90 """
91 @spec for_user_since(Pleroma.User.t(), NaiveDateTime.t()) :: [t()]
92 def for_user_since(user, date) do
93 from(n in for_user_query(user),
94 where: n.updated_at > ^date
95 )
96 |> Repo.all()
97 end
98
99 def set_read_up_to(%{id: user_id} = _user, id) do
100 query =
101 from(
102 n in Notification,
103 where: n.user_id == ^user_id,
104 where: n.id <= ^id,
105 where: n.seen == false,
106 update: [
107 set: [
108 seen: true,
109 updated_at: ^NaiveDateTime.utc_now()
110 ]
111 ],
112 # Ideally we would preload object and activities here
113 # but Ecto does not support preloads in update_all
114 select: n.id
115 )
116
117 {_, notification_ids} = Repo.update_all(query, [])
118
119 Notification
120 |> where([n], n.id in ^notification_ids)
121 |> join(:inner, [n], activity in assoc(n, :activity))
122 |> join(:left, [n, a], object in Object,
123 on:
124 fragment(
125 "(?->>'id') = COALESCE((? -> 'object'::text) ->> 'id'::text)",
126 object.data,
127 a.data
128 )
129 )
130 |> preload([n, a, o], activity: {a, object: o})
131 |> Repo.all()
132 end
133
134 def read_one(%User{} = user, notification_id) do
135 with {:ok, %Notification{} = notification} <- get(user, notification_id) do
136 notification
137 |> changeset(%{seen: true})
138 |> Repo.update()
139 end
140 end
141
142 def get(%{id: user_id} = _user, id) do
143 query =
144 from(
145 n in Notification,
146 where: n.id == ^id,
147 join: activity in assoc(n, :activity),
148 preload: [activity: activity]
149 )
150
151 notification = Repo.one(query)
152
153 case notification do
154 %{user_id: ^user_id} ->
155 {:ok, notification}
156
157 _ ->
158 {:error, "Cannot get notification"}
159 end
160 end
161
162 def clear(user) do
163 from(n in Notification, where: n.user_id == ^user.id)
164 |> Repo.delete_all()
165 end
166
167 def destroy_multiple(%{id: user_id} = _user, ids) do
168 from(n in Notification,
169 where: n.id in ^ids,
170 where: n.user_id == ^user_id
171 )
172 |> Repo.delete_all()
173 end
174
175 def dismiss(%{id: user_id} = _user, id) do
176 notification = Repo.get(Notification, id)
177
178 case notification do
179 %{user_id: ^user_id} ->
180 Repo.delete(notification)
181
182 _ ->
183 {:error, "Cannot dismiss notification"}
184 end
185 end
186
187 def create_notifications(%Activity{data: %{"to" => _, "type" => "Create"}} = activity) do
188 object = Object.normalize(activity)
189
190 unless object && object.data["type"] == "Answer" do
191 users = get_notified_from_activity(activity)
192 notifications = Enum.map(users, fn user -> create_notification(activity, user) end)
193 {:ok, notifications}
194 else
195 {:ok, []}
196 end
197 end
198
199 def create_notifications(%Activity{data: %{"to" => _, "type" => type}} = activity)
200 when type in ["Like", "Announce", "Follow"] do
201 users = get_notified_from_activity(activity)
202 notifications = Enum.map(users, fn user -> create_notification(activity, user) end)
203 {:ok, notifications}
204 end
205
206 def create_notifications(_), do: {:ok, []}
207
208 # TODO move to sql, too.
209 def create_notification(%Activity{} = activity, %User{} = user) do
210 unless skip?(activity, user) do
211 notification = %Notification{user_id: user.id, activity: activity}
212 {:ok, notification} = Repo.insert(notification)
213 Streamer.stream("user", notification)
214 Streamer.stream("user:notification", notification)
215 Push.send(notification)
216 notification
217 end
218 end
219
220 def get_notified_from_activity(activity, local_only \\ true)
221
222 def get_notified_from_activity(
223 %Activity{data: %{"to" => _, "type" => type} = _data} = activity,
224 local_only
225 )
226 when type in ["Create", "Like", "Announce", "Follow"] do
227 recipients =
228 []
229 |> Utils.maybe_notify_to_recipients(activity)
230 |> Utils.maybe_notify_mentioned_recipients(activity)
231 |> Utils.maybe_notify_subscribers(activity)
232 |> Enum.uniq()
233
234 User.get_users_from_set(recipients, local_only)
235 end
236
237 def get_notified_from_activity(_, _local_only), do: []
238
239 @spec skip?(Activity.t(), User.t()) :: boolean()
240 def skip?(activity, user) do
241 [
242 :self,
243 :followers,
244 :follows,
245 :non_followers,
246 :non_follows,
247 :recently_followed
248 ]
249 |> Enum.any?(&skip?(&1, activity, user))
250 end
251
252 @spec skip?(atom(), Activity.t(), User.t()) :: boolean()
253 def skip?(:self, activity, user) do
254 activity.data["actor"] == user.ap_id
255 end
256
257 def skip?(
258 :followers,
259 activity,
260 %{info: %{notification_settings: %{"followers" => false}}} = user
261 ) do
262 actor = activity.data["actor"]
263 follower = User.get_cached_by_ap_id(actor)
264 User.following?(follower, user)
265 end
266
267 def skip?(
268 :non_followers,
269 activity,
270 %{info: %{notification_settings: %{"non_followers" => false}}} = user
271 ) do
272 actor = activity.data["actor"]
273 follower = User.get_cached_by_ap_id(actor)
274 !User.following?(follower, user)
275 end
276
277 def skip?(:follows, activity, %{info: %{notification_settings: %{"follows" => false}}} = user) do
278 actor = activity.data["actor"]
279 followed = User.get_cached_by_ap_id(actor)
280 User.following?(user, followed)
281 end
282
283 def skip?(
284 :non_follows,
285 activity,
286 %{info: %{notification_settings: %{"non_follows" => false}}} = user
287 ) do
288 actor = activity.data["actor"]
289 followed = User.get_cached_by_ap_id(actor)
290 !User.following?(user, followed)
291 end
292
293 def skip?(:recently_followed, %{data: %{"type" => "Follow"}} = activity, user) do
294 actor = activity.data["actor"]
295
296 Notification.for_user(user)
297 |> Enum.any?(fn
298 %{activity: %{data: %{"type" => "Follow", "actor" => ^actor}}} -> true
299 _ -> false
300 end)
301 end
302
303 def skip?(_, _, _), do: false
304 end