[#3213] Prototype of data migrations functionality / HashtagsTableMigrator.
[akkoma] / lib / pleroma / migrators / hashtags_table_migrator.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Migrators.HashtagsTableMigrator do
6 defmodule State do
7 use Agent
8
9 @init_state %{}
10
11 def start_link(_) do
12 Agent.start_link(fn -> @init_state end, name: __MODULE__)
13 end
14
15 def get do
16 Agent.get(__MODULE__, & &1)
17 end
18
19 def put(key, value) do
20 Agent.update(__MODULE__, fn state ->
21 Map.put(state, key, value)
22 end)
23 end
24
25 def increment(key, increment \\ 1) do
26 Agent.update(__MODULE__, fn state ->
27 updated_value = (state[key] || 0) + increment
28 Map.put(state, key, updated_value)
29 end)
30 end
31 end
32
33 use GenServer
34
35 require Logger
36
37 import Ecto.Query
38
39 alias Pleroma.Config
40 alias Pleroma.DataMigration
41 alias Pleroma.Hashtag
42 alias Pleroma.Object
43 alias Pleroma.Repo
44
45 defdelegate state(), to: State, as: :get
46 defdelegate put_state(key, value), to: State, as: :put
47 defdelegate increment_state(key, increment), to: State, as: :increment
48
49 defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table
50
51 def start_link(_) do
52 GenServer.start_link(__MODULE__, nil, name: __MODULE__)
53 end
54
55 @impl true
56 def init(_) do
57 {:ok, nil, {:continue, :init_state}}
58 end
59
60 @impl true
61 def handle_continue(:init_state, _state) do
62 {:ok, _} = State.start_link(nil)
63
64 put_state(:status, :init)
65
66 dm = data_migration()
67
68 cond do
69 Config.get(:env) == :test ->
70 put_state(:status, :noop)
71
72 is_nil(dm) ->
73 put_state(:status, :halt)
74 put_state(:message, "Data migration does not exist.")
75
76 dm.state == :manual ->
77 put_state(:status, :noop)
78 put_state(:message, "Data migration is in manual execution state.")
79
80 dm.state == :complete ->
81 handle_success()
82
83 true ->
84 send(self(), :migrate_hashtags)
85 end
86
87 {:noreply, nil}
88 end
89
90 @impl true
91 def handle_info(:migrate_hashtags, state) do
92 data_migration = data_migration()
93
94 {:ok, data_migration} = DataMigration.update_state(data_migration, :running)
95 put_state(:status, :running)
96
97 Logger.info("Starting transferring object embedded hashtags to `hashtags` table...")
98
99 max_processed_id = data_migration.data["max_processed_id"] || 0
100
101 # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out)
102 from(
103 object in Object,
104 left_join: hashtag in assoc(object, :hashtags),
105 where: object.id > ^max_processed_id,
106 where: is_nil(hashtag.id),
107 where:
108 fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data),
109 select: %{
110 id: object.id,
111 tag: fragment("(?)->'tag'", object.data)
112 }
113 )
114 |> Repo.chunk_stream(100, :batches, timeout: :infinity)
115 |> Stream.each(fn objects ->
116 object_ids = Enum.map(objects, & &1.id)
117
118 failed_ids =
119 objects
120 |> Enum.map(&transfer_object_hashtags(&1))
121 |> Enum.filter(&(elem(&1, 0) == :error))
122 |> Enum.map(&elem(&1, 1))
123
124 for failed_id <- failed_ids do
125 _ =
126 Repo.query(
127 "INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <>
128 "VALUES ($1, $2) ON CONFLICT DO NOTHING;",
129 [data_migration.id, failed_id]
130 )
131 end
132
133 _ =
134 Repo.query(
135 "DELETE FROM data_migration_failed_ids WHERE id = ANY($1)",
136 [object_ids -- failed_ids]
137 )
138
139 max_object_id = Enum.at(object_ids, -1)
140 _ = DataMigration.update(data_migration, %{data: %{"max_processed_id" => max_object_id}})
141
142 increment_state(:processed_count, length(object_ids))
143 increment_state(:failed_count, length(failed_ids))
144
145 # A quick and dirty approach to controlling the load this background migration imposes
146 sleep_interval = Config.get([:populate_hashtags_table, :sleep_interval_ms], 0)
147 Process.sleep(sleep_interval)
148 end)
149 |> Stream.run()
150
151 with {:ok, %{rows: [[0]]}} <-
152 Repo.query(
153 "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;",
154 [data_migration.id]
155 ) do
156 put_state(:status, :complete)
157 _ = DataMigration.update_state(data_migration, :complete)
158
159 handle_success()
160 else
161 _ ->
162 put_state(:status, :failed)
163 put_state(:message, "Please check data_migration_failed_ids records.")
164 end
165
166 {:noreply, state}
167 end
168
169 defp transfer_object_hashtags(object) do
170 hashtags = Object.object_data_hashtags(%{"tag" => object.tag})
171
172 Repo.transaction(fn ->
173 with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do
174 for hashtag_record <- hashtag_records do
175 with {:ok, _} <-
176 Repo.query(
177 "insert into hashtags_objects(hashtag_id, object_id) values ($1, $2);",
178 [hashtag_record.id, object.id]
179 ) do
180 nil
181 else
182 {:error, e} ->
183 error =
184 "ERROR: could not link object #{object.id} and hashtag " <>
185 "#{hashtag_record.id}: #{inspect(e)}"
186
187 Logger.error(error)
188 Repo.rollback(object.id)
189 end
190 end
191
192 object.id
193 else
194 e ->
195 error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}"
196 Logger.error(error)
197 Repo.rollback(object.id)
198 end
199 end)
200 end
201
202 defp handle_success do
203 put_state(:status, :complete)
204
205 unless Config.improved_hashtag_timeline() do
206 Config.put(Config.improved_hashtag_timeline_path(), true)
207 end
208
209 :ok
210 end
211 end