Pipeline: Add a side effects step after the transaction finishes
[akkoma] / lib / pleroma / notification.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Notification do
6 use Ecto.Schema
7
8 alias Ecto.Multi
9 alias Pleroma.Activity
10 alias Pleroma.FollowingRelationship
11 alias Pleroma.Marker
12 alias Pleroma.Notification
13 alias Pleroma.Object
14 alias Pleroma.Pagination
15 alias Pleroma.Repo
16 alias Pleroma.ThreadMute
17 alias Pleroma.User
18 alias Pleroma.Web.CommonAPI.Utils
19 alias Pleroma.Web.Push
20 alias Pleroma.Web.Streamer
21
22 import Ecto.Query
23 import Ecto.Changeset
24
25 require Logger
26
27 @type t :: %__MODULE__{}
28
29 @include_muted_option :with_muted
30
31 schema "notifications" do
32 field(:seen, :boolean, default: false)
33 field(:type, :string)
34 belongs_to(:user, User, type: FlakeId.Ecto.CompatType)
35 belongs_to(:activity, Activity, type: FlakeId.Ecto.CompatType)
36
37 timestamps()
38 end
39
40 def fill_in_notification_types do
41 query =
42 from(n in __MODULE__,
43 where: is_nil(n.type),
44 preload: :activity
45 )
46
47 query
48 |> Repo.all()
49 |> Enum.each(fn notification ->
50 type =
51 notification.activity
52 |> type_from_activity(no_cachex: true)
53
54 notification
55 |> changeset(%{type: type})
56 |> Repo.update()
57 end)
58 end
59
60 def update_notification_type(user, activity) do
61 with %__MODULE__{} = notification <-
62 Repo.get_by(__MODULE__, user_id: user.id, activity_id: activity.id) do
63 type =
64 activity
65 |> type_from_activity()
66
67 notification
68 |> changeset(%{type: type})
69 |> Repo.update()
70 end
71 end
72
73 @spec unread_notifications_count(User.t()) :: integer()
74 def unread_notifications_count(%User{id: user_id}) do
75 from(q in __MODULE__,
76 where: q.user_id == ^user_id and q.seen == false
77 )
78 |> Repo.aggregate(:count, :id)
79 end
80
81 def changeset(%Notification{} = notification, attrs) do
82 notification
83 |> cast(attrs, [:seen, :type])
84 end
85
86 @spec last_read_query(User.t()) :: Ecto.Queryable.t()
87 def last_read_query(user) do
88 from(q in Pleroma.Notification,
89 where: q.user_id == ^user.id,
90 where: q.seen == true,
91 select: type(q.id, :string),
92 limit: 1,
93 order_by: [desc: :id]
94 )
95 end
96
97 defp for_user_query_ap_id_opts(user, opts) do
98 ap_id_relationships =
99 [:block] ++
100 if opts[@include_muted_option], do: [], else: [:notification_mute]
101
102 preloaded_ap_ids = User.outgoing_relationships_ap_ids(user, ap_id_relationships)
103
104 exclude_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
105
106 exclude_notification_muted_opts =
107 Map.merge(%{notification_muted_users_ap_ids: preloaded_ap_ids[:notification_mute]}, opts)
108
109 {exclude_blocked_opts, exclude_notification_muted_opts}
110 end
111
112 def for_user_query(user, opts \\ %{}) do
113 {exclude_blocked_opts, exclude_notification_muted_opts} =
114 for_user_query_ap_id_opts(user, opts)
115
116 Notification
117 |> where(user_id: ^user.id)
118 |> where(
119 [n, a],
120 fragment(
121 "? not in (SELECT ap_id FROM users WHERE deactivated = 'true')",
122 a.actor
123 )
124 )
125 |> join(:inner, [n], activity in assoc(n, :activity))
126 |> join(:left, [n, a], object in Object,
127 on:
128 fragment(
129 "(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
130 object.data,
131 a.data,
132 a.data
133 )
134 )
135 |> preload([n, a, o], activity: {a, object: o})
136 |> exclude_notification_muted(user, exclude_notification_muted_opts)
137 |> exclude_blocked(user, exclude_blocked_opts)
138 |> exclude_visibility(opts)
139 end
140
141 # Excludes blocked users and non-followed domain-blocked users
142 defp exclude_blocked(query, user, opts) do
143 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
144
145 query
146 |> where([n, a], a.actor not in ^blocked_ap_ids)
147 |> FollowingRelationship.keep_following_or_not_domain_blocked(user)
148 end
149
150 defp exclude_notification_muted(query, _, %{@include_muted_option => true}) do
151 query
152 end
153
154 defp exclude_notification_muted(query, user, opts) do
155 notification_muted_ap_ids =
156 opts[:notification_muted_users_ap_ids] || User.notification_muted_users_ap_ids(user)
157
158 query
159 |> where([n, a], a.actor not in ^notification_muted_ap_ids)
160 |> join(:left, [n, a], tm in ThreadMute,
161 on: tm.user_id == ^user.id and tm.context == fragment("?->>'context'", a.data)
162 )
163 |> where([n, a, o, tm], is_nil(tm.user_id))
164 end
165
166 @valid_visibilities ~w[direct unlisted public private]
167
168 defp exclude_visibility(query, %{exclude_visibilities: visibility})
169 when is_list(visibility) do
170 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
171 query
172 |> join(:left, [n, a], mutated_activity in Pleroma.Activity,
173 on:
174 fragment("?->>'context'", a.data) ==
175 fragment("?->>'context'", mutated_activity.data) and
176 fragment("(?->>'type' = 'Like' or ?->>'type' = 'Announce')", a.data, a.data) and
177 fragment("?->>'type'", mutated_activity.data) == "Create",
178 as: :mutated_activity
179 )
180 |> where(
181 [n, a, mutated_activity: mutated_activity],
182 not fragment(
183 """
184 CASE WHEN (?->>'type') = 'Like' or (?->>'type') = 'Announce'
185 THEN (activity_visibility(?, ?, ?) = ANY (?))
186 ELSE (activity_visibility(?, ?, ?) = ANY (?)) END
187 """,
188 a.data,
189 a.data,
190 mutated_activity.actor,
191 mutated_activity.recipients,
192 mutated_activity.data,
193 ^visibility,
194 a.actor,
195 a.recipients,
196 a.data,
197 ^visibility
198 )
199 )
200 else
201 Logger.error("Could not exclude visibility to #{visibility}")
202 query
203 end
204 end
205
206 defp exclude_visibility(query, %{exclude_visibilities: visibility})
207 when visibility in @valid_visibilities do
208 exclude_visibility(query, [visibility])
209 end
210
211 defp exclude_visibility(query, %{exclude_visibilities: visibility})
212 when visibility not in @valid_visibilities do
213 Logger.error("Could not exclude visibility to #{visibility}")
214 query
215 end
216
217 defp exclude_visibility(query, _visibility), do: query
218
219 def for_user(user, opts \\ %{}) do
220 user
221 |> for_user_query(opts)
222 |> Pagination.fetch_paginated(opts)
223 end
224
225 @doc """
226 Returns notifications for user received since given date.
227
228 ## Examples
229
230 iex> Pleroma.Notification.for_user_since(%Pleroma.User{}, ~N[2019-04-13 11:22:33])
231 [%Pleroma.Notification{}, %Pleroma.Notification{}]
232
233 iex> Pleroma.Notification.for_user_since(%Pleroma.User{}, ~N[2019-04-15 11:22:33])
234 []
235 """
236 @spec for_user_since(Pleroma.User.t(), NaiveDateTime.t()) :: [t()]
237 def for_user_since(user, date) do
238 from(n in for_user_query(user),
239 where: n.updated_at > ^date
240 )
241 |> Repo.all()
242 end
243
244 def set_read_up_to(%{id: user_id} = user, id) do
245 query =
246 from(
247 n in Notification,
248 where: n.user_id == ^user_id,
249 where: n.id <= ^id,
250 where: n.seen == false,
251 # Ideally we would preload object and activities here
252 # but Ecto does not support preloads in update_all
253 select: n.id
254 )
255
256 {:ok, %{ids: {_, notification_ids}}} =
257 Multi.new()
258 |> Multi.update_all(:ids, query, set: [seen: true, updated_at: NaiveDateTime.utc_now()])
259 |> Marker.multi_set_last_read_id(user, "notifications")
260 |> Repo.transaction()
261
262 for_user_query(user)
263 |> where([n], n.id in ^notification_ids)
264 |> Repo.all()
265 end
266
267 @spec read_one(User.t(), String.t()) ::
268 {:ok, Notification.t()} | {:error, Ecto.Changeset.t()} | nil
269 def read_one(%User{} = user, notification_id) do
270 with {:ok, %Notification{} = notification} <- get(user, notification_id) do
271 Multi.new()
272 |> Multi.update(:update, changeset(notification, %{seen: true}))
273 |> Marker.multi_set_last_read_id(user, "notifications")
274 |> Repo.transaction()
275 |> case do
276 {:ok, %{update: notification}} -> {:ok, notification}
277 {:error, :update, changeset, _} -> {:error, changeset}
278 end
279 end
280 end
281
282 def get(%{id: user_id} = _user, id) do
283 query =
284 from(
285 n in Notification,
286 where: n.id == ^id,
287 join: activity in assoc(n, :activity),
288 preload: [activity: activity]
289 )
290
291 notification = Repo.one(query)
292
293 case notification do
294 %{user_id: ^user_id} ->
295 {:ok, notification}
296
297 _ ->
298 {:error, "Cannot get notification"}
299 end
300 end
301
302 def clear(user) do
303 from(n in Notification, where: n.user_id == ^user.id)
304 |> Repo.delete_all()
305 end
306
307 def destroy_multiple(%{id: user_id} = _user, ids) do
308 from(n in Notification,
309 where: n.id in ^ids,
310 where: n.user_id == ^user_id
311 )
312 |> Repo.delete_all()
313 end
314
315 def dismiss(%Pleroma.Activity{} = activity) do
316 Notification
317 |> where([n], n.activity_id == ^activity.id)
318 |> Repo.delete_all()
319 |> case do
320 {_, notifications} -> {:ok, notifications}
321 _ -> {:error, "Cannot dismiss notification"}
322 end
323 end
324
325 def dismiss(%{id: user_id} = _user, id) do
326 notification = Repo.get(Notification, id)
327
328 case notification do
329 %{user_id: ^user_id} ->
330 Repo.delete(notification)
331
332 _ ->
333 {:error, "Cannot dismiss notification"}
334 end
335 end
336
337 def create_notifications(activity, options \\ [])
338
339 def create_notifications(%Activity{data: %{"to" => _, "type" => "Create"}} = activity, options) do
340 object = Object.normalize(activity, false)
341
342 if object && object.data["type"] == "Answer" do
343 {:ok, []}
344 else
345 do_create_notifications(activity, options)
346 end
347 end
348
349 def create_notifications(%Activity{data: %{"type" => type}} = activity, options)
350 when type in ["Follow", "Like", "Announce", "Move", "EmojiReact"] do
351 do_create_notifications(activity, options)
352 end
353
354 def create_notifications(_, _), do: {:ok, []}
355
356 defp do_create_notifications(%Activity{} = activity, options) do
357 do_send = Keyword.get(options, :do_send, true)
358
359 {enabled_receivers, disabled_receivers} = get_notified_from_activity(activity)
360 potential_receivers = enabled_receivers ++ disabled_receivers
361
362 notifications =
363 Enum.map(potential_receivers, fn user ->
364 do_send = do_send && user in enabled_receivers
365 create_notification(activity, user, do_send)
366 end)
367
368 {:ok, notifications}
369 end
370
371 defp type_from_activity(%{data: %{"type" => type}} = activity, opts \\ []) do
372 case type do
373 "Follow" ->
374 accepted_function =
375 if Keyword.get(opts, :no_cachex, false) do
376 # A special function to make this usable in a migration.
377 fn activity ->
378 with %User{} = follower <- User.get_by_ap_id(activity.data["actor"]),
379 %User{} = followed <- User.get_by_ap_id(activity.data["object"]) do
380 Pleroma.FollowingRelationship.following?(follower, followed)
381 end
382 end
383 else
384 &Activity.follow_accepted?/1
385 end
386
387 if accepted_function.(activity) do
388 "follow"
389 else
390 "follow_request"
391 end
392
393 "Announce" ->
394 "reblog"
395
396 "Like" ->
397 "favourite"
398
399 "Move" ->
400 "move"
401
402 "EmojiReact" ->
403 "pleroma:emoji_reaction"
404
405 # Compatibility with old reactions
406 "EmojiReaction" ->
407 "pleroma:emoji_reaction"
408
409 "Create" ->
410 activity
411 |> type_from_activity_object()
412
413 t ->
414 raise "No notification type for activity type #{t}"
415 end
416 end
417
418 defp type_from_activity_object(%{data: %{"type" => "Create", "object" => %{}}}), do: "mention"
419
420 defp type_from_activity_object(%{data: %{"type" => "Create"}} = activity) do
421 object = Object.get_by_ap_id(activity.data["object"])
422
423 case object && object.data["type"] do
424 "ChatMessage" -> "pleroma:chat_mention"
425 _ -> "mention"
426 end
427 end
428
429 # TODO move to sql, too.
430 def create_notification(%Activity{} = activity, %User{} = user, do_send \\ true) do
431 unless skip?(activity, user) do
432 {:ok, %{notification: notification}} =
433 Multi.new()
434 |> Multi.insert(:notification, %Notification{
435 user_id: user.id,
436 activity: activity,
437 type: type_from_activity(activity)
438 })
439 |> Marker.multi_set_last_read_id(user, "notifications")
440 |> Repo.transaction()
441
442 if do_send do
443 Streamer.stream(["user", "user:notification"], notification)
444 Push.send(notification)
445 end
446
447 notification
448 end
449 end
450
451 @doc """
452 Returns a tuple with 2 elements:
453 {notification-enabled receivers, currently disabled receivers (blocking / [thread] muting)}
454
455 NOTE: might be called for FAKE Activities, see ActivityPub.Utils.get_notified_from_object/1
456 """
457 @spec get_notified_from_activity(Activity.t(), boolean()) :: {list(User.t()), list(User.t())}
458 def get_notified_from_activity(activity, local_only \\ true)
459
460 def get_notified_from_activity(%Activity{data: %{"type" => type}} = activity, local_only)
461 when type in ["Create", "Like", "Announce", "Follow", "Move", "EmojiReact"] do
462 potential_receiver_ap_ids = get_potential_receiver_ap_ids(activity)
463
464 potential_receivers =
465 User.get_users_from_set(potential_receiver_ap_ids, local_only: local_only)
466
467 notification_enabled_ap_ids =
468 potential_receiver_ap_ids
469 |> exclude_domain_blocker_ap_ids(activity, potential_receivers)
470 |> exclude_relationship_restricted_ap_ids(activity)
471 |> exclude_thread_muter_ap_ids(activity)
472
473 notification_enabled_users =
474 Enum.filter(potential_receivers, fn u -> u.ap_id in notification_enabled_ap_ids end)
475
476 {notification_enabled_users, potential_receivers -- notification_enabled_users}
477 end
478
479 def get_notified_from_activity(_, _local_only), do: {[], []}
480
481 # For some activities, only notify the author of the object
482 def get_potential_receiver_ap_ids(%{data: %{"type" => type, "object" => object_id}})
483 when type in ~w{Like Announce EmojiReact} do
484 case Object.get_cached_by_ap_id(object_id) do
485 %Object{data: %{"actor" => actor}} ->
486 [actor]
487
488 _ ->
489 []
490 end
491 end
492
493 def get_potential_receiver_ap_ids(activity) do
494 []
495 |> Utils.maybe_notify_to_recipients(activity)
496 |> Utils.maybe_notify_mentioned_recipients(activity)
497 |> Utils.maybe_notify_subscribers(activity)
498 |> Utils.maybe_notify_followers(activity)
499 |> Enum.uniq()
500 end
501
502 @doc "Filters out AP IDs domain-blocking and not following the activity's actor"
503 def exclude_domain_blocker_ap_ids(ap_ids, activity, preloaded_users \\ [])
504
505 def exclude_domain_blocker_ap_ids([], _activity, _preloaded_users), do: []
506
507 def exclude_domain_blocker_ap_ids(ap_ids, %Activity{} = activity, preloaded_users) do
508 activity_actor_domain = activity.actor && URI.parse(activity.actor).host
509
510 users =
511 ap_ids
512 |> Enum.map(fn ap_id ->
513 Enum.find(preloaded_users, &(&1.ap_id == ap_id)) ||
514 User.get_cached_by_ap_id(ap_id)
515 end)
516 |> Enum.filter(& &1)
517
518 domain_blocker_ap_ids = for u <- users, activity_actor_domain in u.domain_blocks, do: u.ap_id
519
520 domain_blocker_follower_ap_ids =
521 if Enum.any?(domain_blocker_ap_ids) do
522 activity
523 |> Activity.user_actor()
524 |> FollowingRelationship.followers_ap_ids(domain_blocker_ap_ids)
525 else
526 []
527 end
528
529 ap_ids
530 |> Kernel.--(domain_blocker_ap_ids)
531 |> Kernel.++(domain_blocker_follower_ap_ids)
532 end
533
534 @doc "Filters out AP IDs of users basing on their relationships with activity actor user"
535 def exclude_relationship_restricted_ap_ids([], _activity), do: []
536
537 def exclude_relationship_restricted_ap_ids(ap_ids, %Activity{} = activity) do
538 relationship_restricted_ap_ids =
539 activity
540 |> Activity.user_actor()
541 |> User.incoming_relationships_ungrouped_ap_ids([
542 :block,
543 :notification_mute
544 ])
545
546 Enum.uniq(ap_ids) -- relationship_restricted_ap_ids
547 end
548
549 @doc "Filters out AP IDs of users who mute activity thread"
550 def exclude_thread_muter_ap_ids([], _activity), do: []
551
552 def exclude_thread_muter_ap_ids(ap_ids, %Activity{} = activity) do
553 thread_muter_ap_ids = ThreadMute.muter_ap_ids(activity.data["context"])
554
555 Enum.uniq(ap_ids) -- thread_muter_ap_ids
556 end
557
558 @spec skip?(Activity.t(), User.t()) :: boolean()
559 def skip?(%Activity{} = activity, %User{} = user) do
560 [
561 :self,
562 :followers,
563 :follows,
564 :non_followers,
565 :non_follows,
566 :recently_followed
567 ]
568 |> Enum.find(&skip?(&1, activity, user))
569 end
570
571 def skip?(_, _), do: false
572
573 @spec skip?(atom(), Activity.t(), User.t()) :: boolean()
574 def skip?(:self, %Activity{} = activity, %User{} = user) do
575 activity.data["actor"] == user.ap_id
576 end
577
578 def skip?(
579 :followers,
580 %Activity{} = activity,
581 %User{notification_settings: %{followers: false}} = user
582 ) do
583 actor = activity.data["actor"]
584 follower = User.get_cached_by_ap_id(actor)
585 User.following?(follower, user)
586 end
587
588 def skip?(
589 :non_followers,
590 %Activity{} = activity,
591 %User{notification_settings: %{non_followers: false}} = user
592 ) do
593 actor = activity.data["actor"]
594 follower = User.get_cached_by_ap_id(actor)
595 !User.following?(follower, user)
596 end
597
598 def skip?(
599 :follows,
600 %Activity{} = activity,
601 %User{notification_settings: %{follows: false}} = user
602 ) do
603 actor = activity.data["actor"]
604 followed = User.get_cached_by_ap_id(actor)
605 User.following?(user, followed)
606 end
607
608 def skip?(
609 :non_follows,
610 %Activity{} = activity,
611 %User{notification_settings: %{non_follows: false}} = user
612 ) do
613 actor = activity.data["actor"]
614 followed = User.get_cached_by_ap_id(actor)
615 !User.following?(user, followed)
616 end
617
618 # To do: consider defining recency in hours and checking FollowingRelationship with a single SQL
619 def skip?(:recently_followed, %Activity{data: %{"type" => "Follow"}} = activity, %User{} = user) do
620 actor = activity.data["actor"]
621
622 Notification.for_user(user)
623 |> Enum.any?(fn
624 %{activity: %{data: %{"type" => "Follow", "actor" => ^actor}}} -> true
625 _ -> false
626 end)
627 end
628
629 def skip?(_, _, _), do: false
630
631 def for_user_and_activity(user, activity) do
632 from(n in __MODULE__,
633 where: n.user_id == ^user.id,
634 where: n.activity_id == ^activity.id
635 )
636 |> Repo.one()
637 end
638 end