Pipeline test: Switch from Mock to Mox.
[akkoma] / lib / pleroma / web / federator.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Federator.Publishing do
6 @callback publish(map()) :: any()
7 end
8
9 defmodule Pleroma.Web.Federator do
10 alias Pleroma.Activity
11 alias Pleroma.Object.Containment
12 alias Pleroma.User
13 alias Pleroma.Web.ActivityPub.ActivityPub
14 alias Pleroma.Web.ActivityPub.Transmogrifier
15 alias Pleroma.Web.ActivityPub.Utils
16 alias Pleroma.Web.Federator.Publisher
17 alias Pleroma.Workers.PublisherWorker
18 alias Pleroma.Workers.ReceiverWorker
19
20 require Logger
21
22 @behaviour Pleroma.Web.Federator.Publishing
23
24 @doc """
25 Returns `true` if the distance to target object does not exceed max configured value.
26 Serves to prevent fetching of very long threads, especially useful on smaller instances.
27 Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161).
28 Applies to fetching of both ancestor (reply-to) and child (reply) objects.
29 """
30 # credo:disable-for-previous-line Credo.Check.Readability.MaxLineLength
31 def allowed_thread_distance?(distance) do
32 max_distance = Pleroma.Config.get([:instance, :federation_incoming_replies_max_depth])
33
34 if max_distance && max_distance >= 0 do
35 # Default depth is 0 (an object has zero distance from itself in its thread)
36 (distance || 0) <= max_distance
37 else
38 true
39 end
40 end
41
42 # Client API
43
44 def incoming_ap_doc(params) do
45 ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params})
46 end
47
48 @impl true
49 def publish(%{id: "pleroma:fakeid"} = activity) do
50 perform(:publish, activity)
51 end
52
53 @impl true
54 def publish(activity) do
55 PublisherWorker.enqueue("publish", %{"activity_id" => activity.id})
56 end
57
58 # Job Worker Callbacks
59
60 @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
61 def perform(:publish_one, module, params) do
62 apply(module, :publish_one, [params])
63 end
64
65 def perform(:publish, activity) do
66 Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
67
68 with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]),
69 {:ok, actor} <- User.ensure_keys_present(actor) do
70 Publisher.publish(actor, activity)
71 end
72 end
73
74 def perform(:incoming_ap_doc, params) do
75 Logger.debug("Handling incoming AP activity")
76
77 actor =
78 params
79 |> Map.get("actor")
80 |> Utils.get_ap_id()
81
82 # NOTE: we use the actor ID to do the containment, this is fine because an
83 # actor shouldn't be acting on objects outside their own AP server.
84 with {_, {:ok, _user}} <- {:actor, ap_enabled_actor(actor)},
85 nil <- Activity.normalize(params["id"]),
86 {_, :ok} <-
87 {:correct_origin?, Containment.contain_origin_from_id(actor, params)},
88 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
89 {:ok, activity}
90 else
91 {:correct_origin?, _} ->
92 Logger.debug("Origin containment failure for #{params["id"]}")
93 {:error, :origin_containment_failed}
94
95 %Activity{} ->
96 Logger.debug("Already had #{params["id"]}")
97 {:error, :already_present}
98
99 {:actor, e} ->
100 Logger.debug("Unhandled actor #{actor}, #{inspect(e)}")
101 {:error, e}
102
103 e ->
104 # Just drop those for now
105 Logger.debug(fn -> "Unhandled activity\n" <> Jason.encode!(params, pretty: true) end)
106 {:error, e}
107 end
108 end
109
110 def ap_enabled_actor(id) do
111 user = User.get_cached_by_ap_id(id)
112
113 if User.ap_enabled?(user) do
114 {:ok, user}
115 else
116 ActivityPub.make_user_from_ap_id(id)
117 end
118 end
119 end