4 * Here we process all the incoming requests.
5 * Payload validation occurs here, before committing the pending work to the
6 * database and (usually) calling a processor to act upon it.
8 * As this is the mediator between server framework and actions, this would
9 * be where most of a rewrite for a new server framework would happen.
12 const common
= require('./common');
13 const Communication
= require('./communication');
14 const Enum
= require('./enum');
15 const Errors
= require('./errors');
16 const DBErrors
= require('./db/errors');
17 const { ResponseError
} = require('./errors');
18 const Template
= require('./template');
20 const _fileScope
= common
.fileScope(__filename
);
23 constructor(logger
, db
, options
) {
26 this.options
= options
;
27 this.communication
= new Communication(logger
, db
, options
);
29 // Precalculate the invariant root GET metadata.
30 this.getRootContent
= Template
.rootHTML(undefined, options
);
31 const now
= new Date();
32 this.startTimeString
= now
.toGMTString();
33 this.startTimeMs
= now
.getTime();
34 this.getRootETag
= common
.generateETag(undefined, undefined, this.getRootContent
);
39 * GET request for healthcheck.
40 * @param {http.ServerResponse} res
43 async
getHealthcheck(res
, ctx
) {
44 const _scope
= _fileScope('getHealthcheck');
45 const health
= 'happy';
47 // What else could we check...
48 const dbHealth
= await
this.db
.healthCheck();
49 this.logger
.debug(_scope
, 'called', { health
, dbHealth
, ctx
});
55 * GET request for root.
56 * @param {http.ServerResponse} res
59 async
getRoot(req
, res
, ctx
) {
60 const _scope
= _fileScope('getRoot');
61 this.logger
.debug(_scope
, 'called', { ctx
});
63 res
.setHeader(Enum
.Header
.LastModified
, this.startTimeString
);
64 res
.setHeader(Enum
.Header
.ETag
, this.getRootETag
);
66 if (common
.isClientCached(req
, this.startTimeMs
, this.getRootETag
)) {
67 this.logger
.debug(_scope
, 'client cached response', { ctx
});
72 res
.end(this.getRootContent
);
73 this.logger
.info(_scope
, 'finished', { ctx
});
77 /** All the fields the root handler deals with.
78 * @typedef {object} RootData
79 * @property {string} callback - url
80 * @property {string} mode
81 * @property {string} topic
82 * @property {number} topicId
83 * @property {string} leaseSeconds
84 * @property {string} secret
85 * @property {string} httpRemoteAddr
86 * @property {string} httpFrom
87 * @property {boolean} isSecure
88 * @property {boolean} isPublisherValidated
92 * Extract api parameters.
93 * @param {http.ClientRequest} req
97 static _getRootData(req
, ctx
) {
98 const postData
= ctx
.parsedBody
;
99 const mode
= (postData
['hub.mode'] || '').toLowerCase();
101 callback: postData
['hub.callback'],
103 ...(mode
=== Enum
.Mode
.Publish
&& { url: postData
['hub.url'] }), // Publish accepts either hub.url or hub.topic
104 topic: postData
['hub.topic'],
105 ...(postData
['hub.lease_seconds'] && { leaseSeconds: parseInt(postData
['hub.lease_seconds'], 10) }),
106 secret: postData
['hub.secret'],
107 httpRemoteAddr: ctx
.clientAddress
,
108 httpFrom: req
.getHeader(Enum
.Header
.From
),
109 isSecure: ((ctx
.clientProtocol
|| '').toLowerCase() === 'https'),
110 isPublisherValidated: true, // Default to true. Will be set to false later, if topic has publisher validation url.
118 * @param {RootData} data
119 * @param {String[]} warn
120 * @param {String[]} err
121 * @param {String} requestId
123 async
_validateRootData(dbCtx
, data
, warn
, err
, requestId
) {
124 // These checks can modify data, so order matters.
125 await
this._checkTopic(dbCtx
, data
, warn
, err
, requestId
);
126 this._checkCallbackAndSecrets(data
, warn
, err
, requestId
);
127 await
this._checkMode(dbCtx
, data
, warn
, err
, requestId
);
132 * Check that requested topic exists and values are in range.
133 * Sets topic id, publisher validation state, and requested lease
136 * @param {RootData} data
137 * @param {String[]} warn
138 * @param {String[]} err
140 async
_checkTopic(dbCtx
, data
, warn
, err
, requestId
) {
141 const _scope
= _fileScope('_checkTopic');
145 topic
= await
this.db
.topicGetByUrl(dbCtx
, data
.topic
);
147 if (!topic
&& this.options
.manager
.publicHub
) {
148 this.logger
.info(_scope
, 'new topic from subscribe request', { data
, requestId
});
153 err
.push('invalid topic url (failed to parse url)');
157 await
this.db
.topicSet(dbCtx
, {
160 topic
= await
this.db
.topicGetByUrl(dbCtx
, data
.topic
);
164 if (!topic
|| topic
.isDeleted
) {
165 err
.push('not a supported topic');
169 data
.topicId
= topic
.id
;
171 if (data
.leaseSeconds
=== undefined || isNaN(data
.leaseSeconds
)) {
172 data
.leaseSeconds
= topic
.leaseSecondsPreferred
;
174 if (data
.leaseSeconds
> topic
.leaseSecondsMax
) {
175 data
.leaseSeconds
= topic
.leaseSecondsMax
;
176 warn
.push(`requested lease too long, using ${data.leaseSeconds}`);
177 } else if (data
.leaseSeconds
< topic
.leaseSecondsMin
) {
178 data
.leaseSeconds
= topic
.leaseSecondsMin
;
179 warn
.push(`requested lease too short, using ${data.leaseSeconds}`);
183 if (topic
.publisherValidationUrl
) {
184 data
.isPublisherValidated
= false;
190 * Check data for valid callback url and scheme constraints.
191 * @param {RootData} data
192 * @param {String[]} warn
193 * @param {String[]} err
195 _checkCallbackAndSecrets(data
, warn
, err
) {
196 let isCallbackSecure
= false;
198 if (!data
.callback
) {
199 err
.push('invalid callback url (empty)');
202 const c
= new URL(data
.callback
);
203 isCallbackSecure
= (c
.protocol
.toLowerCase() === 'https:'); // Colon included because url module is weird
205 err
.push('invalid callback url (failed to parse url');
210 if (!isCallbackSecure
) {
211 warn
.push('insecure callback');
215 const secretSeverity
= this.options
.manager
.strictSecrets
? err : warn
;
216 if (!data
.isSecure
) {
217 secretSeverity
.push('secret not safe (insecure hub)');
219 if (!isCallbackSecure
) {
220 secretSeverity
.push('secret not safe (insecure callback)');
222 if (data
.secret
.length
> 199) {
223 err
.push('cannot keep a secret that big');
229 * Check mode validity and subscription requirements.
230 * Publish mode is handled elsewhere in the flow.
232 * @param {RootData} data
233 * @param {String[]} warn
234 * @param {String[]} err
235 * @param {String} requestId
237 async
_checkMode(dbCtx
, data
, warn
, err
) {
239 case Enum
.Mode
.Subscribe:
242 case Enum
.Mode
.Unsubscribe: {
243 const currentEpoch
= Date
.now() / 1000;
245 if (data
.callback
&& data
.topicId
) {
246 s
= await
this.db
.subscriptionGet(dbCtx
, data
.callback
, data
.topicId
);
248 if (s
=== undefined) {
249 err
.push('not subscribed');
251 if (s
.expires
< currentEpoch
) {
252 err
.push('subscription already expired');
259 err
.push('invalid mode');
266 * Check that a publish request topic is valid and exists,
267 * and if it is, add topicId to data.
268 * For a public publish request, create topic if not exists.
270 * @param {RootData} data
271 * @param {String[]} warn
272 * @param {String[]} err
273 * @param {String} requestId
275 async
_checkPublish(dbCtx
, data
, warn
, err
, requestId
) {
276 const _scope
= _fileScope('_checkPublish');
278 const publishUrl
= data
.url
|| data
.topic
;
280 let topic
= await
this.db
.topicGetByUrl(dbCtx
, publishUrl
);
281 if (!topic
&& this.options
.manager
.publicHub
) {
282 this.logger
.info(_scope
, 'new topic from publish request', { data
, requestId
});
287 err
.push('invalid topic url (failed to parse url)');
291 await
this.db
.topicSet(dbCtx
, {
294 topic
= await
this.db
.topicGetByUrl(dbCtx
, publishUrl
);
297 if (!topic
|| topic
.isDeleted
) {
298 err
.push('not a supported topic');
302 data
.topicId
= topic
.id
;
307 * POST request for root.
308 * @param {http.ClientRequest} req
309 * @param {http.ServerResponse} res
310 * @param {object} ctx
312 async
postRoot(req
, res
, ctx
) {
313 const _scope
= _fileScope('postRoot');
314 this.logger
.debug(_scope
, 'called', { ctx
});
316 res
.statusCode
= 202; // Presume success.
320 const data
= Manager
._getRootData(req
, ctx
);
321 const requestId
= ctx
.requestId
;
323 await
this.db
.context(async (dbCtx
) => {
325 if (data
.mode
=== Enum
.Mode
.Publish
) {
326 await
this._checkPublish(dbCtx
, data
, warn
, err
, requestId
);
328 await
this._validateRootData(dbCtx
, data
, warn
, err
, requestId
);
331 const prettyErr
= err
.map((entry
) => `error: ${entry}`);
332 const prettyWarn
= warn
.map((entry
) => `warning: ${entry}`);
333 const details
= prettyErr
.concat(prettyWarn
);
335 // Any errors are fatal. Stop and report anything that went wrong.
337 this.logger
.debug(_scope
, { msg: 'invalid request', data
, err
, warn
, requestId
});
338 throw new ResponseError(Enum
.ErrorResponse
.BadRequest
, details
);
341 // Commit the request for later processing.
344 if (data
.mode
=== Enum
.Mode
.Publish
) {
345 fn
= 'topicFetchRequested';
346 info
= await
this.db
.topicFetchRequested(dbCtx
, data
.topicId
);
349 fn
= 'verificationInsert';
350 id
= await
this.db
.verificationInsert(dbCtx
, { ...data
, requestId
});
353 this.logger
.error(_scope
, `${fn} failed`, { e
, info
, data
, warn
, id
, requestId
});
357 // If we committed to the db, we've succeeded as far as the client is concerned.
358 res
.end(details
.join('\n'));
359 this.logger
.info(_scope
, 'request accepted', { data
, warn
, requestId
});
361 // Immediately attempt to claim and process the request.
362 if (this.options
.manager
.processImmediately
365 if (data
.mode
=== Enum
.Mode
.Publish
) {
366 fn
= 'topicFetchClaimAndProcessById';
367 await
this.communication
.topicFetchClaimAndProcessById(dbCtx
, id
, requestId
);
369 fn
= 'verificationClaimAndProcessById';
370 await
this.communication
.verificationClaimAndProcessById(dbCtx
, id
, requestId
);
373 this.logger
.error(_scope
, `${fn} failed`, { ...data
, id
, requestId
});
374 // Don't bother re-throwing, as we've already ended this response.
382 * Render topic info content.
383 * @param {Object} ctx
384 * @param {String} ctx.responseType
385 * @param {String} ctx.topicUrl
386 * @param {Number} ctx.count
389 // eslint-disable-next-line class-methods-use-this
391 // eslint-disable-next-line sonarjs/no-small-switch
392 switch (ctx
.responseType
) {
393 case Enum
.ContentType
.ApplicationJson:
394 return JSON
.stringify({
399 case Enum
.ContentType
.ImageSVG:
400 return Template
.badgeSVG({}, ` ${ctx.topicUrl} `, ` ${ctx.count} subscribers `, `${ctx.topicUrl} has ${ctx.count} subscribers.`);
403 return ctx
.count
.toString();
409 * GET request for /info?topic=url&format=type
410 * @param {http.ServerResponse} res
411 * @param {object} ctx
413 async
getInfo(res
, ctx
) {
414 const _scope
= _fileScope('getInfo');
415 this.logger
.debug(_scope
, 'called', { ctx
});
417 if (!ctx
.queryParams
.topic
) {
418 throw new ResponseError(Enum
.ErrorResponse
.BadRequest
, 'missing required parameter');
420 ctx
.topicUrl
= ctx
.queryParams
.topic
;
422 switch ((ctx
.queryParams
.format
|| '').toLowerCase()) {
424 ctx
.responseType
= Enum
.ContentType
.ImageSVG
;
425 res
.setHeader(Enum
.Header
.ContentType
, ctx
.responseType
);
429 ctx
.responseType
= Enum
.ContentType
.ApplicationJson
;
430 res
.setHeader(Enum
.Header
.ContentType
, ctx
.responseType
);
438 new URL(ctx
.topicUrl
);
440 throw new ResponseError(Enum
.ErrorResponse
.BadRequest
, 'invalid topic');
444 await
this.db
.context(async (dbCtx
) => {
445 count
= await
this.db
.subscriptionCountByTopicUrl(dbCtx
, ctx
.topicUrl
);
447 throw new ResponseError(Enum
.ErrorResponse
.NotFound
, 'no such topic');
449 ctx
.count
= count
.count
;
452 res
.end(this.infoContent(ctx
));
453 this.logger
.info(_scope
, 'finished', { ...ctx
});
458 * GET request for authorized /admin information.
459 * @param {http.ServerResponse} res
460 * @param {object} ctx
462 async
getAdminOverview(res
, ctx
) {
463 const _scope
= _fileScope('getAdminOverview');
464 this.logger
.debug(_scope
, 'called', { ctx
});
466 await
this.db
.context(async (dbCtx
) => {
467 ctx
.topics
= await
this.db
.topicGetAll(dbCtx
);
469 this.logger
.debug(_scope
, 'got topics', { topics: ctx
.topics
});
471 res
.end(Template
.adminOverviewHTML(ctx
, this.options
));
472 this.logger
.info(_scope
, 'finished', { ...ctx
, topics: ctx
.topics
.length
})
477 * GET request for authorized /admin/topic/:topicId information.
478 * @param {http.ServerResponse} res
479 * @param {object} ctx
481 async
getTopicDetails(res
, ctx
) {
482 const _scope
= _fileScope('getTopicDetails');
483 this.logger
.debug(_scope
, 'called', { ctx
});
485 const topicId
= ctx
.params
.topicId
;
486 await
this.db
.context(async (dbCtx
) => {
487 ctx
.topic
= await
this.db
.topicGetById(dbCtx
, topicId
);
488 ctx
.subscriptions
= await
this.db
.subscriptionsByTopicId(dbCtx
, topicId
);
490 this.logger
.debug(_scope
, 'got topic details', { topic: ctx
.topic
, subscriptions: ctx
.subscriptions
});
492 res
.end(Template
.adminTopicDetailsHTML(ctx
, this.options
));
493 this.logger
.info(_scope
, 'finished', { ...ctx
, subscriptions: ctx
.subscriptions
.length
, topic: ctx
.topic
.id
});
498 * PATCH and DELETE for updating topic data.
499 * @param {http.ServerResponse} res
500 * @param {Object} ctx
502 async
updateTopic(res
, ctx
) {
503 const _scope
= _fileScope('updateTopic');
504 this.logger
.debug(_scope
, 'called', { ctx
});
506 const topicId
= ctx
.params
.topicId
;
508 await
this.db
.context(async (dbCtx
) => {
509 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
510 // Get topic without defaults filled in, to persist nulls
511 const topic
= await
this.db
.topicGetById(txCtx
, topicId
, false);
513 this.logger
.debug(_scope
, 'no topic', { ctx
});
514 throw new Errors
.ResponseError(Enum
.ErrorResponse
.NotFound
);
517 if (ctx
.method
=== 'DELETE') {
518 await
this.db
.topicDeleted(txCtx
, topicId
);
520 this.logger
.info(_scope
, 'topic set deleted', { ctx
, topicId
});
524 const updatableFields
= [
525 'leaseSecondsPreferred',
528 'publisherValidationUrl',
529 'contentHashAlgorithm',
532 const patchValues
= common
.pick({
538 'leaseSecondsPreferred',
541 ].filter((field
) => field
in patchValues
).forEach((field
) => {
542 // eslint-disable-next-line security/detect-object-injection
543 patchValues
[field
] = parseInt(patchValues
[field
], 10);
546 const patchKeys
= Object
.keys(patchValues
);
547 if (patchKeys
.length
=== 0
548 // eslint-disable-next-line security/detect-object-injection
549 || patchKeys
.every((k
) => patchValues
[k
] == topic
[k
])) {
550 res
.statusCode
= 204;
552 this.logger
.info(_scope
, 'empty topic update', { ctx
, topicId
});
555 const patchedTopic
= {
560 this.logger
.debug(_scope
, 'data', { topic
, patchValues
, patchedTopic
});
563 await
this.db
.topicUpdate(txCtx
, { topicId
, ...patchedTopic
});
565 if (e
instanceof DBErrors
.DataValidation
) {
566 this.logger
.debug(_scope
, 'validation error', { error: e
, ctx
, topicId
});
567 throw new Errors
.ResponseError(Enum
.ErrorResponse
.BadRequest
, e
.message
);
569 this.logger
.error(_scope
, 'failed', { error: e
, ctx
, topicId
});
573 this.logger
.info(_scope
, 'topic updated', { ctx
, topicId
, patchValues
});
580 * PATCH and DELETE for updating subscription data.
581 * @param {http.ServerResponse} res
582 * @param {Object} ctx
584 async
updateSubscription(res
, ctx
) {
585 const _scope
= _fileScope('updateSubscription');
586 this.logger
.debug(_scope
, 'called', { ctx
});
588 const subscriptionId
= ctx
.params
.subscriptionId
;
590 await
this.db
.context(async (dbCtx
) => {
591 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
592 const subscription
= await
this.db
.subscriptionGetById(txCtx
, subscriptionId
);
594 this.logger
.debug(_scope
, 'no subscription', { ctx
});
595 throw new Errors
.ResponseError(Enum
.ErrorResponse
.NotFound
);
598 if (ctx
.method
=== 'DELETE') {
599 const deleteFields
= common
.pick({
604 // Queue an unsubscription.
605 const verification
= {
606 topicId: subscription
.topicId
,
607 callback: subscription
.callback
,
608 mode: Enum
.Mode
.Denied
,
609 reason: 'subscription removed by administrative action',
610 isPublisherValidated: true,
611 requestId: ctx
.requestId
,
615 await
this.db
.verificationInsert(txCtx
, verification
);
616 this.logger
.info(_scope
, 'subscription removal initiated', { ctx
, verification
});
621 const updatableFields
= [
622 'signatureAlgorithm',
625 const patchValues
= common
.pick({
630 const patchKeys
= Object
.keys(patchValues
);
631 if (patchKeys
.length
=== 0
632 // eslint-disable-next-line security/detect-object-injection
633 || patchKeys
.every((k
) => patchValues
[k
] == subscription
[k
])) {
634 res
.statusCode
= 204;
638 const patchedSubscription
= {
644 await
this.db
.subscriptionUpdate(txCtx
, { subscriptionId
, ...patchedSubscription
});
646 if (e
instanceof DBErrors
.DataValidation
) {
647 this.logger
.debug(_scope
, 'validation error', { error: e
, ctx
, subscriptionId
});
648 throw new Errors
.ResponseError(Enum
.ErrorResponse
.BadRequest
, e
.message
);
650 this.logger
.info(_scope
, 'failed', { error: e
, ctx
, subscriptionId
});
654 this.logger
.info(_scope
, 'subscription updated', { ctx
, subscriptionId
, patchValues
});
660 * POST request for manually running worker.
661 * @param {http.ServerResponse} res
662 * @param {object} ctx
664 async
processTasks(res
, ctx
) {
665 const _scope
= _fileScope('getTopicDetails');
666 this.logger
.debug(_scope
, 'called', { ctx
});
668 // N.B. no await on this
669 this.communication
.worker
.process().catch((e
) => {
670 this.logger
.error(_scope
, 'failed', { error: e
, ctx
});
674 this.logger
.info(_scope
, 'invoked worker process', { ctx
});
679 module
.exports
= Manager
;