9a377da68e39171ba4d1e75cb6e765b96c7dc45d
[akkoma] / lib / pleroma / web / federator / federator.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Federator do
6 alias Pleroma.Activity
7 alias Pleroma.Object.Containment
8 alias Pleroma.User
9 alias Pleroma.Web.ActivityPub.ActivityPub
10 alias Pleroma.Web.ActivityPub.Transmogrifier
11 alias Pleroma.Web.ActivityPub.Utils
12 alias Pleroma.Web.ActivityPub.Visibility
13 alias Pleroma.Web.Federator.Publisher
14 alias Pleroma.Web.Federator.RetryQueue
15 alias Pleroma.Web.OStatus
16 alias Pleroma.Web.WebFinger
17 alias Pleroma.Web.Websub
18
19 require Logger
20
21 @websub Application.get_env(:pleroma, :websub)
22 @ostatus Application.get_env(:pleroma, :ostatus)
23
24 def init do
25 # 1 minute
26 Process.sleep(1000 * 60)
27 refresh_subscriptions()
28 end
29
30 # Client API
31
32 def incoming_doc(doc) do
33 PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
34 end
35
36 def incoming_ap_doc(params) do
37 PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
38 end
39
40 def publish(activity, priority \\ 1) do
41 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
42 end
43
44 def verify_websub(websub) do
45 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
46 end
47
48 def request_subscription(sub) do
49 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
50 end
51
52 def refresh_subscriptions do
53 PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
54 end
55
56 # Job Worker Callbacks
57
58 def perform(:refresh_subscriptions) do
59 Logger.debug("Federator running refresh subscriptions")
60 Websub.refresh_subscriptions()
61
62 spawn(fn ->
63 # 6 hours
64 Process.sleep(1000 * 60 * 60 * 6)
65 refresh_subscriptions()
66 end)
67 end
68
69 def perform(:request_subscription, websub) do
70 Logger.debug("Refreshing #{websub.topic}")
71
72 with {:ok, websub} <- Websub.request_subscription(websub) do
73 Logger.debug("Successfully refreshed #{websub.topic}")
74 else
75 _e -> Logger.debug("Couldn't refresh #{websub.topic}")
76 end
77 end
78
79 def perform(:publish, activity) do
80 Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
81
82 with actor when not is_nil(actor) <- User.get_cached_by_ap_id(activity.data["actor"]) do
83 {:ok, actor} = WebFinger.ensure_keys_present(actor)
84
85 if Visibility.is_public?(activity) do
86 if OStatus.is_representable?(activity) do
87 Logger.info(fn -> "Sending #{activity.data["id"]} out via WebSub" end)
88 Websub.publish(Pleroma.Web.OStatus.feed_path(actor), actor, activity)
89 end
90 end
91
92 Publisher.publish(actor, activity)
93 end
94 end
95
96 def perform(:verify_websub, websub) do
97 Logger.debug(fn ->
98 "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
99 end)
100
101 @websub.verify(websub)
102 end
103
104 def perform(:incoming_doc, doc) do
105 Logger.info("Got document, trying to parse")
106 @ostatus.handle_incoming(doc)
107 end
108
109 def perform(:incoming_ap_doc, params) do
110 Logger.info("Handling incoming AP activity")
111
112 params = Utils.normalize_params(params)
113
114 # NOTE: we use the actor ID to do the containment, this is fine because an
115 # actor shouldn't be acting on objects outside their own AP server.
116 with {:ok, _user} <- ap_enabled_actor(params["actor"]),
117 nil <- Activity.normalize(params["id"]),
118 :ok <- Containment.contain_origin_from_id(params["actor"], params),
119 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
120 {:ok, activity}
121 else
122 %Activity{} ->
123 Logger.info("Already had #{params["id"]}")
124 :error
125
126 _e ->
127 # Just drop those for now
128 Logger.info("Unhandled activity")
129 Logger.info(Poison.encode!(params, pretty: 2))
130 :error
131 end
132 end
133
134 def perform(
135 :publish_single_websub,
136 %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
137 ) do
138 case Websub.publish_one(params) do
139 {:ok, _} ->
140 :ok
141
142 {:error, _} ->
143 RetryQueue.enqueue(params, Websub)
144 end
145 end
146
147 def perform(type, _) do
148 Logger.debug(fn -> "Unknown task: #{type}" end)
149 {:error, "Don't know what to do with this"}
150 end
151
152 def ap_enabled_actor(id) do
153 user = User.get_cached_by_ap_id(id)
154
155 if User.ap_enabled?(user) do
156 {:ok, user}
157 else
158 ActivityPub.make_user_from_ap_id(id)
159 end
160 end
161 end