[#1149] Addressed code review comments (code style, jobs pruning etc.).
[akkoma] / lib / pleroma / web / federator / federator.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Federator do
6 alias Pleroma.Activity
7 alias Pleroma.Object.Containment
8 alias Pleroma.User
9 alias Pleroma.Web.ActivityPub.ActivityPub
10 alias Pleroma.Web.ActivityPub.Transmogrifier
11 alias Pleroma.Web.ActivityPub.Utils
12 alias Pleroma.Web.Federator.Publisher
13 alias Pleroma.Web.OStatus
14 alias Pleroma.Web.Websub
15 alias Pleroma.Workers.PublisherWorker
16 alias Pleroma.Workers.ReceiverWorker
17 alias Pleroma.Workers.SubscriberWorker
18
19 require Logger
20
21 import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
22
23 def init do
24 # To do: consider removing this call in favor of scheduled execution (`quantum`-based)
25 refresh_subscriptions(schedule_in: 60)
26 end
27
28 @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)"
29 # credo:disable-for-previous-line Credo.Check.Readability.MaxLineLength
30 def allowed_incoming_reply_depth?(depth) do
31 max_replies_depth = Pleroma.Config.get([:instance, :federation_incoming_replies_max_depth])
32
33 if max_replies_depth do
34 (depth || 1) <= max_replies_depth
35 else
36 true
37 end
38 end
39
40 # Client API
41
42 def incoming_doc(doc) do
43 %{"op" => "incoming_doc", "body" => doc}
44 |> ReceiverWorker.new(worker_args(:federator_incoming))
45 |> Pleroma.Repo.insert()
46 end
47
48 def incoming_ap_doc(params) do
49 %{"op" => "incoming_ap_doc", "params" => params}
50 |> ReceiverWorker.new(worker_args(:federator_incoming))
51 |> Pleroma.Repo.insert()
52 end
53
54 def publish(%{id: "pleroma:fakeid"} = activity) do
55 perform(:publish, activity)
56 end
57
58 def publish(activity) do
59 %{"op" => "publish", "activity_id" => activity.id}
60 |> PublisherWorker.new(worker_args(:federator_outgoing))
61 |> Pleroma.Repo.insert()
62 end
63
64 def verify_websub(websub) do
65 %{"op" => "verify_websub", "websub_id" => websub.id}
66 |> SubscriberWorker.new(worker_args(:federator_outgoing))
67 |> Pleroma.Repo.insert()
68 end
69
70 def request_subscription(websub) do
71 %{"op" => "request_subscription", "websub_id" => websub.id}
72 |> SubscriberWorker.new(worker_args(:federator_outgoing))
73 |> Pleroma.Repo.insert()
74 end
75
76 def refresh_subscriptions(worker_args \\ []) do
77 %{"op" => "refresh_subscriptions"}
78 |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing))
79 |> Pleroma.Repo.insert()
80 end
81
82 # Job Worker Callbacks
83
84 @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
85 def perform(:publish_one, module, params) do
86 apply(module, :publish_one, [params])
87 end
88
89 def perform(:publish, activity) do
90 Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
91
92 with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]),
93 {:ok, actor} <- User.ensure_keys_present(actor) do
94 Publisher.publish(actor, activity)
95 end
96 end
97
98 def perform(:incoming_doc, doc) do
99 Logger.info("Got document, trying to parse")
100 OStatus.handle_incoming(doc)
101 end
102
103 def perform(:incoming_ap_doc, params) do
104 Logger.info("Handling incoming AP activity")
105
106 params = Utils.normalize_params(params)
107
108 # NOTE: we use the actor ID to do the containment, this is fine because an
109 # actor shouldn't be acting on objects outside their own AP server.
110 with {:ok, _user} <- ap_enabled_actor(params["actor"]),
111 nil <- Activity.normalize(params["id"]),
112 :ok <- Containment.contain_origin_from_id(params["actor"], params),
113 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
114 {:ok, activity}
115 else
116 %Activity{} ->
117 Logger.info("Already had #{params["id"]}")
118 :error
119
120 _e ->
121 # Just drop those for now
122 Logger.info("Unhandled activity")
123 Logger.info(Jason.encode!(params, pretty: true))
124 :error
125 end
126 end
127
128 def perform(:request_subscription, websub) do
129 Logger.debug("Refreshing #{websub.topic}")
130
131 with {:ok, websub} <- Websub.request_subscription(websub) do
132 Logger.debug("Successfully refreshed #{websub.topic}")
133 else
134 _e -> Logger.debug("Couldn't refresh #{websub.topic}")
135 end
136 end
137
138 def perform(:verify_websub, websub) do
139 Logger.debug(fn ->
140 "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
141 end)
142
143 Websub.verify(websub)
144 end
145
146 def perform(:refresh_subscriptions) do
147 Logger.debug("Federator running refresh subscriptions")
148 Websub.refresh_subscriptions()
149 end
150
151 def ap_enabled_actor(id) do
152 user = User.get_cached_by_ap_id(id)
153
154 if User.ap_enabled?(user) do
155 {:ok, user}
156 else
157 ActivityPub.make_user_from_ap_id(id)
158 end
159 end
160 end