+ // Process the valid publish notifications
+ const validPublishTopics = data.publishTopics.filter((topicResult) => !topicResult.err.length);
+ try {
+ await Promise.all(validPublishTopics.map(async (topicResult) => this.db.topicFetchRequested(dbCtx, topicResult.topicId)));
+ } catch (e) {
+ this.logger.error(_scope, 'topicFetchRequest failed', { error: e, ctx, data, requestId });
+ throw e;
+ }
+
+ this.logger.info(_scope, 'request accepted', { ctx, data, requestId });
+
+ if (data.publishTopics.length === 1) {
+ const soleTopic = data.publishTopics[0];
+ res.statusCode = soleTopic.status;
+ res.end(Manager._prettyDetails(soleTopic.err, soleTopic.warn).join('\n'));
+ } else {
+ res.statusCode = 207;
+ res.end(Manager.multiPublishContent(ctx, data.publishTopics));
+ }
+
+ if (this.options.manager.processImmediately
+ && validPublishTopics.length) {
+ try {
+ await Promise.all(validPublishTopics.map(async (topicResult) => this.communication.topicFetchClaimAndProcessById(dbCtx, topicResult.topicId, requestId)));
+ } catch (e) {
+ this.logger.error(_scope, 'topicFetchClaimAndProcessById failed', { data, validPublishTopics, requestId });
+ // Don't bother re-throwing, as we've already ended this response.
+ }
+ }
+ }
+
+
+ /**
+ * Annotate any encountered issues.
+ * @param {String[]} err
+ * @param {String[]} warn
+ * @returns {String[]}
+ */
+ static _prettyDetails(err, warn) {
+ return [
+ ...err.map((entry) => `error: ${entry}`),
+ ...warn.map((entry) => `warning: ${entry}`),
+ ];