Federate data through persistent websocket connections
authorSteven Fuchs <steven.fuchs@dockyard.com>
Fri, 18 Sep 2020 11:58:22 +0000 (11:58 +0000)
committerlain <lain@soykaf.club>
Fri, 18 Sep 2020 11:58:22 +0000 (11:58 +0000)
25 files changed:
CHANGELOG.md
config/config.exs
config/description.exs
config/test.exs
docs/configuration/cheatsheet.md
lib/pleroma/application.ex
lib/pleroma/object/fetcher.ex
lib/pleroma/signature.ex
lib/pleroma/user.ex
lib/pleroma/web/activity_pub/activity_pub.ex
lib/pleroma/web/activity_pub/publisher.ex
lib/pleroma/web/activity_pub/transmogrifier.ex
lib/pleroma/web/fed_sockets/fed_registry.ex [new file with mode: 0644]
lib/pleroma/web/fed_sockets/fed_socket.ex [new file with mode: 0644]
lib/pleroma/web/fed_sockets/fed_sockets.ex [new file with mode: 0644]
lib/pleroma/web/fed_sockets/fetch_registry.ex [new file with mode: 0644]
lib/pleroma/web/fed_sockets/incoming_handler.ex [new file with mode: 0644]
lib/pleroma/web/fed_sockets/ingester_worker.ex [new file with mode: 0644]
lib/pleroma/web/fed_sockets/outgoing_handler.ex [new file with mode: 0644]
lib/pleroma/web/fed_sockets/socket_info.ex [new file with mode: 0644]
lib/pleroma/web/fed_sockets/supervisor.ex [new file with mode: 0644]
mix.lock
test/web/fed_sockets/fed_registry_test.exs [new file with mode: 0644]
test/web/fed_sockets/fetch_registry_test.exs [new file with mode: 0644]
test/web/fed_sockets/socket_info_test.exs [new file with mode: 0644]

index 1266d2dfebb4c3ddbbbe9353b96d4ac1dd61f418..5e4d06c8278e245e805d156f318ae665e1f74871 100644 (file)
@@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
 
 ## Unreleased
 
+### Added
+- Experimental websocket-based federation between Pleroma instances.
+
 ### Changed
 
 - Renamed `:await_up_timeout` in `:connections_pool` namespace to `:connect_timeout`, old name is deprecated.
