Merge branch 'feature/optimize_rich_media_parser' into 'develop'
[akkoma] / lib / pleroma / web / federator / federator.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Federator do
6 alias Pleroma.Activity
7 alias Pleroma.Object.Containment
8 alias Pleroma.User
9 alias Pleroma.Web.ActivityPub.ActivityPub
10 alias Pleroma.Web.ActivityPub.Transmogrifier
11 alias Pleroma.Web.ActivityPub.Utils
12 alias Pleroma.Web.Federator.Publisher
13 alias Pleroma.Web.OStatus
14 alias Pleroma.Web.Websub
15 alias Pleroma.Workers.PublisherWorker
16 alias Pleroma.Workers.ReceiverWorker
17 alias Pleroma.Workers.SubscriberWorker
18
19 require Logger
20
21 def init do
22 # To do: consider removing this call in favor of scheduled execution (`quantum`-based)
23 refresh_subscriptions(schedule_in: 60)
24 end
25
26 @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)"
27 # credo:disable-for-previous-line Credo.Check.Readability.MaxLineLength
28 def allowed_incoming_reply_depth?(depth) do
29 max_replies_depth = Pleroma.Config.get([:instance, :federation_incoming_replies_max_depth])
30
31 if max_replies_depth do
32 (depth || 1) <= max_replies_depth
33 else
34 true
35 end
36 end
37
38 # Client API
39
40 def incoming_doc(doc) do
41 ReceiverWorker.enqueue("incoming_doc", %{"body" => doc})
42 end
43
44 def incoming_ap_doc(params) do
45 ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params})
46 end
47
48 def publish(%{id: "pleroma:fakeid"} = activity) do
49 perform(:publish, activity)
50 end
51
52 def publish(activity) do
53 PublisherWorker.enqueue("publish", %{"activity_id" => activity.id})
54 end
55
56 def verify_websub(websub) do
57 SubscriberWorker.enqueue("verify_websub", %{"websub_id" => websub.id})
58 end
59
60 def request_subscription(websub) do
61 SubscriberWorker.enqueue("request_subscription", %{"websub_id" => websub.id})
62 end
63
64 def refresh_subscriptions(worker_args \\ []) do
65 SubscriberWorker.enqueue("refresh_subscriptions", %{}, worker_args ++ [max_attempts: 1])
66 end
67
68 # Job Worker Callbacks
69
70 @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
71 def perform(:publish_one, module, params) do
72 apply(module, :publish_one, [params])
73 end
74
75 def perform(:publish, activity) do
76 Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
77
78 with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]),
79 {:ok, actor} <- User.ensure_keys_present(actor) do
80 Publisher.publish(actor, activity)
81 end
82 end
83
84 def perform(:incoming_doc, doc) do
85 Logger.info("Got document, trying to parse")
86 OStatus.handle_incoming(doc)
87 end
88
89 def perform(:incoming_ap_doc, params) do
90 Logger.info("Handling incoming AP activity")
91
92 params = Utils.normalize_params(params)
93
94 # NOTE: we use the actor ID to do the containment, this is fine because an
95 # actor shouldn't be acting on objects outside their own AP server.
96 with {:ok, _user} <- ap_enabled_actor(params["actor"]),
97 nil <- Activity.normalize(params["id"]),
98 :ok <- Containment.contain_origin_from_id(params["actor"], params),
99 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
100 {:ok, activity}
101 else
102 %Activity{} ->
103 Logger.info("Already had #{params["id"]}")
104 :error
105
106 _e ->
107 # Just drop those for now
108 Logger.info("Unhandled activity")
109 Logger.info(Jason.encode!(params, pretty: true))
110 :error
111 end
112 end
113
114 def perform(:request_subscription, websub) do
115 Logger.debug("Refreshing #{websub.topic}")
116
117 with {:ok, websub} <- Websub.request_subscription(websub) do
118 Logger.debug("Successfully refreshed #{websub.topic}")
119 else
120 _e -> Logger.debug("Couldn't refresh #{websub.topic}")
121 end
122 end
123
124 def perform(:verify_websub, websub) do
125 Logger.debug(fn ->
126 "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
127 end)
128
129 Websub.verify(websub)
130 end
131
132 def perform(:refresh_subscriptions) do
133 Logger.debug("Federator running refresh subscriptions")
134 Websub.refresh_subscriptions()
135 end
136
137 def ap_enabled_actor(id) do
138 user = User.get_cached_by_ap_id(id)
139
140 if User.ap_enabled?(user) do
141 {:ok, user}
142 else
143 ActivityPub.make_user_from_ap_id(id)
144 end
145 end
146 end