d7a07f117b9eae96aebb808382625d237409732f
[websub-hub] / src / manager.js
1 'use strict';
2
3 /**
4 * Here we process all the incoming requests.
5 * Payload validation occurs here, before committing the pending work to the
6 * database and (usually) calling a processor to act upon it.
7 *
8 * As this is the mediator between server framework and actions, this would
9 * be where most of a rewrite for a new server framework would happen.
10 */
11
12 const common = require('./common');
13 const Communication = require('./communication');
14 const Enum = require('./enum');
15 const Errors = require('./errors');
16 const DBErrors = require('./db/errors');
17 const { ResponseError } = require('./errors');
18 const Template = require('./template');
19
20 const _fileScope = common.fileScope(__filename);
21
22 class Manager {
23 constructor(logger, db, options) {
24 this.logger = logger;
25 this.db = db;
26 this.options = options;
27 this.communication = new Communication(logger, db, options);
28
29 // Precalculate the invariant root GET metadata.
30 this.getRootContent = Template.rootHTML(undefined, options);
31 const now = new Date();
32 this.startTimeString = now.toGMTString();
33 this.startTimeMs = now.getTime();
34 this.getRootETag = common.generateETag(undefined, undefined, this.getRootContent);
35 }
36
37
38 /**
39 * GET request for healthcheck.
40 * @param {http.ServerResponse} res
41 * @param {object} ctx
42 */
43 async getHealthcheck(res, ctx) {
44 const _scope = _fileScope('getHealthcheck');
45 const health = 'happy';
46
47 // What else could we check...
48 const dbHealth = await this.db.healthCheck();
49 this.logger.debug(_scope, 'called', { health, dbHealth, ctx });
50 res.end(health);
51 }
52
53
54 /**
55 * GET request for root.
56 * @param {http.ServerResponse} res
57 * @param {object} ctx
58 */
59 async getRoot(req, res, ctx) {
60 const _scope = _fileScope('getRoot');
61 this.logger.debug(_scope, 'called', { ctx });
62
63 res.setHeader(Enum.Header.LastModified, this.startTimeString);
64 res.setHeader(Enum.Header.ETag, this.getRootETag);
65
66 if (common.isClientCached(req, this.startTimeMs, this.getRootETag)) {
67 this.logger.debug(_scope, 'client cached response', { ctx });
68 res.statusCode = 304;
69 res.end();
70 return;
71 }
72 res.end(this.getRootContent);
73 this.logger.info(_scope, 'finished', { ctx });
74 }
75
76
77 /** All the fields the root handler deals with.
78 * @typedef {object} RootData
79 * @property {string} callback - url
80 * @property {string} mode
81 * @property {string} topic
82 * @property {number} topicId
83 * @property {string} leaseSeconds
84 * @property {string} secret
85 * @property {string} httpRemoteAddr
86 * @property {string} httpFrom
87 * @property {boolean} isSecure
88 * @property {boolean} isPublisherValidated
89 */
90
91 /**
92 * Extract api parameters.
93 * @param {http.ClientRequest} req
94 * @param {Object} ctx
95 * @returns {RootData}
96 */
97 static _getRootData(req, ctx) {
98 const postData = ctx.parsedBody;
99 const mode = (postData['hub.mode'] || '').toLowerCase();
100 return {
101 callback: postData['hub.callback'],
102 mode,
103 ...(mode === Enum.Mode.Publish && { url: postData['hub.url'] }), // Publish accepts either hub.url or hub.topic
104 topic: postData['hub.topic'],
105 ...(postData['hub.lease_seconds'] && { leaseSeconds: parseInt(postData['hub.lease_seconds'], 10) }),
106 secret: postData['hub.secret'],
107 httpRemoteAddr: ctx.clientAddress,
108 httpFrom: req.getHeader(Enum.Header.From),
109 isSecure: ((ctx.clientProtocol || '').toLowerCase() === 'https'),
110 isPublisherValidated: true, // Default to true. Will be set to false later, if topic has publisher validation url.
111 };
112 }
113
114
115 /**
116 *
117 * @param {*} dbCtx
118 * @param {RootData} data
119 * @param {String[]} warn
120 * @param {String[]} err
121 * @param {String} requestId
122 */
123 async _validateRootData(dbCtx, data, warn, err, requestId) {
124 // These checks can modify data, so order matters.
125 await this._checkTopic(dbCtx, data, warn, err, requestId);
126 this._checkCallbackAndSecrets(data, warn, err, requestId);
127 await this._checkMode(dbCtx, data, warn, err, requestId);
128 }
129
130
131 /**
132 * Check that requested topic exists and values are in range.
133 * Sets topic id, publisher validation state, and requested lease
134 * seconds on data.
135 * @param {*} dbCtx
136 * @param {RootData} data
137 * @param {String[]} warn
138 * @param {String[]} err
139 */
140 async _checkTopic(dbCtx, data, warn, err, requestId) {
141 const _scope = _fileScope('_checkTopic');
142 let topic;
143
144 if (data.topic) {
145 topic = await this.db.topicGetByUrl(dbCtx, data.topic);
146
147 if (!topic && this.options.manager.publicHub) {
148 this.logger.info(_scope, 'new topic from subscribe request', { data, requestId });
149
150 try {
151 new URL(data.topic);
152 } catch (e) {
153 err.push('invalid topic url (failed to parse url)');
154 return;
155 }
156
157 await this.db.topicSet(dbCtx, {
158 url: data.topic,
159 });
160 topic = await this.db.topicGetByUrl(dbCtx, data.topic);
161 }
162 }
163
164 if (!topic || topic.isDeleted) {
165 err.push('not a supported topic');
166 return;
167 }
168
169 data.topicId = topic.id;
170
171 if (data.leaseSeconds === undefined || isNaN(data.leaseSeconds)) {
172 data.leaseSeconds = topic.leaseSecondsPreferred;
173 } else {
174 if (data.leaseSeconds > topic.leaseSecondsMax) {
175 data.leaseSeconds = topic.leaseSecondsMax;
176 warn.push(`requested lease too long, using ${data.leaseSeconds}`);
177 } else if (data.leaseSeconds < topic.leaseSecondsMin) {
178 data.leaseSeconds = topic.leaseSecondsMin;
179 warn.push(`requested lease too short, using ${data.leaseSeconds}`);
180 }
181 }
182
183 if (topic.publisherValidationUrl) {
184 data.isPublisherValidated = false;
185 }
186 }
187
188
189 /**
190 * Check data for valid callback url and scheme constraints.
191 * @param {RootData} data
192 * @param {String[]} warn
193 * @param {String[]} err
194 */
195 _checkCallbackAndSecrets(data, warn, err) {
196 let isCallbackSecure = false;
197
198 if (!data.callback) {
199 err.push('invalid callback url (empty)');
200 } else {
201 try {
202 const c = new URL(data.callback);
203 isCallbackSecure = (c.protocol.toLowerCase() === 'https:'); // Colon included because url module is weird
204 } catch (e) {
205 err.push('invalid callback url (failed to parse url');
206 return;
207 }
208 }
209
210 if (!isCallbackSecure) {
211 warn.push('insecure callback');
212 }
213
214 if (data.secret) {
215 const secretSeverity = this.options.manager.strictSecrets ? err : warn;
216 if (!data.isSecure) {
217 secretSeverity.push('secret not safe (insecure hub)');
218 }
219 if (!isCallbackSecure) {
220 secretSeverity.push('secret not safe (insecure callback)');
221 }
222 if (data.secret.length > 199) {
223 err.push('cannot keep a secret that big');
224 }
225 }
226 }
227
228 /**
229 * Check mode validity and subscription requirements.
230 * Publish mode is handled elsewhere in the flow.
231 * @param {*} dbCtx
232 * @param {RootData} data
233 * @param {String[]} warn
234 * @param {String[]} err
235 * @param {String} requestId
236 */
237 async _checkMode(dbCtx, data, warn, err) {
238 switch (data.mode) {
239 case Enum.Mode.Subscribe:
240 break;
241
242 case Enum.Mode.Unsubscribe: {
243 const currentEpoch = Date.now() / 1000;
244 let s;
245 if (data.callback && data.topicId) {
246 s = await this.db.subscriptionGet(dbCtx, data.callback, data.topicId);
247 }
248 if (s === undefined) {
249 err.push('not subscribed');
250 } else {
251 if (s.expires < currentEpoch) {
252 err.push('subscription already expired');
253 }
254 }
255 break;
256 }
257
258 default: {
259 err.push('invalid mode');
260 }
261 }
262 }
263
264
265 /**
266 * Check that a publish request topic is valid and exists,
267 * and if it is, add topicId to data.
268 * For a public publish request, create topic if not exists.
269 * @param {*} dbCtx
270 * @param {RootData} data
271 * @param {String[]} warn
272 * @param {String[]} err
273 * @param {String} requestId
274 */
275 async _checkPublish(dbCtx, data, warn, err, requestId) {
276 const _scope = _fileScope('_checkPublish');
277
278 const publishUrl = data.url || data.topic;
279
280 let topic = await this.db.topicGetByUrl(dbCtx, publishUrl);
281 if (!topic && this.options.manager.publicHub) {
282 this.logger.info(_scope, 'new topic from publish request', { data, requestId });
283
284 try {
285 new URL(publishUrl);
286 } catch (e) {
287 err.push('invalid topic url (failed to parse url)');
288 return;
289 }
290
291 await this.db.topicSet(dbCtx, {
292 url: publishUrl,
293 });
294 topic = await this.db.topicGetByUrl(dbCtx, publishUrl);
295 }
296
297 if (!topic || topic.isDeleted) {
298 err.push('not a supported topic');
299 return;
300 }
301
302 data.topicId = topic.id;
303 }
304
305
306 /**
307 * POST request for root.
308 * @param {http.ClientRequest} req
309 * @param {http.ServerResponse} res
310 * @param {object} ctx
311 */
312 async postRoot(req, res, ctx) {
313 const _scope = _fileScope('postRoot');
314 this.logger.debug(_scope, 'called', { ctx });
315
316 res.statusCode = 202; // Presume success.
317
318 const warn = [];
319 const err = [];
320 const data = Manager._getRootData(req, ctx);
321 const requestId = ctx.requestId;
322
323 await this.db.context(async (dbCtx) => {
324
325 if (data.mode === Enum.Mode.Publish) {
326 await this._checkPublish(dbCtx, data, warn, err, requestId);
327 } else {
328 await this._validateRootData(dbCtx, data, warn, err, requestId);
329 }
330
331 const prettyErr = err.map((entry) => `error: ${entry}`);
332 const prettyWarn = warn.map((entry) => `warning: ${entry}`);
333 const details = prettyErr.concat(prettyWarn);
334
335 // Any errors are fatal. Stop and report anything that went wrong.
336 if (err.length) {
337 this.logger.debug(_scope, { msg: 'invalid request', data, err, warn, requestId });
338 throw new ResponseError(Enum.ErrorResponse.BadRequest, details);
339 }
340
341 // Commit the request for later processing.
342 let fn, info, id;
343 try {
344 if (data.mode === Enum.Mode.Publish) {
345 fn = 'topicFetchRequested';
346 info = await this.db.topicFetchRequested(dbCtx, data.topicId);
347 id = data.topicId;
348 } else {
349 fn = 'verificationInsert';
350 id = await this.db.verificationInsert(dbCtx, { ...data, requestId });
351 }
352 } catch (e) {
353 this.logger.error(_scope, `${fn} failed`, { e, info, data, warn, id, requestId });
354 throw e;
355 }
356
357 // If we committed to the db, we've succeeded as far as the client is concerned.
358 res.end(details.join('\n'));
359 this.logger.info(_scope, 'request accepted', { data, warn, requestId });
360
361 // Immediately attempt to claim and process the request.
362 if (this.options.manager.processImmediately
363 && id) {
364 try {
365 if (data.mode === Enum.Mode.Publish) {
366 fn = 'topicFetchClaimAndProcessById';
367 await this.communication.topicFetchClaimAndProcessById(dbCtx, id, requestId);
368 } else {
369 fn = 'verificationClaimAndProcessById';
370 await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId);
371 }
372 } catch (e) {
373 this.logger.error(_scope, `${fn} failed`, { ...data, id, requestId });
374 // Don't bother re-throwing, as we've already ended this response.
375 }
376 }
377 }); // dbCtx
378 }
379
380
381 /**
382 * Render topic info content.
383 * @param {Object} ctx
384 * @param {String} ctx.responseType
385 * @param {String} ctx.topicUrl
386 * @param {Number} ctx.count
387 * @returns {String}
388 */
389 // eslint-disable-next-line class-methods-use-this
390 infoContent(ctx) {
391 // eslint-disable-next-line sonarjs/no-small-switch
392 switch (ctx.responseType) {
393 case Enum.ContentType.ApplicationJson:
394 return JSON.stringify({
395 topic: ctx.topicUrl,
396 count: ctx.count,
397 });
398
399 case Enum.ContentType.ImageSVG:
400 return Template.badgeSVG({}, ` ${ctx.topicUrl} `, ` ${ctx.count} subscribers `, `${ctx.topicUrl} has ${ctx.count} subscribers.`);
401
402 default:
403 return ctx.count.toString();
404 }
405 }
406
407
408 /**
409 * GET request for /info?topic=url&format=type
410 * @param {http.ServerResponse} res
411 * @param {object} ctx
412 */
413 async getInfo(res, ctx) {
414 const _scope = _fileScope('getInfo');
415 this.logger.debug(_scope, 'called', { ctx });
416
417 if (!ctx.queryParams.topic) {
418 throw new ResponseError(Enum.ErrorResponse.BadRequest, 'missing required parameter');
419 }
420 ctx.topicUrl = ctx.queryParams.topic;
421
422 switch ((ctx.queryParams.format || '').toLowerCase()) {
423 case 'svg':
424 ctx.responseType = Enum.ContentType.ImageSVG;
425 res.setHeader(Enum.Header.ContentType, ctx.responseType);
426 break;
427
428 case 'json':
429 ctx.responseType = Enum.ContentType.ApplicationJson;
430 res.setHeader(Enum.Header.ContentType, ctx.responseType);
431 break;
432
433 default:
434 break;
435 }
436
437 try {
438 new URL(ctx.topicUrl);
439 } catch (e) {
440 throw new ResponseError(Enum.ErrorResponse.BadRequest, 'invalid topic');
441 }
442
443 let count;
444 await this.db.context(async (dbCtx) => {
445 count = await this.db.subscriptionCountByTopicUrl(dbCtx, ctx.topicUrl);
446 if (!count) {
447 throw new ResponseError(Enum.ErrorResponse.NotFound, 'no such topic');
448 }
449 ctx.count = count.count;
450 });
451
452 res.end(this.infoContent(ctx));
453 this.logger.info(_scope, 'finished', { ...ctx });
454 }
455
456
457 /**
458 * GET request for authorized /admin information.
459 * @param {http.ServerResponse} res
460 * @param {object} ctx
461 */
462 async getAdminOverview(res, ctx) {
463 const _scope = _fileScope('getAdminOverview');
464 this.logger.debug(_scope, 'called', { ctx });
465
466 await this.db.context(async (dbCtx) => {
467 ctx.topics = await this.db.topicGetAll(dbCtx);
468 });
469 this.logger.debug(_scope, 'got topics', { topics: ctx.topics });
470
471 res.end(Template.adminOverviewHTML(ctx, this.options));
472 this.logger.info(_scope, 'finished', { ...ctx, topics: ctx.topics.length })
473 }
474
475
476 /**
477 * GET request for authorized /admin/topic/:topicId information.
478 * @param {http.ServerResponse} res
479 * @param {object} ctx
480 */
481 async getTopicDetails(res, ctx) {
482 const _scope = _fileScope('getTopicDetails');
483 this.logger.debug(_scope, 'called', { ctx });
484
485 const topicId = ctx.params.topicId;
486 await this.db.context(async (dbCtx) => {
487 ctx.topic = await this.db.topicGetById(dbCtx, topicId);
488 ctx.subscriptions = await this.db.subscriptionsByTopicId(dbCtx, topicId);
489 });
490 this.logger.debug(_scope, 'got topic details', { topic: ctx.topic, subscriptions: ctx.subscriptions });
491
492 res.end(Template.adminTopicDetailsHTML(ctx, this.options));
493 this.logger.info(_scope, 'finished', { ...ctx, subscriptions: ctx.subscriptions.length, topic: ctx.topic.id });
494 }
495
496
497 /**
498 * PATCH and DELETE for updating topic data.
499 * @param {http.ServerResponse} res
500 * @param {Object} ctx
501 */
502 async updateTopic(res, ctx) {
503 const _scope = _fileScope('updateTopic');
504 this.logger.debug(_scope, 'called', { ctx });
505
506 const topicId = ctx.params.topicId;
507
508 await this.db.context(async (dbCtx) => {
509 await this.db.transaction(dbCtx, async (txCtx) => {
510 // Get topic without defaults filled in, to persist nulls
511 const topic = await this.db.topicGetById(txCtx, topicId, false);
512 if (!topic) {
513 this.logger.debug(_scope, 'no topic', { ctx });
514 throw new Errors.ResponseError(Enum.ErrorResponse.NotFound);
515 }
516
517 if (ctx.method === 'DELETE') {
518 await this.db.topicDeleted(txCtx, topicId);
519 res.end();
520 this.logger.info(_scope, 'topic set deleted', { ctx, topicId });
521 return;
522 }
523
524 const updatableFields = [
525 'leaseSecondsPreferred',
526 'leaseSecondsMin',
527 'leaseSecondsMax',
528 'publisherValidationUrl',
529 'contentHashAlgorithm',
530 ];
531
532 const patchValues = common.pick({
533 ...ctx.queryParams,
534 ...ctx.parsedBody,
535 }, updatableFields);
536
537 [
538 'leaseSecondsPreferred',
539 'leaseSecondsMin',
540 'leaseSecondsMax',
541 ].filter((field) => field in patchValues).forEach((field) => {
542 // eslint-disable-next-line security/detect-object-injection
543 patchValues[field] = parseInt(patchValues[field], 10);
544 });
545
546 const patchKeys = Object.keys(patchValues);
547 if (patchKeys.length === 0
548 // eslint-disable-next-line security/detect-object-injection
549 || patchKeys.every((k) => patchValues[k] == topic[k])) {
550 res.statusCode = 204;
551 res.end();
552 this.logger.info(_scope, 'empty topic update', { ctx, topicId });
553 return;
554 }
555 const patchedTopic = {
556 ...topic,
557 ...patchValues,
558 };
559
560 this.logger.debug(_scope, 'data', { topic, patchValues, patchedTopic });
561
562 try {
563 await this.db.topicUpdate(txCtx, { topicId, ...patchedTopic });
564 } catch (e) {
565 if (e instanceof DBErrors.DataValidation) {
566 this.logger.debug(_scope, 'validation error', { error: e, ctx, topicId });
567 throw new Errors.ResponseError(Enum.ErrorResponse.BadRequest, e.message);
568 }
569 this.logger.error(_scope, 'failed', { error: e, ctx, topicId });
570 throw e;
571 }
572 res.end();
573 this.logger.info(_scope, 'topic updated', { ctx, topicId, patchValues });
574 }); // transaction
575 }); // context
576 }
577
578
579 /**
580 * PATCH and DELETE for updating subscription data.
581 * @param {http.ServerResponse} res
582 * @param {Object} ctx
583 */
584 async updateSubscription(res, ctx) {
585 const _scope = _fileScope('updateSubscription');
586 this.logger.debug(_scope, 'called', { ctx });
587
588 const subscriptionId = ctx.params.subscriptionId;
589
590 await this.db.context(async (dbCtx) => {
591 await this.db.transaction(dbCtx, async (txCtx) => {
592 const subscription = await this.db.subscriptionGetById(txCtx, subscriptionId);
593 if (!subscription) {
594 this.logger.debug(_scope, 'no subscription', { ctx });
595 throw new Errors.ResponseError(Enum.ErrorResponse.NotFound);
596 }
597
598 if (ctx.method === 'DELETE') {
599 const deleteFields = common.pick({
600 ...ctx.queryParams,
601 ...ctx.parsedBody,
602 }, ['reason']);
603
604 // Queue an unsubscription.
605 const verification = {
606 topicId: subscription.topicId,
607 callback: subscription.callback,
608 mode: Enum.Mode.Denied,
609 reason: 'subscription removed by administrative action',
610 isPublisherValidated: true,
611 requestId: ctx.requestId,
612 ...deleteFields,
613 };
614
615 await this.db.verificationInsert(txCtx, verification);
616 this.logger.info(_scope, 'subscription removal initiated', { ctx, verification });
617 res.end();
618 return;
619 }
620
621 const updatableFields = [
622 'signatureAlgorithm',
623 ];
624
625 const patchValues = common.pick({
626 ...ctx.queryParams,
627 ...ctx.parsedBody,
628 }, updatableFields);
629
630 const patchKeys = Object.keys(patchValues);
631 if (patchKeys.length === 0
632 // eslint-disable-next-line security/detect-object-injection
633 || patchKeys.every((k) => patchValues[k] == subscription[k])) {
634 res.statusCode = 204;
635 res.end();
636 return;
637 }
638 const patchedSubscription = {
639 ...subscription,
640 ...patchValues,
641 };
642
643 try {
644 await this.db.subscriptionUpdate(txCtx, { subscriptionId, ...patchedSubscription });
645 } catch (e) {
646 if (e instanceof DBErrors.DataValidation) {
647 this.logger.debug(_scope, 'validation error', { error: e, ctx, subscriptionId });
648 throw new Errors.ResponseError(Enum.ErrorResponse.BadRequest, e.message);
649 }
650 this.logger.info(_scope, 'failed', { error: e, ctx, subscriptionId });
651 throw e;
652 }
653 res.end();
654 this.logger.info(_scope, 'subscription updated', { ctx, subscriptionId, patchValues });
655 }); // transaction
656 }); // context
657 }
658
659 /**
660 * POST request for manually running worker.
661 * @param {http.ServerResponse} res
662 * @param {object} ctx
663 */
664 async processTasks(res, ctx) {
665 const _scope = _fileScope('getTopicDetails');
666 this.logger.debug(_scope, 'called', { ctx });
667
668 // N.B. no await on this
669 this.communication.worker.process();
670
671 res.end();
672 this.logger.info(_scope, 'invoked worker process', { ctx });
673 }
674
675 }
676
677 module.exports = Manager;