index c204814d0573d0172e156b1ecb1c56b23977e47f..104013b417ca2676609c8b4caacde562d1ec76c6 100644 (file)
@@ -130,6 +130,7 @@ config :pleroma, Pleroma.Web.Endpoint,
     dispatch: [
       {:_,
        [
+         {"/api/fedsocket/v1", Pleroma.Web.FedSockets.IncomingHandler, []},
          {"/api/v1/streaming", Pleroma.Web.MastodonAPI.WebsocketHandler, []},
          {"/websocket", Phoenix.Endpoint.CowboyWebSocket,
           {Phoenix.Transports.WebSocket,
@@ -148,6 +149,16 @@ config :pleroma, Pleroma.Web.Endpoint,
     "SameSite=Lax"
   ]
 
+config :pleroma, :fed_sockets,
+  enabled: false,
+  connection_duration: :timer.hours(8),
+  rejection_duration: :timer.minutes(15),
+  fed_socket_fetches: [
+    default: 12_000,
+    interval: 3_000,
+    lazy: false
+  ]
+
 # Configures Elixir's Logger
 config :logger, :console,
   level: :debug,
@@ -532,6 +543,7 @@ config :pleroma, Oban,
     token_expiration: 5,
     federator_incoming: 50,
     federator_outgoing: 50,
+    ingestion_queue: 50,
     web_push: 50,
     mailer: 10,
     transmogrifier: 20,
index 2b30f81489b5dd25769636ed2842da1486a668d1..6f3855918a836c356937bcaad1d080534bcf38b0 100644 (file)
@@ -270,6 +270,19 @@ config :pleroma, :config_description, [
       }
     ]
   },
+  %{
+    group: :pleroma,
+    key: :fed_sockets,
+    type: :group,
+    description: "Websocket based federation",
+    children: [
+      %{
+        key: :enabled,
+        type: :boolean,
+        description: "Enable FedSockets"
+      }
+    ]
+  },
   %{
     group: :pleroma,
     key: Pleroma.Emails.Mailer,
index 0ee6f1b7f3043fd543f375612d9792353c2cccb8..93a0e2a61c032d63f12850e392b65fc8a44ba6e3 100644 (file)
@@ -19,6 +19,11 @@ config :logger, :console,
   level: :warn,
   format: "\n[$level] $message\n"
 
+config :pleroma, :fed_sockets,
+  enabled: false,
+  connection_duration: 5,
+  rejection_duration: 5
+
 config :pleroma, :auth, oauth_consumer_strategies: []
 
 config :pleroma, Pleroma.Upload,
index 054b8fe4369d7a0b95cfea3fee067b220bde528a..9a275294e7876071ea72a65cdc0cc9d22457c34f 100644 (file)
@@ -225,6 +225,16 @@ Enables the worker which processes posts scheduled for deletion. Pinned posts ar
 
 * `enabled`: whether expired activities will be sent to the job queue to be deleted
 
+## FedSockets
+FedSockets is an experimental feature allowing for Pleroma backends to federate using a persistant websocket connection as opposed to making each federation a seperate http connection. This feature is currently off by default. It is configurable throught he following options.
+
+### :fedsockets
+* `enabled`: Enables FedSockets for this instance. `false` by default.
+* `connection_duration`: Time an idle websocket is kept open.
+* `rejection_duration`: Failures to connect via FedSockets will not be retried for this period of time.
+* `fed_socket_fetches` and `fed_socket_rejections`: Settings passed to `cachex` for the fetch registry, and rejection stacks. See `Pleroma.Web.FedSockets` for more details.
+
+
 ## Frontends
 
 ### :frontend_configurations
index c39e24919c4c76e4ce0c682f99767e71106ba234..00ec79a2a68a6bda886e9e1f4b18084a73078174 100644 (file)
@@ -99,7 +99,7 @@ defmodule Pleroma.Application do
           {Oban, Config.get(Oban)}
         ] ++
         task_children(@env) ++
-        streamer_child(@env) ++
+        dont_run_in_test(@env) ++
         chat_child(@env, chat_enabled?()) ++
         [
           Pleroma.Web.Endpoint,
@@ -188,16 +188,17 @@ defmodule Pleroma.Application do
 
   defp chat_enabled?, do: Config.get([:chat, :enabled])
 
-  defp streamer_child(env) when env in [:test, :benchmark], do: []
+  defp dont_run_in_test(env) when env in [:test, :benchmark], do: []
 
-  defp streamer_child(_) do
+  defp dont_run_in_test(_) do
     [
       {Registry,
        [
          name: Pleroma.Web.Streamer.registry(),
          keys: :duplicate,
          partitions: System.schedulers_online()
-       ]}
+       ]},
+      Pleroma.Web.FedSockets.Supervisor
     ]
   end
 
index 24dc7cb95d37a69b468ed6890d1c5b14c8c7268f..169298b344df29782b972676c03ff44d2f9dcf8e 100644 (file)
@@ -12,6 +12,7 @@ defmodule Pleroma.Object.Fetcher do
   alias Pleroma.Web.ActivityPub.ObjectValidator
   alias Pleroma.Web.ActivityPub.Transmogrifier
   alias Pleroma.Web.Federator
+  alias Pleroma.Web.FedSockets
 
   require Logger
   require Pleroma.Constants
@@ -182,9 +183,47 @@ defmodule Pleroma.Object.Fetcher do
     end
   end
 
-  def fetch_and_contain_remote_object_from_id(id) when is_binary(id) do
+  def fetch_and_contain_remote_object_from_id(prm, opts \\ [])
+
+  def fetch_and_contain_remote_object_from_id(%{"id" => id}, opts),
+    do: fetch_and_contain_remote_object_from_id(id, opts)
+
+  def fetch_and_contain_remote_object_from_id(id, opts) when is_binary(id) do
     Logger.debug("Fetching object #{id} via AP")
 
+    with {:scheme, true} <- {:scheme, String.starts_with?(id, "http")},
+         {:ok, body} <- get_object(id, opts),
+         {:ok, data} <- safe_json_decode(body),
+         :ok <- Containment.contain_origin_from_id(id, data) do
+      {:ok, data}
+    else
+      {:scheme, _} ->
+        {:error, "Unsupported URI scheme"}
+
+      {:error, e} ->
+        {:error, e}
+
+      e ->
+        {:error, e}
+    end
+  end
+
+  def fetch_and_contain_remote_object_from_id(_id, _opts),
+    do: {:error, "id must be a string"}
+
+  defp get_object(id, opts) do
+    with false <- Keyword.get(opts, :force_http, false),
+         {:ok, fedsocket} <- FedSockets.get_or_create_fed_socket(id) do
+      Logger.debug("fetching via fedsocket - #{inspect(id)}")
+      FedSockets.fetch(fedsocket, id)
+    else
+      _other ->
+        Logger.debug("fetching via http - #{inspect(id)}")
+        get_object_http(id)
+    end
+  end
+
+  defp get_object_http(id) do
     date = Pleroma.Signature.signed_date()
 
     headers =
@@ -192,20 +231,13 @@ defmodule Pleroma.Object.Fetcher do
       |> maybe_date_fetch(date)
       |> sign_fetch(id, date)
 
-    Logger.debug("Fetch headers: #{inspect(headers)}")
+    case HTTP.get(id, headers) do
+      {:ok, %{body: body, status: code}} when code in 200..299 ->
+        {:ok, body}
 
-    with {:scheme, true} <- {:scheme, String.starts_with?(id, "http")},
-         {:ok, %{body: body, status: code}} when code in 200..299 <- HTTP.get(id, headers),
-         {:ok, data} <- Jason.decode(body),
-         :ok <- Containment.contain_origin_from_id(id, data) do
-      {:ok, data}
-    else
       {:ok, %{status: code}} when code in [404, 410] ->
         {:error, "Object has been deleted"}
 
-      {:scheme, _} ->
-        {:error, "Unsupported URI scheme"}
-
       {:error, e} ->
         {:error, e}
 
@@ -214,8 +246,6 @@ defmodule Pleroma.Object.Fetcher do
     end
   end
 
-  def fetch_and_contain_remote_object_from_id(%{"id" => id}),
-    do: fetch_and_contain_remote_object_from_id(id)
-
-  def fetch_and_contain_remote_object_from_id(_id), do: {:error, "id must be a string"}
+  defp safe_json_decode(nil), do: {:ok, nil}
+  defp safe_json_decode(json), do: Jason.decode(json)
 end
index 3aa6909d2f1652bf79f6312381a12de130fcd232..e388993b70a819a5773fffc166c89d444d4454a6 100644 (file)
@@ -39,7 +39,7 @@ defmodule Pleroma.Signature do
   def fetch_public_key(conn) do
     with %{"keyId" => kid} <- HTTPSignatures.signature_for_conn(conn),
          {:ok, actor_id} <- key_id_to_actor_id(kid),
-         {:ok, public_key} <- User.get_public_key_for_ap_id(actor_id) do
+         {:ok, public_key} <- User.get_public_key_for_ap_id(actor_id, force_http: true) do
       {:ok, public_key}
     else
       e ->
@@ -50,8 +50,8 @@ defmodule Pleroma.Signature do
   def refetch_public_key(conn) do
     with %{"keyId" => kid} <- HTTPSignatures.signature_for_conn(conn),
          {:ok, actor_id} <- key_id_to_actor_id(kid),
-         {:ok, _user} <- ActivityPub.make_user_from_ap_id(actor_id),
-         {:ok, public_key} <- User.get_public_key_for_ap_id(actor_id) do
+         {:ok, _user} <- ActivityPub.make_user_from_ap_id(actor_id, force_http: true),
+         {:ok, public_key} <- User.get_public_key_for_ap_id(actor_id, force_http: true) do
       {:ok, public_key}
     else
       e ->
index 1ffe60dfc345d80c63afdf2ceb90b5cf8ca9cd8f..d92484a40b9de23dcbd2a7737d269dc35b491a44 100644 (file)
@@ -1820,12 +1820,12 @@ defmodule Pleroma.User do
 
   def html_filter_policy(_), do: Config.get([:markup, :scrub_policy])
 
-  def fetch_by_ap_id(ap_id), do: ActivityPub.make_user_from_ap_id(ap_id)
+  def fetch_by_ap_id(ap_id, opts \\ []), do: ActivityPub.make_user_from_ap_id(ap_id, opts)
 
-  def get_or_fetch_by_ap_id(ap_id) do
+  def get_or_fetch_by_ap_id(ap_id, opts \\ []) do
     cached_user = get_cached_by_ap_id(ap_id)
 
-    maybe_fetched_user = needs_update?(cached_user) && fetch_by_ap_id(ap_id)
+    maybe_fetched_user = needs_update?(cached_user) && fetch_by_ap_id(ap_id, opts)
 
     case {cached_user, maybe_fetched_user} do
       {_, {:ok, %User{} = user}} ->
@@ -1898,8 +1898,8 @@ defmodule Pleroma.User do
 
   def public_key(_), do: {:error, "key not found"}
 
-  def get_public_key_for_ap_id(ap_id) do
-    with {:ok, %User{} = user} <- get_or_fetch_by_ap_id(ap_id),
+  def get_public_key_for_ap_id(ap_id, opts \\ []) do
+    with {:ok, %User{} = user} <- get_or_fetch_by_ap_id(ap_id, opts),
          {:ok, public_key} <- public_key(user) do
       {:ok, public_key}
     else
index 92fc1e4227ca289ac76bdeecff72a2d05819e813..06e8e1a7cf84513b3b20672ef05c9eebf483c2fc 100644 (file)
@@ -1270,10 +1270,12 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
 
   def fetch_follow_information_for_user(user) do
     with {:ok, following_data} <-
-           Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
+           Fetcher.fetch_and_contain_remote_object_from_id(user.following_address,
+             force_http: true
+           ),
          {:ok, hide_follows} <- collection_private(following_data),
          {:ok, followers_data} <-
-           Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
+           Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address, force_http: true),
          {:ok, hide_followers} <- collection_private(followers_data) do
       {:ok,
        %{
@@ -1347,8 +1349,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     end
   end
 
-  def fetch_and_prepare_user_from_ap_id(ap_id) do
-    with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
+  def fetch_and_prepare_user_from_ap_id(ap_id, opts \\ []) do
+    with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id, opts),
          {:ok, data} <- user_data_from_user_object(data) do
       {:ok, maybe_update_follow_information(data)}
     else
@@ -1390,13 +1392,13 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     end
   end
 
-  def make_user_from_ap_id(ap_id) do
+  def make_user_from_ap_id(ap_id, opts \\ []) do
     user = User.get_cached_by_ap_id(ap_id)
 
     if user && !User.ap_enabled?(user) do
       Transmogrifier.upgrade_user_from_ap_id(ap_id)
     else
-      with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
+      with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, opts) do
         if user do
           user
           |> User.remote_user_changeset(data)
index d88f7f3ee57d40614d3c574293aad7423805773d..9c3956683cbbc1564b5f5108b9ab3891c5dae214 100644 (file)
@@ -13,6 +13,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
   alias Pleroma.User
   alias Pleroma.Web.ActivityPub.Relay
   alias Pleroma.Web.ActivityPub.Transmogrifier
+  alias Pleroma.Web.FedSockets
 
   require Pleroma.Constants
 
@@ -50,15 +51,35 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
   def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = params) do
     Logger.debug("Federating #{id} to #{inbox}")
 
-    uri = URI.parse(inbox)
+    case FedSockets.get_or_create_fed_socket(inbox) do
+      {:ok, fedsocket} ->
+        Logger.debug("publishing via fedsockets - #{inspect(inbox)}")
+        FedSockets.publish(fedsocket, json)
 
