Merge branch 'develop' of git.pleroma.social:pleroma/pleroma into feature/jobs
authorEgor Kislitsyn <egor@kislitsyn.com>
Mon, 4 Feb 2019 13:50:28 +0000 (20:50 +0700)
committerEgor Kislitsyn <egor@kislitsyn.com>
Mon, 4 Feb 2019 13:50:28 +0000 (20:50 +0700)
# Conflicts:
# lib/pleroma/web/activity_pub/activity_pub.ex
# lib/pleroma/web/federator/federator.ex

13 files changed:
1  2 
config/config.exs
config/test.exs
docs/config.md
lib/pleroma/application.ex
lib/pleroma/web/activity_pub/activity_pub.ex
lib/pleroma/web/activity_pub/activity_pub_controller.ex
lib/pleroma/web/activity_pub/utils.ex
lib/pleroma/web/federator/federator.ex
lib/pleroma/web/ostatus/ostatus_controller.ex
lib/pleroma/web/salmon/salmon.ex
lib/pleroma/web/websub/websub.ex
lib/pleroma/web/websub/websub_controller.ex
test/web/federator_test.exs

Simple merge
diff --cc config/test.exs
Simple merge
diff --cc docs/config.md
Simple merge
Simple merge
index 6d784717e65d46b33c7e5f30df57d8172c070057,4635e7fcd674a438a632d7b9dc1ab4c0bc51c196..bdc9456ddb7657041f20db72c0352aa1b2ab10e9
@@@ -747,15 -747,18 +747,16 @@@ defmodule Pleroma.Web.ActivityPub.Activ
      {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
      json = Jason.encode!(data)
  
-     (Pleroma.Web.Salmon.remote_users(activity) ++ followers)
 -    Enum.each(reachable_inboxes_metadata, fn {inbox, unreachable_since} ->
 -      Federator.enqueue(:publish_single_ap, %{
++    (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
 +    |> Enum.filter(fn user -> User.ap_enabled?(user) end)
 +    |> Enum.map(fn %{info: %{source_data: data}} ->
 +      (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
 +    end)
 +    |> Enum.uniq()
 +    |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
-     |> Enum.each(fn inbox ->
++    |> Instances.filter_reachable()
++    |> Enum.each(fn {inbox, unreachable_since} ->
 +      Federator.publish_single_ap(%{
          inbox: inbox,
          json: json,
          actor: actor,
Simple merge
index 4d03b462285bdde3c63b8757bae398f903c47039,bb7676cf00be5ab0b1d22c3fa9b5f33cc45f6a62..37100c9e6277f0fdabb7dced65ea42df0dff9ce1
@@@ -3,10 -3,10 +3,10 @@@
  # SPDX-License-Identifier: AGPL-3.0-only
  
  defmodule Pleroma.Web.Federator do
 -  use GenServer
    alias Pleroma.User
    alias Pleroma.Activity
-   alias Pleroma.Web.{WebFinger, Websub}
 +  alias Pleroma.Jobs
+   alias Pleroma.Web.{WebFinger, Websub, Salmon}
    alias Pleroma.Web.Federator.RetryQueue
    alias Pleroma.Web.ActivityPub.ActivityPub
    alias Pleroma.Web.ActivityPub.Relay
    @websub Application.get_env(:pleroma, :websub)
    @ostatus Application.get_env(:pleroma, :ostatus)
  
 -  def init(args) do
 -    {:ok, args}
 +  def init() do
 +    # 1 minute
 +    Process.sleep(1000 * 60 * 1)
 +    refresh_subscriptions()
    end
  
 -  def start_link do
 -    spawn(fn ->
 -      # 1 minute
 -      Process.sleep(1000 * 60 * 1)
 -      enqueue(:refresh_subscriptions, nil)
 -    end)
 +  # Client API
 +
 +  def incoming_doc(doc) do
 +    Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
 +  end
 +
 +  def incoming_ap_doc(params) do
 +    Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
 +  end
  
 -    GenServer.start_link(
 -      __MODULE__,
 -      %{
 -        in: {:sets.new(), []},
 -        out: {:sets.new(), []}
 -      },
 -      name: __MODULE__
 -    )
 +  def publish(activity, priority \\ 1) do
 +    Jobs.enqueue(:federator_out, __MODULE__, [:publish, activity], priority)
    end
  
 -  def handle(:refresh_subscriptions, _) do
 +  def publish_single_ap(params) do
 +    Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_ap, params])
 +  end
 +
 +  def publish_single_websub(websub) do
 +    Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_websub, websub])
 +  end
 +
 +  def verify_websub(websub) do
 +    Jobs.enqueue(:federator_out, __MODULE__, [:verify_websub, websub])
 +  end
 +
 +  def request_subscription(sub) do
 +    Jobs.enqueue(:federator_out, __MODULE__, [:request_subscription, sub])
 +  end
 +
 +  def refresh_subscriptions() do
 +    Jobs.enqueue(:federator_out, __MODULE__, [:refresh_subscriptions])
 +  end
 +
++  def publish_single_salmon(params) do
++    Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_salmon, params])
++  end
++
 +  # Job Worker Callbacks
 +
 +  def perform(:refresh_subscriptions) do
      Logger.debug("Federator running refresh subscriptions")
      Websub.refresh_subscriptions()
  
      end
    end
  
 -  def handle(:publish_single_salmon, params) do
++  def perform(:publish_single_salmon, params) do
+     Salmon.send_to_user(params)
+   end
 -  def handle(:publish_single_ap, params) do
 +  def perform(:publish_single_ap, params) do
      case ActivityPub.publish_one(params) do
        {:ok, _} ->
          :ok
index e41657da1a95cabb8e81651570a9270bcf53b638,b1c2dc7fa96972cfc59de974098374cd5d1bd0df..3db0c07fd6e6e334e9ecf65742195548d811faf4
@@@ -209,12 -216,23 +216,23 @@@ defmodule Pleroma.Web.Salmon d
        {:ok, private, _} = keys_from_pem(keys)
        {:ok, feed} = encode(private, feed)
  
-       remote_users(activity)
+       remote_users = remote_users(activity)
+       salmon_urls = Enum.map(remote_users, & &1.info.salmon)
+       reachable_urls_metadata = Instances.filter_reachable(salmon_urls)
+       reachable_urls = Map.keys(reachable_urls_metadata)
+       remote_users
+       |> Enum.filter(&(&1.info.salmon in reachable_urls))
        |> Enum.each(fn remote_user ->
-         Task.start(fn ->
-           Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
-           send_to_user(remote_user, feed, poster)
-         end)
+         Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
 -        Pleroma.Web.Federator.enqueue(:publish_single_salmon, %{
++        Pleroma.Web.Federator.publish_single_salmon(%{
+           recipient: remote_user,
+           feed: feed,
+           poster: poster,
+           unreachable_since: reachable_urls_metadata[remote_user.info.salmon]
+         })
        end)
      end
    end
index 652ffd92cbb450bdc26b4873619b87ad4fb2a7d7,90ba79962718ef0f6550f5cc3b380c094a276d26..82845f13dbf100b293e0f97e65a96671dac43886
@@@ -5,9 -5,10 +5,10 @@@
  defmodule Pleroma.Web.Websub do
    alias Ecto.Changeset
    alias Pleroma.Repo
+   alias Pleroma.Instances
    alias Pleroma.Web.Websub.{WebsubServerSubscription, WebsubClientSubscription}
    alias Pleroma.Web.OStatus.FeedRepresenter
 -  alias Pleroma.Web.{XML, Endpoint, OStatus}
 +  alias Pleroma.Web.{XML, Endpoint, OStatus, Federator}
    alias Pleroma.Web.Router.Helpers
    require Logger
  
          xml: response,
          topic: topic,
          callback: sub.callback,
-         secret: sub.secret
+         secret: sub.secret,
+         unreachable_since: reachable_callbacks_metadata[sub.callback]
        }
  
 -      Pleroma.Web.Federator.enqueue(:publish_single_websub, data)
 +      Federator.publish_single_websub(data)
      end)
    end
  
Simple merge