Merge branch 'develop' into feature/database-compaction
[akkoma] / lib / pleroma / web / federator / federator.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Federator do
6 alias Pleroma.Activity
7 alias Pleroma.User
8 alias Pleroma.Web.ActivityPub.ActivityPub
9 alias Pleroma.Web.ActivityPub.Relay
10 alias Pleroma.Web.ActivityPub.Transmogrifier
11 alias Pleroma.Web.ActivityPub.Utils
12 alias Pleroma.Web.ActivityPub.Visibility
13 alias Pleroma.Web.Federator.RetryQueue
14 alias Pleroma.Web.OStatus
15 alias Pleroma.Object.Containment
16 alias Pleroma.Web.Salmon
17 alias Pleroma.Web.WebFinger
18 alias Pleroma.Web.Websub
19
20 require Logger
21
22 @websub Application.get_env(:pleroma, :websub)
23 @ostatus Application.get_env(:pleroma, :ostatus)
24
25 def init do
26 # 1 minute
27 Process.sleep(1000 * 60)
28 refresh_subscriptions()
29 end
30
31 # Client API
32
33 def incoming_doc(doc) do
34 PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
35 end
36
37 def incoming_ap_doc(params) do
38 PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
39 end
40
41 def publish(activity, priority \\ 1) do
42 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
43 end
44
45 def publish_single_ap(params) do
46 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_ap, params])
47 end
48
49 def publish_single_websub(websub) do
50 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_websub, websub])
51 end
52
53 def verify_websub(websub) do
54 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
55 end
56
57 def request_subscription(sub) do
58 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
59 end
60
61 def refresh_subscriptions do
62 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
63 end
64
65 def publish_single_salmon(params) do
66 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_salmon, params])
67 end
68
69 # Job Worker Callbacks
70
71 def perform(:refresh_subscriptions) do
72 Logger.debug("Federator running refresh subscriptions")
73 Websub.refresh_subscriptions()
74
75 spawn(fn ->
76 # 6 hours
77 Process.sleep(1000 * 60 * 60 * 6)
78 refresh_subscriptions()
79 end)
80 end
81
82 def perform(:request_subscription, websub) do
83 Logger.debug("Refreshing #{websub.topic}")
84
85 with {:ok, websub} <- Websub.request_subscription(websub) do
86 Logger.debug("Successfully refreshed #{websub.topic}")
87 else
88 _e -> Logger.debug("Couldn't refresh #{websub.topic}")
89 end
90 end
91
92 def perform(:publish, activity) do
93 Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
94
95 with actor when not is_nil(actor) <- User.get_cached_by_ap_id(activity.data["actor"]) do
96 {:ok, actor} = WebFinger.ensure_keys_present(actor)
97
98 if Visibility.is_public?(activity) do
99 if OStatus.is_representable?(activity) do
100 Logger.info(fn -> "Sending #{activity.data["id"]} out via WebSub" end)
101 Websub.publish(Pleroma.Web.OStatus.feed_path(actor), actor, activity)
102
103 Logger.info(fn -> "Sending #{activity.data["id"]} out via Salmon" end)
104 Pleroma.Web.Salmon.publish(actor, activity)
105 end
106
107 if Keyword.get(Application.get_env(:pleroma, :instance), :allow_relay) do
108 Logger.info(fn -> "Relaying #{activity.data["id"]} out" end)
109 Relay.publish(activity)
110 end
111 end
112
113 Logger.info(fn -> "Sending #{activity.data["id"]} out via AP" end)
114 Pleroma.Web.ActivityPub.ActivityPub.publish(actor, activity)
115 end
116 end
117
118 def perform(:verify_websub, websub) do
119 Logger.debug(fn ->
120 "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
121 end)
122
123 @websub.verify(websub)
124 end
125
126 def perform(:incoming_doc, doc) do
127 Logger.info("Got document, trying to parse")
128 @ostatus.handle_incoming(doc)
129 end
130
131 def perform(:incoming_ap_doc, params) do
132 Logger.info("Handling incoming AP activity")
133
134 params = Utils.normalize_params(params)
135
136 # NOTE: we use the actor ID to do the containment, this is fine because an
137 # actor shouldn't be acting on objects outside their own AP server.
138 with {:ok, _user} <- ap_enabled_actor(params["actor"]),
139 nil <- Activity.normalize(params["id"]),
140 :ok <- Containment.contain_origin_from_id(params["actor"], params),
141 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
142 {:ok, activity}
143 else
144 %Activity{} ->
145 Logger.info("Already had #{params["id"]}")
146 :error
147
148 _e ->
149 # Just drop those for now
150 Logger.info("Unhandled activity")
151 Logger.info(Poison.encode!(params, pretty: 2))
152 :error
153 end
154 end
155
156 def perform(:publish_single_salmon, params) do
157 Salmon.send_to_user(params)
158 end
159
160 def perform(:publish_single_ap, params) do
161 case ActivityPub.publish_one(params) do
162 {:ok, _} ->
163 :ok
164
165 {:error, _} ->
166 RetryQueue.enqueue(params, ActivityPub)
167 end
168 end
169
170 def perform(
171 :publish_single_websub,
172 %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
173 ) do
174 case Websub.publish_one(params) do
175 {:ok, _} ->
176 :ok
177
178 {:error, _} ->
179 RetryQueue.enqueue(params, Websub)
180 end
181 end
182
183 def perform(type, _) do
184 Logger.debug(fn -> "Unknown task: #{type}" end)
185 {:error, "Don't know what to do with this"}
186 end
187
188 def ap_enabled_actor(id) do
189 user = User.get_by_ap_id(id)
190
191 if User.ap_enabled?(user) do
192 {:ok, user}
193 else
194 ActivityPub.make_user_from_ap_id(id)
195 end
196 end
197 end