root html is no longer invariant, serve it normally
[websub-hub] / src / manager.js
1 'use strict';
2
3 /**
4 * Here we process all the incoming requests.
5 * Payload validation occurs here, before committing the pending work to the
6 * database and (usually) calling a processor to act upon it.
7 *
8 * As this is the mediator between server framework and actions, this would
9 * be where most of a rewrite for a new server framework would happen.
10 */
11
12 const common = require('./common');
13 const Communication = require('./communication');
14 const Enum = require('./enum');
15 const Errors = require('./errors');
16 const DBErrors = require('./db/errors');
17 const { ResponseError } = require('./errors');
18 const Template = require('./template');
19
20 const _fileScope = common.fileScope(__filename);
21
22 class Manager {
23 constructor(logger, db, options) {
24 this.logger = logger;
25 this.db = db;
26 this.options = options;
27 this.communication = new Communication(logger, db, options);
28 }
29
30
31 /**
32 * GET request for healthcheck.
33 * @param {http.ServerResponse} res
34 * @param {object} ctx
35 */
36 async getHealthcheck(res, ctx) {
37 const _scope = _fileScope('getHealthcheck');
38 const health = 'happy';
39
40 // What else could we check...
41 const dbHealth = await this.db.healthCheck();
42 this.logger.debug(_scope, 'called', { health, dbHealth, ctx });
43 res.end(health);
44 }
45
46
47 /**
48 * GET request for root.
49 * @param {http.ClientRequest} req
50 * @param {http.ServerResponse} res
51 * @param {object} ctx
52 */
53 async getRoot(req, res, ctx) {
54 const _scope = _fileScope('getRoot');
55 this.logger.debug(_scope, 'called', { ctx });
56
57 const content = Template.rootHTML(ctx, this.options);
58 res.end(content);
59 this.logger.info(_scope, 'finished', { ctx });
60 }
61
62
63 /** All the fields the root handler deals with.
64 * @typedef {object} RootData
65 * @property {string} callback - url
66 * @property {string} mode
67 * @property {string} topic
68 * @property {number} topicId
69 * @property {string} leaseSeconds
70 * @property {string} secret
71 * @property {string} httpRemoteAddr
72 * @property {string} httpFrom
73 * @property {boolean} isSecure
74 * @property {boolean} isPublisherValidated
75 */
76
77 /**
78 * Extract api parameters.
79 * @param {http.ClientRequest} req
80 * @param {Object} ctx
81 * @returns {RootData}
82 */
83 static _getRootData(req, ctx) {
84 const postData = ctx.parsedBody;
85 const mode = (postData['hub.mode'] || '').toLowerCase();
86 return {
87 callback: postData['hub.callback'],
88 mode,
89 ...(mode === Enum.Mode.Publish && { url: postData['hub.url'] }), // Publish accepts either hub.url or hub.topic
90 topic: postData['hub.topic'],
91 ...(postData['hub.lease_seconds'] && { leaseSeconds: parseInt(postData['hub.lease_seconds'], 10) }),
92 secret: postData['hub.secret'],
93 httpRemoteAddr: ctx.clientAddress,
94 httpFrom: req.getHeader(Enum.Header.From),
95 isSecure: ((ctx.clientProtocol || '').toLowerCase() === 'https'),
96 isPublisherValidated: true, // Default to true. Will be set to false later, if topic has publisher validation url.
97 };
98 }
99
100
101 /**
102 *
103 * @param {*} dbCtx
104 * @param {RootData} data
105 * @param {String[]} warn
106 * @param {String[]} err
107 * @param {String} requestId
108 */
109 async _validateRootData(dbCtx, data, warn, err, requestId) {
110 // These checks can modify data, so order matters.
111 await this._checkTopic(dbCtx, data, warn, err, requestId);
112 this._checkCallbackAndSecrets(data, warn, err, requestId);
113 await this._checkMode(dbCtx, data, warn, err, requestId);
114 }
115
116
117 /**
118 * Check that requested topic exists and values are in range.
119 * Sets topic id, publisher validation state, and requested lease
120 * seconds on data.
121 * @param {*} dbCtx
122 * @param {RootData} data
123 * @param {String[]} warn
124 * @param {String[]} err
125 */
126 async _checkTopic(dbCtx, data, warn, err, requestId) {
127 const _scope = _fileScope('_checkTopic');
128 let topic;
129
130 if (data.topic) {
131 topic = await this.db.topicGetByUrl(dbCtx, data.topic);
132
133 if (!topic && this._newTopicCreationAllowed()) {
134 this.logger.info(_scope, 'new topic from subscribe request', { data, requestId });
135
136 try {
137 new URL(data.topic);
138 } catch (e) {
139 err.push('invalid topic url (failed to parse url)');
140 return;
141 }
142
143 await this.db.topicSet(dbCtx, {
144 url: data.topic,
145 });
146 topic = await this.db.topicGetByUrl(dbCtx, data.topic);
147 }
148 }
149
150 if (!topic || topic.isDeleted) {
151 err.push('not a supported topic');
152 return;
153 }
154
155 data.topicId = topic.id;
156
157 if (data.leaseSeconds === undefined || isNaN(data.leaseSeconds)) {
158 data.leaseSeconds = topic.leaseSecondsPreferred;
159 } else {
160 if (data.leaseSeconds > topic.leaseSecondsMax) {
161 data.leaseSeconds = topic.leaseSecondsMax;
162 warn.push(`requested lease too long, using ${data.leaseSeconds}`);
163 } else if (data.leaseSeconds < topic.leaseSecondsMin) {
164 data.leaseSeconds = topic.leaseSecondsMin;
165 warn.push(`requested lease too short, using ${data.leaseSeconds}`);
166 }
167 }
168
169 if (topic.publisherValidationUrl) {
170 data.isPublisherValidated = false;
171 }
172 }
173
174
175 /**
176 * Check data for valid callback url and scheme constraints.
177 * @param {RootData} data
178 * @param {String[]} warn
179 * @param {String[]} err
180 */
181 _checkCallbackAndSecrets(data, warn, err) {
182 let isCallbackSecure = false;
183
184 if (!data.callback) {
185 err.push('invalid callback url (empty)');
186 } else {
187 try {
188 const c = new URL(data.callback);
189 isCallbackSecure = (c.protocol.toLowerCase() === 'https:'); // Colon included because url module is weird
190 } catch (e) {
191 err.push('invalid callback url (failed to parse url');
192 return;
193 }
194 }
195
196 if (!isCallbackSecure) {
197 warn.push('insecure callback');
198 }
199
200 if (data.secret) {
201 const secretSeverity = this.options.manager.strictSecrets ? err : warn;
202 if (!data.isSecure) {
203 secretSeverity.push('secret not safe (insecure hub)');
204 }
205 if (!isCallbackSecure) {
206 secretSeverity.push('secret not safe (insecure callback)');
207 }
208 if (data.secret.length > 199) {
209 err.push('cannot keep a secret that big');
210 }
211 }
212 }
213
214 /**
215 * Check mode validity and subscription requirements.
216 * Publish mode is handled elsewhere in the flow.
217 * @param {*} dbCtx
218 * @param {RootData} data
219 * @param {String[]} warn
220 * @param {String[]} err
221 * @param {String} requestId
222 */
223 async _checkMode(dbCtx, data, warn, err) {
224 switch (data.mode) {
225 case Enum.Mode.Subscribe:
226 break;
227
228 case Enum.Mode.Unsubscribe: {
229 const currentEpoch = Date.now() / 1000;
230 let s;
231 if (data.callback && data.topicId) {
232 s = await this.db.subscriptionGet(dbCtx, data.callback, data.topicId);
233 }
234 if (s === undefined) {
235 err.push('not subscribed');
236 } else {
237 if (s.expires < currentEpoch) {
238 err.push('subscription already expired');
239 }
240 }
241 break;
242 }
243
244 default: {
245 err.push('invalid mode');
246 }
247 }
248 }
249
250
251 /**
252 * Determine if a topic url is allowed to be created.
253 * In the future, this may be more complicated.
254 * @returns {Boolean}
255 */
256 _newTopicCreationAllowed() {
257 return this.options.manager.publicHub;
258 }
259
260
261 /**
262 * Check that a publish request's topic(s) are valid and exist,
263 * returning an array with the results for each.
264 * For a public-hub publish request, creates topics if they do not exist.
265 * @param {*} dbCtx
266 * @param {RootData} data
267 * @param {String[]} warn
268 * @param {String[]} err
269 * @param {String} requestId
270 */
271 async _publishTopics(dbCtx, data, requestId) {
272 const _scope = _fileScope('_checkPublish');
273
274 // Publish requests may include multiple topics, consider them all, but deduplicate.
275 const publishUrls = Array.from(new Set([
276 ...common.ensureArray(data.url),
277 ...common.ensureArray(data.topic),
278 ]));
279
280 // Map the requested topics to their ids, creating if necessary.
281 return Promise.all(publishUrls.map(async (url) => {
282 const result = {
283 url,
284 warn: [],
285 err: [],
286 topicId: undefined,
287 };
288 let topic = await this.db.topicGetByUrl(dbCtx, url);
289 if (!topic && this._newTopicCreationAllowed()) {
290 try {
291 new URL(url);
292 } catch (e) {
293 result.err.push('invalid topic url (failed to parse url)');
294 return result;
295 }
296 await this.db.topicSet(dbCtx, {
297 // TODO: accept a publisherValidationUrl parameter
298 url,
299 });
300 topic = await this.db.topicGetByUrl(dbCtx, url);
301 this.logger.info(_scope, 'new topic from publish request', { url, requestId });
302 }
303 if (!topic || topic.isDeleted) {
304 result.err.push('topic not supported');
305 return result;
306 }
307 result.topicId = topic.id;
308 return result;
309 }));
310 }
311
312
313 /**
314 * Render response for multi-topic publish requests.
315 * @param {Object[]} publishTopics
316 */
317 static multiPublishContent(ctx, publishTopics) {
318 const responses = publishTopics.map((topic) => ({
319 href: topic.url,
320 status: topic.status,
321 statusMessage: topic.statusMessage,
322 errors: topic.err,
323 warnings: topic.warn,
324 }));
325 switch (ctx.responseType) {
326 case Enum.ContentType.ApplicationJson:
327 return JSON.stringify(responses);
328
329 case Enum.ContentType.TextPlain:
330 default: {
331 const textResponses = responses.map((response) => {
332 const details = Manager._prettyDetails(response.errors, response.warnings);
333 const textDetails = (details.length ? '\n' : '') + details.map((d) => `\t${d}`).join('\n');
334 return `${response.href} [${response.status} ${response.statusMessage}]${textDetails}`;
335 });
336 return textResponses.join('\n----\n');
337 }
338 }
339 }
340
341
342 /**
343 * Process a publish request.
344 * @param {*} dbCtx
345 * @param {Object} data
346 * @param {http.ServerResponse} res
347 * @param {Object} ctx
348 */
349 async _publishRequest(dbCtx, data, res, ctx) {
350 const _scope = _fileScope('_parsePublish');
351 this.logger.debug(_scope, 'called', { data });
352
353 const requestId = ctx.requestId;
354
355 // Parse and validate all the topics in the request.
356 data.publishTopics = await this._publishTopics(dbCtx, data, requestId);
357 if (!data.publishTopics || !data.publishTopics.length) {
358 const details = Manager._prettyDetails(['no valid topic urls to publish'], []);
359 throw new ResponseError(Enum.ErrorResponse.BadRequest, details);
360 }
361
362 // Set status per topic
363 for (const topicResult of data.publishTopics) {
364 topicResult.status = topicResult.err.length ? 400 : 202;
365 topicResult.statusMessage = topicResult.err.length ? 'Bad Request' : 'Accepted';
366 }
367
368 // Process the valid publish notifications
369 const validPublishTopics = data.publishTopics.filter((topicResult) => !topicResult.err.length);
370 try {
371 await Promise.all(validPublishTopics.map(async (topicResult) => this.db.topicFetchRequested(dbCtx, topicResult.topicId)));
372 } catch (e) {
373 this.logger.error(_scope, 'topicFetchRequest failed', { error: e, ctx, data, requestId });
374 throw e;
375 }
376
377 this.logger.info(_scope, 'request accepted', { ctx, data, requestId });
378
379 if (data.publishTopics.length === 1) {
380 const soleTopic = data.publishTopics[0];
381 res.statusCode = soleTopic.status;
382 res.end(Manager._prettyDetails(soleTopic.err, soleTopic.warn).join('\n'));
383 } else {
384 res.statusCode = 207;
385 res.end(Manager.multiPublishContent(ctx, data.publishTopics));
386 }
387
388 if (this.options.manager.processImmediately
389 && validPublishTopics.length) {
390 try {
391 await Promise.all(validPublishTopics.map(async (topicResult) => this.communication.topicFetchClaimAndProcessById(dbCtx, topicResult.topicId, requestId)));
392 } catch (e) {
393 this.logger.error(_scope, 'topicFetchClaimAndProcessById failed', { data, validPublishTopics, requestId });
394 // Don't bother re-throwing, as we've already ended this response.
395 }
396 }
397 }
398
399
400 /**
401 * Annotate any encountered issues.
402 * @param {String[]} err
403 * @param {String[]} warn
404 * @returns {String[]}
405 */
406 static _prettyDetails(err, warn) {
407 return [
408 ...err.map((entry) => `error: ${entry}`),
409 ...warn.map((entry) => `warning: ${entry}`),
410 ];
411 }
412
413
414 /**
415 * POST request for root.
416 * @param {http.ClientRequest} req
417 * @param {http.ServerResponse} res
418 * @param {object} ctx
419 */
420 async postRoot(req, res, ctx) {
421 const _scope = _fileScope('postRoot');
422 this.logger.debug(_scope, 'called', { ctx });
423
424 res.statusCode = 202; // Presume success.
425
426 const warn = [];
427 const err = [];
428 const data = Manager._getRootData(req, ctx);
429 const requestId = ctx.requestId;
430
431 await this.db.context(async (dbCtx) => {
432
433 // Handle publish requests elsewhere
434 if (data.mode === Enum.Mode.Publish) {
435 return this._publishRequest(dbCtx, data, res, ctx);
436 }
437
438 await this._validateRootData(dbCtx, data, warn, err, requestId);
439
440 const details = Manager._prettyDetails(err, warn);
441
442 // Any errors are fatal. Stop and report anything that went wrong.
443 if (err.length) {
444 this.logger.debug(_scope, { msg: 'invalid request', data, err, warn, requestId });
445 throw new ResponseError(Enum.ErrorResponse.BadRequest, details);
446 }
447
448 // Commit the request for later processing.
449 let id;
450 try {
451 id = await this.db.verificationInsert(dbCtx, { ...data, requestId });
452 } catch (e) {
453 this.logger.error(_scope, 'verificationInsert failed', { error: e, data, warn, id, requestId });
454 throw e;
455 }
456
457 // If we committed to the db, we've succeeded as far as the client is concerned.
458 res.end(details.join('\n'));
459 this.logger.info(_scope, 'request accepted', { data, warn, requestId });
460
461 // Immediately attempt to claim and process the request.
462 if (this.options.manager.processImmediately
463 && id) {
464 try {
465 await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId);
466 } catch (e) {
467 this.logger.error(_scope, 'verificationClaimAndProcessById failed', { ...data, id, requestId });
468 // Don't bother re-throwing, as we've already ended this response.
469 }
470 }
471 }); // dbCtx
472 }
473
474
475 /**
476 * Render topic info content.
477 * @param {Object} ctx
478 * @param {String} ctx.responseType
479 * @param {String} ctx.topicUrl
480 * @param {Number} ctx.count
481 * @returns {String}
482 */
483 // eslint-disable-next-line class-methods-use-this
484 infoContent(ctx) {
485 // eslint-disable-next-line sonarjs/no-small-switch
486 switch (ctx.responseType) {
487 case Enum.ContentType.ApplicationJson:
488 return JSON.stringify({
489 topic: ctx.topicUrl,
490 count: ctx.count,
491 });
492
493 case Enum.ContentType.ImageSVG:
494 return Template.badgeSVG({}, ` ${ctx.topicUrl} `, ` ${ctx.count} subscribers `, `${ctx.topicUrl} has ${ctx.count} subscribers.`);
495
496 default:
497 return ctx.count.toString();
498 }
499 }
500
501
502 /**
503 * GET request for /info?topic=url&format=type
504 * @param {http.ServerResponse} res
505 * @param {object} ctx
506 */
507 async getInfo(res, ctx) {
508 const _scope = _fileScope('getInfo');
509 this.logger.debug(_scope, 'called', { ctx });
510
511 if (!ctx.queryParams.topic) {
512 throw new ResponseError(Enum.ErrorResponse.BadRequest, 'missing required parameter');
513 }
514 ctx.topicUrl = ctx.queryParams.topic;
515
516 switch ((ctx.queryParams.format || '').toLowerCase()) {
517 case 'svg':
518 ctx.responseType = Enum.ContentType.ImageSVG;
519 res.setHeader(Enum.Header.ContentType, ctx.responseType);
520 break;
521
522 case 'json':
523 ctx.responseType = Enum.ContentType.ApplicationJson;
524 res.setHeader(Enum.Header.ContentType, ctx.responseType);
525 break;
526
527 default:
528 break;
529 }
530
531 try {
532 new URL(ctx.topicUrl);
533 } catch (e) {
534 throw new ResponseError(Enum.ErrorResponse.BadRequest, 'invalid topic');
535 }
536
537 let count;
538 await this.db.context(async (dbCtx) => {
539 count = await this.db.subscriptionCountByTopicUrl(dbCtx, ctx.topicUrl);
540 if (!count) {
541 throw new ResponseError(Enum.ErrorResponse.NotFound, 'no such topic');
542 }
543 ctx.count = count.count;
544 });
545
546 res.end(this.infoContent(ctx));
547 this.logger.info(_scope, 'finished', { ...ctx });
548 }
549
550
551 /**
552 * GET request for authorized /admin information.
553 * @param {http.ServerResponse} res
554 * @param {object} ctx
555 */
556 async getAdminOverview(res, ctx) {
557 const _scope = _fileScope('getAdminOverview');
558 this.logger.debug(_scope, 'called', { ctx });
559
560 await this.db.context(async (dbCtx) => {
561 ctx.topics = await this.db.topicGetAll(dbCtx);
562 });
563 this.logger.debug(_scope, 'got topics', { topics: ctx.topics });
564
565 // Profile users can only see related topics.
566 if (ctx.session && ctx.session.authenticatedProfile) {
567 const profileUrlObj = new URL(ctx.session.authenticatedProfile);
568 ctx.topics = ctx.topics.filter((topic) => {
569 const topicUrlObj = new URL(topic.url);
570 return (topicUrlObj.hostname === profileUrlObj.hostname);
571 });
572 }
573
574 res.end(Template.adminOverviewHTML(ctx, this.options));
575 this.logger.info(_scope, 'finished', { ...ctx, topics: ctx.topics.length })
576 }
577
578
579 /**
580 * GET request for authorized /admin/topic/:topicId information.
581 * @param {http.ServerResponse} res
582 * @param {object} ctx
583 */
584 async getTopicDetails(res, ctx) {
585 const _scope = _fileScope('getTopicDetails');
586 this.logger.debug(_scope, 'called', { ctx });
587
588 const topicId = ctx.params.topicId;
589 await this.db.context(async (dbCtx) => {
590 ctx.topic = await this.db.topicGetById(dbCtx, topicId);
591 ctx.subscriptions = await this.db.subscriptionsByTopicId(dbCtx, topicId);
592 });
593 this.logger.debug(_scope, 'got topic details', { topic: ctx.topic, subscriptions: ctx.subscriptions });
594
595 // Profile users can only see related topics.
596 if (ctx.session && ctx.session.authenticatedProfile) {
597 const profileUrlObj = new URL(ctx.session.authenticatedProfile);
598 const topicUrlObj = new URL(ctx.topic.url);
599 if (topicUrlObj.hostname !== profileUrlObj.hostname) {
600 ctx.topic = null;
601 ctx.subscriptions = [];
602 }
603 }
604
605 res.end(Template.adminTopicDetailsHTML(ctx, this.options));
606 this.logger.info(_scope, 'finished', { ...ctx, subscriptions: ctx.subscriptions.length, topic: ctx.topic && ctx.topic.id || ctx.topic });
607 }
608
609
610 /**
611 * PATCH and DELETE for updating topic data.
612 * @param {http.ServerResponse} res
613 * @param {Object} ctx
614 */
615 async updateTopic(res, ctx) {
616 const _scope = _fileScope('updateTopic');
617 this.logger.debug(_scope, 'called', { ctx });
618
619 const topicId = ctx.params.topicId;
620
621 await this.db.context(async (dbCtx) => {
622 await this.db.transaction(dbCtx, async (txCtx) => {
623 // Get topic without defaults filled in, to persist nulls
624 const topic = await this.db.topicGetById(txCtx, topicId, false);
625 if (!topic) {
626 this.logger.debug(_scope, 'no topic', { ctx });
627 throw new Errors.ResponseError(Enum.ErrorResponse.NotFound);
628 }
629
630 if (ctx.method === 'DELETE') {
631 await this.db.topicDeleted(txCtx, topicId);
632 res.end();
633 this.logger.info(_scope, 'topic set deleted', { ctx, topicId });
634 // Attempt to remove from db if no active subscriptions.
635 await this.db.topicPendingDelete(txCtx, topicId);
636 return;
637 }
638
639 const updatableFields = [
640 'leaseSecondsPreferred',
641 'leaseSecondsMin',
642 'leaseSecondsMax',
643 'publisherValidationUrl',
644 'contentHashAlgorithm',
645 ];
646
647 const patchValues = common.pick({
648 ...ctx.queryParams,
649 ...ctx.parsedBody,
650 }, updatableFields);
651
652 [
653 'leaseSecondsPreferred',
654 'leaseSecondsMin',
655 'leaseSecondsMax',
656 ].filter((field) => field in patchValues).forEach((field) => {
657 // eslint-disable-next-line security/detect-object-injection
658 patchValues[field] = parseInt(patchValues[field], 10);
659 });
660
661 const patchKeys = Object.keys(patchValues);
662 if (patchKeys.length === 0
663 // eslint-disable-next-line security/detect-object-injection
664 || patchKeys.every((k) => patchValues[k] == topic[k])) {
665 res.statusCode = 204;
666 res.end();
667 this.logger.info(_scope, 'empty topic update', { ctx, topicId });
668 return;
669 }
670 const patchedTopic = {
671 ...topic,
672 ...patchValues,
673 };
674
675 this.logger.debug(_scope, 'data', { topic, patchValues, patchedTopic });
676
677 try {
678 await this.db.topicUpdate(txCtx, { topicId, ...patchedTopic });
679 } catch (e) {
680 if (e instanceof DBErrors.DataValidation) {
681 this.logger.debug(_scope, 'validation error', { error: e, ctx, topicId });
682 throw new Errors.ResponseError(Enum.ErrorResponse.BadRequest, e.message);
683 }
684 this.logger.error(_scope, 'failed', { error: e, ctx, topicId });
685 throw e;
686 }
687 res.end();
688 this.logger.info(_scope, 'topic updated', { ctx, topicId, patchValues });
689 }); // transaction
690 }); // context
691 }
692
693
694 /**
695 * PATCH and DELETE for updating subscription data.
696 * @param {http.ServerResponse} res
697 * @param {Object} ctx
698 */
699 async updateSubscription(res, ctx) {
700 const _scope = _fileScope('updateSubscription');
701 this.logger.debug(_scope, 'called', { ctx });
702
703 const subscriptionId = ctx.params.subscriptionId;
704
705 await this.db.context(async (dbCtx) => {
706 await this.db.transaction(dbCtx, async (txCtx) => {
707 const subscription = await this.db.subscriptionGetById(txCtx, subscriptionId);
708 if (!subscription) {
709 this.logger.debug(_scope, 'no subscription', { ctx });
710 throw new Errors.ResponseError(Enum.ErrorResponse.NotFound);
711 }
712
713 if (ctx.method === 'DELETE') {
714 const deleteFields = common.pick({
715 ...ctx.queryParams,
716 ...ctx.parsedBody,
717 }, ['reason']);
718
719 // Queue an unsubscription.
720 const verification = {
721 topicId: subscription.topicId,
722 callback: subscription.callback,
723 mode: Enum.Mode.Denied,
724 reason: 'subscription removed by administrative action',
725 isPublisherValidated: true,
726 requestId: ctx.requestId,
727 ...deleteFields,
728 };
729
730 await this.db.verificationInsert(txCtx, verification);
731 this.logger.info(_scope, 'subscription removal initiated', { ctx, verification });
732 res.end();
733 return;
734 }
735
736 const updatableFields = [
737 'signatureAlgorithm',
738 ];
739
740 const patchValues = common.pick({
741 ...ctx.queryParams,
742 ...ctx.parsedBody,
743 }, updatableFields);
744
745 const patchKeys = Object.keys(patchValues);
746 if (patchKeys.length === 0
747 // eslint-disable-next-line security/detect-object-injection
748 || patchKeys.every((k) => patchValues[k] == subscription[k])) {
749 res.statusCode = 204;
750 res.end();
751 return;
752 }
753 const patchedSubscription = {
754 ...subscription,
755 ...patchValues,
756 };
757
758 try {
759 await this.db.subscriptionUpdate(txCtx, { subscriptionId, ...patchedSubscription });
760 } catch (e) {
761 if (e instanceof DBErrors.DataValidation) {
762 this.logger.debug(_scope, 'validation error', { error: e, ctx, subscriptionId });
763 throw new Errors.ResponseError(Enum.ErrorResponse.BadRequest, e.message);
764 }
765 this.logger.info(_scope, 'failed', { error: e, ctx, subscriptionId });
766 throw e;
767 }
768 res.end();
769 this.logger.info(_scope, 'subscription updated', { ctx, subscriptionId, patchValues });
770 }); // transaction
771 }); // context
772 }
773
774 /**
775 * POST request for manually running worker.
776 * @param {http.ServerResponse} res
777 * @param {object} ctx
778 */
779 async processTasks(res, ctx) {
780 const _scope = _fileScope('processTasks');
781 this.logger.debug(_scope, 'called', { ctx });
782
783 // N.B. no await on this
784 this.communication.worker.process().catch((e) => {
785 this.logger.error(_scope, 'failed', { error: e, ctx });
786 });
787
788 res.end();
789 this.logger.info(_scope, 'invoked worker process', { ctx });
790 }
791
792 }
793
794 module.exports = Manager;