+      _ ->
+        Logger.debug("publishing via http - #{inspect(inbox)}")
+        http_publish(inbox, actor, json, params)
+    end
+  end
+
+  def publish_one(%{actor_id: actor_id} = params) do
+    actor = User.get_cached_by_id(actor_id)
+
+    params
+    |> Map.delete(:actor_id)
+    |> Map.put(:actor, actor)
+    |> publish_one()
+  end
+
+  defp http_publish(inbox, actor, json, params) do
+    uri = %{path: path} = URI.parse(inbox)
     digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
 
     date = Pleroma.Signature.signed_date()
 
     signature =
       Pleroma.Signature.sign(actor, %{
-        "(request-target)": "post #{uri.path}",
+        "(request-target)": "post #{path}",
         host: signature_host(uri),
         "content-length": byte_size(json),
         digest: digest,
@@ -89,15 +110,6 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
     end
   end
 
-  def publish_one(%{actor_id: actor_id} = params) do
-    actor = User.get_cached_by_id(actor_id)
-
-    params
-    |> Map.delete(:actor_id)
-    |> Map.put(:actor, actor)
-    |> publish_one()
-  end
-
   defp signature_host(%URI{port: port, scheme: scheme, host: host}) do
     if port == URI.default_port(scheme) do
       host
index fcca014f01d548f0302e2f0f08bcb2d938219cce..aa6a694639be8bd5f4dd276ddc6f2e855c7ba433 100644 (file)
@@ -1000,7 +1000,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
 
   def upgrade_user_from_ap_id(ap_id) do
     with %User{local: false} = user <- User.get_cached_by_ap_id(ap_id),
-         {:ok, data} <- ActivityPub.fetch_and_prepare_user_from_ap_id(ap_id),
+         {:ok, data} <- ActivityPub.fetch_and_prepare_user_from_ap_id(ap_id, force_http: true),
          {:ok, user} <- update_user(user, data) do
       TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id})
       {:ok, user}
