@adapter Pleroma.Instances.Instance
+ defdelegate filter_reachable(urls), to: @adapter
defdelegate reachable?(url), to: @adapter
defdelegate set_reachable(url), to: @adapter
defdelegate set_unreachable(url, unreachable_since \\ nil), to: @adapter
def reachability_time_threshold,
do: NaiveDateTime.add(NaiveDateTime.utc_now(), -30 * 24 * 3600, :second)
+
+ def host(url_or_host) when is_binary(url_or_host) do
+ if url_or_host =~ ~r/^http/i do
+ URI.parse(url_or_host).host
+ else
+ url_or_host
+ end
+ end
end
timestamps()
end
- def update_changeset(struct, params \\ %{}) do
+ defdelegate host(url), to: Instances
+
+ def changeset(struct, params \\ %{}) do
struct
|> cast(params, [:host, :unreachable_since, :reachability_checked_at])
+ |> validate_required([:host])
|> unique_constraint(:host)
end
+ def filter_reachable([]), do: []
+
+ def filter_reachable(urls) when is_list(urls) do
+ hosts =
+ urls
+ |> Enum.map(&(&1 && host(&1)))
+ |> Enum.filter(&(to_string(&1) != ""))
+
+ unreachable_hosts =
+ Repo.all(
+ from(i in Instance,
+ where:
+ i.host in ^hosts and i.unreachable_since <= ^Instances.reachability_time_threshold(),
+ select: i.host
+ )
+ )
+
+ Enum.filter(urls, &(&1 && host(&1) not in unreachable_hosts))
+ end
+
def reachable?(url) when is_binary(url) do
!Repo.one(
from(i in Instance,
def reachable?(_), do: true
def set_reachable(url) when is_binary(url) do
- Repo.update_all(
- from(i in Instance, where: i.host == ^host(url)),
- set: [
- unreachable_since: nil,
- reachability_checked_at: DateTime.utc_now()
- ]
- )
+ with host <- host(url),
+ %Instance{} = existing_record <- Repo.get_by(Instance, %{host: host}) do
+ {:ok, _instance} =
+ existing_record
+ |> changeset(%{unreachable_since: nil, reachability_checked_at: DateTime.utc_now()})
+ |> Repo.update()
+ end
end
def set_reachable(_), do: {0, :noop}
do: Map.delete(changes, :unreachable_since),
else: changes
- {:ok, _instance} = Repo.update(update_changeset(existing_record, update_changes))
+ {:ok, _instance} =
+ existing_record
+ |> changeset(update_changes)
+ |> Repo.update()
else
- {:ok, _instance} = Repo.insert(update_changeset(%Instance{}, Map.put(changes, :host, host)))
+ {:ok, _instance} =
+ %Instance{}
+ |> changeset(Map.put(changes, :host, host))
+ |> Repo.insert()
end
end
def set_unreachable(_, _), do: {0, :noop}
-
- defp host(url_or_host) do
- if url_or_host =~ ~r/^http/i do
- URI.parse(url_or_host).host
- else
- url_or_host
- end
- end
end
end
def publish(actor, activity) do
- followers =
+ remote_followers =
if actor.follower_address in activity.recipients do
{:ok, followers} = User.get_followers(actor)
followers |> Enum.filter(&(!&1.local))
public = is_public?(activity)
remote_inboxes =
- (Pleroma.Web.Salmon.remote_users(activity) ++ followers)
+ (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
|> Enum.filter(fn user -> User.ap_enabled?(user) end)
|> Enum.map(fn %{info: %{source_data: data}} ->
(is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
end)
|> Enum.uniq()
|> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
+ |> Instances.filter_reachable()
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
json = Jason.encode!(data)
{:ok, private, _} = keys_from_pem(keys)
{:ok, feed} = encode(private, feed)
- remote_users(activity)
+ remote_users = remote_users(activity)
+
+ salmon_urls = Enum.map(remote_users, & &1.info.salmon)
+ reachable_salmon_urls = Instances.filter_reachable(salmon_urls)
+
+ remote_users
+ |> Enum.filter(&(&1.info.salmon in reachable_salmon_urls))
|> Enum.each(fn remote_user ->
Task.start(fn ->
Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
]
def publish(topic, user, %{data: %{"type" => type}} = activity)
when type in @supported_activities do
- # TODO: Only send to still valid subscriptions.
+ response =
+ user
+ |> FeedRepresenter.to_simple_form([activity], [user])
+ |> :xmerl.export_simple(:xmerl_xml)
+ |> to_string
+
query =
from(
sub in WebsubServerSubscription,
subscriptions = Repo.all(query)
- Enum.each(subscriptions, fn sub ->
- response =
- user
- |> FeedRepresenter.to_simple_form([activity], [user])
- |> :xmerl.export_simple(:xmerl_xml)
- |> to_string
+ callbacks = Enum.map(subscriptions, & &1.callback)
+ reachable_callbacks = Instances.filter_reachable(callbacks)
+ subscriptions
+ |> Enum.filter(&(&1.callback in reachable_callbacks))
+ |> Enum.each(fn sub ->
data = %{
xml: response,
topic: topic,