salmon: refactor to work as a federator publishing module
[akkoma] / lib / pleroma / web / federator / federator.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Federator do
6 alias Pleroma.Activity
7 alias Pleroma.Object.Containment
8 alias Pleroma.User
9 alias Pleroma.Web.ActivityPub.ActivityPub
10 alias Pleroma.Web.ActivityPub.Transmogrifier
11 alias Pleroma.Web.ActivityPub.Utils
12 alias Pleroma.Web.ActivityPub.Visibility
13 alias Pleroma.Web.Federator.Publisher
14 alias Pleroma.Web.Federator.RetryQueue
15 alias Pleroma.Web.OStatus
16 alias Pleroma.Web.WebFinger
17 alias Pleroma.Web.Websub
18
19 require Logger
20
21 @websub Application.get_env(:pleroma, :websub)
22 @ostatus Application.get_env(:pleroma, :ostatus)
23
24 def init do
25 # 1 minute
26 Process.sleep(1000 * 60)
27 refresh_subscriptions()
28 end
29
30 # Client API
31
32 def incoming_doc(doc) do
33 PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
34 end
35
36 def incoming_ap_doc(params) do
37 PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
38 end
39
40 def publish(activity, priority \\ 1) do
41 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
42 end
43
44 def publish_single_websub(websub) do
45 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_websub, websub])
46 end
47
48 def verify_websub(websub) do
49 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
50 end
51
52 def request_subscription(sub) do
53 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
54 end
55
56 def refresh_subscriptions do
57 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
58 end
59
60 # Job Worker Callbacks
61
62 def perform(:refresh_subscriptions) do
63 Logger.debug("Federator running refresh subscriptions")
64 Websub.refresh_subscriptions()
65
66 spawn(fn ->
67 # 6 hours
68 Process.sleep(1000 * 60 * 60 * 6)
69 refresh_subscriptions()
70 end)
71 end
72
73 def perform(:request_subscription, websub) do
74 Logger.debug("Refreshing #{websub.topic}")
75
76 with {:ok, websub} <- Websub.request_subscription(websub) do
77 Logger.debug("Successfully refreshed #{websub.topic}")
78 else
79 _e -> Logger.debug("Couldn't refresh #{websub.topic}")
80 end
81 end
82
83 def perform(:publish, activity) do
84 Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
85
86 with actor when not is_nil(actor) <- User.get_cached_by_ap_id(activity.data["actor"]) do
87 {:ok, actor} = WebFinger.ensure_keys_present(actor)
88
89 if Visibility.is_public?(activity) do
90 if OStatus.is_representable?(activity) do
91 Logger.info(fn -> "Sending #{activity.data["id"]} out via WebSub" end)
92 Websub.publish(Pleroma.Web.OStatus.feed_path(actor), actor, activity)
93 end
94 end
95
96 Publisher.publish(actor, activity)
97 end
98 end
99
100 def perform(:verify_websub, websub) do
101 Logger.debug(fn ->
102 "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
103 end)
104
105 @websub.verify(websub)
106 end
107
108 def perform(:incoming_doc, doc) do
109 Logger.info("Got document, trying to parse")
110 @ostatus.handle_incoming(doc)
111 end
112
113 def perform(:incoming_ap_doc, params) do
114 Logger.info("Handling incoming AP activity")
115
116 params = Utils.normalize_params(params)
117
118 # NOTE: we use the actor ID to do the containment, this is fine because an
119 # actor shouldn't be acting on objects outside their own AP server.
120 with {:ok, _user} <- ap_enabled_actor(params["actor"]),
121 nil <- Activity.normalize(params["id"]),
122 :ok <- Containment.contain_origin_from_id(params["actor"], params),
123 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
124 {:ok, activity}
125 else
126 %Activity{} ->
127 Logger.info("Already had #{params["id"]}")
128 :error
129
130 _e ->
131 # Just drop those for now
132 Logger.info("Unhandled activity")
133 Logger.info(Poison.encode!(params, pretty: 2))
134 :error
135 end
136 end
137
138 def perform(
139 :publish_single_websub,
140 %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
141 ) do
142 case Websub.publish_one(params) do
143 {:ok, _} ->
144 :ok
145
146 {:error, _} ->
147 RetryQueue.enqueue(params, Websub)
148 end
149 end
150
151 def perform(type, _) do
152 Logger.debug(fn -> "Unknown task: #{type}" end)
153 {:error, "Don't know what to do with this"}
154 end
155
156 def ap_enabled_actor(id) do
157 user = User.get_cached_by_ap_id(id)
158
159 if User.ap_enabled?(user) do
160 {:ok, user}
161 else
162 ActivityPub.make_user_from_ap_id(id)
163 end
164 end
165 end