1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.FedSockets.FedSocket do
7 The FedSocket module abstracts the actions to be taken taken on connections regardless of
8 whether the connection started as inbound or outbound.
11 Normally outside modules will have no need to call the FedSocket module directly.
15 alias Pleroma.Object.Containment
17 alias Pleroma.Web.ActivityPub.ObjectView
18 alias Pleroma.Web.ActivityPub.UserView
19 alias Pleroma.Web.ActivityPub.Visibility
20 alias Pleroma.Web.FedSockets.FetchRegistry
21 alias Pleroma.Web.FedSockets.IngesterWorker
22 alias Pleroma.Web.FedSockets.OutgoingHandler
23 alias Pleroma.Web.FedSockets.SocketInfo
27 @shake "61dd18f7-f1e6-49a4-939a-a749fcdc1103"
29 def connect_to_host(uri) do
30 case OutgoingHandler.start_link(uri) do
39 def close(%SocketInfo{pid: socket_pid}),
40 do: Process.send(socket_pid, :close, [])
42 def publish(%SocketInfo{pid: socket_pid}, json) do
43 %{action: :publish, data: json}
45 |> send_packet(socket_pid)
48 def fetch(%SocketInfo{pid: socket_pid}, id) do
49 fetch_uuid = FetchRegistry.register_fetch(id)
51 %{action: :fetch, data: id, uuid: fetch_uuid}
53 |> send_packet(socket_pid)
55 wait_for_fetch_to_return(fetch_uuid, 0)
58 def receive_package(%SocketInfo{} = fed_socket, json) do
61 |> process_package(fed_socket)
64 defp wait_for_fetch_to_return(uuid, cntr) do
65 case FetchRegistry.check_fetch(uuid) do
67 Process.sleep(:math.pow(cntr, 3) |> Kernel.trunc())
68 wait_for_fetch_to_return(uuid, cntr + 1)
71 Logger.error("FedSocket fetch timed out - #{inspect(uuid)}")
75 FetchRegistry.pop_fetch(uuid)
79 defp process_package(%{"action" => "publish", "data" => data}, %{origin: origin} = _fed_socket) do
80 if Containment.contain_origin(origin, data) do
81 IngesterWorker.enqueue("ingest", %{"object" => data})
84 {:reply, %{"action" => "publish_reply", "status" => "processed"}}
87 defp process_package(%{"action" => "fetch_reply", "uuid" => uuid, "data" => data}, _fed_socket) do
88 FetchRegistry.register_fetch_received(uuid, data)
92 defp process_package(%{"action" => "fetch", "uuid" => uuid, "data" => ap_id}, _fed_socket) do
93 {:ok, data} = render_fetched_data(ap_id, uuid)
97 defp process_package(%{"action" => "publish_reply"}, _fed_socket) do
101 defp process_package(other, _fed_socket) do
102 Logger.warn("unknown json packages received #{inspect(other)}")
106 defp render_fetched_data(ap_id, uuid) do
109 "action" => "fetch_reply",
110 "status" => "processed",
112 "data" => represent_item(ap_id)
116 defp represent_item(ap_id) do
117 case User.get_by_ap_id(ap_id) do
119 object = Object.get_cached_by_ap_id(ap_id)
121 if Visibility.is_public?(object) do
122 Phoenix.View.render_to_string(ObjectView, "object.json", object: object)
128 Phoenix.View.render_to_string(UserView, "user.json", user: user)
132 defp send_packet(data, socket_pid) do
133 Process.send(socket_pid, {:send, data}, [])
136 def shake, do: @shake