[#1149] Added Oban job for "activity_expiration". Merged remote-tracking branch ...
[akkoma] / lib / pleroma / web / activity_pub / publisher.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.Publisher do
6 alias Pleroma.Activity
7 alias Pleroma.Config
8 alias Pleroma.HTTP
9 alias Pleroma.Instances
10 alias Pleroma.User
11 alias Pleroma.Web.ActivityPub.Relay
12 alias Pleroma.Web.ActivityPub.Transmogrifier
13
14 require Pleroma.Constants
15
16 import Pleroma.Web.ActivityPub.Visibility
17
18 @behaviour Pleroma.Web.Federator.Publisher
19
20 require Logger
21
22 @moduledoc """
23 ActivityPub outgoing federation module.
24 """
25
26 @doc """
27 Determine if an activity can be represented by running it through Transmogrifier.
28 """
29 def is_representable?(%Activity{} = activity) do
30 with {:ok, _data} <- Transmogrifier.prepare_outgoing(activity.data) do
31 true
32 else
33 _e ->
34 false
35 end
36 end
37
38 @doc """
39 Publish a single message to a peer. Takes a struct with the following
40 parameters set:
41
42 * `inbox`: the inbox to publish to
43 * `json`: the JSON message body representing the ActivityPub message
44 * `actor`: the actor which is signing the message
45 * `id`: the ActivityStreams URI of the message
46 """
47 def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = params) do
48 Logger.info("Federating #{id} to #{inbox}")
49 %{host: host, path: path} = URI.parse(inbox)
50
51 digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
52
53 date = Pleroma.Signature.signed_date()
54
55 signature =
56 Pleroma.Signature.sign(actor, %{
57 "(request-target)": "post #{path}",
58 host: host,
59 "content-length": byte_size(json),
60 digest: digest,
61 date: date
62 })
63
64 with {:ok, %{status: code}} when code in 200..299 <-
65 result =
66 HTTP.post(
67 inbox,
68 json,
69 [
70 {"Content-Type", "application/activity+json"},
71 {"Date", date},
72 {"signature", signature},
73 {"digest", digest}
74 ]
75 ) do
76 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
77 do: Instances.set_reachable(inbox)
78
79 result
80 else
81 {_post_result, response} ->
82 unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
83 {:error, response}
84 end
85 end
86
87 def publish_one(%{actor_id: actor_id} = params) do
88 actor = User.get_by_id(actor_id)
89
90 params
91 |> Map.delete(:actor_id)
92 |> Map.put(:actor, actor)
93 |> publish_one()
94 end
95
96 defp should_federate?(inbox, public) do
97 if public do
98 true
99 else
100 %{host: host} = URI.parse(inbox)
101
102 quarantined_instances =
103 Config.get([:instance, :quarantined_instances], [])
104 |> Pleroma.Web.ActivityPub.MRF.subdomains_regex()
105
106 !Pleroma.Web.ActivityPub.MRF.subdomain_match?(quarantined_instances, host)
107 end
108 end
109
110 @spec recipients(User.t(), Activity.t()) :: list(User.t()) | []
111 defp recipients(actor, activity) do
112 {:ok, followers} =
113 if actor.follower_address in activity.recipients do
114 User.get_external_followers(actor)
115 else
116 {:ok, []}
117 end
118
119 Pleroma.Web.Salmon.remote_users(actor, activity) ++ followers
120 end
121
122 defp get_cc_ap_ids(ap_id, recipients) do
123 host = Map.get(URI.parse(ap_id), :host)
124
125 recipients
126 |> Enum.filter(fn %User{ap_id: ap_id} -> Map.get(URI.parse(ap_id), :host) == host end)
127 |> Enum.map(& &1.ap_id)
128 end
129
130 defp maybe_use_sharedinbox(%User{info: %{source_data: data}}),
131 do: (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
132
133 @doc """
134 Determine a user inbox to use based on heuristics. These heuristics
135 are based on an approximation of the ``sharedInbox`` rules in the
136 [ActivityPub specification][ap-sharedinbox].
137
138 Please do not edit this function (or its children) without reading
139 the spec, as editing the code is likely to introduce some breakage
140 without some familiarity.
141
142 [ap-sharedinbox]: https://www.w3.org/TR/activitypub/#shared-inbox-delivery
143 """
144 def determine_inbox(
145 %Activity{data: activity_data},
146 %User{info: %{source_data: data}} = user
147 ) do
148 to = activity_data["to"] || []
149 cc = activity_data["cc"] || []
150 type = activity_data["type"]
151
152 cond do
153 type == "Delete" ->
154 maybe_use_sharedinbox(user)
155
156 Pleroma.Constants.as_public() in to || Pleroma.Constants.as_public() in cc ->
157 maybe_use_sharedinbox(user)
158
159 length(to) + length(cc) > 1 ->
160 maybe_use_sharedinbox(user)
161
162 true ->
163 data["inbox"]
164 end
165 end
166
167 @doc """
168 Publishes an activity with BCC to all relevant peers.
169 """
170
171 def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity)
172 when is_list(bcc) and bcc != [] do
173 public = is_public?(activity)
174 {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
175
176 recipients = recipients(actor, activity)
177
178 recipients
179 |> Enum.filter(&User.ap_enabled?/1)
180 |> Enum.map(fn %{info: %{source_data: data}} -> data["inbox"] end)
181 |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
182 |> Instances.filter_reachable()
183 |> Enum.each(fn {inbox, unreachable_since} ->
184 %User{ap_id: ap_id} =
185 Enum.find(recipients, fn %{info: %{source_data: data}} -> data["inbox"] == inbox end)
186
187 # Get all the recipients on the same host and add them to cc. Otherwise, a remote
188 # instance would only accept a first message for the first recipient and ignore the rest.
189 cc = get_cc_ap_ids(ap_id, recipients)
190
191 json =
192 data
193 |> Map.put("cc", cc)
194 |> Jason.encode!()
195
196 Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{
197 inbox: inbox,
198 json: json,
199 actor_id: actor.id,
200 id: activity.data["id"],
201 unreachable_since: unreachable_since
202 })
203 end)
204 end
205
206 @doc """
207 Publishes an activity to all relevant peers.
208 """
209 def publish(%User{} = actor, %Activity{} = activity) do
210 public = is_public?(activity)
211
212 if public && Config.get([:instance, :allow_relay]) do
213 Logger.info(fn -> "Relaying #{activity.data["id"]} out" end)
214 Relay.publish(activity)
215 end
216
217 {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
218 json = Jason.encode!(data)
219
220 recipients(actor, activity)
221 |> Enum.filter(fn user -> User.ap_enabled?(user) end)
222 |> Enum.map(fn %User{} = user ->
223 determine_inbox(activity, user)
224 end)
225 |> Enum.uniq()
226 |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
227 |> Instances.filter_reachable()
228 |> Enum.each(fn {inbox, unreachable_since} ->
229 Pleroma.Web.Federator.Publisher.enqueue_one(
230 __MODULE__,
231 %{
232 inbox: inbox,
233 json: json,
234 actor_id: actor.id,
235 id: activity.data["id"],
236 unreachable_since: unreachable_since
237 }
238 )
239 end)
240 end
241
242 def gather_webfinger_links(%User{} = user) do
243 [
244 %{"rel" => "self", "type" => "application/activity+json", "href" => user.ap_id},
245 %{
246 "rel" => "self",
247 "type" => "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"",
248 "href" => user.ap_id
249 }
250 ]
251 end
252
253 def gather_nodeinfo_protocol_names, do: ["activitypub"]
254 end