cae9e74ce734b6fec771f4c9ceb3927c8e1b9028
4 * Here we process all the incoming requests.
5 * Payload validation occurs here, before committing the pending work to the
6 * database and (usually) calling a processor to act upon it.
8 * As this is the mediator between server framework and actions, this would
9 * be where most of a rewrite for a new server framework would happen.
12 const common
= require('./common');
13 const Communication
= require('./communication');
14 const Enum
= require('./enum');
15 const Errors
= require('./errors');
16 const DBErrors
= require('./db/errors');
17 const { ResponseError
} = require('./errors');
18 const Template
= require('./template');
20 const _fileScope
= common
.fileScope(__filename
);
23 constructor(logger
, db
, options
) {
26 this.options
= options
;
27 this.communication
= new Communication(logger
, db
, options
);
29 // Precalculate the invariant root GET metadata.
30 this.getRootContent
= Template
.rootHTML(undefined, options
);
31 const now
= new Date();
32 this.startTimeString
= now
.toGMTString();
33 this.startTimeMs
= now
.getTime();
34 this.getRootETag
= common
.generateETag(undefined, undefined, this.getRootContent
);
39 * GET request for healthcheck.
40 * @param {http.ServerResponse} res
43 async
getHealthcheck(res
, ctx
) {
44 const _scope
= _fileScope('getHealthcheck');
45 const health
= 'happy';
47 // What else could we check...
48 const dbHealth
= await
this.db
.healthCheck();
49 this.logger
.debug(_scope
, 'called', { health
, dbHealth
, ctx
});
55 * GET request for root.
56 * @param {http.ServerResponse} res
59 async
getRoot(req
, res
, ctx
) {
60 const _scope
= _fileScope('getRoot');
61 this.logger
.debug(_scope
, 'called', { ctx
});
63 res
.setHeader(Enum
.Header
.LastModified
, this.startTimeString
);
64 res
.setHeader(Enum
.Header
.ETag
, this.getRootETag
);
66 if (common
.isClientCached(req
, this.startTimeMs
, this.getRootETag
)) {
67 this.logger
.debug(_scope
, 'client cached response', { ctx
});
72 res
.end(this.getRootContent
);
73 this.logger
.info(_scope
, 'finished', { ctx
});
77 /** All the fields the root handler deals with.
78 * @typedef {object} RootData
79 * @property {string} callback - url
80 * @property {string} mode
81 * @property {string} topic
82 * @property {number} topicId
83 * @property {string} leaseSeconds
84 * @property {string} secret
85 * @property {string} httpRemoteAddr
86 * @property {string} httpFrom
87 * @property {boolean} isSecure
88 * @property {boolean} isPublisherValidated
92 * Extract api parameters.
93 * @param {http.ClientRequest} req
97 static _getRootData(req
, ctx
) {
98 const postData
= ctx
.parsedBody
;
99 const mode
= (postData
['hub.mode'] || '').toLowerCase();
101 callback: postData
['hub.callback'],
103 ...(mode
=== Enum
.Mode
.Publish
&& { url: postData
['hub.url'] }), // Publish accepts either hub.url or hub.topic
104 topic: postData
['hub.topic'],
105 ...(postData
['hub.lease_seconds'] && { leaseSeconds: parseInt(postData
['hub.lease_seconds'], 10) }),
106 secret: postData
['hub.secret'],
107 httpRemoteAddr: ctx
.clientAddress
,
108 httpFrom: req
.getHeader(Enum
.Header
.From
),
109 isSecure: ((ctx
.clientProtocol
|| '').toLowerCase() === 'https'),
110 isPublisherValidated: true, // Default to true. Will be set to false later, if topic has publisher validation url.
118 * @param {RootData} data
119 * @param {String[]} warn
120 * @param {String[]} err
121 * @param {String} requestId
123 async
_validateRootData(dbCtx
, data
, warn
, err
, requestId
) {
124 // These checks can modify data, so order matters.
125 await
this._checkTopic(dbCtx
, data
, warn
, err
, requestId
);
126 this._checkCallbackAndSecrets(data
, warn
, err
, requestId
);
127 await
this._checkMode(dbCtx
, data
, warn
, err
, requestId
);
132 * Check that requested topic exists and values are in range.
133 * Sets topic id, publisher validation state, and requested lease
136 * @param {RootData} data
137 * @param {String[]} warn
138 * @param {String[]} err
140 async
_checkTopic(dbCtx
, data
, warn
, err
, requestId
) {
141 const _scope
= _fileScope('_checkTopic');
145 topic
= await
this.db
.topicGetByUrl(dbCtx
, data
.topic
);
147 if (!topic
&& this._newTopicCreationAllowed()) {
148 this.logger
.info(_scope
, 'new topic from subscribe request', { data
, requestId
});
153 err
.push('invalid topic url (failed to parse url)');
157 await
this.db
.topicSet(dbCtx
, {
160 topic
= await
this.db
.topicGetByUrl(dbCtx
, data
.topic
);
164 if (!topic
|| topic
.isDeleted
) {
165 err
.push('not a supported topic');
169 data
.topicId
= topic
.id
;
171 if (data
.leaseSeconds
=== undefined || isNaN(data
.leaseSeconds
)) {
172 data
.leaseSeconds
= topic
.leaseSecondsPreferred
;
174 if (data
.leaseSeconds
> topic
.leaseSecondsMax
) {
175 data
.leaseSeconds
= topic
.leaseSecondsMax
;
176 warn
.push(`requested lease too long, using ${data.leaseSeconds}`);
177 } else if (data
.leaseSeconds
< topic
.leaseSecondsMin
) {
178 data
.leaseSeconds
= topic
.leaseSecondsMin
;
179 warn
.push(`requested lease too short, using ${data.leaseSeconds}`);
183 if (topic
.publisherValidationUrl
) {
184 data
.isPublisherValidated
= false;
190 * Check data for valid callback url and scheme constraints.
191 * @param {RootData} data
192 * @param {String[]} warn
193 * @param {String[]} err
195 _checkCallbackAndSecrets(data
, warn
, err
) {
196 let isCallbackSecure
= false;
198 if (!data
.callback
) {
199 err
.push('invalid callback url (empty)');
202 const c
= new URL(data
.callback
);
203 isCallbackSecure
= (c
.protocol
.toLowerCase() === 'https:'); // Colon included because url module is weird
205 err
.push('invalid callback url (failed to parse url');
210 if (!isCallbackSecure
) {
211 warn
.push('insecure callback');
215 const secretSeverity
= this.options
.manager
.strictSecrets
? err : warn
;
216 if (!data
.isSecure
) {
217 secretSeverity
.push('secret not safe (insecure hub)');
219 if (!isCallbackSecure
) {
220 secretSeverity
.push('secret not safe (insecure callback)');
222 if (data
.secret
.length
> 199) {
223 err
.push('cannot keep a secret that big');
229 * Check mode validity and subscription requirements.
230 * Publish mode is handled elsewhere in the flow.
232 * @param {RootData} data
233 * @param {String[]} warn
234 * @param {String[]} err
235 * @param {String} requestId
237 async
_checkMode(dbCtx
, data
, warn
, err
) {
239 case Enum
.Mode
.Subscribe:
242 case Enum
.Mode
.Unsubscribe: {
243 const currentEpoch
= Date
.now() / 1000;
245 if (data
.callback
&& data
.topicId
) {
246 s
= await
this.db
.subscriptionGet(dbCtx
, data
.callback
, data
.topicId
);
248 if (s
=== undefined) {
249 err
.push('not subscribed');
251 if (s
.expires
< currentEpoch
) {
252 err
.push('subscription already expired');
259 err
.push('invalid mode');
266 * Determine if a topic url is allowed to be created.
267 * In the future, this may be more complicated.
270 _newTopicCreationAllowed() {
271 return this.options
.manager
.publicHub
;
276 * Check that a publish request's topic(s) are valid and exist,
277 * returning an array with the results for each.
278 * For a public-hub publish request, creates topics if they do not exist.
280 * @param {RootData} data
281 * @param {String[]} warn
282 * @param {String[]} err
283 * @param {String} requestId
285 async
_publishTopics(dbCtx
, data
, requestId
) {
286 const _scope
= _fileScope('_checkPublish');
288 // Publish requests may include multiple topics, consider them all, but deduplicate.
289 const publishUrls
= Array
.from(new Set([
290 ...common
.ensureArray(data
.url
),
291 ...common
.ensureArray(data
.topic
),
294 // Map the requested topics to their ids, creating if necessary.
295 return Promise
.all(publishUrls
.map(async (url
) => {
302 let topic
= await
this.db
.topicGetByUrl(dbCtx
, url
);
303 if (!topic
&& this._newTopicCreationAllowed()) {
307 result
.err
.push('invalid topic url (failed to parse url)');
310 await
this.db
.topicSet(dbCtx
, {
311 // TODO: accept a publisherValidationUrl parameter
314 topic
= await
this.db
.topicGetByUrl(dbCtx
, url
);
315 this.logger
.info(_scope
, 'new topic from publish request', { url
, requestId
});
317 if (!topic
|| topic
.isDeleted
) {
318 result
.err
.push('topic not supported');
321 result
.topicId
= topic
.id
;
328 * Render response for multi-topic publish requests.
329 * @param {Object[]} publishTopics
331 static multiPublishContent(ctx
, publishTopics
) {
332 const responses
= publishTopics
.map((topic
) => ({
334 status: topic
.status
,
335 statusMessage: topic
.statusMessage
,
337 warnings: topic
.warn
,
339 switch (ctx
.responseType
) {
340 case Enum
.ContentType
.ApplicationJson:
341 return JSON
.stringify(responses
);
343 case Enum
.ContentType
.TextPlain:
345 const textResponses
= responses
.map((response
) => {
346 const details
= Manager
._prettyDetails(response
.errors
, response
.warnings
);
347 const textDetails
= (details
.length
? '\n' : '') + details
.map((d
) => `\t${d}`).join('\n');
348 return `${response.href} [${response.status} ${response.statusMessage}]${textDetails}`;
350 return textResponses
.join('\n----\n');
357 * Process a publish request.
359 * @param {Object} data
360 * @param {http.ServerResponse} res
361 * @param {Object} ctx
363 async
_publishRequest(dbCtx
, data
, res
, ctx
) {
364 const _scope
= _fileScope('_parsePublish');
365 this.logger
.debug(_scope
, 'called', { data
});
367 const requestId
= ctx
.requestId
;
369 // Parse and validate all the topics in the request.
370 data
.publishTopics
= await
this._publishTopics(dbCtx
, data
, requestId
);
371 if (!data
.publishTopics
|| !data
.publishTopics
.length
) {
372 const details
= Manager
._prettyDetails(['no valid topic urls to publish'], []);
373 throw new ResponseError(Enum
.ErrorResponse
.BadRequest
, details
);
376 // Set status per topic
377 for (const topicResult
of data
.publishTopics
) {
378 topicResult
.status
= topicResult
.err
.length
? 400 : 202;
379 topicResult
.statusMessage
= topicResult
.err
.length
? 'Bad Request' : 'Accepted';
382 // Process the valid publish notifications
383 const validPublishTopics
= data
.publishTopics
.filter((topicResult
) => !topicResult
.err
.length
);
385 await Promise
.all(validPublishTopics
.map(async (topicResult
) => this.db
.topicFetchRequested(dbCtx
, topicResult
.topicId
)));
387 this.logger
.error(_scope
, 'topicFetchRequest failed', { error: e
, ctx
, data
, requestId
});
391 this.logger
.info(_scope
, 'request accepted', { ctx
, data
, requestId
});
393 if (data
.publishTopics
.length
=== 1) {
394 const soleTopic
= data
.publishTopics
[0];
395 res
.statusCode
= soleTopic
.status
;
396 res
.end(Manager
._prettyDetails(soleTopic
.err
, soleTopic
.warn
).join('\n'));
398 res
.statusCode
= 207;
399 res
.end(Manager
.multiPublishContent(ctx
, data
.publishTopics
));
402 if (this.options
.manager
.processImmediately
403 && validPublishTopics
.length
) {
405 await Promise
.all(validPublishTopics
.map(async (topicResult
) => this.communication
.topicFetchClaimAndProcessById(dbCtx
, topicResult
.topicId
, requestId
)));
407 this.logger
.error(_scope
, 'topicFetchClaimAndProcessById failed', { data
, validPublishTopics
, requestId
});
408 // Don't bother re-throwing, as we've already ended this response.
415 * Annotate any encountered issues.
416 * @param {String[]} err
417 * @param {String[]} warn
418 * @returns {String[]}
420 static _prettyDetails(err
, warn
) {
422 ...err
.map((entry
) => `error: ${entry}`),
423 ...warn
.map((entry
) => `warning: ${entry}`),
429 * POST request for root.
430 * @param {http.ClientRequest} req
431 * @param {http.ServerResponse} res
432 * @param {object} ctx
434 async
postRoot(req
, res
, ctx
) {
435 const _scope
= _fileScope('postRoot');
436 this.logger
.debug(_scope
, 'called', { ctx
});
438 res
.statusCode
= 202; // Presume success.
442 const data
= Manager
._getRootData(req
, ctx
);
443 const requestId
= ctx
.requestId
;
445 await
this.db
.context(async (dbCtx
) => {
447 // Handle publish requests elsewhere
448 if (data
.mode
=== Enum
.Mode
.Publish
) {
449 return this._publishRequest(dbCtx
, data
, res
, ctx
);
452 await
this._validateRootData(dbCtx
, data
, warn
, err
, requestId
);
454 const details
= Manager
._prettyDetails(err
, warn
);
456 // Any errors are fatal. Stop and report anything that went wrong.
458 this.logger
.debug(_scope
, { msg: 'invalid request', data
, err
, warn
, requestId
});
459 throw new ResponseError(Enum
.ErrorResponse
.BadRequest
, details
);
462 // Commit the request for later processing.
465 id
= await
this.db
.verificationInsert(dbCtx
, { ...data
, requestId
});
467 this.logger
.error(_scope
, 'verificationInsert failed', { error: e
, data
, warn
, id
, requestId
});
471 // If we committed to the db, we've succeeded as far as the client is concerned.
472 res
.end(details
.join('\n'));
473 this.logger
.info(_scope
, 'request accepted', { data
, warn
, requestId
});
475 // Immediately attempt to claim and process the request.
476 if (this.options
.manager
.processImmediately
479 await
this.communication
.verificationClaimAndProcessById(dbCtx
, id
, requestId
);
481 this.logger
.error(_scope
, 'verificationClaimAndProcessById failed', { ...data
, id
, requestId
});
482 // Don't bother re-throwing, as we've already ended this response.
490 * Render topic info content.
491 * @param {Object} ctx
492 * @param {String} ctx.responseType
493 * @param {String} ctx.topicUrl
494 * @param {Number} ctx.count
497 // eslint-disable-next-line class-methods-use-this
499 // eslint-disable-next-line sonarjs/no-small-switch
500 switch (ctx
.responseType
) {
501 case Enum
.ContentType
.ApplicationJson:
502 return JSON
.stringify({
507 case Enum
.ContentType
.ImageSVG:
508 return Template
.badgeSVG({}, ` ${ctx.topicUrl} `, ` ${ctx.count} subscribers `, `${ctx.topicUrl} has ${ctx.count} subscribers.`);
511 return ctx
.count
.toString();
517 * GET request for /info?topic=url&format=type
518 * @param {http.ServerResponse} res
519 * @param {object} ctx
521 async
getInfo(res
, ctx
) {
522 const _scope
= _fileScope('getInfo');
523 this.logger
.debug(_scope
, 'called', { ctx
});
525 if (!ctx
.queryParams
.topic
) {
526 throw new ResponseError(Enum
.ErrorResponse
.BadRequest
, 'missing required parameter');
528 ctx
.topicUrl
= ctx
.queryParams
.topic
;
530 switch ((ctx
.queryParams
.format
|| '').toLowerCase()) {
532 ctx
.responseType
= Enum
.ContentType
.ImageSVG
;
533 res
.setHeader(Enum
.Header
.ContentType
, ctx
.responseType
);
537 ctx
.responseType
= Enum
.ContentType
.ApplicationJson
;
538 res
.setHeader(Enum
.Header
.ContentType
, ctx
.responseType
);
546 new URL(ctx
.topicUrl
);
548 throw new ResponseError(Enum
.ErrorResponse
.BadRequest
, 'invalid topic');
552 await
this.db
.context(async (dbCtx
) => {
553 count
= await
this.db
.subscriptionCountByTopicUrl(dbCtx
, ctx
.topicUrl
);
555 throw new ResponseError(Enum
.ErrorResponse
.NotFound
, 'no such topic');
557 ctx
.count
= count
.count
;
560 res
.end(this.infoContent(ctx
));
561 this.logger
.info(_scope
, 'finished', { ...ctx
});
566 * GET request for authorized /admin information.
567 * @param {http.ServerResponse} res
568 * @param {object} ctx
570 async
getAdminOverview(res
, ctx
) {
571 const _scope
= _fileScope('getAdminOverview');
572 this.logger
.debug(_scope
, 'called', { ctx
});
574 await
this.db
.context(async (dbCtx
) => {
575 ctx
.topics
= await
this.db
.topicGetAll(dbCtx
);
577 this.logger
.debug(_scope
, 'got topics', { topics: ctx
.topics
});
579 res
.end(Template
.adminOverviewHTML(ctx
, this.options
));
580 this.logger
.info(_scope
, 'finished', { ...ctx
, topics: ctx
.topics
.length
})
585 * GET request for authorized /admin/topic/:topicId information.
586 * @param {http.ServerResponse} res
587 * @param {object} ctx
589 async
getTopicDetails(res
, ctx
) {
590 const _scope
= _fileScope('getTopicDetails');
591 this.logger
.debug(_scope
, 'called', { ctx
});
593 const topicId
= ctx
.params
.topicId
;
594 await
this.db
.context(async (dbCtx
) => {
595 ctx
.topic
= await
this.db
.topicGetById(dbCtx
, topicId
);
596 ctx
.subscriptions
= await
this.db
.subscriptionsByTopicId(dbCtx
, topicId
);
598 this.logger
.debug(_scope
, 'got topic details', { topic: ctx
.topic
, subscriptions: ctx
.subscriptions
});
600 res
.end(Template
.adminTopicDetailsHTML(ctx
, this.options
));
601 this.logger
.info(_scope
, 'finished', { ...ctx
, subscriptions: ctx
.subscriptions
.length
, topic: ctx
.topic
.id
});
606 * PATCH and DELETE for updating topic data.
607 * @param {http.ServerResponse} res
608 * @param {Object} ctx
610 async
updateTopic(res
, ctx
) {
611 const _scope
= _fileScope('updateTopic');
612 this.logger
.debug(_scope
, 'called', { ctx
});
614 const topicId
= ctx
.params
.topicId
;
616 await
this.db
.context(async (dbCtx
) => {
617 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
618 // Get topic without defaults filled in, to persist nulls
619 const topic
= await
this.db
.topicGetById(txCtx
, topicId
, false);
621 this.logger
.debug(_scope
, 'no topic', { ctx
});
622 throw new Errors
.ResponseError(Enum
.ErrorResponse
.NotFound
);
625 if (ctx
.method
=== 'DELETE') {
626 await
this.db
.topicDeleted(txCtx
, topicId
);
628 this.logger
.info(_scope
, 'topic set deleted', { ctx
, topicId
});
629 // Attempt to remove from db if no active subscriptions.
630 await
this.db
.topicPendingDelete(txCtx
, topicId
);
634 const updatableFields
= [
635 'leaseSecondsPreferred',
638 'publisherValidationUrl',
639 'contentHashAlgorithm',
642 const patchValues
= common
.pick({
648 'leaseSecondsPreferred',
651 ].filter((field
) => field
in patchValues
).forEach((field
) => {
652 // eslint-disable-next-line security/detect-object-injection
653 patchValues
[field
] = parseInt(patchValues
[field
], 10);
656 const patchKeys
= Object
.keys(patchValues
);
657 if (patchKeys
.length
=== 0
658 // eslint-disable-next-line security/detect-object-injection
659 || patchKeys
.every((k
) => patchValues
[k
] == topic
[k
])) {
660 res
.statusCode
= 204;
662 this.logger
.info(_scope
, 'empty topic update', { ctx
, topicId
});
665 const patchedTopic
= {
670 this.logger
.debug(_scope
, 'data', { topic
, patchValues
, patchedTopic
});
673 await
this.db
.topicUpdate(txCtx
, { topicId
, ...patchedTopic
});
675 if (e
instanceof DBErrors
.DataValidation
) {
676 this.logger
.debug(_scope
, 'validation error', { error: e
, ctx
, topicId
});
677 throw new Errors
.ResponseError(Enum
.ErrorResponse
.BadRequest
, e
.message
);
679 this.logger
.error(_scope
, 'failed', { error: e
, ctx
, topicId
});
683 this.logger
.info(_scope
, 'topic updated', { ctx
, topicId
, patchValues
});
690 * PATCH and DELETE for updating subscription data.
691 * @param {http.ServerResponse} res
692 * @param {Object} ctx
694 async
updateSubscription(res
, ctx
) {
695 const _scope
= _fileScope('updateSubscription');
696 this.logger
.debug(_scope
, 'called', { ctx
});
698 const subscriptionId
= ctx
.params
.subscriptionId
;
700 await
this.db
.context(async (dbCtx
) => {
701 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
702 const subscription
= await
this.db
.subscriptionGetById(txCtx
, subscriptionId
);
704 this.logger
.debug(_scope
, 'no subscription', { ctx
});
705 throw new Errors
.ResponseError(Enum
.ErrorResponse
.NotFound
);
708 if (ctx
.method
=== 'DELETE') {
709 const deleteFields
= common
.pick({
714 // Queue an unsubscription.
715 const verification
= {
716 topicId: subscription
.topicId
,
717 callback: subscription
.callback
,
718 mode: Enum
.Mode
.Denied
,
719 reason: 'subscription removed by administrative action',
720 isPublisherValidated: true,
721 requestId: ctx
.requestId
,
725 await
this.db
.verificationInsert(txCtx
, verification
);
726 this.logger
.info(_scope
, 'subscription removal initiated', { ctx
, verification
});
731 const updatableFields
= [
732 'signatureAlgorithm',
735 const patchValues
= common
.pick({
740 const patchKeys
= Object
.keys(patchValues
);
741 if (patchKeys
.length
=== 0
742 // eslint-disable-next-line security/detect-object-injection
743 || patchKeys
.every((k
) => patchValues
[k
] == subscription
[k
])) {
744 res
.statusCode
= 204;
748 const patchedSubscription
= {
754 await
this.db
.subscriptionUpdate(txCtx
, { subscriptionId
, ...patchedSubscription
});
756 if (e
instanceof DBErrors
.DataValidation
) {
757 this.logger
.debug(_scope
, 'validation error', { error: e
, ctx
, subscriptionId
});
758 throw new Errors
.ResponseError(Enum
.ErrorResponse
.BadRequest
, e
.message
);
760 this.logger
.info(_scope
, 'failed', { error: e
, ctx
, subscriptionId
});
764 this.logger
.info(_scope
, 'subscription updated', { ctx
, subscriptionId
, patchValues
});
770 * POST request for manually running worker.
771 * @param {http.ServerResponse} res
772 * @param {object} ctx
774 async
processTasks(res
, ctx
) {
775 const _scope
= _fileScope('getTopicDetails');
776 this.logger
.debug(_scope
, 'called', { ctx
});
778 // N.B. no await on this
779 this.communication
.worker
.process().catch((e
) => {
780 this.logger
.error(_scope
, 'failed', { error: e
, ctx
});
784 this.logger
.info(_scope
, 'invoked worker process', { ctx
});
789 module
.exports
= Manager
;