7897f0fc6bd15debbdb0d31c4abea0840d41d561
[akkoma] / lib / pleroma / web / fed_sockets / fetch_registry.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.FedSockets.FetchRegistry do
6 @moduledoc """
7 The FetchRegistry acts as a broker for fetch requests and return values.
8 This allows calling processes to block while waiting for a reply.
9 It doesn't impose it's own process instead using `Cachex` to handle fetches in process, allowing
10 multi threaded processes to avoid bottlenecking.
11
12 Normally outside modules will have no need to call or use the FetchRegistry themselves.
13
14 The `Cachex` parameters can be controlled from the config. Since exact timeout intervals
15 aren't necessary the following settings are used by default:
16
17 config :pleroma, :fed_sockets,
18 fed_socket_fetches: [
19 default: 12_000,
20 interval: 3_000,
21 lazy: false
22 ]
23
24 """
25
26 defmodule FetchRegistryData do
27 defstruct uuid: nil,
28 sent_json: nil,
29 received_json: nil,
30 sent_at: nil,
31 received_at: nil
32 end
33
34 alias Ecto.UUID
35
36 require Logger
37
38 @fetches :fed_socket_fetches
39
40 @doc """
41 Registers a json request wth the FetchRegistry and returns the identifying UUID.
42 """
43 def register_fetch(json) do
44 %FetchRegistryData{uuid: uuid} =
45 json
46 |> new_registry_data
47 |> save_registry_data
48
49 uuid
50 end
51
52 @doc """
53 Reports on the status of a Fetch given the identifying UUID.
54
55 Will return
56 * {:ok, fetched_object} if a fetch has completed
57 * {:error, :waiting} if a fetch is still pending
58 * {:error, other_error} usually :missing to indicate a fetch that has timed out
59 """
60 def check_fetch(uuid) do
61 case get_registry_data(uuid) do
62 {:ok, %FetchRegistryData{received_at: nil}} ->
63 {:error, :waiting}
64
65 {:ok, %FetchRegistryData{} = reg_data} ->
66 {:ok, reg_data}
67
68 e ->
69 e
70 end
71 end
72
73 @doc """
74 Retrieves the response to a fetch given the identifying UUID.
75 The completed fetch will be deleted from the FetchRegistry
76
77 Will return
78 * {:ok, fetched_object} if a fetch has completed
79 * {:error, :waiting} if a fetch is still pending
80 * {:error, other_error} usually :missing to indicate a fetch that has timed out
81 """
82 def pop_fetch(uuid) do
83 case check_fetch(uuid) do
84 {:ok, %FetchRegistryData{received_json: received_json}} ->
85 delete_registry_data(uuid)
86 {:ok, received_json}
87
88 e ->
89 e
90 end
91 end
92
93 @doc """
94 This is called to register a fetch has returned.
95 It expects the result data along with the UUID that was sent in the request
96
97 Will return the fetched object or :error
98 """
99 def register_fetch_received(uuid, data) do
100 case get_registry_data(uuid) do
101 {:ok, %FetchRegistryData{received_at: nil} = reg_data} ->
102 reg_data
103 |> set_fetch_received(data)
104 |> save_registry_data()
105
106 {:ok, %FetchRegistryData{} = reg_data} ->
107 Logger.warn("tried to add fetched data twice - #{uuid}")
108 reg_data
109
110 {:error, _} ->
111 Logger.warn("Error adding fetch to registry - #{uuid}")
112 :error
113 end
114 end
115
116 defp new_registry_data(json) do
117 %FetchRegistryData{
118 uuid: UUID.generate(),
119 sent_json: json,
120 sent_at: :erlang.monotonic_time(:millisecond)
121 }
122 end
123
124 defp get_registry_data(origin) do
125 case Cachex.get(@fetches, origin) do
126 {:ok, nil} ->
127 {:error, :missing}
128
129 {:ok, reg_data} ->
130 {:ok, reg_data}
131
132 _ ->
133 {:error, :cache_error}
134 end
135 end
136
137 defp set_fetch_received(%FetchRegistryData{} = reg_data, data),
138 do: %FetchRegistryData{
139 reg_data
140 | received_at: :erlang.monotonic_time(:millisecond),
141 received_json: data
142 }
143
144 defp save_registry_data(%FetchRegistryData{uuid: uuid} = reg_data) do
145 {:ok, true} = Cachex.put(@fetches, uuid, reg_data)
146 reg_data
147 end
148
149 defp delete_registry_data(origin),
150 do: {:ok, true} = Cachex.del(@fetches, origin)
151 end