1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.Federator do
10 alias Pleroma.Web.WebFinger
11 alias Pleroma.Web.Websub
12 alias Pleroma.Web.Salmon
13 alias Pleroma.Web.ActivityPub.ActivityPub
14 alias Pleroma.Web.ActivityPub.Relay
15 alias Pleroma.Web.ActivityPub.Transmogrifier
16 alias Pleroma.Web.ActivityPub.Utils
17 alias Pleroma.Web.Federator.RetryQueue
18 alias Pleroma.Web.OStatus
22 @websub Application.get_env(:pleroma, :websub)
23 @ostatus Application.get_env(:pleroma, :ostatus)
32 Process.sleep(1000 * 60)
33 enqueue(:refresh_subscriptions, nil)
39 in: {:sets.new(), []},
40 out: {:sets.new(), []}
46 def handle(:refresh_subscriptions, _) do
47 Logger.debug("Federator running refresh subscriptions")
48 Websub.refresh_subscriptions()
52 Process.sleep(1000 * 60 * 60 * 6)
53 enqueue(:refresh_subscriptions, nil)
57 def handle(:request_subscription, websub) do
58 Logger.debug("Refreshing #{websub.topic}")
60 with {:ok, websub} <- Websub.request_subscription(websub) do
61 Logger.debug("Successfully refreshed #{websub.topic}")
63 _e -> Logger.debug("Couldn't refresh #{websub.topic}")
67 def handle(:publish, activity) do
68 Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
70 with actor when not is_nil(actor) <- User.get_cached_by_ap_id(activity.data["actor"]) do
71 {:ok, actor} = WebFinger.ensure_keys_present(actor)
73 if ActivityPub.is_public?(activity) do
74 if OStatus.is_representable?(activity) do
75 Logger.info(fn -> "Sending #{activity.data["id"]} out via WebSub" end)
76 Websub.publish(Pleroma.Web.OStatus.feed_path(actor), actor, activity)
78 Logger.info(fn -> "Sending #{activity.data["id"]} out via Salmon" end)
79 Pleroma.Web.Salmon.publish(actor, activity)
82 if Keyword.get(Application.get_env(:pleroma, :instance), :allow_relay) do
83 Logger.info(fn -> "Relaying #{activity.data["id"]} out" end)
84 Relay.publish(activity)
88 Logger.info(fn -> "Sending #{activity.data["id"]} out via AP" end)
89 Pleroma.Web.ActivityPub.ActivityPub.publish(actor, activity)
93 def handle(:verify_websub, websub) do
95 "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
98 @websub.verify(websub)
101 def handle(:incoming_doc, doc) do
102 Logger.info("Got document, trying to parse")
103 @ostatus.handle_incoming(doc)
106 def handle(:incoming_ap_doc, params) do
107 Logger.info("Handling incoming AP activity")
109 params = Utils.normalize_params(params)
111 # NOTE: we use the actor ID to do the containment, this is fine because an
112 # actor shouldn't be acting on objects outside their own AP server.
113 with {:ok, _user} <- ap_enabled_actor(params["actor"]),
114 nil <- Activity.normalize(params["id"]),
115 :ok <- Transmogrifier.contain_origin_from_id(params["actor"], params),
116 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
120 Logger.info("Already had #{params["id"]}")
124 # Just drop those for now
125 Logger.info("Unhandled activity")
126 Logger.info(Poison.encode!(params, pretty: 2))
131 def handle(:publish_single_salmon, params) do
132 Salmon.send_to_user(params)
135 def handle(:publish_single_ap, params) do
136 case ActivityPub.publish_one(params) do
141 RetryQueue.enqueue(params, ActivityPub)
146 :publish_single_websub,
147 %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
149 case Websub.publish_one(params) do
154 RetryQueue.enqueue(params, Websub)
158 def handle(type, _) do
159 Logger.debug(fn -> "Unknown task: #{type}" end)
160 {:error, "Don't know what to do with this"}
163 if Mix.env() == :test do
164 def enqueue(type, payload, _priority \\ 1) do
165 if Pleroma.Config.get([:instance, :federating]) do
166 handle(type, payload)
170 def enqueue(type, payload, priority \\ 1) do
171 if Pleroma.Config.get([:instance, :federating]) do
172 GenServer.cast(__MODULE__, {:enqueue, type, payload, priority})
177 def maybe_start_job(running_jobs, queue) do
178 if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, :max_jobs]) && queue != [] do
179 {{type, payload}, queue} = queue_pop(queue)
180 {:ok, pid} = Task.start(fn -> handle(type, payload) end)
181 mref = Process.monitor(pid)
182 {:sets.add_element(mref, running_jobs), queue}
184 {running_jobs, queue}
188 def handle_cast({:enqueue, type, payload, _priority}, state)
189 when type in [:incoming_doc, :incoming_ap_doc] do
190 %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
191 i_queue = enqueue_sorted(i_queue, {type, payload}, 1)
192 {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue)
193 {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
196 def handle_cast({:enqueue, type, payload, _priority}, state) do
197 %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
198 o_queue = enqueue_sorted(o_queue, {type, payload}, 1)
199 {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue)
200 {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
203 def handle_cast(_, state) do
207 def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
208 %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
209 i_running_jobs = :sets.del_element(ref, i_running_jobs)
210 o_running_jobs = :sets.del_element(ref, o_running_jobs)
211 {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue)
212 {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue)
214 {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
217 def enqueue_sorted(queue, element, priority) do
218 [%{item: element, priority: priority} | queue]
219 |> Enum.sort_by(fn %{priority: priority} -> priority end)
222 def queue_pop([%{item: element} | queue]) do
226 def ap_enabled_actor(id) do
227 user = User.get_by_ap_id(id)
229 if User.ap_enabled?(user) do
232 ActivityPub.make_user_from_ap_id(id)