diff --git a/lib/pleroma/web/fed_sockets/fed_registry.ex b/lib/pleroma/web/fed_sockets/fed_registry.ex
new file mode 100644 (file)
index 0000000..e00ea69
--- /dev/null
@@ -0,0 +1,185 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.FedSockets.FedRegistry do
+  @moduledoc """
+  The FedRegistry stores the active FedSockets for quick retrieval.
+
+  The storage and retrieval portion of the FedRegistry is done in process through
+  elixir's `Registry` module for speed and its ability to monitor for terminated processes.
+
+  Dropped connections will be caught by `Registry` and deleted. Since the next
+  message will initiate a new connection there is no reason to try and reconnect at that point.
+
+  Normally outside modules should have no need to call or use the FedRegistry themselves.
+  """
+
+  alias Pleroma.Web.FedSockets.FedSocket
+  alias Pleroma.Web.FedSockets.SocketInfo
+
+  require Logger
+
+  @default_rejection_duration 15 * 60 * 1000
+  @rejections :fed_socket_rejections
+
+  @doc """
+  Retrieves a FedSocket from the Registry given it's origin.
+
+  The origin is expected to be a string identifying the endpoint "example.com" or "example2.com:8080"
+
+  Will return:
+    * {:ok, fed_socket} for working FedSockets
+    * {:error, :rejected} for origins that have been tried and refused within the rejection duration interval
+    * {:error, some_reason} usually :missing for unknown origins
+  """
+  def get_fed_socket(origin) do
+    case get_registry_data(origin) do
+      {:error, reason} ->
+        {:error, reason}
+
+      {:ok, %{state: :connected} = socket_info} ->
+        {:ok, socket_info}
+    end
+  end
+
+  @doc """
+  Adds a connected FedSocket to the Registry.
+
+  Always returns {:ok, fed_socket}
+  """
+  def add_fed_socket(origin, pid \\ nil) do
+    origin
+    |> SocketInfo.build(pid)
+    |> SocketInfo.connect()
+    |> add_socket_info
+  end
+
+  defp add_socket_info(%{origin: origin, state: :connected} = socket_info) do
+    case Registry.register(FedSockets.Registry, origin, socket_info) do
+      {:ok, _owner} ->
+        clear_prior_rejection(origin)
+        Logger.debug("fedsocket added: #{inspect(origin)}")
+
+        {:ok, socket_info}
+
+      {:error, {:already_registered, _pid}} ->
+        FedSocket.close(socket_info)
+        existing_socket_info = Registry.lookup(FedSockets.Registry, origin)
+
+        {:ok, existing_socket_info}
+
+      _ ->
+        {:error, :error_adding_socket}
+    end
+  end
+
+  @doc """
+  Mark this origin as having rejected a connection attempt.
+  This will keep it from getting additional connection attempts
+  for a period of time specified in the config.
+
+  Always returns {:ok, new_reg_data}
+  """
+  def set_host_rejected(uri) do
+    new_reg_data =
+      uri
+      |> SocketInfo.origin()
+      |> get_or_create_registry_data()
+      |> set_to_rejected()
+      |> save_registry_data()
+
+    {:ok, new_reg_data}
+  end
+
+  @doc """
+  Retrieves the FedRegistryData from the Registry given it's origin.
+
+  The origin is expected to be a string identifying the endpoint "example.com" or "example2.com:8080"
+
+  Will return:
+    * {:ok, fed_registry_data} for known origins
+    * {:error, :missing} for uniknown origins
+    * {:error, :cache_error} indicating some low level runtime issues
+  """
+  def get_registry_data(origin) do
+    case Registry.lookup(FedSockets.Registry, origin) do
+      [] ->
+        if is_rejected?(origin) do
+          Logger.debug("previously rejected fedsocket requested")
+          {:error, :rejected}
+        else
+          {:error, :missing}
+        end
+
+      [{_pid, %{state: :connected} = socket_info}] ->
+        {:ok, socket_info}
+
+      _ ->
+        {:error, :cache_error}
+    end
+  end
+
+  @doc """
+  Retrieves a map of all sockets from the Registry. The keys are the origins and the values are the corresponding SocketInfo
+  """
+  def list_all do
+    (list_all_connected() ++ list_all_rejected())
+    |> Enum.into(%{})
+  end
+
+  defp list_all_connected do
+    FedSockets.Registry
+    |> Registry.select([{{:"$1", :_, :"$3"}, [], [{{:"$1", :"$3"}}]}])
+  end
+
+  defp list_all_rejected do
+    {:ok, keys} = Cachex.keys(@rejections)
+
+    {:ok, registry_data} =
+      Cachex.execute(@rejections, fn worker ->
+        Enum.map(keys, fn k -> {k, Cachex.get!(worker, k)} end)
+      end)
+
+    registry_data
+  end
+
+  defp clear_prior_rejection(origin),
+    do: Cachex.del(@rejections, origin)
+
+  defp is_rejected?(origin) do
+    case Cachex.get(@rejections, origin) do
+      {:ok, nil} ->
+        false
+
+      {:ok, _} ->
+        true
+    end
+  end
+
+  defp get_or_create_registry_data(origin) do
+    case get_registry_data(origin) do
+      {:error, :missing} ->
+        %SocketInfo{origin: origin}
+
+      {:ok, socket_info} ->
+        socket_info
+    end
+  end
+
+  defp save_registry_data(%SocketInfo{origin: origin, state: :connected} = socket_info) do
+    {:ok, true} = Registry.update_value(FedSockets.Registry, origin, fn _ -> socket_info end)
+    socket_info
+  end
+
+  defp save_registry_data(%SocketInfo{origin: origin, state: :rejected} = socket_info) do
+    rejection_expiration =
+      Pleroma.Config.get([:fed_sockets, :rejection_duration], @default_rejection_duration)
+
+    {:ok, true} = Cachex.put(@rejections, origin, socket_info, ttl: rejection_expiration)
+    socket_info
+  end
+
+  defp set_to_rejected(%SocketInfo{} = socket_info),
+    do: %SocketInfo{socket_info | state: :rejected}
+end
diff --git a/lib/pleroma/web/fed_sockets/fed_socket.ex b/lib/pleroma/web/fed_sockets/fed_socket.ex
new file mode 100644 (file)
index 0000000..98d64e6
--- /dev/null
@@ -0,0 +1,137 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.FedSockets.FedSocket do
+  @moduledoc """
+  The FedSocket module abstracts the actions to be taken taken on connections regardless of
+  whether the connection started as inbound or outbound.
+
+
+  Normally outside modules will have no need to call the FedSocket module directly.
+  """
+
+  alias Pleroma.Object
+  alias Pleroma.Object.Containment
+  alias Pleroma.User
+  alias Pleroma.Web.ActivityPub.ObjectView
+  alias Pleroma.Web.ActivityPub.UserView
+  alias Pleroma.Web.ActivityPub.Visibility
+  alias Pleroma.Web.FedSockets.FetchRegistry
+  alias Pleroma.Web.FedSockets.IngesterWorker
+  alias Pleroma.Web.FedSockets.OutgoingHandler
+  alias Pleroma.Web.FedSockets.SocketInfo
+
+  require Logger
+
+  @shake "61dd18f7-f1e6-49a4-939a-a749fcdc1103"
+
+  def connect_to_host(uri) do
+    case OutgoingHandler.start_link(uri) do
+      {:ok, pid} ->
+        {:ok, pid}
+
+      error ->
+        {:error, error}
+    end
+  end
+
+  def close(%SocketInfo{pid: socket_pid}),
+    do: Process.send(socket_pid, :close, [])
+
+  def publish(%SocketInfo{pid: socket_pid}, json) do
+    %{action: :publish, data: json}
+    |> Jason.encode!()
+    |> send_packet(socket_pid)
+  end
+
+  def fetch(%SocketInfo{pid: socket_pid}, id) do
+    fetch_uuid = FetchRegistry.register_fetch(id)
+
+    %{action: :fetch, data: id, uuid: fetch_uuid}
+    |> Jason.encode!()
+    |> send_packet(socket_pid)
+
+    wait_for_fetch_to_return(fetch_uuid, 0)
+  end
+
+  def receive_package(%SocketInfo{} = fed_socket, json) do
+    json
+    |> Jason.decode!()
+    |> process_package(fed_socket)
+  end
+
+  defp wait_for_fetch_to_return(uuid, cntr) do
+    case FetchRegistry.check_fetch(uuid) do
+      {:error, :waiting} ->
+        Process.sleep(:math.pow(cntr, 3) |> Kernel.trunc())
+        wait_for_fetch_to_return(uuid, cntr + 1)
+
+      {:error, :missing} ->
+        Logger.error("FedSocket fetch timed out - #{inspect(uuid)}")
+        {:error, :timeout}
+
+      {:ok, _fr} ->
+        FetchRegistry.pop_fetch(uuid)
+    end
+  end
+
+  defp process_package(%{"action" => "publish", "data" => data}, %{origin: origin} = _fed_socket) do
+    if Containment.contain_origin(origin, data) do
+      IngesterWorker.enqueue("ingest", %{"object" => data})
+    end
+
+    {:reply, %{"action" => "publish_reply", "status" => "processed"}}
+  end
+
+  defp process_package(%{"action" => "fetch_reply", "uuid" => uuid, "data" => data}, _fed_socket) do
+    FetchRegistry.register_fetch_received(uuid, data)
+    {:noreply, nil}
+  end
+
+  defp process_package(%{"action" => "fetch", "uuid" => uuid, "data" => ap_id}, _fed_socket) do
+    {:ok, data} = render_fetched_data(ap_id, uuid)
+    {:reply, data}
+  end
+
+  defp process_package(%{"action" => "publish_reply"}, _fed_socket) do
+    {:noreply, nil}
+  end
+
+  defp process_package(other, _fed_socket) do
+    Logger.warn("unknown json packages received #{inspect(other)}")
+    {:noreply, nil}
+  end
+
+  defp render_fetched_data(ap_id, uuid) do
+    {:ok,
+     %{
+       "action" => "fetch_reply",
+       "status" => "processed",
+       "uuid" => uuid,
+       "data" => represent_item(ap_id)
+     }}
+  end
+
+  defp represent_item(ap_id) do
+    case User.get_by_ap_id(ap_id) do
+      nil ->
+        object = Object.get_cached_by_ap_id(ap_id)
+
+        if Visibility.is_public?(object) do
+          Phoenix.View.render_to_string(ObjectView, "object.json", object: object)
+        else
+          nil
+        end
+
+      user ->
+        Phoenix.View.render_to_string(UserView, "user.json", user: user)
+    end
+  end
+
+  defp send_packet(data, socket_pid) do
+    Process.send(socket_pid, {:send, data}, [])
+  end
+
+  def shake, do: @shake
+end
diff --git a/lib/pleroma/web/fed_sockets/fed_sockets.ex b/lib/pleroma/web/fed_sockets/fed_sockets.ex
new file mode 100644 (file)
index 0000000..035d547
--- /dev/null
@@ -0,0 +1,182 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.FedSockets do
+  @moduledoc """
+  This documents the FedSockets framework. A framework for federating
+  ActivityPub objects between servers via persistant WebSocket connections.
+
+  FedSockets allow servers to authenticate on first contact and maintain that
+  connection, eliminating the need to authenticate every time data needs to be shared.
+
+  ## Protocol
+  FedSockets currently support 2 types of data transfer:
+    * `publish` method which doesn't require a response
+    * `fetch` method requires a response be sent
+
+    ### Publish
+    The publish operation sends a json encoded map of the shape:
+      %{action: :publish, data: json}
+    and accepts (but does not require) a reply of form:
+      %{"action" => "publish_reply"}
+
+    The outgoing params represent
+      * data: ActivityPub object encoded into json
+
+
+    ### Fetch
+    The fetch operation sends a json encoded map of the shape:
+      %{action: :fetch, data: id, uuid: fetch_uuid}
+    and requires a reply of form:
+      %{"action" => "fetch_reply", "uuid" => uuid, "data" => data}
+
+    The outgoing params represent
+      * id: an ActivityPub object URI
+      * uuid: a unique uuid generated by the sender
+
+    The reply params represent
+      * data: an ActivityPub object encoded into json
+      * uuid: the uuid sent along with the fetch request
+
+  ## Examples
+  Clients of FedSocket transfers shouldn't need to use any of the functions outside of this module.
+
+  A typical publish operation can be performed through the following code, and a fetch operation in a similar manner.
+
+    case FedSockets.get_or_create_fed_socket(inbox) do
+      {:ok, fedsocket} ->
+        FedSockets.publish(fedsocket, json)
+
+      _ ->
+        alternative_publish(inbox, actor, json, params)
+    end
+
+  ## Configuration
+  FedSockets have the following config settings
+
+  config :pleroma, :fed_sockets,
+  enabled: true,
+  ping_interval: :timer.seconds(15),
+  connection_duration: :timer.hours(1),
+  rejection_duration: :timer.hours(1),
+  fed_socket_fetches: [
+    default: 12_000,
+    interval: 3_000,
+    lazy: false
+  ]
+    * enabled - turn FedSockets on or off with this flag. Can be toggled at runtime.
+    * connection_duration - How long a FedSocket can sit idle before it's culled.
+    * rejection_duration - After failing to make a FedSocket connection a host will be excluded
+    from further connections for this amount of time
+    * fed_socket_fetches - Use these parameters to pass options to the Cachex queue backing the FetchRegistry
+    * fed_socket_rejections - Use these parameters to pass options to the Cachex queue backing the FedRegistry
+
+    Cachex options are
+      * default: the minimum amount of time a fetch can wait before it times out.
+      * interval: the interval between checks for timed out entries. This plus the default represent the maximum time allowed
+      * lazy: leave at false for consistant and fast lookups, set to true for stricter timeout enforcement
+
+  """
+  require Logger
+
+  alias Pleroma.Web.FedSockets.FedRegistry
+  alias Pleroma.Web.FedSockets.FedSocket
+  alias Pleroma.Web.FedSockets.SocketInfo
+
+  @doc """
+  returns a FedSocket for the given origin. Will reuse an existing one or create a new one.
+
+  address is expected to be a fully formed URL such as:
+  "http://www.example.com" or "http://www.example.com:8080"
+
+  It can and usually does include additional path parameters,
+  but these are ignored as the FedSockets are organized by host and port info alone.
+  """
+  def get_or_create_fed_socket(address) do
+    with {:cache, {:error, :missing}} <- {:cache, get_fed_socket(address)},
+         {:connect, {:ok, _pid}} <- {:connect, FedSocket.connect_to_host(address)},
+         {:cache, {:ok, fed_socket}} <- {:cache, get_fed_socket(address)} do
+      Logger.debug("fedsocket created for - #{inspect(address)}")
+      {:ok, fed_socket}
+    else
+      {:cache, {:ok, socket}} ->
+        Logger.debug("fedsocket found in cache - #{inspect(address)}")
+        {:ok, socket}
+
+      {:connect, {:error, _host}} ->
+        Logger.debug("set host rejected for - #{inspect(address)}")
+        FedRegistry.set_host_rejected(address)
+        {:error, :rejected}
+
+      {_, {:error, :disabled}} ->
+        {:error, :disabled}
+
+      {_, {:error, reason}} ->
+        Logger.warn("get_or_create_fed_socket error - #{inspect(reason)}")
+        {:error, reason}
+    end
+  end
+
+  @doc """
+  returns a FedSocket for the given origin. Will not create a new FedSocket if one does not exist.
+
+  address is expected to be a fully formed URL such as:
+    "http://www.example.com" or "http://www.example.com:8080"
+  """
+  def get_fed_socket(address) do
+    origin = SocketInfo.origin(address)
+
+    with {:config, true} <- {:config, Pleroma.Config.get([:fed_sockets, :enabled], false)},
+         {:ok, socket} <- FedRegistry.get_fed_socket(origin) do
+      {:ok, socket}
+    else
+      {:config, _} ->
+        {:error, :disabled}
+
+      {:error, :rejected} ->
+        Logger.debug("FedSocket previously rejected - #{inspect(origin)}")
+        {:error, :rejected}
+
+      {:error, reason} ->
+        {:error, reason}
+    end
+  end
+
+  @doc """
+  Sends the supplied data via the publish protocol.
+  It will not block waiting for a reply.
+  Returns :ok but this is not an indication of a successful transfer.
+
+  the data is expected to be JSON encoded binary data.
+  """
+  def publish(%SocketInfo{} = fed_socket, json) do
+    FedSocket.publish(fed_socket, json)
+  end
+
+  @doc """
+  Sends the supplied data via the fetch protocol.
+  It will block waiting for a reply or timeout.
+
+  Returns {:ok, object} where object is the requested object (or nil)
+          {:error, :timeout} in the event the message was not responded to
+
+  the id is expected to be the URI of an ActivityPub object.
+  """
+  def fetch(%SocketInfo{} = fed_socket, id) do
+    FedSocket.fetch(fed_socket, id)
+  end
+
+  @doc """
+  Disconnect all and restart FedSockets.
+  This is mainly used in development and testing but could be useful in production.
+  """
+  def reset do
+    FedRegistry
+    |> Process.whereis()
+    |> Process.exit(:testing)
+  end
+
+  def uri_for_origin(origin),
+    do: "ws://#{origin}/api/fedsocket/v1"
+end
diff --git a/lib/pleroma/web/fed_sockets/fetch_registry.ex b/lib/pleroma/web/fed_sockets/fetch_registry.ex
new file mode 100644 (file)
index 0000000..7897f0f
--- /dev/null
@@ -0,0 +1,151 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.FedSockets.FetchRegistry do
+  @moduledoc """
+  The FetchRegistry acts as a broker for fetch requests and return values.
+  This allows calling processes to block while waiting for a reply.
+  It doesn't impose it's own process instead using `Cachex` to handle fetches in process, allowing
+  multi threaded processes to avoid bottlenecking.
+
+  Normally outside modules will have no need to call or use the FetchRegistry themselves.
+
+  The `Cachex` parameters can be controlled from the config. Since exact timeout intervals
+  aren't necessary the following settings are used by default:
+
+  config :pleroma, :fed_sockets,
+    fed_socket_fetches: [
+      default: 12_000,
+      interval: 3_000,
+      lazy: false
+    ]
+
+  """
+
+  defmodule FetchRegistryData do
+    defstruct uuid: nil,
+              sent_json: nil,
+              received_json: nil,
+              sent_at: nil,
+              received_at: nil
+  end
+
+  alias Ecto.UUID
+
+  require Logger
+
+  @fetches :fed_socket_fetches
+
+  @doc """
+  Registers a json request wth the FetchRegistry and returns the identifying UUID.
+  """
+  def register_fetch(json) do
+    %FetchRegistryData{uuid: uuid} =
+      json
+      |> new_registry_data
+      |> save_registry_data
+
+    uuid
+  end
+
+  @doc """
+  Reports on the status of a Fetch given the identifying UUID.
+
+  Will return
+    * {:ok, fetched_object} if a fetch has completed
+    * {:error, :waiting} if a fetch is still pending
+    * {:error, other_error} usually :missing to indicate a fetch that has timed out
+  """
+  def check_fetch(uuid) do
+    case get_registry_data(uuid) do
+      {:ok, %FetchRegistryData{received_at: nil}} ->
+        {:error, :waiting}
+
+      {:ok, %FetchRegistryData{} = reg_data} ->
+        {:ok, reg_data}
+
+      e ->
+        e
+    end
+  end
+
+  @doc """
+  Retrieves the response to a fetch given the identifying UUID.
+  The completed fetch will be deleted from the FetchRegistry
+
+  Will return
+    * {:ok, fetched_object} if a fetch has completed
+    * {:error, :waiting} if a fetch is still pending
+    * {:error, other_error} usually :missing to indicate a fetch that has timed out
+  """
+  def pop_fetch(uuid) do
+    case check_fetch(uuid) do
+      {:ok, %FetchRegistryData{received_json: received_json}} ->
+        delete_registry_data(uuid)
+        {:ok, received_json}
+
+      e ->
+        e
+    end
+  end
+
+  @doc """
+  This is called to register a fetch has returned.
+  It expects the result data along with the UUID that was sent in the request
+
+  Will return the fetched object or :error
+  """
+  def register_fetch_received(uuid, data) do
+    case get_registry_data(uuid) do
+      {:ok, %FetchRegistryData{received_at: nil} = reg_data} ->
+        reg_data
+        |> set_fetch_received(data)
+        |> save_registry_data()
+
+      {:ok, %FetchRegistryData{} = reg_data} ->
+        Logger.warn("tried to add fetched data twice - #{uuid}")
+        reg_data
+
+      {:error, _} ->
+        Logger.warn("Error adding fetch to registry - #{uuid}")
+        :error
+    end
+  end
+
+  defp new_registry_data(json) do
+    %FetchRegistryData{
+      uuid: UUID.generate(),
+      sent_json: json,
+      sent_at: :erlang.monotonic_time(:millisecond)
+    }
+  end
+
+  defp get_registry_data(origin) do
+    case Cachex.get(@fetches, origin) do
+      {:ok, nil} ->
+        {:error, :missing}
+
+      {:ok, reg_data} ->
+        {:ok, reg_data}
+
+      _ ->
+        {:error, :cache_error}
+    end
+  end
+
+  defp set_fetch_received(%FetchRegistryData{} = reg_data, data),
+    do: %FetchRegistryData{
+      reg_data
+      | received_at: :erlang.monotonic_time(:millisecond),
+        received_json: data
+    }
+
+  defp save_registry_data(%FetchRegistryData{uuid: uuid} = reg_data) do
+    {:ok, true} = Cachex.put(@fetches, uuid, reg_data)
+    reg_data
+  end
+
+  defp delete_registry_data(origin),
+    do: {:ok, true} = Cachex.del(@fetches, origin)
+end
diff --git a/lib/pleroma/web/fed_sockets/incoming_handler.ex b/lib/pleroma/web/fed_sockets/incoming_handler.ex
new file mode 100644 (file)
index 0000000..49d0d9d
--- /dev/null
@@ -0,0 +1,88 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.FedSockets.IncomingHandler do
+  require Logger
+
+  alias Pleroma.Web.FedSockets.FedRegistry
+  alias Pleroma.Web.FedSockets.FedSocket
+  alias Pleroma.Web.FedSockets.SocketInfo
+
+  import HTTPSignatures, only: [validate_conn: 1, split_signature: 1]
+
+  @behaviour :cowboy_websocket
+
+  def init(req, state) do
+    shake = FedSocket.shake()
+
+    with true <- Pleroma.Config.get([:fed_sockets, :enabled]),
+         sec_protocol <- :cowboy_req.header("sec-websocket-protocol", req, nil),
+         headers = %{"(request-target)" => ^shake} <- :cowboy_req.headers(req),
+         true <- validate_conn(%{req_headers: headers}),
+         %{"keyId" => origin} <- split_signature(headers["signature"]) do
+      req =
+        if is_nil(sec_protocol) do
+          req
+        else
+          :cowboy_req.set_resp_header("sec-websocket-protocol", sec_protocol, req)
+        end
+
+      {:cowboy_websocket, req, %{origin: origin}, %{}}
+    else
+      _ ->
+        {:ok, req, state}
+    end
+  end
+
+  def websocket_init(%{origin: origin}) do
+    case FedRegistry.add_fed_socket(origin) do
+      {:ok, socket_info} ->
+        {:ok, socket_info}
+
+      e ->
+        Logger.error("FedSocket websocket_init failed - #{inspect(e)}")
+        {:error, inspect(e)}
+    end
+  end
+
+  # Use the ping to  check if the connection should be expired
+  def websocket_handle(:ping, socket_info) do
+    if SocketInfo.expired?(socket_info) do
+      {:stop, socket_info}
+    else
+      {:ok, socket_info, :hibernate}
+    end
+  end
+
+  def websocket_handle({:text, data}, socket_info) do
+    socket_info = SocketInfo.touch(socket_info)
+
+    case FedSocket.receive_package(socket_info, data) do
+      {:noreply, _} ->
+        {:ok, socket_info}
+
+      {:reply, reply} ->
+        {:reply, {:text, Jason.encode!(reply)}, socket_info}
+
+      {:error, reason} ->
+        Logger.error("incoming error - receive_package: #{inspect(reason)}")
+        {:ok, socket_info}
+    end
+  end
+
+  def websocket_info({:send, message}, socket_info) do
+    socket_info = SocketInfo.touch(socket_info)
+
+    {:reply, {:text, message}, socket_info}
+  end
+
+  def websocket_info(:close, state) do
+    {:stop, state}
+  end
+
+  def websocket_info(message, state) do
+    Logger.debug("#{__MODULE__} unknown message #{inspect(message)}")
+    {:ok, state}
+  end
+end
diff --git a/lib/pleroma/web/fed_sockets/ingester_worker.ex b/lib/pleroma/web/fed_sockets/ingester_worker.ex
new file mode 100644 (file)
index 0000000..325f2a4
--- /dev/null
@@ -0,0 +1,33 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.FedSockets.IngesterWorker do
+  use Pleroma.Workers.WorkerHelper, queue: "ingestion_queue"
+  require Logger
+
+  alias Pleroma.Web.Federator
+
+  @impl Oban.Worker
+  def perform(%Job{args: %{"op" => "ingest", "object" => ingestee}}) do
+    try do
+      ingestee
+      |> Jason.decode!()
+      |> do_ingestion()
+    rescue
+      e ->
+        Logger.error("IngesterWorker error - #{inspect(e)}")
+        e
+    end
+  end
+
+  defp do_ingestion(params) do
+    case Federator.incoming_ap_doc(params) do
+      {:error, reason} ->
+        {:error, reason}
+
+      {:ok, object} ->
+        {:ok, object}
+    end
+  end
+end
diff --git a/lib/pleroma/web/fed_sockets/outgoing_handler.ex b/lib/pleroma/web/fed_sockets/outgoing_handler.ex
new file mode 100644 (file)
index 0000000..6ddef17
--- /dev/null
@@ -0,0 +1,146 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.FedSockets.OutgoingHandler do
+  use GenServer
+
+  require Logger
+
+  alias Pleroma.Web.ActivityPub.InternalFetchActor
+  alias Pleroma.Web.FedSockets
+  alias Pleroma.Web.FedSockets.FedRegistry
+  alias Pleroma.Web.FedSockets.FedSocket
+  alias Pleroma.Web.FedSockets.SocketInfo
+
+  def start_link(uri) do
+    GenServer.start_link(__MODULE__, %{uri: uri})
+  end
+
+  def init(%{uri: uri}) do
+    case initiate_connection(uri) do
+      {:ok, ws_origin, conn_pid} ->
+        FedRegistry.add_fed_socket(ws_origin, conn_pid)
+
+      {:error, reason} ->
+        Logger.debug("Outgoing connection failed - #{inspect(reason)}")
+        :ignore
+    end
+  end
+
+  def handle_info({:gun_ws, conn_pid, _ref, {:text, data}}, socket_info) do
+    socket_info = SocketInfo.touch(socket_info)
+
+    case FedSocket.receive_package(socket_info, data) do
+      {:noreply, _} ->
+        {:noreply, socket_info}
+
+      {:reply, reply} ->
+        :gun.ws_send(conn_pid, {:text, Jason.encode!(reply)})
+        {:noreply, socket_info}
+
+      {:error, reason} ->
+        Logger.error("incoming error - receive_package: #{inspect(reason)}")
+        {:noreply, socket_info}
+    end
+  end
+
+  def handle_info(:close, state) do
+    Logger.debug("Sending close frame !!!!!!!")
+    {:close, state}
+  end
+
+  def handle_info({:gun_down, _pid, _prot, :closed, _}, state) do
+    {:stop, :normal, state}
+  end
+
+  def handle_info({:send, data}, %{conn_pid: conn_pid} = socket_info) do
+    socket_info = SocketInfo.touch(socket_info)
+    :gun.ws_send(conn_pid, {:text, data})
+    {:noreply, socket_info}
+  end
+
+  def handle_info({:gun_ws, _, _, :pong}, state) do
+    {:noreply, state, :hibernate}
+  end
+
+  def handle_info(msg, state) do
+    Logger.debug("#{__MODULE__} unhandled event #{inspect(msg)}")
+    {:noreply, state}
+  end
+
+  def terminate(reason, state) do
+    Logger.debug(
+      "#{__MODULE__} terminating outgoing connection for #{inspect(state)} for #{inspect(reason)}"
+    )
+
+    {:ok, state}
+  end
+
+  def initiate_connection(uri) do
+    ws_uri =
+      uri
+      |> SocketInfo.origin()
+      |> FedSockets.uri_for_origin()
+
+    %{host: host, port: port, path: path} = URI.parse(ws_uri)
+
+    with {:ok, conn_pid} <- :gun.open(to_charlist(host), port),
+         {:ok, _} <- :gun.await_up(conn_pid),
+         reference <- :gun.get(conn_pid, to_charlist(path)),
+         {:response, :fin, 204, _} <- :gun.await(conn_pid, reference),
+         headers <- build_headers(uri),
+         ref <- :gun.ws_upgrade(conn_pid, to_charlist(path), headers, %{silence_pings: false}) do
+      receive do
+        {:gun_upgrade, ^conn_pid, ^ref, [<<"websocket">>], _} ->
+          {:ok, ws_uri, conn_pid}
+      after
+        15_000 ->
+          Logger.debug("Fedsocket timeout connecting to #{inspect(uri)}")
+          {:error, :timeout}
+      end
+    else
+      {:response, :nofin, 404, _} ->
+        {:error, :fedsockets_not_supported}
+
+      e ->
+        Logger.debug("Fedsocket error connecting to #{inspect(uri)}")
+        {:error, e}
+    end
+  end
+
+  defp build_headers(uri) do
+    host_for_sig = uri |> URI.parse() |> host_signature()
+
+    shake = FedSocket.shake()
+    digest = "SHA-256=" <> (:crypto.hash(:sha256, shake) |> Base.encode64())
+    date = Pleroma.Signature.signed_date()
+    shake_size = byte_size(shake)
+
+    signature_opts = %{
+      "(request-target)": shake,
+      "content-length": to_charlist("#{shake_size}"),
+      date: date,
+      digest: digest,
+      host: host_for_sig
+    }
+
+    signature = Pleroma.Signature.sign(InternalFetchActor.get_actor(), signature_opts)
+
+    [
+      {'signature', to_charlist(signature)},
+      {'date', date},
+      {'digest', to_charlist(digest)},
+      {'content-length', to_charlist("#{shake_size}")},
+      {to_charlist("(request-target)"), to_charlist(shake)}
+    ]
+  end
+
+  defp host_signature(%{host: host, scheme: scheme, port: port}) do
+    if port == URI.default_port(scheme) do
+      host
+    else
+      "#{host}:#{port}"
+    end
+  end
+end
diff --git a/lib/pleroma/web/fed_sockets/socket_info.ex b/lib/pleroma/web/fed_sockets/socket_info.ex
new file mode 100644 (file)
index 0000000..d6fdffe
--- /dev/null
@@ -0,0 +1,52 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.FedSockets.SocketInfo do
+  defstruct origin: nil,
+            pid: nil,
+            conn_pid: nil,
+            state: :default,
+            connected_until: nil
+
+  alias Pleroma.Web.FedSockets.SocketInfo
+  @default_connection_duration 15 * 60 * 1000
+
+  def build(uri, conn_pid \\ nil) do
+    uri
+    |> build_origin()
+    |> build_pids(conn_pid)
+    |> touch()
+  end
+
+  def touch(%SocketInfo{} = socket_info),
+    do: %{socket_info | connected_until: new_ttl()}
+
+  def connect(%SocketInfo{} = socket_info),
+    do: %{socket_info | state: :connected}
+
+  def expired?(%{connected_until: connected_until}),
+    do: connected_until < :erlang.monotonic_time(:millisecond)
+
+  def origin(uri),
+    do: build_origin(uri).origin
+
+  defp build_pids(socket_info, conn_pid),
+    do: struct(socket_info, pid: self(), conn_pid: conn_pid)
+
+  defp build_origin(uri) when is_binary(uri),
+    do: uri |> URI.parse() |> build_origin
+
+  defp build_origin(%{host: host, port: nil, scheme: scheme}),
+    do: build_origin(%{host: host, port: URI.default_port(scheme)})
+
+  defp build_origin(%{host: host, port: port}),
+    do: %SocketInfo{origin: "#{host}:#{port}"}
+
+  defp new_ttl do
+    connection_duration =
+      Pleroma.Config.get([:fed_sockets, :connection_duration], @default_connection_duration)
+
+    :erlang.monotonic_time(:millisecond) + connection_duration
+  end
+end
diff --git a/lib/pleroma/web/fed_sockets/supervisor.ex b/lib/pleroma/web/fed_sockets/supervisor.ex
new file mode 100644 (file)
index 0000000..a5f4beb
--- /dev/null
@@ -0,0 +1,59 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.FedSockets.Supervisor do
+  use Supervisor
+  import Cachex.Spec
+
+  def start_link(opts) do
+    Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
+  end
+
+  def init(args) do
+    children = [
+      build_cache(:fed_socket_fetches, args),
+      build_cache(:fed_socket_rejections, args),
+      {Registry, keys: :unique, name: FedSockets.Registry, meta: [rejected: %{}]}
+    ]
+
+    opts = [strategy: :one_for_all, name: Pleroma.Web.Streamer.Supervisor]
+    Supervisor.init(children, opts)
+  end
+
+  defp build_cache(name, args) do
+    opts = get_opts(name, args)
+
+    %{
+      id: String.to_atom("#{name}_cache"),
+      start: {Cachex, :start_link, [name, opts]},
+      type: :worker
+    }
+  end
+
+  defp get_opts(cache_name, args)
+       when cache_name in [:fed_socket_fetches, :fed_socket_rejections] do
+    default = get_opts_or_config(args, cache_name, :default, 15_000)
+    interval = get_opts_or_config(args, cache_name, :interval, 3_000)
+    lazy = get_opts_or_config(args, cache_name, :lazy, false)
+
+    [expiration: expiration(default: default, interval: interval, lazy: lazy)]
+  end
+
+  defp get_opts(name, args) do
+    Keyword.get(args, name, [])
+  end
+
+  defp get_opts_or_config(args, name, key, default) do
+    args
+    |> Keyword.get(name, [])
+    |> Keyword.get(key)
+    |> case do
+      nil ->
+        Pleroma.Config.get([:fed_sockets, name, key], default)
+
+      value ->
+        value
+    end
+  end
+end
index a28c47017a950e56f733b9fd343e8dc6a31dbf2e..a17e8c0fcdcac4f23627aab99ccb6f774709eb48 100644 (file)
--- a/mix.lock
+++ b/mix.lock
   "unicode_util_compat": {:hex, :unicode_util_compat, "0.4.1", "d869e4c68901dd9531385bb0c8c40444ebf624e60b6962d95952775cac5e90cd", [:rebar3], [], "hexpm", "1d1848c40487cdb0b30e8ed975e34e025860c02e419cb615d255849f3427439d"},
   "unsafe": {:hex, :unsafe, "1.0.1", "a27e1874f72ee49312e0a9ec2e0b27924214a05e3ddac90e91727bc76f8613d8", [:mix], [], "hexpm", "6c7729a2d214806450d29766abc2afaa7a2cbecf415be64f36a6691afebb50e5"},
   "web_push_encryption": {:hex, :web_push_encryption, "0.3.0", "598b5135e696fd1404dc8d0d7c0fa2c027244a4e5d5e5a98ba267f14fdeaabc8", [:mix], [{:httpoison, "~> 1.0", [hex: :httpoison, repo: "hexpm", optional: false]}, {:jose, "~> 1.8", [hex: :jose, repo: "hexpm", optional: false]}], "hexpm", "f10bdd1afe527ede694749fb77a2f22f146a51b054c7fa541c9fd920fba7c875"},
