Merge branch 'fix/debian-install-libmagic-typo' into 'develop'
[akkoma] / lib / pleroma / web / fed_sockets / fed_socket.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.FedSockets.FedSocket do
6 @moduledoc """
7 The FedSocket module abstracts the actions to be taken taken on connections regardless of
8 whether the connection started as inbound or outbound.
9
10
11 Normally outside modules will have no need to call the FedSocket module directly.
12 """
13
14 alias Pleroma.Object
15 alias Pleroma.Object.Containment
16 alias Pleroma.User
17 alias Pleroma.Web.ActivityPub.ObjectView
18 alias Pleroma.Web.ActivityPub.UserView
19 alias Pleroma.Web.ActivityPub.Visibility
20 alias Pleroma.Web.FedSockets.FetchRegistry
21 alias Pleroma.Web.FedSockets.IngesterWorker
22 alias Pleroma.Web.FedSockets.OutgoingHandler
23 alias Pleroma.Web.FedSockets.SocketInfo
24
25 require Logger
26
27 @shake "61dd18f7-f1e6-49a4-939a-a749fcdc1103"
28
29 def connect_to_host(uri) do
30 case OutgoingHandler.start_link(uri) do
31 {:ok, pid} ->
32 {:ok, pid}
33
34 error ->
35 {:error, error}
36 end
37 end
38
39 def close(%SocketInfo{pid: socket_pid}),
40 do: Process.send(socket_pid, :close, [])
41
42 def publish(%SocketInfo{pid: socket_pid}, json) do
43 %{action: :publish, data: json}
44 |> Jason.encode!()
45 |> send_packet(socket_pid)
46 end
47
48 def fetch(%SocketInfo{pid: socket_pid}, id) do
49 fetch_uuid = FetchRegistry.register_fetch(id)
50
51 %{action: :fetch, data: id, uuid: fetch_uuid}
52 |> Jason.encode!()
53 |> send_packet(socket_pid)
54
55 wait_for_fetch_to_return(fetch_uuid, 0)
56 end
57
58 def receive_package(%SocketInfo{} = fed_socket, json) do
59 json
60 |> Jason.decode!()
61 |> process_package(fed_socket)
62 end
63
64 defp wait_for_fetch_to_return(uuid, cntr) do
65 case FetchRegistry.check_fetch(uuid) do
66 {:error, :waiting} ->
67 Process.sleep(:math.pow(cntr, 3) |> Kernel.trunc())
68 wait_for_fetch_to_return(uuid, cntr + 1)
69
70 {:error, :missing} ->
71 Logger.error("FedSocket fetch timed out - #{inspect(uuid)}")
72 {:error, :timeout}
73
74 {:ok, _fr} ->
75 FetchRegistry.pop_fetch(uuid)
76 end
77 end
78
79 defp process_package(%{"action" => "publish", "data" => data}, %{origin: origin} = _fed_socket) do
80 if Containment.contain_origin(origin, data) do
81 IngesterWorker.enqueue("ingest", %{"object" => data})
82 end
83
84 {:reply, %{"action" => "publish_reply", "status" => "processed"}}
85 end
86
87 defp process_package(%{"action" => "fetch_reply", "uuid" => uuid, "data" => data}, _fed_socket) do
88 FetchRegistry.register_fetch_received(uuid, data)
89 {:noreply, nil}
90 end
91
92 defp process_package(%{"action" => "fetch", "uuid" => uuid, "data" => ap_id}, _fed_socket) do
93 {:ok, data} = render_fetched_data(ap_id, uuid)
94 {:reply, data}
95 end
96
97 defp process_package(%{"action" => "publish_reply"}, _fed_socket) do
98 {:noreply, nil}
99 end
100
101 defp process_package(other, _fed_socket) do
102 Logger.warn("unknown json packages received #{inspect(other)}")
103 {:noreply, nil}
104 end
105
106 defp render_fetched_data(ap_id, uuid) do
107 {:ok,
108 %{
109 "action" => "fetch_reply",
110 "status" => "processed",
111 "uuid" => uuid,
112 "data" => represent_item(ap_id)
113 }}
114 end
115
116 defp represent_item(ap_id) do
117 case User.get_by_ap_id(ap_id) do
118 nil ->
119 object = Object.get_cached_by_ap_id(ap_id)
120
121 if Visibility.is_public?(object) do
122 Phoenix.View.render_to_string(ObjectView, "object.json", object: object)
123 else
124 nil
125 end
126
127 user ->
128 Phoenix.View.render_to_string(UserView, "user.json", user: user)
129 end
130 end
131
132 defp send_packet(data, socket_pid) do
133 Process.send(socket_pid, {:send, data}, [])
134 end
135
136 def shake, do: @shake
137 end