9f1a00f9c43f10c0da73cc83d5baea86c8effe02
[akkoma] / lib / pleroma / migrators / hashtags_table_migrator.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Migrators.HashtagsTableMigrator do
6 use GenServer
7
8 require Logger
9
10 import Ecto.Query
11
12 alias __MODULE__.State
13 alias Pleroma.Config
14 alias Pleroma.DataMigration
15 alias Pleroma.Hashtag
16 alias Pleroma.Object
17 alias Pleroma.Repo
18
19 defdelegate state(), to: State, as: :get
20 defdelegate put_stat(key, value), to: State, as: :put
21 defdelegate increment_stat(key, increment), to: State, as: :increment
22
23 defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table
24
25 @reg_name {:global, __MODULE__}
26
27 def whereis, do: GenServer.whereis(@reg_name)
28
29 def start_link(_) do
30 case whereis() do
31 nil ->
32 GenServer.start_link(__MODULE__, nil, name: @reg_name)
33
34 pid ->
35 {:ok, pid}
36 end
37 end
38
39 @impl true
40 def init(_) do
41 {:ok, nil, {:continue, :init_state}}
42 end
43
44 @impl true
45 def handle_continue(:init_state, _state) do
46 {:ok, _} = State.start_link(nil)
47
48 put_stat(:status, :init)
49
50 dm = data_migration()
51 manual_migrations = Config.get([:instance, :manual_data_migrations], [])
52
53 cond do
54 Config.get(:env) == :test ->
55 put_stat(:status, :noop)
56
57 is_nil(dm) ->
58 put_stat(:status, :halt)
59 put_stat(:message, "Data migration does not exist.")
60
61 dm.state == :manual or dm.name in manual_migrations ->
62 put_stat(:status, :noop)
63 put_stat(:message, "Data migration is in manual execution state.")
64
65 dm.state == :complete ->
66 handle_success()
67
68 true ->
69 send(self(), :migrate_hashtags)
70 end
71
72 {:noreply, nil}
73 end
74
75 @impl true
76 def handle_info(:migrate_hashtags, state) do
77 data_migration = data_migration()
78
79 persistent_data = Map.take(data_migration.data, ["max_processed_id"])
80
81 {:ok, data_migration} =
82 DataMigration.update(data_migration, %{state: :running, data: persistent_data})
83
84 put_stat(:status, :running)
85
86 Logger.info("Starting transferring object embedded hashtags to `hashtags` table...")
87
88 max_processed_id = data_migration.data["max_processed_id"] || 0
89
90 # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out)
91 from(
92 object in Object,
93 left_join: hashtag in assoc(object, :hashtags),
94 where: object.id > ^max_processed_id,
95 where: is_nil(hashtag.id),
96 where:
97 fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data),
98 select: %{
99 id: object.id,
100 tag: fragment("(?)->'tag'", object.data)
101 }
102 )
103 |> Repo.chunk_stream(100, :batches, timeout: :infinity)
104 |> Stream.each(fn objects ->
105 object_ids = Enum.map(objects, & &1.id)
106
107 failed_ids =
108 objects
109 |> Enum.map(&transfer_object_hashtags(&1))
110 |> Enum.filter(&(elem(&1, 0) == :error))
111 |> Enum.map(&elem(&1, 1))
112
113 for failed_id <- failed_ids do
114 _ =
115 Repo.query(
116 "INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <>
117 "VALUES ($1, $2) ON CONFLICT DO NOTHING;",
118 [data_migration.id, failed_id]
119 )
120 end
121
122 _ =
123 Repo.query(
124 "DELETE FROM data_migration_failed_ids WHERE id = ANY($1)",
125 [object_ids -- failed_ids]
126 )
127
128 max_object_id = Enum.at(object_ids, -1)
129
130 put_stat(:max_processed_id, max_object_id)
131 increment_stat(:processed_count, length(object_ids))
132 increment_stat(:failed_count, length(failed_ids))
133
134 persist_stats(data_migration)
135
136 # A quick and dirty approach to controlling the load this background migration imposes
137 sleep_interval = Config.get([:populate_hashtags_table, :sleep_interval_ms], 0)
138 Process.sleep(sleep_interval)
139 end)
140 |> Stream.run()
141
142 with {:ok, %{rows: [[0]]}} <-
143 Repo.query(
144 "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;",
145 [data_migration.id]
146 ) do
147 _ = DataMigration.update_state(data_migration, :complete)
148
149 handle_success()
150 else
151 _ ->
152 _ = DataMigration.update_state(data_migration, :failed)
153
154 put_stat(:status, :failed)
155 put_stat(:message, "Please check data_migration_failed_ids records.")
156 end
157
158 {:noreply, state}
159 end
160
161 defp transfer_object_hashtags(object) do
162 hashtags = Object.object_data_hashtags(%{"tag" => object.tag})
163
164 Repo.transaction(fn ->
165 with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do
166 for hashtag_record <- hashtag_records do
167 with {:ok, _} <-
168 Repo.query(
169 "insert into hashtags_objects(hashtag_id, object_id) values ($1, $2);",
170 [hashtag_record.id, object.id]
171 ) do
172 nil
173 else
174 {:error, e} ->
175 error =
176 "ERROR: could not link object #{object.id} and hashtag " <>
177 "#{hashtag_record.id}: #{inspect(e)}"
178
179 Logger.error(error)
180 Repo.rollback(object.id)
181 end
182 end
183
184 object.id
185 else
186 e ->
187 error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}"
188 Logger.error(error)
189 Repo.rollback(object.id)
190 end
191 end)
192 end
193
194 defp persist_stats(data_migration) do
195 runner_state = Map.drop(state(), [:status])
196 _ = DataMigration.update(data_migration, %{data: runner_state})
197 end
198
199 defp handle_success do
200 put_stat(:status, :complete)
201
202 unless Config.improved_hashtag_timeline() do
203 Config.put(Config.improved_hashtag_timeline_path(), true)
204 end
205
206 :ok
207 end
208
209 def force_continue do
210 send(whereis(), :migrate_hashtags)
211 end
212
213 def force_restart do
214 {:ok, _} = DataMigration.update(data_migration(), %{state: :pending, data: %{}})
215 force_continue()
216 end
217 end