4 * Here we process all the incoming requests.
5 * Payload validation occurs here, before committing the pending work to the
6 * database and (usually) calling a processor to act upon it.
8 * As this is the mediator between server framework and actions, this would
9 * be where most of a rewrite for a new server framework would happen.
12 const common
= require('./common');
13 const Communication
= require('./communication');
14 const Enum
= require('./enum');
15 const Errors
= require('./errors');
16 const DBErrors
= require('./db/errors');
17 const { ResponseError
} = require('./errors');
18 const Template
= require('./template');
20 const _fileScope
= common
.fileScope(__filename
);
23 constructor(logger
, db
, options
) {
26 this.options
= options
;
27 this.communication
= new Communication(logger
, db
, options
);
32 * GET request for healthcheck.
33 * @param {http.ServerResponse} res
36 async
getHealthcheck(res
, ctx
) {
37 const _scope
= _fileScope('getHealthcheck');
38 const health
= 'happy';
40 // What else could we check...
41 const dbHealth
= await
this.db
.healthCheck();
42 this.logger
.debug(_scope
, 'called', { health
, dbHealth
, ctx
});
48 * GET request for root.
49 * @param {http.ClientRequest} req
50 * @param {http.ServerResponse} res
53 async
getRoot(req
, res
, ctx
) {
54 const _scope
= _fileScope('getRoot');
55 this.logger
.debug(_scope
, 'called', { ctx
});
57 const content
= Template
.rootHTML(ctx
, this.options
);
59 this.logger
.info(_scope
, 'finished', { ctx
});
63 /** All the fields the root handler deals with.
64 * @typedef {object} RootData
65 * @property {string} callback - url
66 * @property {string} mode
67 * @property {string} topic
68 * @property {number} topicId
69 * @property {string} leaseSeconds
70 * @property {string} secret
71 * @property {string} httpRemoteAddr
72 * @property {string} httpFrom
73 * @property {boolean} isSecure
74 * @property {boolean} isPublisherValidated
78 * Extract api parameters.
79 * @param {http.ClientRequest} req
83 static _getRootData(req
, ctx
) {
84 const postData
= ctx
.parsedBody
;
85 const mode
= (postData
['hub.mode'] || '').toLowerCase();
87 callback: postData
['hub.callback'],
89 ...(mode
=== Enum
.Mode
.Publish
&& { url: postData
['hub.url'] }), // Publish accepts either hub.url or hub.topic
90 topic: postData
['hub.topic'],
91 ...(postData
['hub.lease_seconds'] && { leaseSeconds: parseInt(postData
['hub.lease_seconds'], 10) }),
92 secret: postData
['hub.secret'],
93 httpRemoteAddr: ctx
.clientAddress
,
94 httpFrom: req
.getHeader(Enum
.Header
.From
),
95 isSecure: ((ctx
.clientProtocol
|| '').toLowerCase() === 'https'),
96 isPublisherValidated: true, // Default to true. Will be set to false later, if topic has publisher validation url.
104 * @param {RootData} data
105 * @param {String[]} warn
106 * @param {String[]} err
107 * @param {String} requestId
109 async
_validateRootData(dbCtx
, data
, warn
, err
, requestId
) {
110 // These checks can modify data, so order matters.
111 await
this._checkTopic(dbCtx
, data
, warn
, err
, requestId
);
112 this._checkCallbackAndSecrets(data
, warn
, err
, requestId
);
113 await
this._checkMode(dbCtx
, data
, warn
, err
, requestId
);
118 * Check that requested topic exists and values are in range.
119 * Sets topic id, publisher validation state, and requested lease
122 * @param {RootData} data
123 * @param {String[]} warn
124 * @param {String[]} err
126 async
_checkTopic(dbCtx
, data
, warn
, err
, requestId
) {
127 const _scope
= _fileScope('_checkTopic');
131 topic
= await
this.db
.topicGetByUrl(dbCtx
, data
.topic
);
133 if (!topic
&& this._newTopicCreationAllowed()) {
134 this.logger
.info(_scope
, 'new topic from subscribe request', { data
, requestId
});
139 err
.push('invalid topic url (failed to parse url)');
143 await
this.db
.topicSet(dbCtx
, {
146 topic
= await
this.db
.topicGetByUrl(dbCtx
, data
.topic
);
150 if (!topic
|| topic
.isDeleted
) {
151 err
.push('not a supported topic');
155 data
.topicId
= topic
.id
;
157 if (data
.leaseSeconds
=== undefined || isNaN(data
.leaseSeconds
)) {
158 data
.leaseSeconds
= topic
.leaseSecondsPreferred
;
160 if (data
.leaseSeconds
> topic
.leaseSecondsMax
) {
161 data
.leaseSeconds
= topic
.leaseSecondsMax
;
162 warn
.push(`requested lease too long, using ${data.leaseSeconds}`);
163 } else if (data
.leaseSeconds
< topic
.leaseSecondsMin
) {
164 data
.leaseSeconds
= topic
.leaseSecondsMin
;
165 warn
.push(`requested lease too short, using ${data.leaseSeconds}`);
169 if (topic
.publisherValidationUrl
) {
170 data
.isPublisherValidated
= false;
176 * Check data for valid callback url and scheme constraints.
177 * @param {RootData} data
178 * @param {String[]} warn
179 * @param {String[]} err
181 _checkCallbackAndSecrets(data
, warn
, err
) {
182 let isCallbackSecure
= false;
184 if (!data
.callback
) {
185 err
.push('invalid callback url (empty)');
188 const c
= new URL(data
.callback
);
189 isCallbackSecure
= (c
.protocol
.toLowerCase() === 'https:'); // Colon included because url module is weird
191 err
.push('invalid callback url (failed to parse url');
196 if (!isCallbackSecure
) {
197 warn
.push('insecure callback');
201 const secretSeverity
= this.options
.manager
.strictSecrets
? err : warn
;
202 if (!data
.isSecure
) {
203 secretSeverity
.push('secret not safe (insecure hub)');
205 if (!isCallbackSecure
) {
206 secretSeverity
.push('secret not safe (insecure callback)');
208 if (data
.secret
.length
> 199) {
209 err
.push('cannot keep a secret that big');
215 * Check mode validity and subscription requirements.
216 * Publish mode is handled elsewhere in the flow.
218 * @param {RootData} data
219 * @param {String[]} warn
220 * @param {String[]} err
221 * @param {String} requestId
223 async
_checkMode(dbCtx
, data
, warn
, err
) {
225 case Enum
.Mode
.Subscribe:
228 case Enum
.Mode
.Unsubscribe: {
229 const currentEpoch
= Date
.now() / 1000;
231 if (data
.callback
&& data
.topicId
) {
232 s
= await
this.db
.subscriptionGet(dbCtx
, data
.callback
, data
.topicId
);
234 if (s
=== undefined) {
235 err
.push('not subscribed');
237 if (s
.expires
< currentEpoch
) {
238 err
.push('subscription already expired');
245 err
.push('invalid mode');
252 * Determine if a topic url is allowed to be created.
253 * In the future, this may be more complicated.
256 _newTopicCreationAllowed() {
257 return this.options
.manager
.publicHub
;
262 * Check that a publish request's topic(s) are valid and exist,
263 * returning an array with the results for each.
264 * For a public-hub publish request, creates topics if they do not exist.
266 * @param {RootData} data
267 * @param {String[]} warn
268 * @param {String[]} err
269 * @param {String} requestId
271 async
_publishTopics(dbCtx
, data
, requestId
) {
272 const _scope
= _fileScope('_checkPublish');
274 // Publish requests may include multiple topics, consider them all, but deduplicate.
275 const publishUrls
= Array
.from(new Set([
276 ...common
.ensureArray(data
.url
),
277 ...common
.ensureArray(data
.topic
),
280 // Map the requested topics to their ids, creating if necessary.
281 return Promise
.all(publishUrls
.map(async (url
) => {
288 let topic
= await
this.db
.topicGetByUrl(dbCtx
, url
);
289 if (!topic
&& this._newTopicCreationAllowed()) {
293 result
.err
.push('invalid topic url (failed to parse url)');
296 await
this.db
.topicSet(dbCtx
, {
297 // TODO: accept a publisherValidationUrl parameter
300 topic
= await
this.db
.topicGetByUrl(dbCtx
, url
);
301 this.logger
.info(_scope
, 'new topic from publish request', { url
, requestId
});
303 if (!topic
|| topic
.isDeleted
) {
304 result
.err
.push('topic not supported');
307 result
.topicId
= topic
.id
;
314 * Render response for multi-topic publish requests.
315 * @param {Object[]} publishTopics
317 static multiPublishContent(ctx
, publishTopics
) {
318 const responses
= publishTopics
.map((topic
) => ({
320 status: topic
.status
,
321 statusMessage: topic
.statusMessage
,
323 warnings: topic
.warn
,
325 switch (ctx
.responseType
) {
326 case Enum
.ContentType
.ApplicationJson:
327 return JSON
.stringify(responses
);
329 case Enum
.ContentType
.TextPlain:
331 const textResponses
= responses
.map((response
) => {
332 const details
= Manager
._prettyDetails(response
.errors
, response
.warnings
);
333 const textDetails
= (details
.length
? '\n' : '') + details
.map((d
) => `\t${d}`).join('\n');
334 return `${response.href} [${response.status} ${response.statusMessage}]${textDetails}`;
336 return textResponses
.join('\n----\n');
343 * Process a publish request.
345 * @param {Object} data
346 * @param {http.ServerResponse} res
347 * @param {Object} ctx
349 async
_publishRequest(dbCtx
, data
, res
, ctx
) {
350 const _scope
= _fileScope('_parsePublish');
351 this.logger
.debug(_scope
, 'called', { data
});
353 const requestId
= ctx
.requestId
;
355 // Parse and validate all the topics in the request.
356 data
.publishTopics
= await
this._publishTopics(dbCtx
, data
, requestId
);
357 if (!data
.publishTopics
|| !data
.publishTopics
.length
) {
358 const details
= Manager
._prettyDetails(['no valid topic urls to publish'], []);
359 throw new ResponseError(Enum
.ErrorResponse
.BadRequest
, details
);
362 // Set status per topic
363 for (const topicResult
of data
.publishTopics
) {
364 topicResult
.status
= topicResult
.err
.length
? 400 : 202;
365 topicResult
.statusMessage
= topicResult
.err
.length
? 'Bad Request' : 'Accepted';
368 // Process the valid publish notifications
369 const validPublishTopics
= data
.publishTopics
.filter((topicResult
) => !topicResult
.err
.length
);
371 await Promise
.all(validPublishTopics
.map(async (topicResult
) => this.db
.topicFetchRequested(dbCtx
, topicResult
.topicId
)));
373 this.logger
.error(_scope
, 'topicFetchRequest failed', { error: e
, ctx
, data
, requestId
});
377 this.logger
.info(_scope
, 'request accepted', { ctx
, data
, requestId
});
379 if (data
.publishTopics
.length
=== 1) {
380 const soleTopic
= data
.publishTopics
[0];
381 res
.statusCode
= soleTopic
.status
;
382 res
.end(Manager
._prettyDetails(soleTopic
.err
, soleTopic
.warn
).join('\n'));
384 res
.statusCode
= 207;
385 res
.end(Manager
.multiPublishContent(ctx
, data
.publishTopics
));
388 if (this.options
.manager
.processImmediately
389 && validPublishTopics
.length
) {
391 await Promise
.all(validPublishTopics
.map(async (topicResult
) => this.communication
.topicFetchClaimAndProcessById(dbCtx
, topicResult
.topicId
, requestId
)));
393 this.logger
.error(_scope
, 'topicFetchClaimAndProcessById failed', { data
, validPublishTopics
, requestId
});
394 // Don't bother re-throwing, as we've already ended this response.
401 * Annotate any encountered issues.
402 * @param {String[]} err
403 * @param {String[]} warn
404 * @returns {String[]}
406 static _prettyDetails(err
, warn
) {
408 ...err
.map((entry
) => `error: ${entry}`),
409 ...warn
.map((entry
) => `warning: ${entry}`),
415 * POST request for root.
416 * @param {http.ClientRequest} req
417 * @param {http.ServerResponse} res
418 * @param {object} ctx
420 async
postRoot(req
, res
, ctx
) {
421 const _scope
= _fileScope('postRoot');
422 this.logger
.debug(_scope
, 'called', { ctx
});
424 res
.statusCode
= 202; // Presume success.
428 const data
= Manager
._getRootData(req
, ctx
);
429 const requestId
= ctx
.requestId
;
431 await
this.db
.context(async (dbCtx
) => {
433 // Handle publish requests elsewhere
434 if (data
.mode
=== Enum
.Mode
.Publish
) {
435 return this._publishRequest(dbCtx
, data
, res
, ctx
);
438 await
this._validateRootData(dbCtx
, data
, warn
, err
, requestId
);
440 const details
= Manager
._prettyDetails(err
, warn
);
442 // Any errors are fatal. Stop and report anything that went wrong.
444 this.logger
.debug(_scope
, { msg: 'invalid request', data
, err
, warn
, requestId
});
445 throw new ResponseError(Enum
.ErrorResponse
.BadRequest
, details
);
448 // Commit the request for later processing.
451 id
= await
this.db
.verificationInsert(dbCtx
, { ...data
, requestId
});
453 this.logger
.error(_scope
, 'verificationInsert failed', { error: e
, data
, warn
, id
, requestId
});
457 // If we committed to the db, we've succeeded as far as the client is concerned.
458 res
.end(details
.join('\n'));
459 this.logger
.info(_scope
, 'request accepted', { data
, warn
, requestId
});
461 // Immediately attempt to claim and process the request.
462 if (this.options
.manager
.processImmediately
465 await
this.communication
.verificationClaimAndProcessById(dbCtx
, id
, requestId
);
467 this.logger
.error(_scope
, 'verificationClaimAndProcessById failed', { ...data
, id
, requestId
});
468 // Don't bother re-throwing, as we've already ended this response.
476 * Render topic info content.
477 * @param {Object} ctx
478 * @param {String} ctx.responseType
479 * @param {String} ctx.topicUrl
480 * @param {Number} ctx.count
483 // eslint-disable-next-line class-methods-use-this
485 // eslint-disable-next-line sonarjs/no-small-switch
486 switch (ctx
.responseType
) {
487 case Enum
.ContentType
.ApplicationJson:
488 return JSON
.stringify({
493 case Enum
.ContentType
.ImageSVG:
494 return Template
.badgeSVG({}, ` ${ctx.topicUrl} `, ` ${ctx.count} subscribers `, `${ctx.topicUrl} has ${ctx.count} subscribers.`);
497 return ctx
.count
.toString();
503 * GET request for /info?topic=url&format=type
504 * @param {http.ServerResponse} res
505 * @param {object} ctx
507 async
getInfo(res
, ctx
) {
508 const _scope
= _fileScope('getInfo');
509 this.logger
.debug(_scope
, 'called', { ctx
});
511 if (!ctx
.queryParams
.topic
) {
512 throw new ResponseError(Enum
.ErrorResponse
.BadRequest
, 'missing required parameter');
514 ctx
.topicUrl
= ctx
.queryParams
.topic
;
516 switch ((ctx
.queryParams
.format
|| '').toLowerCase()) {
518 ctx
.responseType
= Enum
.ContentType
.ImageSVG
;
519 res
.setHeader(Enum
.Header
.ContentType
, ctx
.responseType
);
523 ctx
.responseType
= Enum
.ContentType
.ApplicationJson
;
524 res
.setHeader(Enum
.Header
.ContentType
, ctx
.responseType
);
532 new URL(ctx
.topicUrl
);
534 throw new ResponseError(Enum
.ErrorResponse
.BadRequest
, 'invalid topic');
538 await
this.db
.context(async (dbCtx
) => {
539 count
= await
this.db
.subscriptionCountByTopicUrl(dbCtx
, ctx
.topicUrl
);
541 throw new ResponseError(Enum
.ErrorResponse
.NotFound
, 'no such topic');
543 ctx
.count
= count
.count
;
546 const content
= this.infoContent(ctx
);
547 res
.setHeader(Enum
.Header
.ETag
, common
.generateETag(undefined, undefined, content
));
548 res
.setHeader(Enum
.Header
.CacheControl
, 'no-cache');
550 this.logger
.info(_scope
, 'finished', { ctx
});
555 * label the bars of the topic update history graph
556 * @param {Number} index
557 * @param {Number} value
560 static _historyBarCaption(index
, value
) {
570 when
= `${index} days ago`;
572 return `${when}, ${value ? value : 'no'} update${value === 1 ? '': 's'}`;
577 * GET SVG chart of topic update history
578 * @param {http.ServerResponse} res
579 * @param {object} ctx
581 async
getHistorySVG(res
, ctx
) {
582 const _scope
= _fileScope('getHist');
583 this.logger
.debug(_scope
, 'called', { ctx
});
585 const days
= Math
.min(parseInt(ctx
.queryParams
.days
) || this.options
.manager
.publishHistoryDays
, 365);
586 const histOptions
= {
587 title: 'Topic Publish History',
588 description: 'Updates per Day',
589 labelZero: '^ Today',
597 barCaptionFn: Manager
._historyBarCaption
,
601 await
this.db
.context(async (dbCtx
) => {
602 publishHistory
= await
this.db
.topicPublishHistory(dbCtx
, ctx
.params
.topicId
, days
);
605 res
.end(Template
.histogramSVG(publishHistory
, histOptions
));
606 this.logger
.info(_scope
, 'finished', { ctx
});
610 * GET request for authorized /admin information.
611 * @param {http.ServerResponse} res
612 * @param {object} ctx
614 async
getAdminOverview(res
, ctx
) {
615 const _scope
= _fileScope('getAdminOverview');
616 this.logger
.debug(_scope
, 'called', { ctx
});
618 await
this.db
.context(async (dbCtx
) => {
619 ctx
.topics
= await
this.db
.topicGetAll(dbCtx
);
621 this.logger
.debug(_scope
, 'got topics', { topics: ctx
.topics
});
623 // Profile users can only see related topics.
624 if (ctx
.session
&& ctx
.session
.authenticatedProfile
) {
625 const profileUrlObj
= new URL(ctx
.session
.authenticatedProfile
);
626 ctx
.topics
= ctx
.topics
.filter((topic
) => {
627 const topicUrlObj
= new URL(topic
.url
);
628 return (topicUrlObj
.hostname
=== profileUrlObj
.hostname
);
632 res
.end(Template
.adminOverviewHTML(ctx
, this.options
));
633 this.logger
.info(_scope
, 'finished', { ctx
, topics: ctx
.topics
.length
});
638 * GET request for authorized /admin/topic/:topicId information.
639 * @param {http.ServerResponse} res
640 * @param {object} ctx
642 async
getTopicDetails(res
, ctx
) {
643 const _scope
= _fileScope('getTopicDetails');
644 this.logger
.debug(_scope
, 'called', { ctx
});
647 ctx
.publishSpan
= 60;
648 const topicId
= ctx
.params
.topicId
;
650 await
this.db
.context(async (dbCtx
) => {
651 ctx
.topic
= await
this.db
.topicGetById(dbCtx
, topicId
);
652 ctx
.subscriptions
= await
this.db
.subscriptionsByTopicId(dbCtx
, topicId
);
653 publishHistory
= await
this.db
.topicPublishHistory(dbCtx
, topicId
, ctx
.publishSpan
);
655 ctx
.publishCount
= publishHistory
.reduce((a
, b
) => a
+ b
, 0);
656 this.logger
.debug(_scope
, 'got topic details', { topic: ctx
.topic
, subscriptions: ctx
.subscriptions
, updates: ctx
.publishCount
});
658 // Profile users can only see related topics.
659 if (ctx
.session
&& ctx
.session
.authenticatedProfile
) {
660 const profileUrlObj
= new URL(ctx
.session
.authenticatedProfile
);
661 const topicUrlObj
= new URL(ctx
.topic
.url
);
662 if (topicUrlObj
.hostname
!== profileUrlObj
.hostname
) {
664 ctx
.subscriptions
= [];
668 res
.end(Template
.adminTopicDetailsHTML(ctx
, this.options
));
669 this.logger
.info(_scope
, 'finished', { ctx
, subscriptions: ctx
.subscriptions
.length
, topic: ctx
.topic
&& ctx
.topic
.id
|| ctx
.topic
});
674 * PATCH and DELETE for updating topic data.
675 * @param {http.ServerResponse} res
676 * @param {Object} ctx
678 async
updateTopic(res
, ctx
) {
679 const _scope
= _fileScope('updateTopic');
680 this.logger
.debug(_scope
, 'called', { ctx
});
682 const topicId
= ctx
.params
.topicId
;
684 await
this.db
.context(async (dbCtx
) => {
685 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
686 // Get topic without defaults filled in, to persist nulls
687 const topic
= await
this.db
.topicGetById(txCtx
, topicId
, false);
689 this.logger
.debug(_scope
, 'no topic', { ctx
});
690 throw new Errors
.ResponseError(Enum
.ErrorResponse
.NotFound
);
693 if (ctx
.method
=== 'DELETE') {
694 await
this.db
.topicDeleted(txCtx
, topicId
);
696 this.logger
.info(_scope
, 'topic set deleted', { ctx
, topicId
});
697 // Attempt to remove from db if no active subscriptions.
698 await
this.db
.topicPendingDelete(txCtx
, topicId
);
702 const updatableFields
= [
703 'leaseSecondsPreferred',
706 'publisherValidationUrl',
707 'contentHashAlgorithm',
710 const patchValues
= common
.pick({
716 'leaseSecondsPreferred',
719 ].filter((field
) => field
in patchValues
).forEach((field
) => {
720 // eslint-disable-next-line security/detect-object-injection
721 patchValues
[field
] = parseInt(patchValues
[field
], 10);
724 const patchKeys
= Object
.keys(patchValues
);
725 if (patchKeys
.length
=== 0
726 // eslint-disable-next-line security/detect-object-injection
727 || patchKeys
.every((k
) => patchValues
[k
] == topic
[k
])) {
728 res
.statusCode
= 204;
730 this.logger
.info(_scope
, 'empty topic update', { ctx
, topicId
});
733 const patchedTopic
= {
738 this.logger
.debug(_scope
, 'data', { topic
, patchValues
, patchedTopic
});
741 await
this.db
.topicUpdate(txCtx
, { topicId
, ...patchedTopic
});
743 if (e
instanceof DBErrors
.DataValidation
) {
744 this.logger
.debug(_scope
, 'validation error', { error: e
, ctx
, topicId
});
745 throw new Errors
.ResponseError(Enum
.ErrorResponse
.BadRequest
, e
.message
);
747 this.logger
.error(_scope
, 'failed', { error: e
, ctx
, topicId
});
751 this.logger
.info(_scope
, 'topic updated', { ctx
, topicId
, patchValues
});
758 * PATCH and DELETE for updating subscription data.
759 * @param {http.ServerResponse} res
760 * @param {Object} ctx
762 async
updateSubscription(res
, ctx
) {
763 const _scope
= _fileScope('updateSubscription');
764 this.logger
.debug(_scope
, 'called', { ctx
});
766 const subscriptionId
= ctx
.params
.subscriptionId
;
768 await
this.db
.context(async (dbCtx
) => {
769 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
770 const subscription
= await
this.db
.subscriptionGetById(txCtx
, subscriptionId
);
772 this.logger
.debug(_scope
, 'no subscription', { ctx
});
773 throw new Errors
.ResponseError(Enum
.ErrorResponse
.NotFound
);
776 if (ctx
.method
=== 'DELETE') {
777 const deleteFields
= common
.pick({
782 // Queue an unsubscription.
783 const verification
= {
784 topicId: subscription
.topicId
,
785 callback: subscription
.callback
,
786 mode: Enum
.Mode
.Denied
,
787 reason: 'subscription removed by administrative action',
788 isPublisherValidated: true,
789 requestId: ctx
.requestId
,
793 await
this.db
.verificationInsert(txCtx
, verification
);
794 this.logger
.info(_scope
, 'subscription removal initiated', { ctx
, verification
});
799 const updatableFields
= [
800 'signatureAlgorithm',
803 const patchValues
= common
.pick({
808 const patchKeys
= Object
.keys(patchValues
);
809 if (patchKeys
.length
=== 0
810 // eslint-disable-next-line security/detect-object-injection
811 || patchKeys
.every((k
) => patchValues
[k
] == subscription
[k
])) {
812 res
.statusCode
= 204;
816 const patchedSubscription
= {
822 await
this.db
.subscriptionUpdate(txCtx
, { subscriptionId
, ...patchedSubscription
});
824 if (e
instanceof DBErrors
.DataValidation
) {
825 this.logger
.debug(_scope
, 'validation error', { error: e
, ctx
, subscriptionId
});
826 throw new Errors
.ResponseError(Enum
.ErrorResponse
.BadRequest
, e
.message
);
828 this.logger
.info(_scope
, 'failed', { error: e
, ctx
, subscriptionId
});
832 this.logger
.info(_scope
, 'subscription updated', { ctx
, subscriptionId
, patchValues
});
838 * POST request for manually running worker.
839 * @param {http.ServerResponse} res
840 * @param {object} ctx
842 async
processTasks(res
, ctx
) {
843 const _scope
= _fileScope('processTasks');
844 this.logger
.debug(_scope
, 'called', { ctx
});
846 // N.B. no await on this
847 this.communication
.worker
.process().catch((e
) => {
848 this.logger
.error(_scope
, 'failed', { error: e
, ctx
});
852 this.logger
.info(_scope
, 'invoked worker process', { ctx
});
857 module
.exports
= Manager
;