federator: remove no longer used :publish_single_ap
[akkoma] / lib / pleroma / web / federator / federator.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Federator do
6 alias Pleroma.Activity
7 alias Pleroma.Object.Containment
8 alias Pleroma.User
9 alias Pleroma.Web.ActivityPub.ActivityPub
10 alias Pleroma.Web.ActivityPub.Relay
11 alias Pleroma.Web.ActivityPub.Transmogrifier
12 alias Pleroma.Web.ActivityPub.Utils
13 alias Pleroma.Web.ActivityPub.Visibility
14 alias Pleroma.Web.Federator.RetryQueue
15 alias Pleroma.Web.OStatus
16 alias Pleroma.Web.Salmon
17 alias Pleroma.Web.WebFinger
18 alias Pleroma.Web.Websub
19
20 require Logger
21
22 @websub Application.get_env(:pleroma, :websub)
23 @ostatus Application.get_env(:pleroma, :ostatus)
24
25 def init do
26 # 1 minute
27 Process.sleep(1000 * 60)
28 refresh_subscriptions()
29 end
30
31 # Client API
32
33 def incoming_doc(doc) do
34 PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
35 end
36
37 def incoming_ap_doc(params) do
38 PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
39 end
40
41 def publish(activity, priority \\ 1) do
42 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
43 end
44
45 def publish_single_websub(websub) do
46 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_websub, websub])
47 end
48
49 def verify_websub(websub) do
50 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
51 end
52
53 def request_subscription(sub) do
54 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
55 end
56
57 def refresh_subscriptions do
58 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
59 end
60
61 def publish_single_salmon(params) do
62 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_salmon, params])
63 end
64
65 # Job Worker Callbacks
66
67 def perform(:refresh_subscriptions) do
68 Logger.debug("Federator running refresh subscriptions")
69 Websub.refresh_subscriptions()
70
71 spawn(fn ->
72 # 6 hours
73 Process.sleep(1000 * 60 * 60 * 6)
74 refresh_subscriptions()
75 end)
76 end
77
78 def perform(:request_subscription, websub) do
79 Logger.debug("Refreshing #{websub.topic}")
80
81 with {:ok, websub} <- Websub.request_subscription(websub) do
82 Logger.debug("Successfully refreshed #{websub.topic}")
83 else
84 _e -> Logger.debug("Couldn't refresh #{websub.topic}")
85 end
86 end
87
88 def perform(:publish, activity) do
89 Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
90
91 with actor when not is_nil(actor) <- User.get_cached_by_ap_id(activity.data["actor"]) do
92 {:ok, actor} = WebFinger.ensure_keys_present(actor)
93
94 if Visibility.is_public?(activity) do
95 if OStatus.is_representable?(activity) do
96 Logger.info(fn -> "Sending #{activity.data["id"]} out via WebSub" end)
97 Websub.publish(Pleroma.Web.OStatus.feed_path(actor), actor, activity)
98
99 Logger.info(fn -> "Sending #{activity.data["id"]} out via Salmon" end)
100 Pleroma.Web.Salmon.publish(actor, activity)
101 end
102
103 if Keyword.get(Application.get_env(:pleroma, :instance), :allow_relay) do
104 Logger.info(fn -> "Relaying #{activity.data["id"]} out" end)
105 Relay.publish(activity)
106 end
107 end
108
109 Logger.info(fn -> "Sending #{activity.data["id"]} out via AP" end)
110 Pleroma.Web.ActivityPub.ActivityPub.publish(actor, activity)
111 end
112 end
113
114 def perform(:verify_websub, websub) do
115 Logger.debug(fn ->
116 "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
117 end)
118
119 @websub.verify(websub)
120 end
121
122 def perform(:incoming_doc, doc) do
123 Logger.info("Got document, trying to parse")
124 @ostatus.handle_incoming(doc)
125 end
126
127 def perform(:incoming_ap_doc, params) do
128 Logger.info("Handling incoming AP activity")
129
130 params = Utils.normalize_params(params)
131
132 # NOTE: we use the actor ID to do the containment, this is fine because an
133 # actor shouldn't be acting on objects outside their own AP server.
134 with {:ok, _user} <- ap_enabled_actor(params["actor"]),
135 nil <- Activity.normalize(params["id"]),
136 :ok <- Containment.contain_origin_from_id(params["actor"], params),
137 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
138 {:ok, activity}
139 else
140 %Activity{} ->
141 Logger.info("Already had #{params["id"]}")
142 :error
143
144 _e ->
145 # Just drop those for now
146 Logger.info("Unhandled activity")
147 Logger.info(Poison.encode!(params, pretty: 2))
148 :error
149 end
150 end
151
152 def perform(:publish_single_salmon, params) do
153 Salmon.send_to_user(params)
154 end
155
156 def perform(
157 :publish_single_websub,
158 %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
159 ) do
160 case Websub.publish_one(params) do
161 {:ok, _} ->
162 :ok
163
164 {:error, _} ->
165 RetryQueue.enqueue(params, Websub)
166 end
167 end
168
169 def perform(type, _) do
170 Logger.debug(fn -> "Unknown task: #{type}" end)
171 {:error, "Don't know what to do with this"}
172 end
173
174 def ap_enabled_actor(id) do
175 user = User.get_cached_by_ap_id(id)
176
177 if User.ap_enabled?(user) do
178 {:ok, user}
179 else
180 ActivityPub.make_user_from_ap_id(id)
181 end
182 end
183 end