c570671d94ada2bad567fc02f5598d923d90b04b
[websub-hub] / src / manager.js
1 'use strict';
2
3 /**
4 * Here we process all the incoming requests.
5 * Payload validation occurs here, before committing the pending work to the
6 * database and (usually) calling a processor to act upon it.
7 *
8 * As this is the mediator between server framework and actions, this would
9 * be where most of a rewrite for a new server framework would happen.
10 */
11
12 const common = require('./common');
13 const Communication = require('./communication');
14 const Enum = require('./enum');
15 const Errors = require('./errors');
16 const DBErrors = require('./db/errors');
17 const { ResponseError } = require('./errors');
18 const Template = require('./template');
19
20 const _fileScope = common.fileScope(__filename);
21
22 class Manager {
23 constructor(logger, db, options) {
24 this.logger = logger;
25 this.db = db;
26 this.options = options;
27 this.communication = new Communication(logger, db, options);
28 }
29
30
31 /**
32 * GET request for healthcheck.
33 * @param {http.ServerResponse} res
34 * @param {object} ctx
35 */
36 async getHealthcheck(res, ctx) {
37 const _scope = _fileScope('getHealthcheck');
38 const health = 'happy';
39
40 // What else could we check...
41 const dbHealth = await this.db.healthCheck();
42 this.logger.debug(_scope, 'called', { health, dbHealth, ctx });
43 res.end(health);
44 }
45
46
47 /**
48 * GET request for root.
49 * @param {http.ClientRequest} req
50 * @param {http.ServerResponse} res
51 * @param {object} ctx
52 */
53 async getRoot(req, res, ctx) {
54 const _scope = _fileScope('getRoot');
55 this.logger.debug(_scope, 'called', { ctx });
56
57 const content = Template.rootHTML(ctx, this.options);
58 res.end(content);
59 this.logger.info(_scope, 'finished', { ctx });
60 }
61
62
63 /** All the fields the root handler deals with.
64 * @typedef {object} RootData
65 * @property {string} callback - url
66 * @property {string} mode
67 * @property {string} topic
68 * @property {number} topicId
69 * @property {string} leaseSeconds
70 * @property {string} secret
71 * @property {string} httpRemoteAddr
72 * @property {string} httpFrom
73 * @property {boolean} isSecure
74 * @property {boolean} isPublisherValidated
75 */
76
77 /**
78 * Extract api parameters.
79 * @param {http.ClientRequest} req
80 * @param {Object} ctx
81 * @returns {RootData}
82 */
83 static _getRootData(req, ctx) {
84 const postData = ctx.parsedBody;
85 const mode = (postData['hub.mode'] || '').toLowerCase();
86 return {
87 callback: postData['hub.callback'],
88 mode,
89 ...(mode === Enum.Mode.Publish && { url: postData['hub.url'] }), // Publish accepts either hub.url or hub.topic
90 topic: postData['hub.topic'],
91 ...(postData['hub.lease_seconds'] && { leaseSeconds: parseInt(postData['hub.lease_seconds'], 10) }),
92 secret: postData['hub.secret'],
93 httpRemoteAddr: ctx.clientAddress,
94 httpFrom: req.getHeader(Enum.Header.From),
95 isSecure: ((ctx.clientProtocol || '').toLowerCase() === 'https'),
96 isPublisherValidated: true, // Default to true. Will be set to false later, if topic has publisher validation url.
97 };
98 }
99
100
101 /**
102 *
103 * @param {*} dbCtx
104 * @param {RootData} data
105 * @param {String[]} warn
106 * @param {String[]} err
107 * @param {String} requestId
108 */
109 async _validateRootData(dbCtx, data, warn, err, requestId) {
110 // These checks can modify data, so order matters.
111 await this._checkTopic(dbCtx, data, warn, err, requestId);
112 this._checkCallbackAndSecrets(data, warn, err, requestId);
113 await this._checkMode(dbCtx, data, warn, err, requestId);
114 }
115
116
117 /**
118 * Check that requested topic exists and values are in range.
119 * Sets topic id, publisher validation state, and requested lease
120 * seconds on data.
121 * @param {*} dbCtx
122 * @param {RootData} data
123 * @param {String[]} warn
124 * @param {String[]} err
125 */
126 async _checkTopic(dbCtx, data, warn, err, requestId) {
127 const _scope = _fileScope('_checkTopic');
128 let topic;
129
130 if (data.topic) {
131 topic = await this.db.topicGetByUrl(dbCtx, data.topic);
132
133 if (!topic && this._newTopicCreationAllowed()) {
134 this.logger.info(_scope, 'new topic from subscribe request', { data, requestId });
135
136 try {
137 new URL(data.topic);
138 } catch (e) {
139 err.push('invalid topic url (failed to parse url)');
140 return;
141 }
142
143 await this.db.topicSet(dbCtx, {
144 url: data.topic,
145 });
146 topic = await this.db.topicGetByUrl(dbCtx, data.topic);
147 }
148 }
149
150 if (!topic || topic.isDeleted) {
151 err.push('not a supported topic');
152 return;
153 }
154
155 data.topicId = topic.id;
156
157 if (data.leaseSeconds === undefined || isNaN(data.leaseSeconds)) {
158 data.leaseSeconds = topic.leaseSecondsPreferred;
159 } else {
160 if (data.leaseSeconds > topic.leaseSecondsMax) {
161 data.leaseSeconds = topic.leaseSecondsMax;
162 warn.push(`requested lease too long, using ${data.leaseSeconds}`);
163 } else if (data.leaseSeconds < topic.leaseSecondsMin) {
164 data.leaseSeconds = topic.leaseSecondsMin;
165 warn.push(`requested lease too short, using ${data.leaseSeconds}`);
166 }
167 }
168
169 if (topic.publisherValidationUrl) {
170 data.isPublisherValidated = false;
171 }
172 }
173
174
175 /**
176 * Check data for valid callback url and scheme constraints.
177 * @param {RootData} data
178 * @param {String[]} warn
179 * @param {String[]} err
180 */
181 _checkCallbackAndSecrets(data, warn, err) {
182 let isCallbackSecure = false;
183
184 if (!data.callback) {
185 err.push('invalid callback url (empty)');
186 } else {
187 try {
188 const c = new URL(data.callback);
189 isCallbackSecure = (c.protocol.toLowerCase() === 'https:'); // Colon included because url module is weird
190 } catch (e) {
191 err.push('invalid callback url (failed to parse url');
192 return;
193 }
194 }
195
196 if (!isCallbackSecure) {
197 warn.push('insecure callback');
198 }
199
200 if (data.secret) {
201 const secretSeverity = this.options.manager.strictSecrets ? err : warn;
202 if (!data.isSecure) {
203 secretSeverity.push('secret not safe (insecure hub)');
204 }
205 if (!isCallbackSecure) {
206 secretSeverity.push('secret not safe (insecure callback)');
207 }
208 if (data.secret.length > 199) {
209 err.push('cannot keep a secret that big');
210 }
211 }
212 }
213
214 /**
215 * Check mode validity and subscription requirements.
216 * Publish mode is handled elsewhere in the flow.
217 * @param {*} dbCtx
218 * @param {RootData} data
219 * @param {String[]} warn
220 * @param {String[]} err
221 * @param {String} requestId
222 */
223 async _checkMode(dbCtx, data, warn, err) {
224 switch (data.mode) {
225 case Enum.Mode.Subscribe:
226 break;
227
228 case Enum.Mode.Unsubscribe: {
229 const currentEpoch = Date.now() / 1000;
230 let s;
231 if (data.callback && data.topicId) {
232 s = await this.db.subscriptionGet(dbCtx, data.callback, data.topicId);
233 }
234 if (s === undefined) {
235 err.push('not subscribed');
236 } else {
237 if (s.expires < currentEpoch) {
238 err.push('subscription already expired');
239 }
240 }
241 break;
242 }
243
244 default: {
245 err.push('invalid mode');
246 }
247 }
248 }
249
250
251 /**
252 * Determine if a topic url is allowed to be created.
253 * In the future, this may be more complicated.
254 * @returns {Boolean}
255 */
256 _newTopicCreationAllowed() {
257 return this.options.manager.publicHub;
258 }
259
260
261 /**
262 * Check that a publish request's topic(s) are valid and exist,
263 * returning an array with the results for each.
264 * For a public-hub publish request, creates topics if they do not exist.
265 * @param {*} dbCtx
266 * @param {RootData} data
267 * @param {String[]} warn
268 * @param {String[]} err
269 * @param {String} requestId
270 */
271 async _publishTopics(dbCtx, data, requestId) {
272 const _scope = _fileScope('_checkPublish');
273
274 // Publish requests may include multiple topics, consider them all, but deduplicate.
275 const publishUrls = Array.from(new Set([
276 ...common.ensureArray(data.url),
277 ...common.ensureArray(data.topic),
278 ]));
279
280 // Map the requested topics to their ids, creating if necessary.
281 return Promise.all(publishUrls.map(async (url) => {
282 const result = {
283 url,
284 warn: [],
285 err: [],
286 topicId: undefined,
287 };
288 let topic = await this.db.topicGetByUrl(dbCtx, url);
289 if (!topic && this._newTopicCreationAllowed()) {
290 try {
291 new URL(url);
292 } catch (e) {
293 result.err.push('invalid topic url (failed to parse url)');
294 return result;
295 }
296 await this.db.topicSet(dbCtx, {
297 // TODO: accept a publisherValidationUrl parameter
298 url,
299 });
300 topic = await this.db.topicGetByUrl(dbCtx, url);
301 this.logger.info(_scope, 'new topic from publish request', { url, requestId });
302 }
303 if (!topic || topic.isDeleted) {
304 result.err.push('topic not supported');
305 return result;
306 }
307 result.topicId = topic.id;
308 return result;
309 }));
310 }
311
312
313 /**
314 * Render response for multi-topic publish requests.
315 * @param {Object[]} publishTopics
316 */
317 static multiPublishContent(ctx, publishTopics) {
318 const responses = publishTopics.map((topic) => ({
319 href: topic.url,
320 status: topic.status,
321 statusMessage: topic.statusMessage,
322 errors: topic.err,
323 warnings: topic.warn,
324 }));
325 switch (ctx.responseType) {
326 case Enum.ContentType.ApplicationJson:
327 return JSON.stringify(responses);
328
329 case Enum.ContentType.TextPlain:
330 default: {
331 const textResponses = responses.map((response) => {
332 const details = Manager._prettyDetails(response.errors, response.warnings);
333 const textDetails = (details.length ? '\n' : '') + details.map((d) => `\t${d}`).join('\n');
334 return `${response.href} [${response.status} ${response.statusMessage}]${textDetails}`;
335 });
336 return textResponses.join('\n----\n');
337 }
338 }
339 }
340
341
342 /**
343 * Process a publish request.
344 * @param {*} dbCtx
345 * @param {Object} data
346 * @param {http.ServerResponse} res
347 * @param {Object} ctx
348 */
349 async _publishRequest(dbCtx, data, res, ctx) {
350 const _scope = _fileScope('_parsePublish');
351 this.logger.debug(_scope, 'called', { data });
352
353 const requestId = ctx.requestId;
354
355 // Parse and validate all the topics in the request.
356 data.publishTopics = await this._publishTopics(dbCtx, data, requestId);
357 if (!data.publishTopics || !data.publishTopics.length) {
358 const details = Manager._prettyDetails(['no valid topic urls to publish'], []);
359 throw new ResponseError(Enum.ErrorResponse.BadRequest, details);
360 }
361
362 // Set status per topic
363 for (const topicResult of data.publishTopics) {
364 topicResult.status = topicResult.err.length ? 400 : 202;
365 topicResult.statusMessage = topicResult.err.length ? 'Bad Request' : 'Accepted';
366 }
367
368 // Process the valid publish notifications
369 const validPublishTopics = data.publishTopics.filter((topicResult) => !topicResult.err.length);
370 try {
371 await Promise.all(validPublishTopics.map(async (topicResult) => this.db.topicFetchRequested(dbCtx, topicResult.topicId)));
372 } catch (e) {
373 this.logger.error(_scope, 'topicFetchRequest failed', { error: e, ctx, data, requestId });
374 throw e;
375 }
376
377 this.logger.info(_scope, 'request accepted', { ctx, data, requestId });
378
379 if (data.publishTopics.length === 1) {
380 const soleTopic = data.publishTopics[0];
381 res.statusCode = soleTopic.status;
382 res.end(Manager._prettyDetails(soleTopic.err, soleTopic.warn).join('\n'));
383 } else {
384 res.statusCode = 207;
385 res.end(Manager.multiPublishContent(ctx, data.publishTopics));
386 }
387
388 if (this.options.manager.processImmediately
389 && validPublishTopics.length) {
390 try {
391 await Promise.all(validPublishTopics.map(async (topicResult) => this.communication.topicFetchClaimAndProcessById(dbCtx, topicResult.topicId, requestId)));
392 } catch (e) {
393 this.logger.error(_scope, 'topicFetchClaimAndProcessById failed', { data, validPublishTopics, requestId });
394 // Don't bother re-throwing, as we've already ended this response.
395 }
396 }
397 }
398
399
400 /**
401 * Annotate any encountered issues.
402 * @param {String[]} err
403 * @param {String[]} warn
404 * @returns {String[]}
405 */
406 static _prettyDetails(err, warn) {
407 return [
408 ...err.map((entry) => `error: ${entry}`),
409 ...warn.map((entry) => `warning: ${entry}`),
410 ];
411 }
412
413
414 /**
415 * POST request for root.
416 * @param {http.ClientRequest} req
417 * @param {http.ServerResponse} res
418 * @param {object} ctx
419 */
420 async postRoot(req, res, ctx) {
421 const _scope = _fileScope('postRoot');
422 this.logger.debug(_scope, 'called', { ctx });
423
424 res.statusCode = 202; // Presume success.
425
426 const warn = [];
427 const err = [];
428 const data = Manager._getRootData(req, ctx);
429 const requestId = ctx.requestId;
430
431 await this.db.context(async (dbCtx) => {
432
433 // Handle publish requests elsewhere
434 if (data.mode === Enum.Mode.Publish) {
435 return this._publishRequest(dbCtx, data, res, ctx);
436 }
437
438 await this._validateRootData(dbCtx, data, warn, err, requestId);
439
440 const details = Manager._prettyDetails(err, warn);
441
442 // Any errors are fatal. Stop and report anything that went wrong.
443 if (err.length) {
444 this.logger.debug(_scope, { msg: 'invalid request', data, err, warn, requestId });
445 throw new ResponseError(Enum.ErrorResponse.BadRequest, details);
446 }
447
448 // Commit the request for later processing.
449 let id;
450 try {
451 id = await this.db.verificationInsert(dbCtx, { ...data, requestId });
452 } catch (e) {
453 this.logger.error(_scope, 'verificationInsert failed', { error: e, data, warn, id, requestId });
454 throw e;
455 }
456
457 // If we committed to the db, we've succeeded as far as the client is concerned.
458 res.end(details.join('\n'));
459 this.logger.info(_scope, 'request accepted', { data, warn, requestId });
460
461 // Immediately attempt to claim and process the request.
462 if (this.options.manager.processImmediately
463 && id) {
464 try {
465 await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId);
466 } catch (e) {
467 this.logger.error(_scope, 'verificationClaimAndProcessById failed', { ...data, id, requestId });
468 // Don't bother re-throwing, as we've already ended this response.
469 }
470 }
471 }); // dbCtx
472 }
473
474
475 /**
476 * Render topic info content.
477 * @param {Object} ctx
478 * @param {String} ctx.responseType
479 * @param {String} ctx.topicUrl
480 * @param {Number} ctx.count
481 * @returns {String}
482 */
483 // eslint-disable-next-line class-methods-use-this
484 infoContent(ctx) {
485 // eslint-disable-next-line sonarjs/no-small-switch
486 switch (ctx.responseType) {
487 case Enum.ContentType.ApplicationJson:
488 return JSON.stringify({
489 topic: ctx.topicUrl,
490 count: ctx.count,
491 });
492
493 case Enum.ContentType.ImageSVG:
494 return Template.badgeSVG({}, ` ${ctx.topicUrl} `, ` ${ctx.count} subscribers `, `${ctx.topicUrl} has ${ctx.count} subscribers.`);
495
496 default:
497 return ctx.count.toString();
498 }
499 }
500
501
502 /**
503 * GET request for /info?topic=url&format=type
504 * @param {http.ServerResponse} res
505 * @param {object} ctx
506 */
507 async getInfo(res, ctx) {
508 const _scope = _fileScope('getInfo');
509 this.logger.debug(_scope, 'called', { ctx });
510
511 if (!ctx.queryParams.topic) {
512 throw new ResponseError(Enum.ErrorResponse.BadRequest, 'missing required parameter');
513 }
514 ctx.topicUrl = ctx.queryParams.topic;
515
516 switch ((ctx.queryParams.format || '').toLowerCase()) {
517 case 'svg':
518 ctx.responseType = Enum.ContentType.ImageSVG;
519 res.setHeader(Enum.Header.ContentType, ctx.responseType);
520 break;
521
522 case 'json':
523 ctx.responseType = Enum.ContentType.ApplicationJson;
524 res.setHeader(Enum.Header.ContentType, ctx.responseType);
525 break;
526
527 default:
528 break;
529 }
530
531 try {
532 new URL(ctx.topicUrl);
533 } catch (e) {
534 throw new ResponseError(Enum.ErrorResponse.BadRequest, 'invalid topic');
535 }
536
537 let count;
538 await this.db.context(async (dbCtx) => {
539 count = await this.db.subscriptionCountByTopicUrl(dbCtx, ctx.topicUrl);
540 if (!count) {
541 throw new ResponseError(Enum.ErrorResponse.NotFound, 'no such topic');
542 }
543 ctx.count = count.count;
544 });
545
546 res.end(this.infoContent(ctx));
547 this.logger.info(_scope, 'finished', { ctx });
548 }
549
550
551 /**
552 * label the bars of the topic update history graph
553 * @param {Number} index
554 * @param {Number} value
555 * @returns {String}
556 */
557 static _historyBarCaption(index, value) {
558 let when;
559 switch (index) {
560 case 0:
561 when ='today';
562 break;
563 case 1:
564 when = 'yesterday';
565 break;
566 default:
567 when = `${index} days ago`;
568 }
569 return `${when}, ${value ? value : 'no'} update${value === 1 ? '': 's'}`;
570 }
571
572
573 /**
574 * GET SVG chart of topic update history
575 * @param {http.ServerResponse} res
576 * @param {object} ctx
577 */
578 async getHistorySVG(res, ctx) {
579 const _scope = _fileScope('getHist');
580 this.logger.debug(_scope, 'called', { ctx });
581
582 const days = Math.min(parseInt(ctx.queryParams.days) || this.options.manager.publishHistoryDays, 365);
583 const histOptions = {
584 title: 'Topic Publish History',
585 description: 'Updates per Day',
586 labelZero: '^ Today',
587 labelX: 'Days Ago',
588 maxItems: days,
589 minItems: days,
590 tickEvery: 7,
591 barWidth: 25,
592 barHeight: 40,
593 labelHeight: 12,
594 barCaptionFn: Manager._historyBarCaption,
595 };
596
597 let publishHistory;
598 await this.db.context(async (dbCtx) => {
599 publishHistory = await this.db.topicPublishHistory(dbCtx, ctx.params.topicId, days);
600 });
601
602 res.end(Template.histogramSVG(publishHistory, histOptions));
603 this.logger.info(_scope, 'finished', { ctx });
604 }
605
606 /**
607 * GET request for authorized /admin information.
608 * @param {http.ServerResponse} res
609 * @param {object} ctx
610 */
611 async getAdminOverview(res, ctx) {
612 const _scope = _fileScope('getAdminOverview');
613 this.logger.debug(_scope, 'called', { ctx });
614
615 await this.db.context(async (dbCtx) => {
616 ctx.topics = await this.db.topicGetAll(dbCtx);
617 });
618 this.logger.debug(_scope, 'got topics', { topics: ctx.topics });
619
620 // Profile users can only see related topics.
621 if (ctx.session && ctx.session.authenticatedProfile) {
622 const profileUrlObj = new URL(ctx.session.authenticatedProfile);
623 ctx.topics = ctx.topics.filter((topic) => {
624 const topicUrlObj = new URL(topic.url);
625 return (topicUrlObj.hostname === profileUrlObj.hostname);
626 });
627 }
628
629 res.end(Template.adminOverviewHTML(ctx, this.options));
630 this.logger.info(_scope, 'finished', { ctx, topics: ctx.topics.length });
631 }
632
633
634 /**
635 * GET request for authorized /admin/topic/:topicId information.
636 * @param {http.ServerResponse} res
637 * @param {object} ctx
638 */
639 async getTopicDetails(res, ctx) {
640 const _scope = _fileScope('getTopicDetails');
641 this.logger.debug(_scope, 'called', { ctx });
642
643
644 ctx.publishSpan = 60;
645 const topicId = ctx.params.topicId;
646 let publishHistory;
647 await this.db.context(async (dbCtx) => {
648 ctx.topic = await this.db.topicGetById(dbCtx, topicId);
649 ctx.subscriptions = await this.db.subscriptionsByTopicId(dbCtx, topicId);
650 publishHistory = await this.db.topicPublishHistory(dbCtx, topicId, ctx.publishSpan);
651 });
652 ctx.publishCount = publishHistory.reduce((a, b) => a + b, 0);
653 this.logger.debug(_scope, 'got topic details', { topic: ctx.topic, subscriptions: ctx.subscriptions, updates: ctx.publishCount });
654
655 // Profile users can only see related topics.
656 if (ctx.session && ctx.session.authenticatedProfile) {
657 const profileUrlObj = new URL(ctx.session.authenticatedProfile);
658 const topicUrlObj = new URL(ctx.topic.url);
659 if (topicUrlObj.hostname !== profileUrlObj.hostname) {
660 ctx.topic = null;
661 ctx.subscriptions = [];
662 }
663 }
664
665 res.end(Template.adminTopicDetailsHTML(ctx, this.options));
666 this.logger.info(_scope, 'finished', { ctx, subscriptions: ctx.subscriptions.length, topic: ctx.topic && ctx.topic.id || ctx.topic });
667 }
668
669
670 /**
671 * PATCH and DELETE for updating topic data.
672 * @param {http.ServerResponse} res
673 * @param {Object} ctx
674 */
675 async updateTopic(res, ctx) {
676 const _scope = _fileScope('updateTopic');
677 this.logger.debug(_scope, 'called', { ctx });
678
679 const topicId = ctx.params.topicId;
680
681 await this.db.context(async (dbCtx) => {
682 await this.db.transaction(dbCtx, async (txCtx) => {
683 // Get topic without defaults filled in, to persist nulls
684 const topic = await this.db.topicGetById(txCtx, topicId, false);
685 if (!topic) {
686 this.logger.debug(_scope, 'no topic', { ctx });
687 throw new Errors.ResponseError(Enum.ErrorResponse.NotFound);
688 }
689
690 if (ctx.method === 'DELETE') {
691 await this.db.topicDeleted(txCtx, topicId);
692 res.end();
693 this.logger.info(_scope, 'topic set deleted', { ctx, topicId });
694 // Attempt to remove from db if no active subscriptions.
695 await this.db.topicPendingDelete(txCtx, topicId);
696 return;
697 }
698
699 const updatableFields = [
700 'leaseSecondsPreferred',
701 'leaseSecondsMin',
702 'leaseSecondsMax',
703 'publisherValidationUrl',
704 'contentHashAlgorithm',
705 ];
706
707 const patchValues = common.pick({
708 ...ctx.queryParams,
709 ...ctx.parsedBody,
710 }, updatableFields);
711
712 [
713 'leaseSecondsPreferred',
714 'leaseSecondsMin',
715 'leaseSecondsMax',
716 ].filter((field) => field in patchValues).forEach((field) => {
717 // eslint-disable-next-line security/detect-object-injection
718 patchValues[field] = parseInt(patchValues[field], 10);
719 });
720
721 const patchKeys = Object.keys(patchValues);
722 if (patchKeys.length === 0
723 // eslint-disable-next-line security/detect-object-injection
724 || patchKeys.every((k) => patchValues[k] == topic[k])) {
725 res.statusCode = 204;
726 res.end();
727 this.logger.info(_scope, 'empty topic update', { ctx, topicId });
728 return;
729 }
730 const patchedTopic = {
731 ...topic,
732 ...patchValues,
733 };
734
735 this.logger.debug(_scope, 'data', { topic, patchValues, patchedTopic });
736
737 try {
738 await this.db.topicUpdate(txCtx, { topicId, ...patchedTopic });
739 } catch (e) {
740 if (e instanceof DBErrors.DataValidation) {
741 this.logger.debug(_scope, 'validation error', { error: e, ctx, topicId });
742 throw new Errors.ResponseError(Enum.ErrorResponse.BadRequest, e.message);
743 }
744 this.logger.error(_scope, 'failed', { error: e, ctx, topicId });
745 throw e;
746 }
747 res.end();
748 this.logger.info(_scope, 'topic updated', { ctx, topicId, patchValues });
749 }); // transaction
750 }); // context
751 }
752
753
754 /**
755 * PATCH and DELETE for updating subscription data.
756 * @param {http.ServerResponse} res
757 * @param {Object} ctx
758 */
759 async updateSubscription(res, ctx) {
760 const _scope = _fileScope('updateSubscription');
761 this.logger.debug(_scope, 'called', { ctx });
762
763 const subscriptionId = ctx.params.subscriptionId;
764
765 await this.db.context(async (dbCtx) => {
766 await this.db.transaction(dbCtx, async (txCtx) => {
767 const subscription = await this.db.subscriptionGetById(txCtx, subscriptionId);
768 if (!subscription) {
769 this.logger.debug(_scope, 'no subscription', { ctx });
770 throw new Errors.ResponseError(Enum.ErrorResponse.NotFound);
771 }
772
773 if (ctx.method === 'DELETE') {
774 const deleteFields = common.pick({
775 ...ctx.queryParams,
776 ...ctx.parsedBody,
777 }, ['reason']);
778
779 // Queue an unsubscription.
780 const verification = {
781 topicId: subscription.topicId,
782 callback: subscription.callback,
783 mode: Enum.Mode.Denied,
784 reason: 'subscription removed by administrative action',
785 isPublisherValidated: true,
786 requestId: ctx.requestId,
787 ...deleteFields,
788 };
789
790 await this.db.verificationInsert(txCtx, verification);
791 this.logger.info(_scope, 'subscription removal initiated', { ctx, verification });
792 res.end();
793 return;
794 }
795
796 const updatableFields = [
797 'signatureAlgorithm',
798 ];
799
800 const patchValues = common.pick({
801 ...ctx.queryParams,
802 ...ctx.parsedBody,
803 }, updatableFields);
804
805 const patchKeys = Object.keys(patchValues);
806 if (patchKeys.length === 0
807 // eslint-disable-next-line security/detect-object-injection
808 || patchKeys.every((k) => patchValues[k] == subscription[k])) {
809 res.statusCode = 204;
810 res.end();
811 return;
812 }
813 const patchedSubscription = {
814 ...subscription,
815 ...patchValues,
816 };
817
818 try {
819 await this.db.subscriptionUpdate(txCtx, { subscriptionId, ...patchedSubscription });
820 } catch (e) {
821 if (e instanceof DBErrors.DataValidation) {
822 this.logger.debug(_scope, 'validation error', { error: e, ctx, subscriptionId });
823 throw new Errors.ResponseError(Enum.ErrorResponse.BadRequest, e.message);
824 }
825 this.logger.info(_scope, 'failed', { error: e, ctx, subscriptionId });
826 throw e;
827 }
828 res.end();
829 this.logger.info(_scope, 'subscription updated', { ctx, subscriptionId, patchValues });
830 }); // transaction
831 }); // context
832 }
833
834 /**
835 * POST request for manually running worker.
836 * @param {http.ServerResponse} res
837 * @param {object} ctx
838 */
839 async processTasks(res, ctx) {
840 const _scope = _fileScope('processTasks');
841 this.logger.debug(_scope, 'called', { ctx });
842
843 // N.B. no await on this
844 this.communication.worker.process().catch((e) => {
845 this.logger.error(_scope, 'failed', { error: e, ctx });
846 });
847
848 res.end();
849 this.logger.info(_scope, 'invoked worker process', { ctx });
850 }
851
852 }
853
854 module.exports = Manager;