Merge branch 'feature/788-separate-email-addresses' into 'develop'
[akkoma] / lib / pleroma / web / federator / federator.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Federator do
6 alias Pleroma.Activity
7 alias Pleroma.User
8 alias Pleroma.Web.ActivityPub.ActivityPub
9 alias Pleroma.Web.ActivityPub.Relay
10 alias Pleroma.Web.ActivityPub.Transmogrifier
11 alias Pleroma.Web.ActivityPub.Utils
12 alias Pleroma.Web.ActivityPub.Visibility
13 alias Pleroma.Web.Federator.RetryQueue
14 alias Pleroma.Web.OStatus
15 alias Pleroma.Web.Salmon
16 alias Pleroma.Web.WebFinger
17 alias Pleroma.Web.Websub
18
19 require Logger
20
21 @websub Application.get_env(:pleroma, :websub)
22 @ostatus Application.get_env(:pleroma, :ostatus)
23
24 def init do
25 # 1 minute
26 Process.sleep(1000 * 60)
27 refresh_subscriptions()
28 end
29
30 # Client API
31
32 def incoming_doc(doc) do
33 PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
34 end
35
36 def incoming_ap_doc(params) do
37 PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
38 end
39
40 def publish(activity, priority \\ 1) do
41 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
42 end
43
44 def publish_single_ap(params) do
45 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_ap, params])
46 end
47
48 def publish_single_websub(websub) do
49 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_websub, websub])
50 end
51
52 def verify_websub(websub) do
53 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
54 end
55
56 def request_subscription(sub) do
57 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
58 end
59
60 def refresh_subscriptions do
61 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
62 end
63
64 def publish_single_salmon(params) do
65 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_salmon, params])
66 end
67
68 # Job Worker Callbacks
69
70 def perform(:refresh_subscriptions) do
71 Logger.debug("Federator running refresh subscriptions")
72 Websub.refresh_subscriptions()
73
74 spawn(fn ->
75 # 6 hours
76 Process.sleep(1000 * 60 * 60 * 6)
77 refresh_subscriptions()
78 end)
79 end
80
81 def perform(:request_subscription, websub) do
82 Logger.debug("Refreshing #{websub.topic}")
83
84 with {:ok, websub} <- Websub.request_subscription(websub) do
85 Logger.debug("Successfully refreshed #{websub.topic}")
86 else
87 _e -> Logger.debug("Couldn't refresh #{websub.topic}")
88 end
89 end
90
91 def perform(:publish, activity) do
92 Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
93
94 with actor when not is_nil(actor) <- User.get_cached_by_ap_id(activity.data["actor"]) do
95 {:ok, actor} = WebFinger.ensure_keys_present(actor)
96
97 if Visibility.is_public?(activity) do
98 if OStatus.is_representable?(activity) do
99 Logger.info(fn -> "Sending #{activity.data["id"]} out via WebSub" end)
100 Websub.publish(Pleroma.Web.OStatus.feed_path(actor), actor, activity)
101
102 Logger.info(fn -> "Sending #{activity.data["id"]} out via Salmon" end)
103 Pleroma.Web.Salmon.publish(actor, activity)
104 end
105
106 if Keyword.get(Application.get_env(:pleroma, :instance), :allow_relay) do
107 Logger.info(fn -> "Relaying #{activity.data["id"]} out" end)
108 Relay.publish(activity)
109 end
110 end
111
112 Logger.info(fn -> "Sending #{activity.data["id"]} out via AP" end)
113 Pleroma.Web.ActivityPub.ActivityPub.publish(actor, activity)
114 end
115 end
116
117 def perform(:verify_websub, websub) do
118 Logger.debug(fn ->
119 "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
120 end)
121
122 @websub.verify(websub)
123 end
124
125 def perform(:incoming_doc, doc) do
126 Logger.info("Got document, trying to parse")
127 @ostatus.handle_incoming(doc)
128 end
129
130 def perform(:incoming_ap_doc, params) do
131 Logger.info("Handling incoming AP activity")
132
133 params = Utils.normalize_params(params)
134
135 # NOTE: we use the actor ID to do the containment, this is fine because an
136 # actor shouldn't be acting on objects outside their own AP server.
137 with {:ok, _user} <- ap_enabled_actor(params["actor"]),
138 nil <- Activity.normalize(params["id"]),
139 :ok <- Transmogrifier.contain_origin_from_id(params["actor"], params),
140 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
141 {:ok, activity}
142 else
143 %Activity{} ->
144 Logger.info("Already had #{params["id"]}")
145 :error
146
147 _e ->
148 # Just drop those for now
149 Logger.info("Unhandled activity")
150 Logger.info(Poison.encode!(params, pretty: 2))
151 :error
152 end
153 end
154
155 def perform(:publish_single_salmon, params) do
156 Salmon.send_to_user(params)
157 end
158
159 def perform(:publish_single_ap, params) do
160 case ActivityPub.publish_one(params) do
161 {:ok, _} ->
162 :ok
163
164 {:error, _} ->
165 RetryQueue.enqueue(params, ActivityPub)
166 end
167 end
168
169 def perform(
170 :publish_single_websub,
171 %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
172 ) do
173 case Websub.publish_one(params) do
174 {:ok, _} ->
175 :ok
176
177 {:error, _} ->
178 RetryQueue.enqueue(params, Websub)
179 end
180 end
181
182 def perform(type, _) do
183 Logger.debug(fn -> "Unknown task: #{type}" end)
184 {:error, "Don't know what to do with this"}
185 end
186
187 def ap_enabled_actor(id) do
188 user = User.get_by_ap_id(id)
189
190 if User.ap_enabled?(user) do
191 {:ok, user}
192 else
193 ActivityPub.make_user_from_ap_id(id)
194 end
195 end
196 end