static userAgentString(userAgentConfig) {
// eslint-disable-next-line security/detect-object-injection
const _conf = (field, def) => (userAgentConfig && field in userAgentConfig) ? userAgentConfig[field] : def;
- const product = _conf('product', packageName);
+ const product = _conf('product', packageName).split('/').pop();
const version = _conf('version', packageVersion);
let implementation = _conf('implementation', Enum.Specification);
if (implementation) {
}
if (!topic.isActive) {
+ // These should be filtered out when selecting verification tasks to process.
this.logger.debug(_scope, 'topic not active, skipping verification', { verification, requestId });
await this.db.verificationRelease(dbCtx, verificationId);
return;
case Enum.Mode.Unsubscribe:
if (verificationAccepted) {
await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
+ if (topic.isDeleted) {
+ // Remove a deleted topic after the last subscription is notified.
+ await this.db.topicPendingDelete(txCtx, topic.id);
+ }
}
break;
case Enum.Mode.Denied:
await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
+ if (topic.isDeleted) {
+ // Remove a deleted topic after he last subscription is notified.
+ await this.db.topicPendingDelete(txCtx, topic.id);
+ }
break;
default:
throw new Errors.InternalInconsistencyError(verification.mode);
}
- await this.db.verificationComplete(dbCtx, verificationId, verification.callback, verification.topicId);
+ await this.db.verificationComplete(txCtx, verificationId, verification.callback, verification.topicId);
}); // txCtx
this.logger.info(_scope, 'verification complete', { ...logInfoData, verificationAccepted });
throw new Errors.InternalInconsistencyError('no such topic id');
}
- logInfoData.url = topicId.url;
+ // Cull any expired subscriptions
+ await this.db.subscriptionDeleteExpired(dbCtx, topicId);
+
+ logInfoData.url = topic.url;
if (topic.isDeleted) {
this.logger.debug(_scope, 'topic deleted, skipping update request', logInfoData);
const validHub = await this.linkHelper.validHub(topic.url, response.headers, response.data);
if (!validHub) {
- this.logger.debug(_scope, 'retrieved topic does not list us as hub', { logInfoData });
+ this.logger.info(_scope, 'retrieved topic does not list us as hub', { logInfoData });
if (this.options.communication.strictTopicHubLink) {
await this.db.transaction(dbCtx, async (txCtx) => {
// Set as deleted and set content_updated so subscriptions are notified.
await this.db.topicDeleted(txCtx, topicId);
await this.db.topicFetchComplete(txCtx, topicId);
});
+ // Attempt to remove from db, if no active subscriptions.
+ await this.db.topicPendingDelete(dbCtx, topicId);
return;
}
}
await this.db.transaction(dbCtx, async (txCtx) => {
await this.db.verificationInsert(txCtx, verification);
- await this.db.subscriptionDeliveryComplete(txCtx, subscription.callback, subscription.topicId);
+ await this.db.subscriptionDeliveryComplete(txCtx, subscription.callback, subscription.topicId, topic.contentUpdated);
});
this.logger.info(_scope, 'update unsubscription for deleted topic', logInfoData);
return;
return;
}
- await this.db.subscriptionDeliveryComplete(dbCtx, subscription.callback, subscription.topicId);
+ await this.db.subscriptionDeliveryComplete(dbCtx, subscription.callback, subscription.topicId, topic.contentUpdated);
this.logger.info(_scope, 'update success', logInfoData);
}
if (wanted > 0) {
// Update topics before anything else.
const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
- topicFetchPromises = topicFetchIds.map((id) => this.topicFetchProcess(dbCtx, id, requestId));
+ topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId)));
inProgress.push(...topicFetchPromises);
wanted -= topicFetchPromises.length;
}
if (wanted > 0) {
// Then any pending verifications.
const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
- verificationPromises = verifications.map((id) => this.verificationProcess(dbCtx, id, requestId));
+ verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId)));
inProgress.push(...verificationPromises);
wanted -= verificationPromises.length;
}
if (wanted > 0) {
// Finally dole out content.
const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
- updatePromises = updates.map((id) => this.subscriptionDeliveryProcess(dbCtx, id, requestId));
+ updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId)));
inProgress.push(...updatePromises);
wanted -= updatePromises.length;
}