Merge branch 'feat/connection-worker-monitor-flush' into 'develop'
[akkoma] / lib / mix / tasks / pleroma / database.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Mix.Tasks.Pleroma.Database do
6 alias Pleroma.Conversation
7 alias Pleroma.Maintenance
8 alias Pleroma.Object
9 alias Pleroma.Repo
10 alias Pleroma.User
11 require Logger
12 require Pleroma.Constants
13 import Ecto.Query
14 import Mix.Pleroma
15 use Mix.Task
16
17 @shortdoc "A collection of database related tasks"
18 @moduledoc File.read!("docs/administration/CLI_tasks/database.md")
19
20 def run(["remove_embedded_objects" | args]) do
21 {options, [], []} =
22 OptionParser.parse(
23 args,
24 strict: [
25 vacuum: :boolean
26 ]
27 )
28
29 start_pleroma()
30 Logger.info("Removing embedded objects")
31
32 Repo.query!(
33 "update activities set data = safe_jsonb_set(data, '{object}'::text[], data->'object'->'id') where data->'object'->>'id' is not null;",
34 [],
35 timeout: :infinity
36 )
37
38 if Keyword.get(options, :vacuum) do
39 Maintenance.vacuum("full")
40 end
41 end
42
43 def run(["bump_all_conversations"]) do
44 start_pleroma()
45 Conversation.bump_for_all_activities()
46 end
47
48 def run(["update_users_following_followers_counts"]) do
49 start_pleroma()
50
51 User
52 |> Repo.all()
53 |> Enum.each(&User.update_follower_count/1)
54 end
55
56 def run(["prune_objects" | args]) do
57 {options, [], []} =
58 OptionParser.parse(
59 args,
60 strict: [
61 vacuum: :boolean
62 ]
63 )
64
65 start_pleroma()
66
67 deadline = Pleroma.Config.get([:instance, :remote_post_retention_days])
68
69 Logger.info("Pruning objects older than #{deadline} days")
70
71 time_deadline =
72 NaiveDateTime.utc_now()
73 |> NaiveDateTime.add(-(deadline * 86_400))
74
75 from(o in Object,
76 where:
77 fragment(
78 "?->'to' \\? ? OR ?->'cc' \\? ?",
79 o.data,
80 ^Pleroma.Constants.as_public(),
81 o.data,
82 ^Pleroma.Constants.as_public()
83 ),
84 where: o.inserted_at < ^time_deadline,
85 where:
86 fragment("split_part(?->>'actor', '/', 3) != ?", o.data, ^Pleroma.Web.Endpoint.host())
87 )
88 |> Repo.delete_all(timeout: :infinity)
89
90 if Keyword.get(options, :vacuum) do
91 Maintenance.vacuum("full")
92 end
93 end
94
95 def run(["fix_likes_collections"]) do
96 start_pleroma()
97
98 from(object in Object,
99 where: fragment("(?)->>'likes' is not null", object.data),
100 select: %{id: object.id, likes: fragment("(?)->>'likes'", object.data)}
101 )
102 |> Pleroma.Repo.chunk_stream(100, :batches)
103 |> Stream.each(fn objects ->
104 ids =
105 objects
106 |> Enum.filter(fn object -> object.likes |> Jason.decode!() |> is_map() end)
107 |> Enum.map(& &1.id)
108
109 Object
110 |> where([object], object.id in ^ids)
111 |> update([object],
112 set: [
113 data:
114 fragment(
115 "safe_jsonb_set(?, '{likes}', '[]'::jsonb, true)",
116 object.data
117 )
118 ]
119 )
120 |> Repo.update_all([], timeout: :infinity)
121 end)
122 |> Stream.run()
123 end
124
125 def run(["vacuum", args]) do
126 start_pleroma()
127
128 Maintenance.vacuum(args)
129 end
130
131 def run(["ensure_expiration"]) do
132 start_pleroma()
133 days = Pleroma.Config.get([:mrf_activity_expiration, :days], 365)
134
135 Pleroma.Activity
136 |> join(:inner, [a], o in Object,
137 on:
138 fragment(
139 "(?->>'id') = COALESCE((?)->'object'->> 'id', (?)->>'object')",
140 o.data,
141 a.data,
142 a.data
143 )
144 )
145 |> where(local: true)
146 |> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data))
147 |> where([_a, o], fragment("?->>'type' = 'Note'", o.data))
148 |> Pleroma.Repo.chunk_stream(100, :batches)
149 |> Stream.each(fn activities ->
150 Enum.each(activities, fn activity ->
151 expires_at =
152 activity.inserted_at
153 |> DateTime.from_naive!("Etc/UTC")
154 |> Timex.shift(days: days)
155
156 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
157 activity_id: activity.id,
158 expires_at: expires_at
159 })
160 end)
161 end)
162 |> Stream.run()
163 end
164 end