Merge branch 'update-mastofe/glitch-soc-2019-02-10' into 'develop'
[akkoma] / lib / pleroma / web / federator / federator.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Federator do
6 use GenServer
7
8 alias Pleroma.Activity
9 alias Pleroma.User
10 alias Pleroma.Web.WebFinger
11 alias Pleroma.Web.Websub
12 alias Pleroma.Web.Salmon
13 alias Pleroma.Web.ActivityPub.ActivityPub
14 alias Pleroma.Web.ActivityPub.Relay
15 alias Pleroma.Web.ActivityPub.Transmogrifier
16 alias Pleroma.Web.ActivityPub.Utils
17 alias Pleroma.Web.Federator.RetryQueue
18 alias Pleroma.Web.OStatus
19
20 require Logger
21
22 @websub Application.get_env(:pleroma, :websub)
23 @ostatus Application.get_env(:pleroma, :ostatus)
24
25 def init(args) do
26 {:ok, args}
27 end
28
29 def start_link do
30 spawn(fn ->
31 # 1 minute
32 Process.sleep(1000 * 60)
33 enqueue(:refresh_subscriptions, nil)
34 end)
35
36 GenServer.start_link(
37 __MODULE__,
38 %{
39 in: {:sets.new(), []},
40 out: {:sets.new(), []}
41 },
42 name: __MODULE__
43 )
44 end
45
46 def handle(:refresh_subscriptions, _) do
47 Logger.debug("Federator running refresh subscriptions")
48 Websub.refresh_subscriptions()
49
50 spawn(fn ->
51 # 6 hours
52 Process.sleep(1000 * 60 * 60 * 6)
53 enqueue(:refresh_subscriptions, nil)
54 end)
55 end
56
57 def handle(:request_subscription, websub) do
58 Logger.debug("Refreshing #{websub.topic}")
59
60 with {:ok, websub} <- Websub.request_subscription(websub) do
61 Logger.debug("Successfully refreshed #{websub.topic}")
62 else
63 _e -> Logger.debug("Couldn't refresh #{websub.topic}")
64 end
65 end
66
67 def handle(:publish, activity) do
68 Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
69
70 with actor when not is_nil(actor) <- User.get_cached_by_ap_id(activity.data["actor"]) do
71 {:ok, actor} = WebFinger.ensure_keys_present(actor)
72
73 if ActivityPub.is_public?(activity) do
74 if OStatus.is_representable?(activity) do
75 Logger.info(fn -> "Sending #{activity.data["id"]} out via WebSub" end)
76 Websub.publish(Pleroma.Web.OStatus.feed_path(actor), actor, activity)
77
78 Logger.info(fn -> "Sending #{activity.data["id"]} out via Salmon" end)
79 Pleroma.Web.Salmon.publish(actor, activity)
80 end
81
82 if Keyword.get(Application.get_env(:pleroma, :instance), :allow_relay) do
83 Logger.info(fn -> "Relaying #{activity.data["id"]} out" end)
84 Relay.publish(activity)
85 end
86 end
87
88 Logger.info(fn -> "Sending #{activity.data["id"]} out via AP" end)
89 Pleroma.Web.ActivityPub.ActivityPub.publish(actor, activity)
90 end
91 end
92
93 def handle(:verify_websub, websub) do
94 Logger.debug(fn ->
95 "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
96 end)
97
98 @websub.verify(websub)
99 end
100
101 def handle(:incoming_doc, doc) do
102 Logger.info("Got document, trying to parse")
103 @ostatus.handle_incoming(doc)
104 end
105
106 def handle(:incoming_ap_doc, params) do
107 Logger.info("Handling incoming AP activity")
108
109 params = Utils.normalize_params(params)
110
111 # NOTE: we use the actor ID to do the containment, this is fine because an
112 # actor shouldn't be acting on objects outside their own AP server.
113 with {:ok, _user} <- ap_enabled_actor(params["actor"]),
114 nil <- Activity.normalize(params["id"]),
115 :ok <- Transmogrifier.contain_origin_from_id(params["actor"], params),
116 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
117 {:ok, activity}
118 else
119 %Activity{} ->
120 Logger.info("Already had #{params["id"]}")
121 :error
122
123 _e ->
124 # Just drop those for now
125 Logger.info("Unhandled activity")
126 Logger.info(Poison.encode!(params, pretty: 2))
127 :error
128 end
129 end
130
131 def handle(:publish_single_salmon, params) do
132 Salmon.send_to_user(params)
133 end
134
135 def handle(:publish_single_ap, params) do
136 case ActivityPub.publish_one(params) do
137 {:ok, _} ->
138 :ok
139
140 {:error, _} ->
141 RetryQueue.enqueue(params, ActivityPub)
142 end
143 end
144
145 def handle(
146 :publish_single_websub,
147 %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
148 ) do
149 case Websub.publish_one(params) do
150 {:ok, _} ->
151 :ok
152
153 {:error, _} ->
154 RetryQueue.enqueue(params, Websub)
155 end
156 end
157
158 def handle(type, _) do
159 Logger.debug(fn -> "Unknown task: #{type}" end)
160 {:error, "Don't know what to do with this"}
161 end
162
163 if Mix.env() == :test do
164 def enqueue(type, payload, _priority \\ 1) do
165 if Pleroma.Config.get([:instance, :federating]) do
166 handle(type, payload)
167 end
168 end
169 else
170 def enqueue(type, payload, priority \\ 1) do
171 if Pleroma.Config.get([:instance, :federating]) do
172 GenServer.cast(__MODULE__, {:enqueue, type, payload, priority})
173 end
174 end
175 end
176
177 def maybe_start_job(running_jobs, queue) do
178 if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, :max_jobs]) && queue != [] do
179 {{type, payload}, queue} = queue_pop(queue)
180 {:ok, pid} = Task.start(fn -> handle(type, payload) end)
181 mref = Process.monitor(pid)
182 {:sets.add_element(mref, running_jobs), queue}
183 else
184 {running_jobs, queue}
185 end
186 end
187
188 def handle_cast({:enqueue, type, payload, _priority}, state)
189 when type in [:incoming_doc, :incoming_ap_doc] do
190 %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
191 i_queue = enqueue_sorted(i_queue, {type, payload}, 1)
192 {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue)
193 {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
194 end
195
196 def handle_cast({:enqueue, type, payload, _priority}, state) do
197 %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
198 o_queue = enqueue_sorted(o_queue, {type, payload}, 1)
199 {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue)
200 {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
201 end
202
203 def handle_cast(_, state) do
204 {:noreply, state}
205 end
206
207 def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
208 %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
209 i_running_jobs = :sets.del_element(ref, i_running_jobs)
210 o_running_jobs = :sets.del_element(ref, o_running_jobs)
211 {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue)
212 {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue)
213
214 {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
215 end
216
217 def enqueue_sorted(queue, element, priority) do
218 [%{item: element, priority: priority} | queue]
219 |> Enum.sort_by(fn %{priority: priority} -> priority end)
220 end
221
222 def queue_pop([%{item: element} | queue]) do
223 {element, queue}
224 end
225
226 def ap_enabled_actor(id) do
227 user = User.get_by_ap_id(id)
228
229 if User.ap_enabled?(user) do
230 {:ok, user}
231 else
232 ActivityPub.make_user_from_ap_id(id)
233 end
234 end
235 end