Merge branch '534_federation_targets_reachability' into 'develop'
authorhref <href+git-pleroma@random.sh>
Fri, 1 Feb 2019 09:14:35 +0000 (09:14 +0000)
committerhref <href+git-pleroma@random.sh>
Fri, 1 Feb 2019 09:14:35 +0000 (09:14 +0000)
[#534] Unreachable federation targets retirement

Closes #534

See merge request pleroma/pleroma!703

24 files changed:
config/config.exs
docs/config.md
lib/pleroma/instances.ex [new file with mode: 0644]
lib/pleroma/instances/instance.ex [new file with mode: 0644]
lib/pleroma/web/activity_pub/activity_pub.ex
lib/pleroma/web/activity_pub/activity_pub_controller.ex
lib/pleroma/web/federator/federator.ex
lib/pleroma/web/ostatus/ostatus.ex
lib/pleroma/web/ostatus/ostatus_controller.ex
lib/pleroma/web/salmon/salmon.ex
lib/pleroma/web/websub/websub.ex
lib/pleroma/web/websub/websub_controller.ex
priv/repo/migrations/20190123125546_create_instances.exs [new file with mode: 0644]
test/support/factory.ex
test/support/http_request_mock.ex
test/web/activity_pub/activity_pub_controller_test.exs
test/web/activity_pub/activity_pub_test.exs
test/web/federator_test.exs
test/web/instances/instance_test.exs [new file with mode: 0644]
test/web/instances/instances_test.exs [new file with mode: 0644]
test/web/ostatus/incoming_documents/delete_handling_test.exs
test/web/ostatus/ostatus_controller_test.exs
test/web/ostatus/ostatus_test.exs
test/web/websub/websub_controller_test.exs

index 98c94c14922c72c262b9de091eb7bb3e6740d9fe..d0d53a64a2fa25127c8d9f1a226ee727b87db190 100644 (file)
@@ -146,6 +146,7 @@ config :pleroma, :instance,
   banner_upload_limit: 4_000_000,
   registrations_open: true,
   federating: true,
+  federation_reachability_timeout_days: 7,
   allow_relay: true,
   rewrite_policy: Pleroma.Web.ActivityPub.MRF.NoOpPolicy,
   public: true,
index 22a9b23f9258b3657d6112bcb53c9043eff88adb..c14746d932fb9c573560363d2c73ecd5769e7766 100644 (file)
@@ -72,6 +72,7 @@ config :pleroma, Pleroma.Mailer,
 * `invites_enabled`: Enable user invitations for admins (depends on `registrations_open: false`).
 * `account_activation_required`: Require users to confirm their emails before signing in.
 * `federating`: Enable federation with other instances
+* `federation_reachability_timeout_days`: Timeout (in days) of each external federation target being unreachable prior to pausing federating to it.
 * `allow_relay`: Enable Pleroma’s Relay, which makes it possible to follow a whole instance
 * `rewrite_policy`: Message Rewrite Policy, either one or a list. Here are the ones available by default:
   * `Pleroma.Web.ActivityPub.MRF.NoOpPolicy`: Doesn’t modify activities (default)
diff --git a/lib/pleroma/instances.ex b/lib/pleroma/instances.ex
new file mode 100644 (file)
index 0000000..5e107f4
--- /dev/null
@@ -0,0 +1,36 @@
+defmodule Pleroma.Instances do
+  @moduledoc "Instances context."
+
+  @adapter Pleroma.Instances.Instance
+
+  defdelegate filter_reachable(urls_or_hosts), to: @adapter
+  defdelegate reachable?(url_or_host), to: @adapter
+  defdelegate set_reachable(url_or_host), to: @adapter
+  defdelegate set_unreachable(url_or_host, unreachable_since \\ nil), to: @adapter
+
+  def set_consistently_unreachable(url_or_host),
+    do: set_unreachable(url_or_host, reachability_datetime_threshold())
+
+  def reachability_datetime_threshold do
+    federation_reachability_timeout_days =
+      Pleroma.Config.get(:instance)[:federation_reachability_timeout_days] || 0
+
+    if federation_reachability_timeout_days > 0 do
+      NaiveDateTime.add(
+        NaiveDateTime.utc_now(),
+        -federation_reachability_timeout_days * 24 * 3600,
+        :second
+      )
+    else
+      ~N[0000-01-01 00:00:00]
+    end
+  end
+
+  def host(url_or_host) when is_binary(url_or_host) do
+    if url_or_host =~ ~r/^http/i do
+      URI.parse(url_or_host).host
+    else
+      url_or_host
+    end
+  end
+end
diff --git a/lib/pleroma/instances/instance.ex b/lib/pleroma/instances/instance.ex
new file mode 100644 (file)
index 0000000..a87590d
--- /dev/null
@@ -0,0 +1,102 @@
+defmodule Pleroma.Instances.Instance do
+  @moduledoc "Instance."
+
+  alias Pleroma.Instances
+  alias Pleroma.Instances.Instance
+
+  use Ecto.Schema
+
+  import Ecto.{Query, Changeset}
+
+  alias Pleroma.Repo
+
+  schema "instances" do
+    field(:host, :string)
+    field(:unreachable_since, :naive_datetime)
+
+    timestamps()
+  end
+
+  defdelegate host(url_or_host), to: Instances
+
+  def changeset(struct, params \\ %{}) do
+    struct
+    |> cast(params, [:host, :unreachable_since])
+    |> validate_required([:host])
+    |> unique_constraint(:host)
+  end
+
+  def filter_reachable([]), do: []
+
+  def filter_reachable(urls_or_hosts) when is_list(urls_or_hosts) do
+    hosts =
+      urls_or_hosts
+      |> Enum.map(&(&1 && host(&1)))
+      |> Enum.filter(&(to_string(&1) != ""))
+
+    unreachable_hosts =
+      Repo.all(
+        from(i in Instance,
+          where:
+            i.host in ^hosts and
+              i.unreachable_since <= ^Instances.reachability_datetime_threshold(),
+          select: i.host
+        )
+      )
+
+    Enum.filter(urls_or_hosts, &(&1 && host(&1) not in unreachable_hosts))
+  end
+
+  def reachable?(url_or_host) when is_binary(url_or_host) do
+    !Repo.one(
+      from(i in Instance,
+        where:
+          i.host == ^host(url_or_host) and
+            i.unreachable_since <= ^Instances.reachability_datetime_threshold(),
+        select: true
+      )
+    )
+  end
+
+  def reachable?(_), do: true
+
+  def set_reachable(url_or_host) when is_binary(url_or_host) do
+    with host <- host(url_or_host),
+         %Instance{} = existing_record <- Repo.get_by(Instance, %{host: host}) do
+      {:ok, _instance} =
+        existing_record
+        |> changeset(%{unreachable_since: nil})
+        |> Repo.update()
+    end
+  end
+
+  def set_reachable(_), do: {:error, nil}
+
+  def set_unreachable(url_or_host, unreachable_since \\ nil)
+
+  def set_unreachable(url_or_host, unreachable_since) when is_binary(url_or_host) do
+    unreachable_since = unreachable_since || DateTime.utc_now()
+    host = host(url_or_host)
+    existing_record = Repo.get_by(Instance, %{host: host})
+
+    changes = %{unreachable_since: unreachable_since}
+
+    cond do
+      is_nil(existing_record) ->
+        %Instance{}
+        |> changeset(Map.put(changes, :host, host))
+        |> Repo.insert()
+
+      existing_record.unreachable_since &&
+          NaiveDateTime.compare(existing_record.unreachable_since, unreachable_since) != :gt ->
+        {:ok, existing_record}
+
+      true ->
+        existing_record
+        |> changeset(changes)
+        |> Repo.update()
+    end
+  end
+
+  def set_unreachable(_, _), do: {:error, nil}
+end
index 0199ac9e7c0558a5db0721fd2ffc71a17ff486b9..06e8c3f1c28945370e547284d9cdc274cfdcd65e 100644 (file)
@@ -3,7 +3,7 @@
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Web.ActivityPub.ActivityPub do
-  alias Pleroma.{Activity, Repo, Object, Upload, User, Notification}
+  alias Pleroma.{Activity, Repo, Object, Upload, User, Notification, Instances}
   alias Pleroma.Web.ActivityPub.{Transmogrifier, MRF}
   alias Pleroma.Web.WebFinger
   alias Pleroma.Web.Federator
@@ -734,7 +734,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
   end
 
   def publish(actor, activity) do
-    followers =
+    remote_followers =
       if actor.follower_address in activity.recipients do
         {:ok, followers} = User.get_followers(actor)
         followers |> Enum.filter(&(!&1.local))
@@ -745,13 +745,14 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     public = is_public?(activity)
 
     remote_inboxes =
-      (Pleroma.Web.Salmon.remote_users(activity) ++ followers)
+      (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
       |> Enum.filter(fn user -> User.ap_enabled?(user) end)
       |> Enum.map(fn %{info: %{source_data: data}} ->
         (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
       end)
       |> Enum.uniq()
       |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
+      |> Instances.filter_reachable()
 
     {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
     json = Jason.encode!(data)
@@ -779,15 +780,24 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
         digest: digest
       })
 
-    @httpoison.post(
-      inbox,
-      json,
-      [
-        {"Content-Type", "application/activity+json"},
-        {"signature", signature},
-        {"digest", digest}
-      ]
-    )
+    with {:ok, %{status: code}} when code in 200..299 <-
+           result =
+             @httpoison.post(
+               inbox,
+               json,
+               [
+                 {"Content-Type", "application/activity+json"},
+                 {"signature", signature},
+                 {"digest", digest}
+               ]
+             ) do
+      Instances.set_reachable(inbox)
+      result
+    else
+      {_post_result, response} ->
+        Instances.set_unreachable(inbox)
+        {:error, response}
+    end
   end
 
   # TODO:
index 7eed0a6006d86154400cfb50b69978ae281d1ac5..4dea6ab83ee21ac87ee92f750151ef9751d8a76e 100644 (file)
@@ -4,6 +4,7 @@
 
 defmodule Pleroma.Web.ActivityPub.ActivityPubController do
   use Pleroma.Web, :controller
+
   alias Pleroma.{Activity, User, Object}
   alias Pleroma.Web.ActivityPub.{ObjectView, UserView}
   alias Pleroma.Web.ActivityPub.ActivityPub
@@ -17,6 +18,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
   action_fallback(:errors)
 
   plug(Pleroma.Web.FederatingPlug when action in [:inbox, :relay])
+  plug(:set_requester_reachable when action in [:inbox])
   plug(:relay_active? when action in [:relay])
 
   def relay_active?(conn, _) do
@@ -289,4 +291,13 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
     |> put_status(500)
     |> json("error")
   end
+
+  defp set_requester_reachable(%Plug.Conn{} = conn, _) do
+    with actor <- conn.params["actor"],
+         true <- is_binary(actor) do
+      Pleroma.Instances.set_reachable(actor)
+    end
+
+    conn
+  end
 end
index f3a0e18b8b7536284ba8d23e2a06b774025d81d6..46f7a497341a143c12a7b527f05ab800eee8cd18 100644 (file)
@@ -6,7 +6,7 @@ defmodule Pleroma.Web.Federator do
   use GenServer
   alias Pleroma.User
   alias Pleroma.Activity
-  alias Pleroma.Web.{WebFinger, Websub}
+  alias Pleroma.Web.{WebFinger, Websub, Salmon}
   alias Pleroma.Web.Federator.RetryQueue
   alias Pleroma.Web.ActivityPub.ActivityPub
   alias Pleroma.Web.ActivityPub.Relay
@@ -124,6 +124,10 @@ defmodule Pleroma.Web.Federator do
     end
   end
 
+  def handle(:publish_single_salmon, {user_or_url, feed, poster}) do
+    Salmon.send_to_user(user_or_url, feed, poster)
+  end
+
   def handle(:publish_single_ap, params) do
     case ActivityPub.publish_one(params) do
       {:ok, _} ->
index a3155b79dd07db2ca586f7800e1f7aa75f8a57f7..a20ca17bbe7ce8498bdb3cf9faa3e3d158fd920d 100644 (file)
@@ -48,6 +48,9 @@ defmodule Pleroma.Web.OStatus do
 
   def handle_incoming(xml_string) do
     with doc when doc != :error <- parse_document(xml_string) do
+      with {:ok, actor_user} <- find_make_or_update_user(doc),
+           do: Pleroma.Instances.set_reachable(actor_user.ap_id)
+
       entries = :xmerl_xpath.string('//entry', doc)
 
       activities =
index 297aca2f976948ef18c6a373fb39d19c38416217..302ff38a457716b06e557892c2315ce7aa1d6e0d 100644 (file)
@@ -14,6 +14,7 @@ defmodule Pleroma.Web.OStatus.OStatusController do
   alias Pleroma.Web.ActivityPub.ActivityPub
 
   plug(Pleroma.Web.FederatingPlug when action in [:salmon_incoming])
+
   action_fallback(:errors)
 
   def feed_redirect(conn, %{"nickname" => nickname}) do
index e41657da1a95cabb8e81651570a9270bcf53b638..07ca42a5f0aef095c94909b3ad9c4f3b6792c709 100644 (file)
@@ -6,6 +6,7 @@ defmodule Pleroma.Web.Salmon do
   @httpoison Application.get_env(:pleroma, :httpoison)
 
   use Bitwise
+  alias Pleroma.Instances
   alias Pleroma.Web.XML
   alias Pleroma.Web.OStatus.ActivityRepresenter
   alias Pleroma.User
@@ -163,23 +164,28 @@ defmodule Pleroma.Web.Salmon do
 
   # push an activity to remote accounts
   #
-  defp send_to_user(%{info: %{salmon: salmon}}, feed, poster),
+  def send_to_user(%{info: %{salmon: salmon}}, feed, poster),
     do: send_to_user(salmon, feed, poster)
 
-  defp send_to_user(url, feed, poster) when is_binary(url) do
-    with {:ok, %{status: code}} <-
+  def send_to_user(url, feed, poster) when is_binary(url) do
+    with {:ok, %{status: code}} when code in 200..299 <-
            poster.(
              url,
              feed,
              [{"Content-Type", "application/magic-envelope+xml"}]
            ) do
+      Instances.set_reachable(url)
       Logger.debug(fn -> "Pushed to #{url}, code #{code}" end)
+      :ok
     else
-      e -> Logger.debug(fn -> "Pushing Salmon to #{url} failed, #{inspect(e)}" end)
+      e ->
+        Instances.set_unreachable(url)
+        Logger.debug(fn -> "Pushing Salmon to #{url} failed, #{inspect(e)}" end)
+        :error
     end
   end
 
-  defp send_to_user(_, _, _), do: nil
+  def send_to_user(_, _, _), do: :noop
 
   @supported_activities [
     "Create",
@@ -209,12 +215,16 @@ defmodule Pleroma.Web.Salmon do
       {:ok, private, _} = keys_from_pem(keys)
       {:ok, feed} = encode(private, feed)
 
-      remote_users(activity)
+      remote_users = remote_users(activity)
+
+      salmon_urls = Enum.map(remote_users, & &1.info.salmon)
+      reachable_salmon_urls = Instances.filter_reachable(salmon_urls)
+
+      remote_users
+      |> Enum.filter(&(&1.info.salmon in reachable_salmon_urls))
       |> Enum.each(fn remote_user ->
-        Task.start(fn ->
-          Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
-          send_to_user(remote_user, feed, poster)
-        end)
+        Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
+        Pleroma.Web.Federator.enqueue(:publish_single_salmon, {remote_user, feed, poster})
       end)
     end
   end
index 7ca62c83b111fb9ccd560d6c232f9605e105da98..8f7d53b03f598da82d15e12d8cbd2b5da4ebf546 100644 (file)
@@ -5,6 +5,7 @@
 defmodule Pleroma.Web.Websub do
   alias Ecto.Changeset
   alias Pleroma.Repo
+  alias Pleroma.Instances
   alias Pleroma.Web.Websub.{WebsubServerSubscription, WebsubClientSubscription}
   alias Pleroma.Web.OStatus.FeedRepresenter
   alias Pleroma.Web.{XML, Endpoint, OStatus}
@@ -53,23 +54,27 @@ defmodule Pleroma.Web.Websub do
   ]
   def publish(topic, user, %{data: %{"type" => type}} = activity)
       when type in @supported_activities do
-    # TODO: Only send to still valid subscriptions.
+    response =
+      user
+      |> FeedRepresenter.to_simple_form([activity], [user])
+      |> :xmerl.export_simple(:xmerl_xml)
+      |> to_string
+
     query =
       from(
         sub in WebsubServerSubscription,
         where: sub.topic == ^topic and sub.state == "active",
-        where: fragment("? > NOW()", sub.valid_until)
+        where: fragment("? > (NOW() at time zone 'UTC')", sub.valid_until)
       )
 
     subscriptions = Repo.all(query)
 
-    Enum.each(subscriptions, fn sub ->
-      response =
-        user
-        |> FeedRepresenter.to_simple_form([activity], [user])
-        |> :xmerl.export_simple(:xmerl_xml)
-        |> to_string
+    callbacks = Enum.map(subscriptions, & &1.callback)
+    reachable_callbacks = Instances.filter_reachable(callbacks)
 
+    subscriptions
+    |> Enum.filter(&(&1.callback in reachable_callbacks))
+    |> Enum.each(fn sub ->
       data = %{
         xml: response,
         topic: topic,
@@ -267,7 +272,7 @@ defmodule Pleroma.Web.Websub do
     signature = sign(secret || "", xml)
     Logger.info(fn -> "Pushing #{topic} to #{callback}" end)
 
-    with {:ok, %{status: code}} <-
+    with {:ok, %{status: code}} when code in 200..299 <-
            @httpoison.post(
              callback,
              xml,
@@ -276,12 +281,14 @@ defmodule Pleroma.Web.Websub do
                {"X-Hub-Signature", "sha1=#{signature}"}
              ]
            ) do
+      Instances.set_reachable(callback)
       Logger.info(fn -> "Pushed to #{callback}, code #{code}" end)
       {:ok, code}
     else
-      e ->
-        Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(e)}" end)
-        {:error, e}
+      {_post_result, response} ->
+        Instances.set_unreachable(callback)
+        Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(response)}" end)
+        {:error, response}
     end
   end
 end
index e58f144e5227d3721e44271122acd11653991128..a92dfe87b6cd0191bc412d10e1f960f9a05adc6d 100644 (file)
@@ -4,9 +4,11 @@
 
 defmodule Pleroma.Web.Websub.WebsubController do
   use Pleroma.Web, :controller
+
   alias Pleroma.{Repo, User}
   alias Pleroma.Web.{Websub, Federator}
   alias Pleroma.Web.Websub.WebsubClientSubscription
+
   require Logger
 
   plug(
diff --git a/priv/repo/migrations/20190123125546_create_instances.exs b/priv/repo/migrations/20190123125546_create_instances.exs
new file mode 100644 (file)
index 0000000..b527ad7
--- /dev/null
@@ -0,0 +1,15 @@
+defmodule Pleroma.Repo.Migrations.CreateInstances do
+  use Ecto.Migration
+
+  def change do
+    create table(:instances) do
+      add :host, :string
+      add :unreachable_since, :naive_datetime
+
+      timestamps()
+    end
+
+    create unique_index(:instances, [:host])
+    create index(:instances, [:unreachable_since])
+  end
+end
index 4ac77981a76de3d021cf4738ca060c3f0d889ac0..0c21093cef5c3f7c4a8082d29a7f2d49623ffa66 100644 (file)
@@ -193,7 +193,7 @@ defmodule Pleroma.Factory do
   def websub_subscription_factory do
     %Pleroma.Web.Websub.WebsubServerSubscription{
       topic: "http://example.org",
-      callback: "http://example/org/callback",
+      callback: "http://example.org/callback",
       secret: "here's a secret",
       valid_until: NaiveDateTime.add(NaiveDateTime.utc_now(), 100),
       state: "requested"
@@ -220,4 +220,11 @@ defmodule Pleroma.Factory do
       client_secret: "aaa;/&bbb"
     }
   end
+
+  def instance_factory do
+    %Pleroma.Instances.Instance{
+      host: "domain.com",
+      unreachable_since: nil
+    }
+  end
 end
index c60f618738e8c83134c77210fea5ab7506dbc39b..78e8efc9df77c1128d21bc73630e20b8752b4ec0 100644 (file)
@@ -696,6 +696,14 @@ defmodule HttpRequestMock do
     {:ok, %Tesla.Env{status: 200, body: "hello"}}
   end
 
+  def get("http://404.site" <> _, _, _, _) do
+    {:ok,
+     %Tesla.Env{
+       status: 404,
+       body: ""
+     }}
+  end
+
   def get(url, query, body, headers) do
     {:error,
      "Not implemented the mock response for get #{inspect(url)}, #{query}, #{inspect(body)}, #{
@@ -716,6 +724,26 @@ defmodule HttpRequestMock do
      }}
   end
 
+  def post("http://200.site" <> _, _, _, _) do
+    {:ok,
+     %Tesla.Env{
+       status: 200,
+       body: ""
+     }}
+  end
+
+  def post("http://connrefused.site" <> _, _, _, _) do
+    {:error, :connrefused}
+  end
+
+  def post("http://404.site" <> _, _, _, _) do
+    {:ok,
+     %Tesla.Env{
+       status: 404,
+       body: ""
+     }}
+  end
+
   def post(url, _query, _body, _headers) do
     {:error, "Not implemented the mock response for post #{inspect(url)}"}
   end
index 52e67f0467d0bf27e843ae65fa1fa46436a9e873..d3dd160dd92be987ce92988e16863ad5ec52e8b9 100644 (file)
@@ -6,8 +6,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
   use Pleroma.Web.ConnCase
   import Pleroma.Factory
   alias Pleroma.Web.ActivityPub.{UserView, ObjectView}
-  alias Pleroma.{Object, Repo, User}
-  alias Pleroma.Activity
+  alias Pleroma.{Object, Repo, Activity, User, Instances}
 
   setup_all do
     Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
@@ -144,6 +143,23 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
       :timer.sleep(500)
       assert Activity.get_by_ap_id(data["id"])
     end
+
+    test "it clears `unreachable` federation status of the sender", %{conn: conn} do
+      data = File.read!("test/fixtures/mastodon-post-activity.json") |> Poison.decode!()
+
+      sender_url = data["actor"]
+      Instances.set_consistently_unreachable(sender_url)
+      refute Instances.reachable?(sender_url)
+
+      conn =
+        conn
+        |> assign(:valid_signature, true)
+        |> put_req_header("content-type", "application/activity+json")
+        |> post("/inbox", data)
+
+      assert "ok" == json_response(conn, 200)
+      assert Instances.reachable?(sender_url)
+    end
   end
 
   describe "/users/:nickname/inbox" do
@@ -191,6 +207,28 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
 
       assert response(conn, 200) =~ note_activity.data["object"]["content"]
     end
+
+    test "it clears `unreachable` federation status of the sender", %{conn: conn} do
+      user = insert(:user)
+
+      data =
+        File.read!("test/fixtures/mastodon-post-activity.json")
+        |> Poison.decode!()
+        |> Map.put("bcc", [user.ap_id])
+
+      sender_host = URI.parse(data["actor"]).host
+      Instances.set_consistently_unreachable(sender_host)
+      refute Instances.reachable?(sender_host)
+
+      conn =
+        conn
+        |> assign(:valid_signature, true)
+        |> put_req_header("content-type", "application/activity+json")
+        |> post("/users/#{user.nickname}/inbox", data)
+
+      assert "ok" == json_response(conn, 200)
+      assert Instances.reachable?(sender_host)
+    end
   end
 
   describe "/users/:nickname/outbox" do
index b826f5a1baa9fd64fd2b9cd0f3a600197c224ef1..2ada4f2e5e1cd23a2bad9596b52eab027e3d9a20 100644 (file)
@@ -7,11 +7,12 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubTest do
   alias Pleroma.Web.ActivityPub.ActivityPub
   alias Pleroma.Web.ActivityPub.Utils
   alias Pleroma.Web.CommonAPI
-  alias Pleroma.{Activity, Object, User}
+  alias Pleroma.{Activity, Object, User, Instances}
   alias Pleroma.Builders.ActivityBuilder
 
   import Pleroma.Factory
   import Tesla.Mock
+  import Mock
 
   setup do
     mock(fn env -> apply(HttpRequestMock, :request, [env]) end)
@@ -696,6 +697,46 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubTest do
     assert 3 = length(activities)
   end
 
+  describe "publish_one/1" do
+    test_with_mock "it calls `Instances.set_unreachable` on target inbox on non-2xx HTTP response code",
+                   Instances,
+                   [:passthrough],
+                   [] do
+      actor = insert(:user)
+      inbox = "http://404.site/users/nick1/inbox"
+
+      assert {:error, _} =
+               ActivityPub.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1})
+
+      assert called(Instances.set_unreachable(inbox))
+    end
+
+    test_with_mock "it calls `Instances.set_unreachable` on target inbox on request error of any kind",
+                   Instances,
+                   [:passthrough],
+                   [] do
+      actor = insert(:user)
+      inbox = "http://connrefused.site/users/nick1/inbox"
+
+      assert {:error, _} =
+               ActivityPub.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1})
+
+      assert called(Instances.set_unreachable(inbox))
+    end
+
+    test_with_mock "it does NOT call `Instances.set_unreachable` if target is reachable",
+                   Instances,
+                   [:passthrough],
+                   [] do
+      actor = insert(:user)
+      inbox = "http://200.site/users/nick1/inbox"
+
+      assert {:ok, _} = ActivityPub.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1})
+
+      refute called(Instances.set_unreachable(inbox))
+    end
+  end
+
   def data_uri do
     File.read!("test/fixtures/avatar_data_uri")
   end
