667fb301ef83d1abffb53b53470a25588f11d10d
4 * Here we process all the incoming requests.
5 * Payload validation occurs here, before committing the pending work to the
6 * database and (usually) calling a processor to act upon it.
8 * As this is the mediator between server framework and actions, this would
9 * be where most of a rewrite for a new server framework would happen.
12 const common
= require('./common');
13 const Communication
= require('./communication');
14 const Enum
= require('./enum');
15 const Errors
= require('./errors');
16 const DBErrors
= require('./db/errors');
17 const { ResponseError
} = require('./errors');
18 const Template
= require('./template');
20 const _fileScope
= common
.fileScope(__filename
);
23 constructor(logger
, db
, options
) {
26 this.options
= options
;
27 this.communication
= new Communication(logger
, db
, options
);
29 // Precalculate the invariant root GET metadata.
30 this.getRootContent
= Template
.rootHTML(undefined, options
);
31 const now
= new Date();
32 this.startTimeString
= now
.toGMTString();
33 this.startTimeMs
= now
.getTime();
34 this.getRootETag
= common
.generateETag(undefined, undefined, this.getRootContent
);
39 * GET request for healthcheck.
40 * @param {http.ServerResponse} res
43 async
getHealthcheck(res
, ctx
) {
44 const _scope
= _fileScope('getHealthcheck');
45 const health
= 'happy';
47 // What else could we check...
48 const dbHealth
= await
this.db
.healthCheck();
49 this.logger
.debug(_scope
, 'called', { health
, dbHealth
, ctx
});
55 * GET request for root.
56 * @param {http.ClientRequest} req
57 * @param {http.ServerResponse} res
60 async
getRoot(req
, res
, ctx
) {
61 const _scope
= _fileScope('getRoot');
62 this.logger
.debug(_scope
, 'called', { ctx
});
64 res
.setHeader(Enum
.Header
.LastModified
, this.startTimeString
);
65 res
.setHeader(Enum
.Header
.ETag
, this.getRootETag
);
67 if (common
.isClientCached(req
, this.startTimeMs
, this.getRootETag
)) {
68 this.logger
.debug(_scope
, 'client cached response', { ctx
});
73 res
.end(this.getRootContent
);
74 this.logger
.info(_scope
, 'finished', { ctx
});
78 /** All the fields the root handler deals with.
79 * @typedef {object} RootData
80 * @property {string} callback - url
81 * @property {string} mode
82 * @property {string} topic
83 * @property {number} topicId
84 * @property {string} leaseSeconds
85 * @property {string} secret
86 * @property {string} httpRemoteAddr
87 * @property {string} httpFrom
88 * @property {boolean} isSecure
89 * @property {boolean} isPublisherValidated
93 * Extract api parameters.
94 * @param {http.ClientRequest} req
98 static _getRootData(req
, ctx
) {
99 const postData
= ctx
.parsedBody
;
100 const mode
= (postData
['hub.mode'] || '').toLowerCase();
102 callback: postData
['hub.callback'],
104 ...(mode
=== Enum
.Mode
.Publish
&& { url: postData
['hub.url'] }), // Publish accepts either hub.url or hub.topic
105 topic: postData
['hub.topic'],
106 ...(postData
['hub.lease_seconds'] && { leaseSeconds: parseInt(postData
['hub.lease_seconds'], 10) }),
107 secret: postData
['hub.secret'],
108 httpRemoteAddr: ctx
.clientAddress
,
109 httpFrom: req
.getHeader(Enum
.Header
.From
),
110 isSecure: ((ctx
.clientProtocol
|| '').toLowerCase() === 'https'),
111 isPublisherValidated: true, // Default to true. Will be set to false later, if topic has publisher validation url.
119 * @param {RootData} data
120 * @param {String[]} warn
121 * @param {String[]} err
122 * @param {String} requestId
124 async
_validateRootData(dbCtx
, data
, warn
, err
, requestId
) {
125 // These checks can modify data, so order matters.
126 await
this._checkTopic(dbCtx
, data
, warn
, err
, requestId
);
127 this._checkCallbackAndSecrets(data
, warn
, err
, requestId
);
128 await
this._checkMode(dbCtx
, data
, warn
, err
, requestId
);
133 * Check that requested topic exists and values are in range.
134 * Sets topic id, publisher validation state, and requested lease
137 * @param {RootData} data
138 * @param {String[]} warn
139 * @param {String[]} err
141 async
_checkTopic(dbCtx
, data
, warn
, err
, requestId
) {
142 const _scope
= _fileScope('_checkTopic');
146 topic
= await
this.db
.topicGetByUrl(dbCtx
, data
.topic
);
148 if (!topic
&& this._newTopicCreationAllowed()) {
149 this.logger
.info(_scope
, 'new topic from subscribe request', { data
, requestId
});
154 err
.push('invalid topic url (failed to parse url)');
158 await
this.db
.topicSet(dbCtx
, {
161 topic
= await
this.db
.topicGetByUrl(dbCtx
, data
.topic
);
165 if (!topic
|| topic
.isDeleted
) {
166 err
.push('not a supported topic');
170 data
.topicId
= topic
.id
;
172 if (data
.leaseSeconds
=== undefined || isNaN(data
.leaseSeconds
)) {
173 data
.leaseSeconds
= topic
.leaseSecondsPreferred
;
175 if (data
.leaseSeconds
> topic
.leaseSecondsMax
) {
176 data
.leaseSeconds
= topic
.leaseSecondsMax
;
177 warn
.push(`requested lease too long, using ${data.leaseSeconds}`);
178 } else if (data
.leaseSeconds
< topic
.leaseSecondsMin
) {
179 data
.leaseSeconds
= topic
.leaseSecondsMin
;
180 warn
.push(`requested lease too short, using ${data.leaseSeconds}`);
184 if (topic
.publisherValidationUrl
) {
185 data
.isPublisherValidated
= false;
191 * Check data for valid callback url and scheme constraints.
192 * @param {RootData} data
193 * @param {String[]} warn
194 * @param {String[]} err
196 _checkCallbackAndSecrets(data
, warn
, err
) {
197 let isCallbackSecure
= false;
199 if (!data
.callback
) {
200 err
.push('invalid callback url (empty)');
203 const c
= new URL(data
.callback
);
204 isCallbackSecure
= (c
.protocol
.toLowerCase() === 'https:'); // Colon included because url module is weird
206 err
.push('invalid callback url (failed to parse url');
211 if (!isCallbackSecure
) {
212 warn
.push('insecure callback');
216 const secretSeverity
= this.options
.manager
.strictSecrets
? err : warn
;
217 if (!data
.isSecure
) {
218 secretSeverity
.push('secret not safe (insecure hub)');
220 if (!isCallbackSecure
) {
221 secretSeverity
.push('secret not safe (insecure callback)');
223 if (data
.secret
.length
> 199) {
224 err
.push('cannot keep a secret that big');
230 * Check mode validity and subscription requirements.
231 * Publish mode is handled elsewhere in the flow.
233 * @param {RootData} data
234 * @param {String[]} warn
235 * @param {String[]} err
236 * @param {String} requestId
238 async
_checkMode(dbCtx
, data
, warn
, err
) {
240 case Enum
.Mode
.Subscribe:
243 case Enum
.Mode
.Unsubscribe: {
244 const currentEpoch
= Date
.now() / 1000;
246 if (data
.callback
&& data
.topicId
) {
247 s
= await
this.db
.subscriptionGet(dbCtx
, data
.callback
, data
.topicId
);
249 if (s
=== undefined) {
250 err
.push('not subscribed');
252 if (s
.expires
< currentEpoch
) {
253 err
.push('subscription already expired');
260 err
.push('invalid mode');
267 * Determine if a topic url is allowed to be created.
268 * In the future, this may be more complicated.
271 _newTopicCreationAllowed() {
272 return this.options
.manager
.publicHub
;
277 * Check that a publish request's topic(s) are valid and exist,
278 * returning an array with the results for each.
279 * For a public-hub publish request, creates topics if they do not exist.
281 * @param {RootData} data
282 * @param {String[]} warn
283 * @param {String[]} err
284 * @param {String} requestId
286 async
_publishTopics(dbCtx
, data
, requestId
) {
287 const _scope
= _fileScope('_checkPublish');
289 // Publish requests may include multiple topics, consider them all, but deduplicate.
290 const publishUrls
= Array
.from(new Set([
291 ...common
.ensureArray(data
.url
),
292 ...common
.ensureArray(data
.topic
),
295 // Map the requested topics to their ids, creating if necessary.
296 return Promise
.all(publishUrls
.map(async (url
) => {
303 let topic
= await
this.db
.topicGetByUrl(dbCtx
, url
);
304 if (!topic
&& this._newTopicCreationAllowed()) {
308 result
.err
.push('invalid topic url (failed to parse url)');
311 await
this.db
.topicSet(dbCtx
, {
312 // TODO: accept a publisherValidationUrl parameter
315 topic
= await
this.db
.topicGetByUrl(dbCtx
, url
);
316 this.logger
.info(_scope
, 'new topic from publish request', { url
, requestId
});
318 if (!topic
|| topic
.isDeleted
) {
319 result
.err
.push('topic not supported');
322 result
.topicId
= topic
.id
;
329 * Render response for multi-topic publish requests.
330 * @param {Object[]} publishTopics
332 static multiPublishContent(ctx
, publishTopics
) {
333 const responses
= publishTopics
.map((topic
) => ({
335 status: topic
.status
,
336 statusMessage: topic
.statusMessage
,
338 warnings: topic
.warn
,
340 switch (ctx
.responseType
) {
341 case Enum
.ContentType
.ApplicationJson:
342 return JSON
.stringify(responses
);
344 case Enum
.ContentType
.TextPlain:
346 const textResponses
= responses
.map((response
) => {
347 const details
= Manager
._prettyDetails(response
.errors
, response
.warnings
);
348 const textDetails
= (details
.length
? '\n' : '') + details
.map((d
) => `\t${d}`).join('\n');
349 return `${response.href} [${response.status} ${response.statusMessage}]${textDetails}`;
351 return textResponses
.join('\n----\n');
358 * Process a publish request.
360 * @param {Object} data
361 * @param {http.ServerResponse} res
362 * @param {Object} ctx
364 async
_publishRequest(dbCtx
, data
, res
, ctx
) {
365 const _scope
= _fileScope('_parsePublish');
366 this.logger
.debug(_scope
, 'called', { data
});
368 const requestId
= ctx
.requestId
;
370 // Parse and validate all the topics in the request.
371 data
.publishTopics
= await
this._publishTopics(dbCtx
, data
, requestId
);
372 if (!data
.publishTopics
|| !data
.publishTopics
.length
) {
373 const details
= Manager
._prettyDetails(['no valid topic urls to publish'], []);
374 throw new ResponseError(Enum
.ErrorResponse
.BadRequest
, details
);
377 // Set status per topic
378 for (const topicResult
of data
.publishTopics
) {
379 topicResult
.status
= topicResult
.err
.length
? 400 : 202;
380 topicResult
.statusMessage
= topicResult
.err
.length
? 'Bad Request' : 'Accepted';
383 // Process the valid publish notifications
384 const validPublishTopics
= data
.publishTopics
.filter((topicResult
) => !topicResult
.err
.length
);
386 await Promise
.all(validPublishTopics
.map(async (topicResult
) => this.db
.topicFetchRequested(dbCtx
, topicResult
.topicId
)));
388 this.logger
.error(_scope
, 'topicFetchRequest failed', { error: e
, ctx
, data
, requestId
});
392 this.logger
.info(_scope
, 'request accepted', { ctx
, data
, requestId
});
394 if (data
.publishTopics
.length
=== 1) {
395 const soleTopic
= data
.publishTopics
[0];
396 res
.statusCode
= soleTopic
.status
;
397 res
.end(Manager
._prettyDetails(soleTopic
.err
, soleTopic
.warn
).join('\n'));
399 res
.statusCode
= 207;
400 res
.end(Manager
.multiPublishContent(ctx
, data
.publishTopics
));
403 if (this.options
.manager
.processImmediately
404 && validPublishTopics
.length
) {
406 await Promise
.all(validPublishTopics
.map(async (topicResult
) => this.communication
.topicFetchClaimAndProcessById(dbCtx
, topicResult
.topicId
, requestId
)));
408 this.logger
.error(_scope
, 'topicFetchClaimAndProcessById failed', { data
, validPublishTopics
, requestId
});
409 // Don't bother re-throwing, as we've already ended this response.
416 * Annotate any encountered issues.
417 * @param {String[]} err
418 * @param {String[]} warn
419 * @returns {String[]}
421 static _prettyDetails(err
, warn
) {
423 ...err
.map((entry
) => `error: ${entry}`),
424 ...warn
.map((entry
) => `warning: ${entry}`),
430 * POST request for root.
431 * @param {http.ClientRequest} req
432 * @param {http.ServerResponse} res
433 * @param {object} ctx
435 async
postRoot(req
, res
, ctx
) {
436 const _scope
= _fileScope('postRoot');
437 this.logger
.debug(_scope
, 'called', { ctx
});
439 res
.statusCode
= 202; // Presume success.
443 const data
= Manager
._getRootData(req
, ctx
);
444 const requestId
= ctx
.requestId
;
446 await
this.db
.context(async (dbCtx
) => {
448 // Handle publish requests elsewhere
449 if (data
.mode
=== Enum
.Mode
.Publish
) {
450 return this._publishRequest(dbCtx
, data
, res
, ctx
);
453 await
this._validateRootData(dbCtx
, data
, warn
, err
, requestId
);
455 const details
= Manager
._prettyDetails(err
, warn
);
457 // Any errors are fatal. Stop and report anything that went wrong.
459 this.logger
.debug(_scope
, { msg: 'invalid request', data
, err
, warn
, requestId
});
460 throw new ResponseError(Enum
.ErrorResponse
.BadRequest
, details
);
463 // Commit the request for later processing.
466 id
= await
this.db
.verificationInsert(dbCtx
, { ...data
, requestId
});
468 this.logger
.error(_scope
, 'verificationInsert failed', { error: e
, data
, warn
, id
, requestId
});
472 // If we committed to the db, we've succeeded as far as the client is concerned.
473 res
.end(details
.join('\n'));
474 this.logger
.info(_scope
, 'request accepted', { data
, warn
, requestId
});
476 // Immediately attempt to claim and process the request.
477 if (this.options
.manager
.processImmediately
480 await
this.communication
.verificationClaimAndProcessById(dbCtx
, id
, requestId
);
482 this.logger
.error(_scope
, 'verificationClaimAndProcessById failed', { ...data
, id
, requestId
});
483 // Don't bother re-throwing, as we've already ended this response.
491 * Render topic info content.
492 * @param {Object} ctx
493 * @param {String} ctx.responseType
494 * @param {String} ctx.topicUrl
495 * @param {Number} ctx.count
498 // eslint-disable-next-line class-methods-use-this
500 // eslint-disable-next-line sonarjs/no-small-switch
501 switch (ctx
.responseType
) {
502 case Enum
.ContentType
.ApplicationJson:
503 return JSON
.stringify({
508 case Enum
.ContentType
.ImageSVG:
509 return Template
.badgeSVG({}, ` ${ctx.topicUrl} `, ` ${ctx.count} subscribers `, `${ctx.topicUrl} has ${ctx.count} subscribers.`);
512 return ctx
.count
.toString();
518 * GET request for /info?topic=url&format=type
519 * @param {http.ServerResponse} res
520 * @param {object} ctx
522 async
getInfo(res
, ctx
) {
523 const _scope
= _fileScope('getInfo');
524 this.logger
.debug(_scope
, 'called', { ctx
});
526 if (!ctx
.queryParams
.topic
) {
527 throw new ResponseError(Enum
.ErrorResponse
.BadRequest
, 'missing required parameter');
529 ctx
.topicUrl
= ctx
.queryParams
.topic
;
531 switch ((ctx
.queryParams
.format
|| '').toLowerCase()) {
533 ctx
.responseType
= Enum
.ContentType
.ImageSVG
;
534 res
.setHeader(Enum
.Header
.ContentType
, ctx
.responseType
);
538 ctx
.responseType
= Enum
.ContentType
.ApplicationJson
;
539 res
.setHeader(Enum
.Header
.ContentType
, ctx
.responseType
);
547 new URL(ctx
.topicUrl
);
549 throw new ResponseError(Enum
.ErrorResponse
.BadRequest
, 'invalid topic');
553 await
this.db
.context(async (dbCtx
) => {
554 count
= await
this.db
.subscriptionCountByTopicUrl(dbCtx
, ctx
.topicUrl
);
556 throw new ResponseError(Enum
.ErrorResponse
.NotFound
, 'no such topic');
558 ctx
.count
= count
.count
;
561 res
.end(this.infoContent(ctx
));
562 this.logger
.info(_scope
, 'finished', { ...ctx
});
567 * GET request for authorized /admin information.
568 * @param {http.ServerResponse} res
569 * @param {object} ctx
571 async
getAdminOverview(res
, ctx
) {
572 const _scope
= _fileScope('getAdminOverview');
573 this.logger
.debug(_scope
, 'called', { ctx
});
575 await
this.db
.context(async (dbCtx
) => {
576 ctx
.topics
= await
this.db
.topicGetAll(dbCtx
);
578 this.logger
.debug(_scope
, 'got topics', { topics: ctx
.topics
});
580 // Profile users can only see related topics.
581 if (ctx
.session
&& ctx
.session
.authenticatedProfile
) {
582 const profileUrlObj
= new URL(ctx
.session
.authenticatedProfile
);
583 ctx
.topics
= ctx
.topics
.filter((topic
) => {
584 const topicUrlObj
= new URL(topic
.url
);
585 return (topicUrlObj
.hostname
=== profileUrlObj
.hostname
);
589 res
.end(Template
.adminOverviewHTML(ctx
, this.options
));
590 this.logger
.info(_scope
, 'finished', { ...ctx
, topics: ctx
.topics
.length
})
595 * GET request for authorized /admin/topic/:topicId information.
596 * @param {http.ServerResponse} res
597 * @param {object} ctx
599 async
getTopicDetails(res
, ctx
) {
600 const _scope
= _fileScope('getTopicDetails');
601 this.logger
.debug(_scope
, 'called', { ctx
});
603 const topicId
= ctx
.params
.topicId
;
604 await
this.db
.context(async (dbCtx
) => {
605 ctx
.topic
= await
this.db
.topicGetById(dbCtx
, topicId
);
606 ctx
.subscriptions
= await
this.db
.subscriptionsByTopicId(dbCtx
, topicId
);
608 this.logger
.debug(_scope
, 'got topic details', { topic: ctx
.topic
, subscriptions: ctx
.subscriptions
});
610 // Profile users can only see related topics.
611 if (ctx
.session
&& ctx
.session
.authenticatedProfile
) {
612 const profileUrlObj
= new URL(ctx
.session
.authenticatedProfile
);
613 const topicUrlObj
= new URL(ctx
.topic
.url
);
614 if (topicUrlObj
.hostname
!== profileUrlObj
.hostname
) {
616 ctx
.subscriptions
= [];
620 res
.end(Template
.adminTopicDetailsHTML(ctx
, this.options
));
621 this.logger
.info(_scope
, 'finished', { ...ctx
, subscriptions: ctx
.subscriptions
.length
, topic: ctx
.topic
&& ctx
.topic
.id
|| ctx
.topic
});
626 * PATCH and DELETE for updating topic data.
627 * @param {http.ServerResponse} res
628 * @param {Object} ctx
630 async
updateTopic(res
, ctx
) {
631 const _scope
= _fileScope('updateTopic');
632 this.logger
.debug(_scope
, 'called', { ctx
});
634 const topicId
= ctx
.params
.topicId
;
636 await
this.db
.context(async (dbCtx
) => {
637 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
638 // Get topic without defaults filled in, to persist nulls
639 const topic
= await
this.db
.topicGetById(txCtx
, topicId
, false);
641 this.logger
.debug(_scope
, 'no topic', { ctx
});
642 throw new Errors
.ResponseError(Enum
.ErrorResponse
.NotFound
);
645 if (ctx
.method
=== 'DELETE') {
646 await
this.db
.topicDeleted(txCtx
, topicId
);
648 this.logger
.info(_scope
, 'topic set deleted', { ctx
, topicId
});
649 // Attempt to remove from db if no active subscriptions.
650 await
this.db
.topicPendingDelete(txCtx
, topicId
);
654 const updatableFields
= [
655 'leaseSecondsPreferred',
658 'publisherValidationUrl',
659 'contentHashAlgorithm',
662 const patchValues
= common
.pick({
668 'leaseSecondsPreferred',
671 ].filter((field
) => field
in patchValues
).forEach((field
) => {
672 // eslint-disable-next-line security/detect-object-injection
673 patchValues
[field
] = parseInt(patchValues
[field
], 10);
676 const patchKeys
= Object
.keys(patchValues
);
677 if (patchKeys
.length
=== 0
678 // eslint-disable-next-line security/detect-object-injection
679 || patchKeys
.every((k
) => patchValues
[k
] == topic
[k
])) {
680 res
.statusCode
= 204;
682 this.logger
.info(_scope
, 'empty topic update', { ctx
, topicId
});
685 const patchedTopic
= {
690 this.logger
.debug(_scope
, 'data', { topic
, patchValues
, patchedTopic
});
693 await
this.db
.topicUpdate(txCtx
, { topicId
, ...patchedTopic
});
695 if (e
instanceof DBErrors
.DataValidation
) {
696 this.logger
.debug(_scope
, 'validation error', { error: e
, ctx
, topicId
});
697 throw new Errors
.ResponseError(Enum
.ErrorResponse
.BadRequest
, e
.message
);
699 this.logger
.error(_scope
, 'failed', { error: e
, ctx
, topicId
});
703 this.logger
.info(_scope
, 'topic updated', { ctx
, topicId
, patchValues
});
710 * PATCH and DELETE for updating subscription data.
711 * @param {http.ServerResponse} res
712 * @param {Object} ctx
714 async
updateSubscription(res
, ctx
) {
715 const _scope
= _fileScope('updateSubscription');
716 this.logger
.debug(_scope
, 'called', { ctx
});
718 const subscriptionId
= ctx
.params
.subscriptionId
;
720 await
this.db
.context(async (dbCtx
) => {
721 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
722 const subscription
= await
this.db
.subscriptionGetById(txCtx
, subscriptionId
);
724 this.logger
.debug(_scope
, 'no subscription', { ctx
});
725 throw new Errors
.ResponseError(Enum
.ErrorResponse
.NotFound
);
728 if (ctx
.method
=== 'DELETE') {
729 const deleteFields
= common
.pick({
734 // Queue an unsubscription.
735 const verification
= {
736 topicId: subscription
.topicId
,
737 callback: subscription
.callback
,
738 mode: Enum
.Mode
.Denied
,
739 reason: 'subscription removed by administrative action',
740 isPublisherValidated: true,
741 requestId: ctx
.requestId
,
745 await
this.db
.verificationInsert(txCtx
, verification
);
746 this.logger
.info(_scope
, 'subscription removal initiated', { ctx
, verification
});
751 const updatableFields
= [
752 'signatureAlgorithm',
755 const patchValues
= common
.pick({
760 const patchKeys
= Object
.keys(patchValues
);
761 if (patchKeys
.length
=== 0
762 // eslint-disable-next-line security/detect-object-injection
763 || patchKeys
.every((k
) => patchValues
[k
] == subscription
[k
])) {
764 res
.statusCode
= 204;
768 const patchedSubscription
= {
774 await
this.db
.subscriptionUpdate(txCtx
, { subscriptionId
, ...patchedSubscription
});
776 if (e
instanceof DBErrors
.DataValidation
) {
777 this.logger
.debug(_scope
, 'validation error', { error: e
, ctx
, subscriptionId
});
778 throw new Errors
.ResponseError(Enum
.ErrorResponse
.BadRequest
, e
.message
);
780 this.logger
.info(_scope
, 'failed', { error: e
, ctx
, subscriptionId
});
784 this.logger
.info(_scope
, 'subscription updated', { ctx
, subscriptionId
, patchValues
});
790 * POST request for manually running worker.
791 * @param {http.ServerResponse} res
792 * @param {object} ctx
794 async
processTasks(res
, ctx
) {
795 const _scope
= _fileScope('getTopicDetails');
796 this.logger
.debug(_scope
, 'called', { ctx
});
798 // N.B. no await on this
799 this.communication
.worker
.process().catch((e
) => {
800 this.logger
.error(_scope
, 'failed', { error: e
, ctx
});
804 this.logger
.info(_scope
, 'invoked worker process', { ctx
});
809 module
.exports
= Manager
;