cae9e74ce734b6fec771f4c9ceb3927c8e1b9028
[websub-hub] / src / manager.js
1 'use strict';
2
3 /**
4 * Here we process all the incoming requests.
5 * Payload validation occurs here, before committing the pending work to the
6 * database and (usually) calling a processor to act upon it.
7 *
8 * As this is the mediator between server framework and actions, this would
9 * be where most of a rewrite for a new server framework would happen.
10 */
11
12 const common = require('./common');
13 const Communication = require('./communication');
14 const Enum = require('./enum');
15 const Errors = require('./errors');
16 const DBErrors = require('./db/errors');
17 const { ResponseError } = require('./errors');
18 const Template = require('./template');
19
20 const _fileScope = common.fileScope(__filename);
21
22 class Manager {
23 constructor(logger, db, options) {
24 this.logger = logger;
25 this.db = db;
26 this.options = options;
27 this.communication = new Communication(logger, db, options);
28
29 // Precalculate the invariant root GET metadata.
30 this.getRootContent = Template.rootHTML(undefined, options);
31 const now = new Date();
32 this.startTimeString = now.toGMTString();
33 this.startTimeMs = now.getTime();
34 this.getRootETag = common.generateETag(undefined, undefined, this.getRootContent);
35 }
36
37
38 /**
39 * GET request for healthcheck.
40 * @param {http.ServerResponse} res
41 * @param {object} ctx
42 */
43 async getHealthcheck(res, ctx) {
44 const _scope = _fileScope('getHealthcheck');
45 const health = 'happy';
46
47 // What else could we check...
48 const dbHealth = await this.db.healthCheck();
49 this.logger.debug(_scope, 'called', { health, dbHealth, ctx });
50 res.end(health);
51 }
52
53
54 /**
55 * GET request for root.
56 * @param {http.ServerResponse} res
57 * @param {object} ctx
58 */
59 async getRoot(req, res, ctx) {
60 const _scope = _fileScope('getRoot');
61 this.logger.debug(_scope, 'called', { ctx });
62
63 res.setHeader(Enum.Header.LastModified, this.startTimeString);
64 res.setHeader(Enum.Header.ETag, this.getRootETag);
65
66 if (common.isClientCached(req, this.startTimeMs, this.getRootETag)) {
67 this.logger.debug(_scope, 'client cached response', { ctx });
68 res.statusCode = 304;
69 res.end();
70 return;
71 }
72 res.end(this.getRootContent);
73 this.logger.info(_scope, 'finished', { ctx });
74 }
75
76
77 /** All the fields the root handler deals with.
78 * @typedef {object} RootData
79 * @property {string} callback - url
80 * @property {string} mode
81 * @property {string} topic
82 * @property {number} topicId
83 * @property {string} leaseSeconds
84 * @property {string} secret
85 * @property {string} httpRemoteAddr
86 * @property {string} httpFrom
87 * @property {boolean} isSecure
88 * @property {boolean} isPublisherValidated
89 */
90
91 /**
92 * Extract api parameters.
93 * @param {http.ClientRequest} req
94 * @param {Object} ctx
95 * @returns {RootData}
96 */
97 static _getRootData(req, ctx) {
98 const postData = ctx.parsedBody;
99 const mode = (postData['hub.mode'] || '').toLowerCase();
100 return {
101 callback: postData['hub.callback'],
102 mode,
103 ...(mode === Enum.Mode.Publish && { url: postData['hub.url'] }), // Publish accepts either hub.url or hub.topic
104 topic: postData['hub.topic'],
105 ...(postData['hub.lease_seconds'] && { leaseSeconds: parseInt(postData['hub.lease_seconds'], 10) }),
106 secret: postData['hub.secret'],
107 httpRemoteAddr: ctx.clientAddress,
108 httpFrom: req.getHeader(Enum.Header.From),
109 isSecure: ((ctx.clientProtocol || '').toLowerCase() === 'https'),
110 isPublisherValidated: true, // Default to true. Will be set to false later, if topic has publisher validation url.
111 };
112 }
113
114
115 /**
116 *
117 * @param {*} dbCtx
118 * @param {RootData} data
119 * @param {String[]} warn
120 * @param {String[]} err
121 * @param {String} requestId
122 */
123 async _validateRootData(dbCtx, data, warn, err, requestId) {
124 // These checks can modify data, so order matters.
125 await this._checkTopic(dbCtx, data, warn, err, requestId);
126 this._checkCallbackAndSecrets(data, warn, err, requestId);
127 await this._checkMode(dbCtx, data, warn, err, requestId);
128 }
129
130
131 /**
132 * Check that requested topic exists and values are in range.
133 * Sets topic id, publisher validation state, and requested lease
134 * seconds on data.
135 * @param {*} dbCtx
136 * @param {RootData} data
137 * @param {String[]} warn
138 * @param {String[]} err
139 */
140 async _checkTopic(dbCtx, data, warn, err, requestId) {
141 const _scope = _fileScope('_checkTopic');
142 let topic;
143
144 if (data.topic) {
145 topic = await this.db.topicGetByUrl(dbCtx, data.topic);
146
147 if (!topic && this._newTopicCreationAllowed()) {
148 this.logger.info(_scope, 'new topic from subscribe request', { data, requestId });
149
150 try {
151 new URL(data.topic);
152 } catch (e) {
153 err.push('invalid topic url (failed to parse url)');
154 return;
155 }
156
157 await this.db.topicSet(dbCtx, {
158 url: data.topic,
159 });
160 topic = await this.db.topicGetByUrl(dbCtx, data.topic);
161 }
162 }
163
164 if (!topic || topic.isDeleted) {
165 err.push('not a supported topic');
166 return;
167 }
168
169 data.topicId = topic.id;
170
171 if (data.leaseSeconds === undefined || isNaN(data.leaseSeconds)) {
172 data.leaseSeconds = topic.leaseSecondsPreferred;
173 } else {
174 if (data.leaseSeconds > topic.leaseSecondsMax) {
175 data.leaseSeconds = topic.leaseSecondsMax;
176 warn.push(`requested lease too long, using ${data.leaseSeconds}`);
177 } else if (data.leaseSeconds < topic.leaseSecondsMin) {
178 data.leaseSeconds = topic.leaseSecondsMin;
179 warn.push(`requested lease too short, using ${data.leaseSeconds}`);
180 }
181 }
182
183 if (topic.publisherValidationUrl) {
184 data.isPublisherValidated = false;
185 }
186 }
187
188
189 /**
190 * Check data for valid callback url and scheme constraints.
191 * @param {RootData} data
192 * @param {String[]} warn
193 * @param {String[]} err
194 */
195 _checkCallbackAndSecrets(data, warn, err) {
196 let isCallbackSecure = false;
197
198 if (!data.callback) {
199 err.push('invalid callback url (empty)');
200 } else {
201 try {
202 const c = new URL(data.callback);
203 isCallbackSecure = (c.protocol.toLowerCase() === 'https:'); // Colon included because url module is weird
204 } catch (e) {
205 err.push('invalid callback url (failed to parse url');
206 return;
207 }
208 }
209
210 if (!isCallbackSecure) {
211 warn.push('insecure callback');
212 }
213
214 if (data.secret) {
215 const secretSeverity = this.options.manager.strictSecrets ? err : warn;
216 if (!data.isSecure) {
217 secretSeverity.push('secret not safe (insecure hub)');
218 }
219 if (!isCallbackSecure) {
220 secretSeverity.push('secret not safe (insecure callback)');
221 }
222 if (data.secret.length > 199) {
223 err.push('cannot keep a secret that big');
224 }
225 }
226 }
227
228 /**
229 * Check mode validity and subscription requirements.
230 * Publish mode is handled elsewhere in the flow.
231 * @param {*} dbCtx
232 * @param {RootData} data
233 * @param {String[]} warn
234 * @param {String[]} err
235 * @param {String} requestId
236 */
237 async _checkMode(dbCtx, data, warn, err) {
238 switch (data.mode) {
239 case Enum.Mode.Subscribe:
240 break;
241
242 case Enum.Mode.Unsubscribe: {
243 const currentEpoch = Date.now() / 1000;
244 let s;
245 if (data.callback && data.topicId) {
246 s = await this.db.subscriptionGet(dbCtx, data.callback, data.topicId);
247 }
248 if (s === undefined) {
249 err.push('not subscribed');
250 } else {
251 if (s.expires < currentEpoch) {
252 err.push('subscription already expired');
253 }
254 }
255 break;
256 }
257
258 default: {
259 err.push('invalid mode');
260 }
261 }
262 }
263
264
265 /**
266 * Determine if a topic url is allowed to be created.
267 * In the future, this may be more complicated.
268 * @returns {Boolean}
269 */
270 _newTopicCreationAllowed() {
271 return this.options.manager.publicHub;
272 }
273
274
275 /**
276 * Check that a publish request's topic(s) are valid and exist,
277 * returning an array with the results for each.
278 * For a public-hub publish request, creates topics if they do not exist.
279 * @param {*} dbCtx
280 * @param {RootData} data
281 * @param {String[]} warn
282 * @param {String[]} err
283 * @param {String} requestId
284 */
285 async _publishTopics(dbCtx, data, requestId) {
286 const _scope = _fileScope('_checkPublish');
287
288 // Publish requests may include multiple topics, consider them all, but deduplicate.
289 const publishUrls = Array.from(new Set([
290 ...common.ensureArray(data.url),
291 ...common.ensureArray(data.topic),
292 ]));
293
294 // Map the requested topics to their ids, creating if necessary.
295 return Promise.all(publishUrls.map(async (url) => {
296 const result = {
297 url,
298 warn: [],
299 err: [],
300 topicId: undefined,
301 };
302 let topic = await this.db.topicGetByUrl(dbCtx, url);
303 if (!topic && this._newTopicCreationAllowed()) {
304 try {
305 new URL(url);
306 } catch (e) {
307 result.err.push('invalid topic url (failed to parse url)');
308 return result;
309 }
310 await this.db.topicSet(dbCtx, {
311 // TODO: accept a publisherValidationUrl parameter
312 url,
313 });
314 topic = await this.db.topicGetByUrl(dbCtx, url);
315 this.logger.info(_scope, 'new topic from publish request', { url, requestId });
316 }
317 if (!topic || topic.isDeleted) {
318 result.err.push('topic not supported');
319 return result;
320 }
321 result.topicId = topic.id;
322 return result;
323 }));
324 }
325
326
327 /**
328 * Render response for multi-topic publish requests.
329 * @param {Object[]} publishTopics
330 */
331 static multiPublishContent(ctx, publishTopics) {
332 const responses = publishTopics.map((topic) => ({
333 href: topic.url,
334 status: topic.status,
335 statusMessage: topic.statusMessage,
336 errors: topic.err,
337 warnings: topic.warn,
338 }));
339 switch (ctx.responseType) {
340 case Enum.ContentType.ApplicationJson:
341 return JSON.stringify(responses);
342
343 case Enum.ContentType.TextPlain:
344 default: {
345 const textResponses = responses.map((response) => {
346 const details = Manager._prettyDetails(response.errors, response.warnings);
347 const textDetails = (details.length ? '\n' : '') + details.map((d) => `\t${d}`).join('\n');
348 return `${response.href} [${response.status} ${response.statusMessage}]${textDetails}`;
349 });
350 return textResponses.join('\n----\n');
351 }
352 }
353 }
354
355
356 /**
357 * Process a publish request.
358 * @param {*} dbCtx
359 * @param {Object} data
360 * @param {http.ServerResponse} res
361 * @param {Object} ctx
362 */
363 async _publishRequest(dbCtx, data, res, ctx) {
364 const _scope = _fileScope('_parsePublish');
365 this.logger.debug(_scope, 'called', { data });
366
367 const requestId = ctx.requestId;
368
369 // Parse and validate all the topics in the request.
370 data.publishTopics = await this._publishTopics(dbCtx, data, requestId);
371 if (!data.publishTopics || !data.publishTopics.length) {
372 const details = Manager._prettyDetails(['no valid topic urls to publish'], []);
373 throw new ResponseError(Enum.ErrorResponse.BadRequest, details);
374 }
375
376 // Set status per topic
377 for (const topicResult of data.publishTopics) {
378 topicResult.status = topicResult.err.length ? 400 : 202;
379 topicResult.statusMessage = topicResult.err.length ? 'Bad Request' : 'Accepted';
380 }
381
382 // Process the valid publish notifications
383 const validPublishTopics = data.publishTopics.filter((topicResult) => !topicResult.err.length);
384 try {
385 await Promise.all(validPublishTopics.map(async (topicResult) => this.db.topicFetchRequested(dbCtx, topicResult.topicId)));
386 } catch (e) {
387 this.logger.error(_scope, 'topicFetchRequest failed', { error: e, ctx, data, requestId });
388 throw e;
389 }
390
391 this.logger.info(_scope, 'request accepted', { ctx, data, requestId });
392
393 if (data.publishTopics.length === 1) {
394 const soleTopic = data.publishTopics[0];
395 res.statusCode = soleTopic.status;
396 res.end(Manager._prettyDetails(soleTopic.err, soleTopic.warn).join('\n'));
397 } else {
398 res.statusCode = 207;
399 res.end(Manager.multiPublishContent(ctx, data.publishTopics));
400 }
401
402 if (this.options.manager.processImmediately
403 && validPublishTopics.length) {
404 try {
405 await Promise.all(validPublishTopics.map(async (topicResult) => this.communication.topicFetchClaimAndProcessById(dbCtx, topicResult.topicId, requestId)));
406 } catch (e) {
407 this.logger.error(_scope, 'topicFetchClaimAndProcessById failed', { data, validPublishTopics, requestId });
408 // Don't bother re-throwing, as we've already ended this response.
409 }
410 }
411 }
412
413
414 /**
415 * Annotate any encountered issues.
416 * @param {String[]} err
417 * @param {String[]} warn
418 * @returns {String[]}
419 */
420 static _prettyDetails(err, warn) {
421 return [
422 ...err.map((entry) => `error: ${entry}`),
423 ...warn.map((entry) => `warning: ${entry}`),
424 ];
425 }
426
427
428 /**
429 * POST request for root.
430 * @param {http.ClientRequest} req
431 * @param {http.ServerResponse} res
432 * @param {object} ctx
433 */
434 async postRoot(req, res, ctx) {
435 const _scope = _fileScope('postRoot');
436 this.logger.debug(_scope, 'called', { ctx });
437
438 res.statusCode = 202; // Presume success.
439
440 const warn = [];
441 const err = [];
442 const data = Manager._getRootData(req, ctx);
443 const requestId = ctx.requestId;
444
445 await this.db.context(async (dbCtx) => {
446
447 // Handle publish requests elsewhere
448 if (data.mode === Enum.Mode.Publish) {
449 return this._publishRequest(dbCtx, data, res, ctx);
450 }
451
452 await this._validateRootData(dbCtx, data, warn, err, requestId);
453
454 const details = Manager._prettyDetails(err, warn);
455
456 // Any errors are fatal. Stop and report anything that went wrong.
457 if (err.length) {
458 this.logger.debug(_scope, { msg: 'invalid request', data, err, warn, requestId });
459 throw new ResponseError(Enum.ErrorResponse.BadRequest, details);
460 }
461
462 // Commit the request for later processing.
463 let id;
464 try {
465 id = await this.db.verificationInsert(dbCtx, { ...data, requestId });
466 } catch (e) {
467 this.logger.error(_scope, 'verificationInsert failed', { error: e, data, warn, id, requestId });
468 throw e;
469 }
470
471 // If we committed to the db, we've succeeded as far as the client is concerned.
472 res.end(details.join('\n'));
473 this.logger.info(_scope, 'request accepted', { data, warn, requestId });
474
475 // Immediately attempt to claim and process the request.
476 if (this.options.manager.processImmediately
477 && id) {
478 try {
479 await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId);
480 } catch (e) {
481 this.logger.error(_scope, 'verificationClaimAndProcessById failed', { ...data, id, requestId });
482 // Don't bother re-throwing, as we've already ended this response.
483 }
484 }
485 }); // dbCtx
486 }
487
488
489 /**
490 * Render topic info content.
491 * @param {Object} ctx
492 * @param {String} ctx.responseType
493 * @param {String} ctx.topicUrl
494 * @param {Number} ctx.count
495 * @returns {String}
496 */
497 // eslint-disable-next-line class-methods-use-this
498 infoContent(ctx) {
499 // eslint-disable-next-line sonarjs/no-small-switch
500 switch (ctx.responseType) {
501 case Enum.ContentType.ApplicationJson:
502 return JSON.stringify({
503 topic: ctx.topicUrl,
504 count: ctx.count,
505 });
506
507 case Enum.ContentType.ImageSVG:
508 return Template.badgeSVG({}, ` ${ctx.topicUrl} `, ` ${ctx.count} subscribers `, `${ctx.topicUrl} has ${ctx.count} subscribers.`);
509
510 default:
511 return ctx.count.toString();
512 }
513 }
514
515
516 /**
517 * GET request for /info?topic=url&format=type
518 * @param {http.ServerResponse} res
519 * @param {object} ctx
520 */
521 async getInfo(res, ctx) {
522 const _scope = _fileScope('getInfo');
523 this.logger.debug(_scope, 'called', { ctx });
524
525 if (!ctx.queryParams.topic) {
526 throw new ResponseError(Enum.ErrorResponse.BadRequest, 'missing required parameter');
527 }
528 ctx.topicUrl = ctx.queryParams.topic;
529
530 switch ((ctx.queryParams.format || '').toLowerCase()) {
531 case 'svg':
532 ctx.responseType = Enum.ContentType.ImageSVG;
533 res.setHeader(Enum.Header.ContentType, ctx.responseType);
534 break;
535
536 case 'json':
537 ctx.responseType = Enum.ContentType.ApplicationJson;
538 res.setHeader(Enum.Header.ContentType, ctx.responseType);
539 break;
540
541 default:
542 break;
543 }
544
545 try {
546 new URL(ctx.topicUrl);
547 } catch (e) {
548 throw new ResponseError(Enum.ErrorResponse.BadRequest, 'invalid topic');
549 }
550
551 let count;
552 await this.db.context(async (dbCtx) => {
553 count = await this.db.subscriptionCountByTopicUrl(dbCtx, ctx.topicUrl);
554 if (!count) {
555 throw new ResponseError(Enum.ErrorResponse.NotFound, 'no such topic');
556 }
557 ctx.count = count.count;
558 });
559
560 res.end(this.infoContent(ctx));
561 this.logger.info(_scope, 'finished', { ...ctx });
562 }
563
564
565 /**
566 * GET request for authorized /admin information.
567 * @param {http.ServerResponse} res
568 * @param {object} ctx
569 */
570 async getAdminOverview(res, ctx) {
571 const _scope = _fileScope('getAdminOverview');
572 this.logger.debug(_scope, 'called', { ctx });
573
574 await this.db.context(async (dbCtx) => {
575 ctx.topics = await this.db.topicGetAll(dbCtx);
576 });
577 this.logger.debug(_scope, 'got topics', { topics: ctx.topics });
578
579 res.end(Template.adminOverviewHTML(ctx, this.options));
580 this.logger.info(_scope, 'finished', { ...ctx, topics: ctx.topics.length })
581 }
582
583
584 /**
585 * GET request for authorized /admin/topic/:topicId information.
586 * @param {http.ServerResponse} res
587 * @param {object} ctx
588 */
589 async getTopicDetails(res, ctx) {
590 const _scope = _fileScope('getTopicDetails');
591 this.logger.debug(_scope, 'called', { ctx });
592
593 const topicId = ctx.params.topicId;
594 await this.db.context(async (dbCtx) => {
595 ctx.topic = await this.db.topicGetById(dbCtx, topicId);
596 ctx.subscriptions = await this.db.subscriptionsByTopicId(dbCtx, topicId);
597 });
598 this.logger.debug(_scope, 'got topic details', { topic: ctx.topic, subscriptions: ctx.subscriptions });
599
600 res.end(Template.adminTopicDetailsHTML(ctx, this.options));
601 this.logger.info(_scope, 'finished', { ...ctx, subscriptions: ctx.subscriptions.length, topic: ctx.topic.id });
602 }
603
604
605 /**
606 * PATCH and DELETE for updating topic data.
607 * @param {http.ServerResponse} res
608 * @param {Object} ctx
609 */
610 async updateTopic(res, ctx) {
611 const _scope = _fileScope('updateTopic');
612 this.logger.debug(_scope, 'called', { ctx });
613
614 const topicId = ctx.params.topicId;
615
616 await this.db.context(async (dbCtx) => {
617 await this.db.transaction(dbCtx, async (txCtx) => {
618 // Get topic without defaults filled in, to persist nulls
619 const topic = await this.db.topicGetById(txCtx, topicId, false);
620 if (!topic) {
621 this.logger.debug(_scope, 'no topic', { ctx });
622 throw new Errors.ResponseError(Enum.ErrorResponse.NotFound);
623 }
624
625 if (ctx.method === 'DELETE') {
626 await this.db.topicDeleted(txCtx, topicId);
627 res.end();
628 this.logger.info(_scope, 'topic set deleted', { ctx, topicId });
629 // Attempt to remove from db if no active subscriptions.
630 await this.db.topicPendingDelete(txCtx, topicId);
631 return;
632 }
633
634 const updatableFields = [
635 'leaseSecondsPreferred',
636 'leaseSecondsMin',
637 'leaseSecondsMax',
638 'publisherValidationUrl',
639 'contentHashAlgorithm',
640 ];
641
642 const patchValues = common.pick({
643 ...ctx.queryParams,
644 ...ctx.parsedBody,
645 }, updatableFields);
646
647 [
648 'leaseSecondsPreferred',
649 'leaseSecondsMin',
650 'leaseSecondsMax',
651 ].filter((field) => field in patchValues).forEach((field) => {
652 // eslint-disable-next-line security/detect-object-injection
653 patchValues[field] = parseInt(patchValues[field], 10);
654 });
655
656 const patchKeys = Object.keys(patchValues);
657 if (patchKeys.length === 0
658 // eslint-disable-next-line security/detect-object-injection
659 || patchKeys.every((k) => patchValues[k] == topic[k])) {
660 res.statusCode = 204;
661 res.end();
662 this.logger.info(_scope, 'empty topic update', { ctx, topicId });
663 return;
664 }
665 const patchedTopic = {
666 ...topic,
667 ...patchValues,
668 };
669
670 this.logger.debug(_scope, 'data', { topic, patchValues, patchedTopic });
671
672 try {
673 await this.db.topicUpdate(txCtx, { topicId, ...patchedTopic });
674 } catch (e) {
675 if (e instanceof DBErrors.DataValidation) {
676 this.logger.debug(_scope, 'validation error', { error: e, ctx, topicId });
677 throw new Errors.ResponseError(Enum.ErrorResponse.BadRequest, e.message);
678 }
679 this.logger.error(_scope, 'failed', { error: e, ctx, topicId });
680 throw e;
681 }
682 res.end();
683 this.logger.info(_scope, 'topic updated', { ctx, topicId, patchValues });
684 }); // transaction
685 }); // context
686 }
687
688
689 /**
690 * PATCH and DELETE for updating subscription data.
691 * @param {http.ServerResponse} res
692 * @param {Object} ctx
693 */
694 async updateSubscription(res, ctx) {
695 const _scope = _fileScope('updateSubscription');
696 this.logger.debug(_scope, 'called', { ctx });
697
698 const subscriptionId = ctx.params.subscriptionId;
699
700 await this.db.context(async (dbCtx) => {
701 await this.db.transaction(dbCtx, async (txCtx) => {
702 const subscription = await this.db.subscriptionGetById(txCtx, subscriptionId);
703 if (!subscription) {
704 this.logger.debug(_scope, 'no subscription', { ctx });
705 throw new Errors.ResponseError(Enum.ErrorResponse.NotFound);
706 }
707
708 if (ctx.method === 'DELETE') {
709 const deleteFields = common.pick({
710 ...ctx.queryParams,
711 ...ctx.parsedBody,
712 }, ['reason']);
713
714 // Queue an unsubscription.
715 const verification = {
716 topicId: subscription.topicId,
717 callback: subscription.callback,
718 mode: Enum.Mode.Denied,
719 reason: 'subscription removed by administrative action',
720 isPublisherValidated: true,
721 requestId: ctx.requestId,
722 ...deleteFields,
723 };
724
725 await this.db.verificationInsert(txCtx, verification);
726 this.logger.info(_scope, 'subscription removal initiated', { ctx, verification });
727 res.end();
728 return;
729 }
730
731 const updatableFields = [
732 'signatureAlgorithm',
733 ];
734
735 const patchValues = common.pick({
736 ...ctx.queryParams,
737 ...ctx.parsedBody,
738 }, updatableFields);
739
740 const patchKeys = Object.keys(patchValues);
741 if (patchKeys.length === 0
742 // eslint-disable-next-line security/detect-object-injection
743 || patchKeys.every((k) => patchValues[k] == subscription[k])) {
744 res.statusCode = 204;
745 res.end();
746 return;
747 }
748 const patchedSubscription = {
749 ...subscription,
750 ...patchValues,
751 };
752
753 try {
754 await this.db.subscriptionUpdate(txCtx, { subscriptionId, ...patchedSubscription });
755 } catch (e) {
756 if (e instanceof DBErrors.DataValidation) {
757 this.logger.debug(_scope, 'validation error', { error: e, ctx, subscriptionId });
758 throw new Errors.ResponseError(Enum.ErrorResponse.BadRequest, e.message);
759 }
760 this.logger.info(_scope, 'failed', { error: e, ctx, subscriptionId });
761 throw e;
762 }
763 res.end();
764 this.logger.info(_scope, 'subscription updated', { ctx, subscriptionId, patchValues });
765 }); // transaction
766 }); // context
767 }
768
769 /**
770 * POST request for manually running worker.
771 * @param {http.ServerResponse} res
772 * @param {object} ctx
773 */
774 async processTasks(res, ctx) {
775 const _scope = _fileScope('getTopicDetails');
776 this.logger.debug(_scope, 'called', { ctx });
777
778 // N.B. no await on this
779 this.communication.worker.process().catch((e) => {
780 this.logger.error(_scope, 'failed', { error: e, ctx });
781 });
782
783 res.end();
784 this.logger.info(_scope, 'invoked worker process', { ctx });
785 }
786
787 }
788
789 module.exports = Manager;