-  "websocket_client": {:git, "https://github.com/jeremyong/websocket_client.git", "9a6f65d05ebf2725d62fb19262b21f1805a59fbf", []},
+  "websocket_client": {:git, "https://github.com/jeremyong/websocket_client.git", "9a6f65d05ebf2725d62fb19262b21f1805a59fbf", []}
 }
diff --git a/test/web/fed_sockets/fed_registry_test.exs b/test/web/fed_sockets/fed_registry_test.exs
new file mode 100644 (file)
index 0000000..19ac874
--- /dev/null
@@ -0,0 +1,124 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.FedSockets.FedRegistryTest do
+  use ExUnit.Case
+
+  alias Pleroma.Web.FedSockets
+  alias Pleroma.Web.FedSockets.FedRegistry
+  alias Pleroma.Web.FedSockets.SocketInfo
+
+  @good_domain "http://good.domain"
+  @good_domain_origin "good.domain:80"
+
+  setup do
+    start_supervised({Pleroma.Web.FedSockets.Supervisor, []})
+    build_test_socket(@good_domain)
+    Process.sleep(10)
+
+    :ok
+  end
+
+  describe "add_fed_socket/1 without conflicting sockets" do
+    test "can be added" do
+      Process.sleep(10)
+      assert {:ok, %SocketInfo{origin: origin}} = FedRegistry.get_fed_socket(@good_domain_origin)
+      assert origin == "good.domain:80"
+    end
+
+    test "multiple origins can be added" do
+      build_test_socket("http://anothergood.domain")
+      Process.sleep(10)
+
+      assert {:ok, %SocketInfo{origin: origin_1}} =
+               FedRegistry.get_fed_socket(@good_domain_origin)
+
+      assert {:ok, %SocketInfo{origin: origin_2}} =
+               FedRegistry.get_fed_socket("anothergood.domain:80")
+
+      assert origin_1 == "good.domain:80"
+      assert origin_2 == "anothergood.domain:80"
+      assert FedRegistry.list_all() |> Enum.count() == 2
+    end
+  end
+
+  describe "add_fed_socket/1 when duplicate sockets conflict" do
+    setup do
+      build_test_socket(@good_domain)
+      build_test_socket(@good_domain)
+      Process.sleep(10)
+      :ok
+    end
+
+    test "will be ignored" do
+      assert {:ok, %SocketInfo{origin: origin, pid: pid_one}} =
+               FedRegistry.get_fed_socket(@good_domain_origin)
+
+      assert origin == "good.domain:80"
+
+      assert FedRegistry.list_all() |> Enum.count() == 1
+    end
+
+    test "the newer process will be closed" do
+      pid_two = build_test_socket(@good_domain)
+
+      assert {:ok, %SocketInfo{origin: origin, pid: pid_one}} =
+               FedRegistry.get_fed_socket(@good_domain_origin)
+
+      assert origin == "good.domain:80"
+      Process.sleep(10)
+
+      refute Process.alive?(pid_two)
+
+      assert FedRegistry.list_all() |> Enum.count() == 1
+    end
+  end
+
+  describe "get_fed_socket/1" do
+    test "returns missing for unknown hosts" do
+      assert {:error, :missing} = FedRegistry.get_fed_socket("not_a_dmoain")
+    end
+
+    test "returns rejected for hosts previously rejected" do
+      "rejected.domain:80"
+      |> FedSockets.uri_for_origin()
+      |> FedRegistry.set_host_rejected()
+
+      assert {:error, :rejected} = FedRegistry.get_fed_socket("rejected.domain:80")
+    end
+
+    test "can retrieve a previously added SocketInfo" do
+      build_test_socket(@good_domain)
+      Process.sleep(10)
+      assert {:ok, %SocketInfo{origin: origin}} = FedRegistry.get_fed_socket(@good_domain_origin)
+      assert origin == "good.domain:80"
+    end
+
+    test "removes references to SocketInfos when the process crashes" do
+      assert {:ok, %SocketInfo{origin: origin, pid: pid}} =
+               FedRegistry.get_fed_socket(@good_domain_origin)
+
+      assert origin == "good.domain:80"
+
+      Process.exit(pid, :testing)
+      Process.sleep(100)
+      assert {:error, :missing} = FedRegistry.get_fed_socket(@good_domain_origin)
+    end
+  end
+
+  def build_test_socket(uri) do
+    Kernel.spawn(fn -> fed_socket_almost(uri) end)
+  end
+
+  def fed_socket_almost(origin) do
+    FedRegistry.add_fed_socket(origin)
+
+    receive do
+      :close ->
+        :ok
+    after
+      5_000 -> :timeout
+    end
+  end
+end
diff --git a/test/web/fed_sockets/fetch_registry_test.exs b/test/web/fed_sockets/fetch_registry_test.exs
new file mode 100644 (file)
index 0000000..7bd2d99
--- /dev/null
@@ -0,0 +1,67 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.FedSockets.FetchRegistryTest do
+  use ExUnit.Case
+
+  alias Pleroma.Web.FedSockets.FetchRegistry
+  alias Pleroma.Web.FedSockets.FetchRegistry.FetchRegistryData
+
+  @json_message "hello"
+  @json_reply "hello back"
+
+  setup do
+    start_supervised(
+      {Pleroma.Web.FedSockets.Supervisor,
+       [
+         ping_interval: 8,
+         connection_duration: 15,
+         rejection_duration: 5,
+         fed_socket_fetches: [default: 10, interval: 10]
+       ]}
+    )
+
+    :ok
+  end
+
+  test "fetches can be stored" do
+    uuid = FetchRegistry.register_fetch(@json_message)
+
+    assert {:error, :waiting} = FetchRegistry.check_fetch(uuid)
+  end
+
+  test "fetches can return" do
+    uuid = FetchRegistry.register_fetch(@json_message)
+    task = Task.async(fn -> FetchRegistry.register_fetch_received(uuid, @json_reply) end)
+
+    assert {:error, :waiting} = FetchRegistry.check_fetch(uuid)
+    Task.await(task)
+
+    assert {:ok, %FetchRegistryData{received_json: received_json}} =
+             FetchRegistry.check_fetch(uuid)
+
+    assert received_json == @json_reply
+  end
+
+  test "fetches are deleted once popped from stack" do
+    uuid = FetchRegistry.register_fetch(@json_message)
+    task = Task.async(fn -> FetchRegistry.register_fetch_received(uuid, @json_reply) end)
+    Task.await(task)
+
+    assert {:ok, %FetchRegistryData{received_json: received_json}} =
+             FetchRegistry.check_fetch(uuid)
+
+    assert received_json == @json_reply
+    assert {:ok, @json_reply} = FetchRegistry.pop_fetch(uuid)
+
+    assert {:error, :missing} = FetchRegistry.check_fetch(uuid)
+  end
+
+  test "fetches can time out" do
+    uuid = FetchRegistry.register_fetch(@json_message)
+    assert {:error, :waiting} = FetchRegistry.check_fetch(uuid)
+    Process.sleep(500)
+    assert {:error, :missing} = FetchRegistry.check_fetch(uuid)
+  end
+end
diff --git a/test/web/fed_sockets/socket_info_test.exs b/test/web/fed_sockets/socket_info_test.exs
new file mode 100644 (file)
index 0000000..db3d6ed
--- /dev/null
@@ -0,0 +1,118 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.FedSockets.SocketInfoTest do
+  use ExUnit.Case
+
+  alias Pleroma.Web.FedSockets
+  alias Pleroma.Web.FedSockets.SocketInfo
+
+  describe "uri_for_origin" do
+    test "provides the fed_socket URL given the origin information" do
+      endpoint = "example.com:4000"
+      assert FedSockets.uri_for_origin(endpoint) =~ "ws://"
+      assert FedSockets.uri_for_origin(endpoint) =~ endpoint
+    end
+  end
+
+  describe "origin" do
+    test "will provide the origin field given a url" do
+      endpoint = "example.com:4000"
+      assert SocketInfo.origin("ws://#{endpoint}") == endpoint
+      assert SocketInfo.origin("http://#{endpoint}") == endpoint
+      assert SocketInfo.origin("https://#{endpoint}") == endpoint
+    end
+
+    test "will proide the origin field given a uri" do
+      endpoint = "example.com:4000"
+      uri = URI.parse("http://#{endpoint}")
+
+      assert SocketInfo.origin(uri) == endpoint
+    end
+  end
+
+  describe "touch" do
+    test "will update the TTL" do
+      endpoint = "example.com:4000"
+      socket = SocketInfo.build("ws://#{endpoint}")
+      Process.sleep(2)
+      touched_socket = SocketInfo.touch(socket)
+
+      assert socket.connected_until < touched_socket.connected_until
+    end
+  end
+
+  describe "expired?" do
+    setup do
+      start_supervised(
+        {Pleroma.Web.FedSockets.Supervisor,
+         [
+           ping_interval: 8,
+           connection_duration: 5,
+           rejection_duration: 5,
+           fed_socket_rejections: [lazy: true]
+         ]}
+      )
+
+      :ok
+    end
+
+    test "tests if the TTL is exceeded" do
+      endpoint = "example.com:4000"
+      socket = SocketInfo.build("ws://#{endpoint}")
+      refute SocketInfo.expired?(socket)
+      Process.sleep(10)
+
+      assert SocketInfo.expired?(socket)
+    end
+  end
+
+  describe "creating outgoing connection records" do
+    test "can be passed a string" do
+      assert %{conn_pid: :pid, origin: _origin} = SocketInfo.build("example.com:4000", :pid)
+    end
+
+    test "can be passed a URI" do
+      uri = URI.parse("http://example.com:4000")
+      assert %{conn_pid: :pid, origin: origin} = SocketInfo.build(uri, :pid)
+      assert origin =~ "example.com:4000"
+    end
+
+    test "will include the port number" do
+      assert %{conn_pid: :pid, origin: origin} = SocketInfo.build("http://example.com:4000", :pid)
+
+      assert origin =~ ":4000"
+    end
+
+    test "will provide the port if missing" do
+      assert %{conn_pid: :pid, origin: "example.com:80"} =
+               SocketInfo.build("http://example.com", :pid)
+
+      assert %{conn_pid: :pid, origin: "example.com:443"} =
+               SocketInfo.build("https://example.com", :pid)
+    end
+  end
+
+  describe "creating incoming connection records" do
+    test "can be passed a string" do
+      assert %{pid: _, origin: _origin} = SocketInfo.build("example.com:4000")
+    end
+
+    test "can be passed a URI" do
+      uri = URI.parse("example.com:4000")
+      assert %{pid: _, origin: _origin} = SocketInfo.build(uri)
+    end
+
+    test "will include the port number" do
+      assert %{pid: _, origin: origin} = SocketInfo.build("http://example.com:4000")
+
+      assert origin =~ ":4000"
+    end
+
+    test "will provide the port if missing" do
+      assert %{pid: _, origin: "example.com:80"} = SocketInfo.build("http://example.com")
+      assert %{pid: _, origin: "example.com:443"} = SocketInfo.build("https://example.com")
+    end
+  end
+end