+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
defmodule Pleroma.Web.Federator do
use GenServer
alias Pleroma.User
alias Pleroma.Activity
alias Pleroma.Web.{WebFinger, Websub}
+ alias Pleroma.Web.Federator.RetryQueue
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Relay
alias Pleroma.Web.ActivityPub.Transmogrifier
@websub Application.get_env(:pleroma, :websub)
@ostatus Application.get_env(:pleroma, :ostatus)
- @httpoison Application.get_env(:pleroma, :httpoison)
- @max_jobs 20
def init(args) do
{:ok, args}
end
def handle(:publish_single_ap, params) do
- ActivityPub.publish_one(params)
- end
-
- def handle(:publish_single_websub, %{xml: xml, topic: topic, callback: callback, secret: secret}) do
- signature = @websub.sign(secret || "", xml)
- Logger.debug(fn -> "Pushing #{topic} to #{callback}" end)
-
- with {:ok, %{status_code: code}} <-
- @httpoison.post(
- callback,
- xml,
- [
- {"Content-Type", "application/atom+xml"},
- {"X-Hub-Signature", "sha1=#{signature}"}
- ],
- timeout: 10000,
- recv_timeout: 20000,
- hackney: [pool: :default]
- ) do
- Logger.debug(fn -> "Pushed to #{callback}, code #{code}" end)
- else
- e ->
- Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(e)}" end)
+ case ActivityPub.publish_one(params) do
+ {:ok, _} ->
+ :ok
+
+ {:error, _} ->
+ RetryQueue.enqueue(params, ActivityPub)
+ end
+ end
+
+ def handle(
+ :publish_single_websub,
+ %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
+ ) do
+ case Websub.publish_one(params) do
+ {:ok, _} ->
+ :ok
+
+ {:error, _} ->
+ RetryQueue.enqueue(params, Websub)
end
end
{:error, "Don't know what to do with this"}
end
- def enqueue(type, payload, priority \\ 1) do
- if Pleroma.Config.get([:instance, :federating]) do
- if Mix.env() == :test do
+ if Mix.env() == :test do
+ def enqueue(type, payload, _priority \\ 1) do
+ if Pleroma.Config.get([:instance, :federating]) do
handle(type, payload)
- else
+ end
+ end
+ else
+ def enqueue(type, payload, priority \\ 1) do
+ if Pleroma.Config.get([:instance, :federating]) do
GenServer.cast(__MODULE__, {:enqueue, type, payload, priority})
end
end
end
def maybe_start_job(running_jobs, queue) do
- if :sets.size(running_jobs) < @max_jobs && queue != [] do
+ if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, :max_jobs]) && queue != [] do
{{type, payload}, queue} = queue_pop(queue)
{:ok, pid} = Task.start(fn -> handle(type, payload) end)
mref = Process.monitor(pid)