index a49265c0c7d93054ae69de572c12b55caac18d8d..c6d10ef7809edcd6a47d964fac488b3aacfc3def 100644 (file)
@@ -3,8 +3,8 @@
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Web.FederatorTest do
-  alias Pleroma.Web.Federator
-  alias Pleroma.Web.CommonAPI
+  alias Pleroma.Web.{CommonAPI, Federator}
+  alias Pleroma.Instances
   use Pleroma.DataCase
   import Pleroma.Factory
   import Mock
@@ -71,6 +71,103 @@ defmodule Pleroma.Web.FederatorTest do
     end
   end
 
+  describe "Targets reachability filtering in `publish`" do
+    test_with_mock "it federates only to reachable instances via AP",
+                   Federator,
+                   [:passthrough],
+                   [] do
+      user = insert(:user)
+
+      {inbox1, inbox2} =
+        {"https://domain.com/users/nick1/inbox", "https://domain2.com/users/nick2/inbox"}
+
+      insert(:user, %{
+        local: false,
+        nickname: "nick1@domain.com",
+        ap_id: "https://domain.com/users/nick1",
+        info: %{ap_enabled: true, source_data: %{"inbox" => inbox1}}
+      })
+
+      insert(:user, %{
+        local: false,
+        nickname: "nick2@domain2.com",
+        ap_id: "https://domain2.com/users/nick2",
+        info: %{ap_enabled: true, source_data: %{"inbox" => inbox2}}
+      })
+
+      Instances.set_unreachable(
+        URI.parse(inbox2).host,
+        Instances.reachability_datetime_threshold()
+      )
+
+      {:ok, _activity} =
+        CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"})
+
+      assert called(Federator.enqueue(:publish_single_ap, %{inbox: inbox1}))
+      refute called(Federator.enqueue(:publish_single_ap, %{inbox: inbox2}))
+    end
+
+    test_with_mock "it federates only to reachable instances via Websub",
+                   Federator,
+                   [:passthrough],
+                   [] do
+      user = insert(:user)
+      websub_topic = Pleroma.Web.OStatus.feed_path(user)
+
+      sub1 =
+        insert(:websub_subscription, %{
+          topic: websub_topic,
+          state: "active",
+          callback: "http://pleroma.soykaf.com/cb"
+        })
+
+      sub2 =
+        insert(:websub_subscription, %{
+          topic: websub_topic,
+          state: "active",
+          callback: "https://pleroma2.soykaf.com/cb"
+        })
+
+      Instances.set_consistently_unreachable(sub1.callback)
+
+      {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI"})
+
+      assert called(Federator.enqueue(:publish_single_websub, %{callback: sub2.callback}))
+      refute called(Federator.enqueue(:publish_single_websub, %{callback: sub1.callback}))
+    end
+
+    test_with_mock "it federates only to reachable instances via Salmon",
+                   Federator,
+                   [:passthrough],
+                   [] do
+      user = insert(:user)
+
+      remote_user1 =
+        insert(:user, %{
+          local: false,
+          nickname: "nick1@domain.com",
+          ap_id: "https://domain.com/users/nick1",
+          info: %{salmon: "https://domain.com/salmon"}
+        })
+
+      remote_user2 =
+        insert(:user, %{
+          local: false,
+          nickname: "nick2@domain2.com",
+          ap_id: "https://domain2.com/users/nick2",
+          info: %{salmon: "https://domain2.com/salmon"}
+        })
+
+      Instances.set_consistently_unreachable("domain.com")
+
+      {:ok, _activity} =
+        CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"})
+
+      assert called(Federator.enqueue(:publish_single_salmon, {remote_user2, :_, :_}))
+      refute called(Federator.enqueue(:publish_single_websub, {remote_user1, :_, :_}))
+    end
+  end
+
   describe "Receive an activity" do
     test "successfully processes incoming AP docs with correct origin" do
       params = %{
diff --git a/test/web/instances/instance_test.exs b/test/web/instances/instance_test.exs
new file mode 100644 (file)
index 0000000..a158c0a
--- /dev/null
@@ -0,0 +1,107 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Instances.InstanceTest do
+  alias Pleroma.Repo
+  alias Pleroma.Instances.Instance
+
+  use Pleroma.DataCase
+
+  import Pleroma.Factory
+
+  setup_all do
+    config_path = [:instance, :federation_reachability_timeout_days]
+    initial_setting = Pleroma.Config.get(config_path)
+
+    Pleroma.Config.put(config_path, 1)
+    on_exit(fn -> Pleroma.Config.put(config_path, initial_setting) end)
+
+    :ok
+  end
+
+  describe "set_reachable/1" do
+    test "clears `unreachable_since` of existing matching Instance record having non-nil `unreachable_since`" do
+      instance = insert(:instance, unreachable_since: NaiveDateTime.utc_now())
+
+      assert {:ok, instance} = Instance.set_reachable(instance.host)
+      refute instance.unreachable_since
+    end
+
+    test "keeps nil `unreachable_since` of existing matching Instance record having nil `unreachable_since`" do
+      instance = insert(:instance, unreachable_since: nil)
+
+      assert {:ok, instance} = Instance.set_reachable(instance.host)
+      refute instance.unreachable_since
+    end
+
+    test "does NOT create an Instance record in case of no existing matching record" do
+      host = "domain.org"
+      assert nil == Instance.set_reachable(host)
+
+      assert [] = Repo.all(Ecto.Query.from(i in Instance))
+      assert Instance.reachable?(host)
+    end
+  end
+
+  describe "set_unreachable/1" do
+    test "creates new record having `unreachable_since` to current time if record does not exist" do
+      assert {:ok, instance} = Instance.set_unreachable("https://domain.com/path")
+
+      instance = Repo.get(Instance, instance.id)
+      assert instance.unreachable_since
+      assert "domain.com" == instance.host
+    end
+
+    test "sets `unreachable_since` of existing record having nil `unreachable_since`" do
+      instance = insert(:instance, unreachable_since: nil)
+      refute instance.unreachable_since
+
+      assert {:ok, _} = Instance.set_unreachable(instance.host)
+
+      instance = Repo.get(Instance, instance.id)
+      assert instance.unreachable_since
+    end
+
+    test "does NOT modify `unreachable_since` value of existing record in case it's present" do
+      instance =
+        insert(:instance, unreachable_since: NaiveDateTime.add(NaiveDateTime.utc_now(), -10))
+
+      assert instance.unreachable_since
+      initial_value = instance.unreachable_since
+
+      assert {:ok, _} = Instance.set_unreachable(instance.host)
+
+      instance = Repo.get(Instance, instance.id)
+      assert initial_value == instance.unreachable_since
+    end
+  end
+
+  describe "set_unreachable/2" do
+    test "sets `unreachable_since` value of existing record in case it's newer than supplied value" do
+      instance =
+        insert(:instance, unreachable_since: NaiveDateTime.add(NaiveDateTime.utc_now(), -10))
+
+      assert instance.unreachable_since
+
+      past_value = NaiveDateTime.add(NaiveDateTime.utc_now(), -100)
+      assert {:ok, _} = Instance.set_unreachable(instance.host, past_value)
+
+      instance = Repo.get(Instance, instance.id)
+      assert past_value == instance.unreachable_since
+    end
+
+    test "does NOT modify `unreachable_since` value of existing record in case it's equal to or older than supplied value" do
+      instance =
+        insert(:instance, unreachable_since: NaiveDateTime.add(NaiveDateTime.utc_now(), -10))
+
+      assert instance.unreachable_since
+      initial_value = instance.unreachable_since
+
+      assert {:ok, _} = Instance.set_unreachable(instance.host, NaiveDateTime.utc_now())
+
+      instance = Repo.get(Instance, instance.id)
+      assert initial_value == instance.unreachable_since
+    end
+  end
+end
diff --git a/test/web/instances/instances_test.exs b/test/web/instances/instances_test.exs
new file mode 100644 (file)
index 0000000..adb8560
--- /dev/null
@@ -0,0 +1,112 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.InstancesTest do
+  alias Pleroma.Instances
+
+  use Pleroma.DataCase
+
+  setup_all do
+    config_path = [:instance, :federation_reachability_timeout_days]
+    initial_setting = Pleroma.Config.get(config_path)
+
+    Pleroma.Config.put(config_path, 1)
+    on_exit(fn -> Pleroma.Config.put(config_path, initial_setting) end)
+
+    :ok
+  end
+
+  describe "reachable?/1" do
+    test "returns `true` for host / url with unknown reachability status" do
+      assert Instances.reachable?("unknown.site")
+      assert Instances.reachable?("http://unknown.site")
+    end
+
+    test "returns `false` for host / url marked unreachable for at least `reachability_datetime_threshold()`" do
+      host = "consistently-unreachable.name"
+      Instances.set_consistently_unreachable(host)
+
+      refute Instances.reachable?(host)
+      refute Instances.reachable?("http://#{host}/path")
+    end
+
+    test "returns `true` for host / url marked unreachable for less than `reachability_datetime_threshold()`" do
+      url = "http://eventually-unreachable.name/path"
+
+      Instances.set_unreachable(url)
+
+      assert Instances.reachable?(url)
+      assert Instances.reachable?(URI.parse(url).host)
+    end
+
+    test "returns true on non-binary input" do
+      assert Instances.reachable?(nil)
+      assert Instances.reachable?(1)
+    end
+  end
+
+  describe "filter_reachable/1" do
+    test "keeps only reachable elements of supplied list" do
+      host = "consistently-unreachable.name"
+      url1 = "http://eventually-unreachable.com/path"
+      url2 = "http://domain.com/path"
+
+      Instances.set_consistently_unreachable(host)
+      Instances.set_unreachable(url1)
+
+      assert [url1, url2] == Instances.filter_reachable([host, url1, url2])
+    end
+  end
+
+  describe "set_reachable/1" do
+    test "sets unreachable url or host reachable" do
+      host = "domain.com"
+      Instances.set_consistently_unreachable(host)
+      refute Instances.reachable?(host)
+
+      Instances.set_reachable(host)
+      assert Instances.reachable?(host)
+    end
+
+    test "keeps reachable url or host reachable" do
+      url = "https://site.name?q="
+      assert Instances.reachable?(url)
+
+      Instances.set_reachable(url)
+      assert Instances.reachable?(url)
+    end
+
+    test "returns error status on non-binary input" do
+      assert {:error, _} = Instances.set_reachable(nil)
+      assert {:error, _} = Instances.set_reachable(1)
+    end
+  end
+
+  # Note: implementation-specific (e.g. Instance) details of set_unreachable/1 should be tested in implementation-specific tests
+  describe "set_unreachable/1" do
+    test "returns error status on non-binary input" do
+      assert {:error, _} = Instances.set_unreachable(nil)
+      assert {:error, _} = Instances.set_unreachable(1)
+    end
+  end
+
+  describe "set_consistently_unreachable/1" do
+    test "sets reachable url or host unreachable" do
+      url = "http://domain.com?q="
+      assert Instances.reachable?(url)
+
+      Instances.set_consistently_unreachable(url)
+      refute Instances.reachable?(url)
+    end
+
+    test "keeps unreachable url or host unreachable" do
+      host = "site.name"
+      Instances.set_consistently_unreachable(host)
+      refute Instances.reachable?(host)
+
+      Instances.set_consistently_unreachable(host)
+      refute Instances.reachable?(host)
+    end
+  end
+end
index c8fbff6cc7741e3298185ca551d1dbee0eac1cec..d97cd79f4df10773e071bb6635887378ccc9c636 100644 (file)
@@ -2,9 +2,16 @@ defmodule Pleroma.Web.OStatus.DeleteHandlingTest do
   use Pleroma.DataCase
 
   import Pleroma.Factory
+  import Tesla.Mock
+
   alias Pleroma.{Repo, Activity, Object}
   alias Pleroma.Web.OStatus
 
+  setup do
+    mock(fn env -> apply(HttpRequestMock, :request, [env]) end)
+    :ok
+  end
+
   describe "deletions" do
     test "it removes the mentioned activity" do
       note = insert(:note_activity)
index 954abf5fe96559aaff1ecbd274d979465f679352..3145ca9a1ab7a010f3364f84e67f717962ed91e8 100644 (file)
@@ -14,49 +14,51 @@ defmodule Pleroma.Web.OStatus.OStatusControllerTest do
     :ok
   end
 
-  test "decodes a salmon", %{conn: conn} do
-    user = insert(:user)
-    salmon = File.read!("test/fixtures/salmon.xml")
-
-    conn =
-      conn
-      |> put_req_header("content-type", "application/atom+xml")
-      |> post("/users/#{user.nickname}/salmon", salmon)
-
-    assert response(conn, 200)
-  end
-
-  test "decodes a salmon with a changed magic key", %{conn: conn} do
-    user = insert(:user)
-    salmon = File.read!("test/fixtures/salmon.xml")
-
-    conn =
-      conn
-      |> put_req_header("content-type", "application/atom+xml")
-      |> post("/users/#{user.nickname}/salmon", salmon)
-
-    assert response(conn, 200)
-
-    # Set a wrong magic-key for a user so it has to refetch
-    salmon_user = User.get_by_ap_id("http://gs.example.org:4040/index.php/user/1")
-    # Wrong key
-    info_cng =
-      User.Info.remote_user_creation(salmon_user.info, %{
-        magic_key:
-          "RSA.pu0s-halox4tu7wmES1FVSx6u-4wc0YrUFXcqWXZG4-27UmbCOpMQftRCldNRfyA-qLbz-eqiwrong1EwUvjsD4cYbAHNGHwTvDOyx5AKthQUP44ykPv7kjKGh3DWKySJvcs9tlUG87hlo7AvnMo9pwRS_Zz2CacQ-MKaXyDepk=.AQAB"
-      })
-
-    salmon_user
-    |> Ecto.Changeset.change()
-    |> Ecto.Changeset.put_embed(:info, info_cng)
-    |> Repo.update()
-
-    conn =
-      build_conn()
-      |> put_req_header("content-type", "application/atom+xml")
-      |> post("/users/#{user.nickname}/salmon", salmon)
-
-    assert response(conn, 200)
+  describe "salmon_incoming" do
+    test "decodes a salmon", %{conn: conn} do
+      user = insert(:user)
+      salmon = File.read!("test/fixtures/salmon.xml")
+
+      conn =
+        conn
+        |> put_req_header("content-type", "application/atom+xml")
+        |> post("/users/#{user.nickname}/salmon", salmon)
+
+      assert response(conn, 200)
+    end
+
+    test "decodes a salmon with a changed magic key", %{conn: conn} do
+      user = insert(:user)
+      salmon = File.read!("test/fixtures/salmon.xml")
+
+      conn =
+        conn
+        |> put_req_header("content-type", "application/atom+xml")
+        |> post("/users/#{user.nickname}/salmon", salmon)
+
+      assert response(conn, 200)
+
+      # Set a wrong magic-key for a user so it has to refetch
+      salmon_user = User.get_by_ap_id("http://gs.example.org:4040/index.php/user/1")
+      # Wrong key
+      info_cng =
+        User.Info.remote_user_creation(salmon_user.info, %{
+          magic_key:
+            "RSA.pu0s-halox4tu7wmES1FVSx6u-4wc0YrUFXcqWXZG4-27UmbCOpMQftRCldNRfyA-qLbz-eqiwrong1EwUvjsD4cYbAHNGHwTvDOyx5AKthQUP44ykPv7kjKGh3DWKySJvcs9tlUG87hlo7AvnMo9pwRS_Zz2CacQ-MKaXyDepk=.AQAB"
+        })
+
+      salmon_user
+      |> Ecto.Changeset.change()
+      |> Ecto.Changeset.put_embed(:info, info_cng)
+      |> Repo.update()
+
+      conn =
+        build_conn()
+        |> put_req_header("content-type", "application/atom+xml")
+        |> post("/users/#{user.nickname}/salmon", salmon)
+
+      assert response(conn, 200)
+    end
   end
 
   test "gets a feed", %{conn: conn} do
index 403cc7095ddb077839fc74bca266cbfab04bdce3..0c63dd84dae8a84e311fa8ce2c903cac224affb7 100644 (file)
@@ -6,7 +6,7 @@ defmodule Pleroma.Web.OStatusTest do
   use Pleroma.DataCase
   alias Pleroma.Web.OStatus
   alias Pleroma.Web.XML
-  alias Pleroma.{Object, Repo, User, Activity}
+  alias Pleroma.{Object, Repo, User, Activity, Instances}
   import Pleroma.Factory
   import ExUnit.CaptureLog
 
@@ -311,6 +311,22 @@ defmodule Pleroma.Web.OStatusTest do
     refute User.following?(follower, followed)
   end
 
+  test "it clears `unreachable` federation status of the sender" do
+    incoming_reaction_xml = File.read!("test/fixtures/share-gs.xml")
+    doc = XML.parse_document(incoming_reaction_xml)
+    actor_uri = XML.string_from_xpath("//author/uri[1]", doc)
+    reacted_to_author_uri = XML.string_from_xpath("//author/uri[2]", doc)
+
+    Instances.set_consistently_unreachable(actor_uri)
+    Instances.set_consistently_unreachable(reacted_to_author_uri)
+    refute Instances.reachable?(actor_uri)
+    refute Instances.reachable?(reacted_to_author_uri)
+
+    {:ok, _} = OStatus.handle_incoming(incoming_reaction_xml)
+    assert Instances.reachable?(actor_uri)
+    refute Instances.reachable?(reacted_to_author_uri)
+  end
+
   describe "new remote user creation" do
     test "returns local users" do
       local_user = insert(:user)
index 9cbcda063e4186a9b27cfabd9ac70fa068a61258..6492df2a0b5c0167fbf3229c41c16e9e02bdebb3 100644 (file)
@@ -50,35 +50,37 @@ defmodule Pleroma.Web.Websub.WebsubControllerTest do
     assert_in_delta NaiveDateTime.diff(websub.valid_until, NaiveDateTime.utc_now()), 100, 5
   end
 
-  test "handles incoming feed updates", %{conn: conn} do
-    websub = insert(:websub_client_subscription)
-    doc = "some stuff"
-    signature = Websub.sign(websub.secret, doc)
-
-    conn =
-      conn
-      |> put_req_header("x-hub-signature", "sha1=" <> signature)
-      |> put_req_header("content-type", "application/atom+xml")
-      |> post("/push/subscriptions/#{websub.id}", doc)
-
-    assert response(conn, 200) == "OK"
-
-    assert length(Repo.all(Activity)) == 1
-  end
-
-  test "rejects incoming feed updates with the wrong signature", %{conn: conn} do
-    websub = insert(:websub_client_subscription)
-    doc = "some stuff"
-    signature = Websub.sign("wrong secret", doc)
-
-    conn =
-      conn
-      |> put_req_header("x-hub-signature", "sha1=" <> signature)
-      |> put_req_header("content-type", "application/atom+xml")
-      |> post("/push/subscriptions/#{websub.id}", doc)
-
-    assert response(conn, 500) == "Error"
-
-    assert length(Repo.all(Activity)) == 0
+  describe "websub_incoming" do
+    test "handles incoming feed updates", %{conn: conn} do
+      websub = insert(:websub_client_subscription)
+      doc = "some stuff"
+      signature = Websub.sign(websub.secret, doc)
+
+      conn =
+        conn
+        |> put_req_header("x-hub-signature", "sha1=" <> signature)
+        |> put_req_header("content-type", "application/atom+xml")
+        |> post("/push/subscriptions/#{websub.id}", doc)
+
+      assert response(conn, 200) == "OK"
+
+      assert length(Repo.all(Activity)) == 1
+    end
+
+    test "rejects incoming feed updates with the wrong signature", %{conn: conn} do
+      websub = insert(:websub_client_subscription)
+      doc = "some stuff"
+      signature = Websub.sign("wrong secret", doc)
+
+      conn =
+        conn
+        |> put_req_header("x-hub-signature", "sha1=" <> signature)
+        |> put_req_header("content-type", "application/atom+xml")
+        |> post("/push/subscriptions/#{websub.id}", doc)
+
+      assert response(conn, 500) == "Error"
+
+      assert length(Repo.all(Activity)) == 0
+    end
   end
 end