43558b4e6b71d3d0b4068edca70dde3c1499784f
[akkoma] / lib / pleroma / workers / receiver.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Workers.Receiver do
6 alias Pleroma.Activity
7 alias Pleroma.Object.Containment
8 alias Pleroma.User
9 alias Pleroma.Web.ActivityPub.ActivityPub
10 alias Pleroma.Web.ActivityPub.Transmogrifier
11 alias Pleroma.Web.ActivityPub.Utils
12 alias Pleroma.Web.OStatus
13
14 require Logger
15
16 # Note: `max_attempts` is intended to be overridden in `new/1` call
17 use Oban.Worker,
18 queue: "federator_incoming",
19 max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
20
21 @impl Oban.Worker
22 def perform(%{"op" => "incoming_doc", "body" => doc}) do
23 Logger.info("Got incoming document, trying to parse")
24 OStatus.handle_incoming(doc)
25 end
26
27 def perform(%{"op" => "incoming_ap_doc", "params" => params}) do
28 Logger.info("Handling incoming AP activity")
29
30 params = Utils.normalize_params(params)
31
32 # NOTE: we use the actor ID to do the containment, this is fine because an
33 # actor shouldn't be acting on objects outside their own AP server.
34 with {:ok, _user} <- ap_enabled_actor(params["actor"]),
35 nil <- Activity.normalize(params["id"]),
36 :ok <- Containment.contain_origin_from_id(params["actor"], params),
37 {:ok, activity} <- Transmogrifier.handle_incoming(params) do
38 {:ok, activity}
39 else
40 %Activity{} ->
41 Logger.info("Already had #{params["id"]}")
42 :error
43
44 _e ->
45 # Just drop those for now
46 Logger.info("Unhandled activity")
47 Logger.info(Jason.encode!(params, pretty: true))
48 :error
49 end
50 end
51
52 defp ap_enabled_actor(id) do
53 user = User.get_cached_by_ap_id(id)
54
55 if User.ap_enabled?(user) do
56 {:ok, user}
57 else
58 ActivityPub.make_user_from_ap_id(id)
59 end
60 end
61 end