Merge branch 'v1.3-dev' as v1.3.11
[websub-hub] / src / manager.js
1 'use strict';
2
3 /**
4 * Here we process all the incoming requests.
5 * Payload validation occurs here, before committing the pending work to the
6 * database and (usually) calling a processor to act upon it.
7 *
8 * As this is the mediator between server framework and actions, this would
9 * be where most of a rewrite for a new server framework would happen.
10 */
11
12 const common = require('./common');
13 const Communication = require('./communication');
14 const Enum = require('./enum');
15 const Errors = require('./errors');
16 const DBErrors = require('./db/errors');
17 const { ResponseError } = require('./errors');
18 const Template = require('./template');
19
20 const _fileScope = common.fileScope(__filename);
21
22 class Manager {
23 constructor(logger, db, options) {
24 this.logger = logger;
25 this.db = db;
26 this.options = options;
27 this.communication = new Communication(logger, db, options);
28 }
29
30
31 /**
32 * GET request for healthcheck.
33 * @param {http.ServerResponse} res
34 * @param {object} ctx
35 */
36 async getHealthcheck(res, ctx) {
37 const _scope = _fileScope('getHealthcheck');
38 const health = 'happy';
39
40 // What else could we check...
41 const dbHealth = await this.db.healthCheck();
42 this.logger.debug(_scope, 'called', { health, dbHealth, ctx });
43 res.end(health);
44 }
45
46
47 /**
48 * GET request for root.
49 * @param {http.ClientRequest} req
50 * @param {http.ServerResponse} res
51 * @param {object} ctx
52 */
53 async getRoot(req, res, ctx) {
54 const _scope = _fileScope('getRoot');
55 this.logger.debug(_scope, 'called', { ctx });
56
57 const content = Template.rootHTML(ctx, this.options);
58 res.end(content);
59 this.logger.info(_scope, 'finished', { ctx });
60 }
61
62
63 /** All the fields the root handler deals with.
64 * @typedef {object} RootData
65 * @property {string} callback - url
66 * @property {string} mode
67 * @property {string} topic
68 * @property {number} topicId
69 * @property {string} leaseSeconds
70 * @property {string} secret
71 * @property {string} httpRemoteAddr
72 * @property {string} httpFrom
73 * @property {boolean} isSecure
74 * @property {boolean} isPublisherValidated
75 */
76
77 /**
78 * Extract api parameters.
79 * @param {http.ClientRequest} req
80 * @param {Object} ctx
81 * @returns {RootData}
82 */
83 static _getRootData(req, ctx) {
84 const postData = ctx.parsedBody;
85 const mode = (postData['hub.mode'] || '').toLowerCase();
86 return {
87 callback: postData['hub.callback'],
88 mode,
89 ...(mode === Enum.Mode.Publish && { url: postData['hub.url'] }), // Publish accepts either hub.url or hub.topic
90 topic: postData['hub.topic'],
91 ...(postData['hub.lease_seconds'] && { leaseSeconds: parseInt(postData['hub.lease_seconds'], 10) }),
92 secret: postData['hub.secret'],
93 httpRemoteAddr: ctx.clientAddress,
94 httpFrom: req.getHeader(Enum.Header.From),
95 isSecure: ((ctx.clientProtocol || '').toLowerCase() === 'https'),
96 isPublisherValidated: true, // Default to true. Will be set to false later, if topic has publisher validation url.
97 };
98 }
99
100
101 /**
102 *
103 * @param {*} dbCtx
104 * @param {RootData} data
105 * @param {String[]} warn
106 * @param {String[]} err
107 * @param {String} requestId
108 */
109 async _validateRootData(dbCtx, data, warn, err, requestId) {
110 // These checks can modify data, so order matters.
111 await this._checkTopic(dbCtx, data, warn, err, requestId);
112 this._checkCallbackAndSecrets(data, warn, err, requestId);
113 await this._checkMode(dbCtx, data, warn, err, requestId);
114 }
115
116
117 /**
118 * Check that requested topic exists and values are in range.
119 * Sets topic id, publisher validation state, and requested lease
120 * seconds on data.
121 * @param {*} dbCtx
122 * @param {RootData} data
123 * @param {String[]} warn
124 * @param {String[]} err
125 */
126 async _checkTopic(dbCtx, data, warn, err, requestId) {
127 const _scope = _fileScope('_checkTopic');
128 let topic;
129
130 if (data.topic) {
131 topic = await this.db.topicGetByUrl(dbCtx, data.topic);
132
133 if (!topic && this._newTopicCreationAllowed()) {
134 this.logger.info(_scope, 'new topic from subscribe request', { data, requestId });
135
136 try {
137 new URL(data.topic);
138 } catch (e) {
139 err.push('invalid topic url (failed to parse url)');
140 return;
141 }
142
143 await this.db.topicSet(dbCtx, {
144 url: data.topic,
145 });
146 topic = await this.db.topicGetByUrl(dbCtx, data.topic);
147 }
148 }
149
150 if (!topic || topic.isDeleted) {
151 err.push('not a supported topic');
152 return;
153 }
154
155 data.topicId = topic.id;
156
157 if (data.leaseSeconds === undefined || isNaN(data.leaseSeconds)) {
158 data.leaseSeconds = topic.leaseSecondsPreferred;
159 } else {
160 if (data.leaseSeconds > topic.leaseSecondsMax) {
161 data.leaseSeconds = topic.leaseSecondsMax;
162 warn.push(`requested lease too long, using ${data.leaseSeconds}`);
163 } else if (data.leaseSeconds < topic.leaseSecondsMin) {
164 data.leaseSeconds = topic.leaseSecondsMin;
165 warn.push(`requested lease too short, using ${data.leaseSeconds}`);
166 }
167 }
168
169 if (topic.publisherValidationUrl) {
170 data.isPublisherValidated = false;
171 }
172 }
173
174
175 /**
176 * Check data for valid callback url and scheme constraints.
177 * @param {RootData} data
178 * @param {String[]} warn
179 * @param {String[]} err
180 */
181 _checkCallbackAndSecrets(data, warn, err) {
182 let isCallbackSecure = false;
183
184 if (!data.callback) {
185 err.push('invalid callback url (empty)');
186 } else {
187 try {
188 const c = new URL(data.callback);
189 isCallbackSecure = (c.protocol.toLowerCase() === 'https:'); // Colon included because url module is weird
190 } catch (e) {
191 err.push('invalid callback url (failed to parse url');
192 return;
193 }
194 }
195
196 if (!isCallbackSecure) {
197 warn.push('insecure callback');
198 }
199
200 if (data.secret) {
201 const secretSeverity = this.options.manager.strictSecrets ? err : warn;
202 if (!data.isSecure) {
203 secretSeverity.push('secret not safe (insecure hub)');
204 }
205 if (!isCallbackSecure) {
206 secretSeverity.push('secret not safe (insecure callback)');
207 }
208 if (data.secret.length > 199) {
209 err.push('cannot keep a secret that big');
210 }
211 }
212 }
213
214 /**
215 * Check mode validity and subscription requirements.
216 * Publish mode is handled elsewhere in the flow.
217 * @param {*} dbCtx
218 * @param {RootData} data
219 * @param {String[]} warn
220 * @param {String[]} err
221 * @param {String} requestId
222 */
223 async _checkMode(dbCtx, data, warn, err) {
224 switch (data.mode) {
225 case Enum.Mode.Subscribe:
226 break;
227
228 case Enum.Mode.Unsubscribe: {
229 const currentEpoch = Date.now() / 1000;
230 let s;
231 if (data.callback && data.topicId) {
232 s = await this.db.subscriptionGet(dbCtx, data.callback, data.topicId);
233 }
234 if (s === undefined) {
235 err.push('not subscribed');
236 } else {
237 if (s.expires < currentEpoch) {
238 err.push('subscription already expired');
239 }
240 }
241 break;
242 }
243
244 default: {
245 err.push('invalid mode');
246 }
247 }
248 }
249
250
251 /**
252 * Determine if a topic url is allowed to be created.
253 * In the future, this may be more complicated.
254 * @returns {Boolean}
255 */
256 _newTopicCreationAllowed() {
257 return this.options.manager.publicHub;
258 }
259
260
261 /**
262 * Check that a publish request's topic(s) are valid and exist,
263 * returning an array with the results for each.
264 * For a public-hub publish request, creates topics if they do not exist.
265 * @param {*} dbCtx
266 * @param {RootData} data
267 * @param {String[]} warn
268 * @param {String[]} err
269 * @param {String} requestId
270 */
271 async _publishTopics(dbCtx, data, requestId) {
272 const _scope = _fileScope('_checkPublish');
273
274 // Publish requests may include multiple topics, consider them all, but deduplicate.
275 const publishUrls = Array.from(new Set([
276 ...common.ensureArray(data.url),
277 ...common.ensureArray(data.topic),
278 ]));
279
280 // Map the requested topics to their ids, creating if necessary.
281 return Promise.all(publishUrls.map(async (url) => {
282 const result = {
283 url,
284 warn: [],
285 err: [],
286 topicId: undefined,
287 };
288 let topic = await this.db.topicGetByUrl(dbCtx, url);
289 if (!topic && this._newTopicCreationAllowed()) {
290 try {
291 new URL(url);
292 } catch (e) {
293 result.err.push('invalid topic url (failed to parse url)');
294 return result;
295 }
296 await this.db.topicSet(dbCtx, {
297 // TODO: accept a publisherValidationUrl parameter
298 url,
299 });
300 topic = await this.db.topicGetByUrl(dbCtx, url);
301 this.logger.info(_scope, 'new topic from publish request', { url, requestId });
302 }
303 if (!topic || topic.isDeleted) {
304 result.err.push('topic not supported');
305 return result;
306 }
307 result.topicId = topic.id;
308 return result;
309 }));
310 }
311
312
313 /**
314 * Render response for multi-topic publish requests.
315 * @param {Object[]} publishTopics
316 */
317 static multiPublishContent(ctx, publishTopics) {
318 const responses = publishTopics.map((topic) => ({
319 href: topic.url,
320 status: topic.status,
321 statusMessage: topic.statusMessage,
322 errors: topic.err,
323 warnings: topic.warn,
324 }));
325 switch (ctx.responseType) {
326 case Enum.ContentType.ApplicationJson:
327 return JSON.stringify(responses);
328
329 case Enum.ContentType.TextPlain:
330 default: {
331 const textResponses = responses.map((response) => {
332 const details = Manager._prettyDetails(response.errors, response.warnings);
333 const textDetails = (details.length ? '\n' : '') + details.map((d) => `\t${d}`).join('\n');
334 return `${response.href} [${response.status} ${response.statusMessage}]${textDetails}`;
335 });
336 return textResponses.join('\n----\n');
337 }
338 }
339 }
340
341
342 /**
343 * Process a publish request.
344 * @param {*} dbCtx
345 * @param {Object} data
346 * @param {http.ServerResponse} res
347 * @param {Object} ctx
348 */
349 async _publishRequest(dbCtx, data, res, ctx) {
350 const _scope = _fileScope('_parsePublish');
351 this.logger.debug(_scope, 'called', { data });
352
353 const requestId = ctx.requestId;
354
355 // Parse and validate all the topics in the request.
356 data.publishTopics = await this._publishTopics(dbCtx, data, requestId);
357 if (!data.publishTopics || !data.publishTopics.length) {
358 const details = Manager._prettyDetails(['no valid topic urls to publish'], []);
359 throw new ResponseError(Enum.ErrorResponse.BadRequest, details);
360 }
361
362 // Set status per topic
363 for (const topicResult of data.publishTopics) {
364 topicResult.status = topicResult.err.length ? 400 : 202;
365 topicResult.statusMessage = topicResult.err.length ? 'Bad Request' : 'Accepted';
366 }
367
368 // Process the valid publish notifications
369 const validPublishTopics = data.publishTopics.filter((topicResult) => !topicResult.err.length);
370 try {
371 await Promise.all(validPublishTopics.map(async (topicResult) => this.db.topicFetchRequested(dbCtx, topicResult.topicId)));
372 } catch (e) {
373 this.logger.error(_scope, 'topicFetchRequest failed', { error: e, ctx, data, requestId });
374 throw e;
375 }
376
377 this.logger.info(_scope, 'request accepted', { ctx, data, requestId });
378
379 if (data.publishTopics.length === 1) {
380 const soleTopic = data.publishTopics[0];
381 res.statusCode = soleTopic.status;
382 res.end(Manager._prettyDetails(soleTopic.err, soleTopic.warn).join('\n'));
383 } else {
384 res.statusCode = 207;
385 res.end(Manager.multiPublishContent(ctx, data.publishTopics));
386 }
387
388 if (this.options.manager.processImmediately
389 && validPublishTopics.length) {
390 try {
391 await Promise.all(validPublishTopics.map(async (topicResult) => this.communication.topicFetchClaimAndProcessById(dbCtx, topicResult.topicId, requestId)));
392 } catch (e) {
393 this.logger.error(_scope, 'topicFetchClaimAndProcessById failed', { data, validPublishTopics, requestId });
394 // Don't bother re-throwing, as we've already ended this response.
395 }
396 }
397 }
398
399
400 /**
401 * Annotate any encountered issues.
402 * @param {String[]} err
403 * @param {String[]} warn
404 * @returns {String[]}
405 */
406 static _prettyDetails(err, warn) {
407 return [
408 ...err.map((entry) => `error: ${entry}`),
409 ...warn.map((entry) => `warning: ${entry}`),
410 ];
411 }
412
413
414 /**
415 * POST request for root.
416 * @param {http.ClientRequest} req
417 * @param {http.ServerResponse} res
418 * @param {object} ctx
419 */
420 async postRoot(req, res, ctx) {
421 const _scope = _fileScope('postRoot');
422 this.logger.debug(_scope, 'called', { ctx });
423
424 res.statusCode = 202; // Presume success.
425
426 const warn = [];
427 const err = [];
428 const data = Manager._getRootData(req, ctx);
429 const requestId = ctx.requestId;
430
431 await this.db.context(async (dbCtx) => {
432
433 // Handle publish requests elsewhere
434 if (data.mode === Enum.Mode.Publish) {
435 return this._publishRequest(dbCtx, data, res, ctx);
436 }
437
438 await this._validateRootData(dbCtx, data, warn, err, requestId);
439
440 const details = Manager._prettyDetails(err, warn);
441
442 // Any errors are fatal. Stop and report anything that went wrong.
443 if (err.length) {
444 this.logger.debug(_scope, { msg: 'invalid request', data, err, warn, requestId });
445 throw new ResponseError(Enum.ErrorResponse.BadRequest, details);
446 }
447
448 // Commit the request for later processing.
449 let id;
450 try {
451 id = await this.db.verificationInsert(dbCtx, { ...data, requestId });
452 } catch (e) {
453 this.logger.error(_scope, 'verificationInsert failed', { error: e, data, warn, id, requestId });
454 throw e;
455 }
456
457 // If we committed to the db, we've succeeded as far as the client is concerned.
458 res.end(details.join('\n'));
459 this.logger.info(_scope, 'request accepted', { data, warn, requestId });
460
461 // Immediately attempt to claim and process the request.
462 if (this.options.manager.processImmediately
463 && id) {
464 try {
465 await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId);
466 } catch (e) {
467 this.logger.error(_scope, 'verificationClaimAndProcessById failed', { ...data, id, requestId });
468 // Don't bother re-throwing, as we've already ended this response.
469 }
470 }
471 }); // dbCtx
472 }
473
474
475 /**
476 * Render topic info content.
477 * @param {Object} ctx
478 * @param {String} ctx.responseType
479 * @param {String} ctx.topicUrl
480 * @param {Number} ctx.count
481 * @returns {String}
482 */
483 // eslint-disable-next-line class-methods-use-this
484 infoContent(ctx) {
485 // eslint-disable-next-line sonarjs/no-small-switch
486 switch (ctx.responseType) {
487 case Enum.ContentType.ApplicationJson:
488 return JSON.stringify({
489 topic: ctx.topicUrl,
490 count: ctx.count,
491 });
492
493 case Enum.ContentType.ImageSVG:
494 return Template.badgeSVG({}, ` ${ctx.topicUrl} `, ` ${ctx.count} subscribers `, `${ctx.topicUrl} has ${ctx.count} subscribers.`);
495
496 default:
497 return ctx.count.toString();
498 }
499 }
500
501
502 /**
503 * GET request for /info?topic=url&format=type
504 * @param {http.ServerResponse} res
505 * @param {object} ctx
506 */
507 async getInfo(res, ctx) {
508 const _scope = _fileScope('getInfo');
509 this.logger.debug(_scope, 'called', { ctx });
510
511 if (!ctx.queryParams.topic) {
512 throw new ResponseError(Enum.ErrorResponse.BadRequest, 'missing required parameter');
513 }
514 ctx.topicUrl = ctx.queryParams.topic;
515
516 switch ((ctx.queryParams.format || '').toLowerCase()) {
517 case 'svg':
518 ctx.responseType = Enum.ContentType.ImageSVG;
519 res.setHeader(Enum.Header.ContentType, ctx.responseType);
520 break;
521
522 case 'json':
523 ctx.responseType = Enum.ContentType.ApplicationJson;
524 res.setHeader(Enum.Header.ContentType, ctx.responseType);
525 break;
526
527 default:
528 break;
529 }
530
531 try {
532 new URL(ctx.topicUrl);
533 } catch (e) {
534 throw new ResponseError(Enum.ErrorResponse.BadRequest, 'invalid topic');
535 }
536
537 let count;
538 await this.db.context(async (dbCtx) => {
539 count = await this.db.subscriptionCountByTopicUrl(dbCtx, ctx.topicUrl);
540 if (!count) {
541 throw new ResponseError(Enum.ErrorResponse.NotFound, 'no such topic');
542 }
543 ctx.count = count.count;
544 });
545
546 const content = this.infoContent(ctx);
547 res.setHeader(Enum.Header.ETag, common.generateETag(undefined, undefined, content));
548 res.setHeader(Enum.Header.CacheControl, 'no-cache');
549 res.end(content);
550 this.logger.info(_scope, 'finished', { ctx });
551 }
552
553
554 /**
555 * label the bars of the topic update history graph
556 * @param {Number} index
557 * @param {Number} value
558 * @returns {String}
559 */
560 static _historyBarCaption(index, value) {
561 let when;
562 switch (index) {
563 case 0:
564 when ='today';
565 break;
566 case 1:
567 when = 'yesterday';
568 break;
569 default:
570 when = `${index} days ago`;
571 }
572 return `${when}, ${value ? value : 'no'} update${value === 1 ? '': 's'}`;
573 }
574
575
576 /**
577 * GET SVG chart of topic update history
578 * @param {http.ServerResponse} res
579 * @param {object} ctx
580 */
581 async getHistorySVG(res, ctx) {
582 const _scope = _fileScope('getHistorySVG');
583 this.logger.debug(_scope, 'called', { ctx });
584
585 const days = Math.min(parseInt(ctx.queryParams.days) || this.options.manager.publishHistoryDays, 365);
586 const histOptions = {
587 title: 'Topic Publish History',
588 description: 'Updates per Day',
589 labelZero: '^ Today',
590 labelX: 'Days Ago',
591 maxItems: days,
592 minItems: days,
593 tickEvery: 7,
594 barWidth: 25,
595 barHeight: 40,
596 labelHeight: 12,
597 barCaptionFn: Manager._historyBarCaption,
598 };
599
600 let publishHistory;
601 await this.db.context(async (dbCtx) => {
602 publishHistory = await this.db.topicPublishHistory(dbCtx, ctx.params.topicId, days);
603 });
604
605 res.end(Template.histogramSVG(publishHistory, histOptions));
606 this.logger.info(_scope, 'finished', { ctx });
607 }
608
609
610 /**
611 * Determine if a profile url matches enough of a topic url to describe control over it.
612 * Topic must match hostname and start with the profile's path.
613 * @param {URL} profileUrlObj
614 * @param {URL} topicUrlObj
615 * @returns {Boolean}
616 */
617 static _profileControlsTopic(profileUrlObj, topicUrlObj) {
618 const hostnameMatches = profileUrlObj.hostname === topicUrlObj.hostname;
619 const pathIsPrefix = topicUrlObj.pathname.startsWith(profileUrlObj.pathname);
620 return hostnameMatches && pathIsPrefix;
621 }
622
623
624 /**
625 * GET request for authorized /admin information.
626 * @param {http.ServerResponse} res
627 * @param {object} ctx
628 */
629 async getAdminOverview(res, ctx) {
630 const _scope = _fileScope('getAdminOverview');
631 this.logger.debug(_scope, 'called', { ctx });
632
633 await this.db.context(async (dbCtx) => {
634 ctx.topics = await this.db.topicGetAll(dbCtx);
635 });
636 this.logger.debug(_scope, 'got topics', { topics: ctx.topics });
637
638 // Profile users can only see related topics.
639 if (ctx.session && ctx.session.authenticatedProfile) {
640 const profileUrlObj = new URL(ctx.session.authenticatedProfile);
641 ctx.topics = ctx.topics.filter((topic) => {
642 const topicUrlObj = new URL(topic.url);
643 return Manager._profileControlsTopic(profileUrlObj, topicUrlObj);
644 });
645 }
646
647 res.end(Template.adminOverviewHTML(ctx, this.options));
648 this.logger.info(_scope, 'finished', { ctx, topics: ctx.topics.length });
649 }
650
651
652 /**
653 * GET request for authorized /admin/topic/:topicId information.
654 * @param {http.ServerResponse} res
655 * @param {object} ctx
656 */
657 async getTopicDetails(res, ctx) {
658 const _scope = _fileScope('getTopicDetails');
659 this.logger.debug(_scope, 'called', { ctx });
660
661 ctx.publishSpan = 60; // FIXME: configurable
662 const topicId = ctx.params.topicId;
663 let publishHistory;
664 await this.db.context(async (dbCtx) => {
665 ctx.topic = await this.db.topicGetById(dbCtx, topicId);
666 ctx.subscriptions = await this.db.subscriptionsByTopicId(dbCtx, topicId);
667 publishHistory = await this.db.topicPublishHistory(dbCtx, topicId, ctx.publishSpan);
668 });
669 ctx.publishCount = publishHistory.reduce((a, b) => a + b, 0);
670 ctx.subscriptionsDelivered = ctx.subscriptions.filter((subscription) => {
671 return subscription.latestContentDelivered >= ctx.topic.contentUpdated;
672 }).length;
673 this.logger.debug(_scope, 'got topic details', { topic: ctx.topic, subscriptions: ctx.subscriptions, updates: ctx.publishCount });
674
675 // Profile users can only see related topics.
676 if (ctx.session && ctx.session.authenticatedProfile) {
677 const profileUrlObj = new URL(ctx.session.authenticatedProfile);
678 const topicUrlObj = new URL(ctx.topic.url);
679 if (!Manager._profileControlsTopic(profileUrlObj, topicUrlObj)) {
680 ctx.topic = null;
681 ctx.subscriptions = [];
682 }
683 }
684
685 res.end(Template.adminTopicDetailsHTML(ctx, this.options));
686 this.logger.info(_scope, 'finished', { ctx, subscriptions: ctx.subscriptions.length, topic: ctx.topic && ctx.topic.id || ctx.topic });
687 }
688
689
690 /**
691 * PATCH and DELETE for updating topic data.
692 * @param {http.ServerResponse} res
693 * @param {Object} ctx
694 */
695 async updateTopic(res, ctx) {
696 const _scope = _fileScope('updateTopic');
697 this.logger.debug(_scope, 'called', { ctx });
698
699 const topicId = ctx.params.topicId;
700
701 await this.db.context(async (dbCtx) => {
702 await this.db.transaction(dbCtx, async (txCtx) => {
703 // Get topic without defaults filled in, to persist nulls
704 const topic = await this.db.topicGetById(txCtx, topicId, false);
705 if (!topic) {
706 this.logger.debug(_scope, 'no topic', { ctx });
707 throw new Errors.ResponseError(Enum.ErrorResponse.NotFound);
708 }
709
710 if (ctx.method === 'DELETE') {
711 await this.db.topicDeleted(txCtx, topicId);
712 res.end();
713 this.logger.info(_scope, 'topic set deleted', { ctx, topicId });
714 // Attempt to remove from db if no active subscriptions.
715 await this.db.topicPendingDelete(txCtx, topicId);
716 return;
717 }
718
719 const updatableFields = [
720 'leaseSecondsPreferred',
721 'leaseSecondsMin',
722 'leaseSecondsMax',
723 'publisherValidationUrl',
724 'contentHashAlgorithm',
725 ];
726
727 const patchValues = common.pick({
728 ...ctx.queryParams,
729 ...ctx.parsedBody,
730 }, updatableFields);
731
732 [
733 'leaseSecondsPreferred',
734 'leaseSecondsMin',
735 'leaseSecondsMax',
736 ].filter((field) => field in patchValues).forEach((field) => {
737 // eslint-disable-next-line security/detect-object-injection
738 patchValues[field] = parseInt(patchValues[field], 10);
739 });
740
741 const patchKeys = Object.keys(patchValues);
742 if (patchKeys.length === 0
743 // eslint-disable-next-line security/detect-object-injection
744 || patchKeys.every((k) => patchValues[k] == topic[k])) {
745 res.statusCode = 204;
746 res.end();
747 this.logger.info(_scope, 'empty topic update', { ctx, topicId });
748 return;
749 }
750 const patchedTopic = {
751 ...topic,
752 ...patchValues,
753 };
754
755 this.logger.debug(_scope, 'data', { topic, patchValues, patchedTopic });
756
757 try {
758 await this.db.topicUpdate(txCtx, { topicId, ...patchedTopic });
759 } catch (e) {
760 if (e instanceof DBErrors.DataValidation) {
761 this.logger.debug(_scope, 'validation error', { error: e, ctx, topicId });
762 throw new Errors.ResponseError(Enum.ErrorResponse.BadRequest, e.message);
763 }
764 this.logger.error(_scope, 'failed', { error: e, ctx, topicId });
765 throw e;
766 }
767 res.end();
768 this.logger.info(_scope, 'topic updated', { ctx, topicId, patchValues });
769 }); // transaction
770 }); // context
771 }
772
773
774 /**
775 * PATCH and DELETE for updating subscription data.
776 * @param {http.ServerResponse} res
777 * @param {Object} ctx
778 */
779 async updateSubscription(res, ctx) {
780 const _scope = _fileScope('updateSubscription');
781 this.logger.debug(_scope, 'called', { ctx });
782
783 const subscriptionId = ctx.params.subscriptionId;
784
785 await this.db.context(async (dbCtx) => {
786 await this.db.transaction(dbCtx, async (txCtx) => {
787 const subscription = await this.db.subscriptionGetById(txCtx, subscriptionId);
788 if (!subscription) {
789 this.logger.debug(_scope, 'no subscription', { ctx });
790 throw new Errors.ResponseError(Enum.ErrorResponse.NotFound);
791 }
792
793 if (ctx.method === 'DELETE') {
794 const deleteFields = common.pick({
795 ...ctx.queryParams,
796 ...ctx.parsedBody,
797 }, ['reason']);
798
799 // Queue an unsubscription.
800 const verification = {
801 topicId: subscription.topicId,
802 callback: subscription.callback,
803 mode: Enum.Mode.Denied,
804 reason: 'subscription removed by administrative action',
805 isPublisherValidated: true,
806 requestId: ctx.requestId,
807 ...deleteFields,
808 };
809
810 await this.db.verificationInsert(txCtx, verification);
811 this.logger.info(_scope, 'subscription removal initiated', { ctx, verification });
812 res.end();
813 return;
814 }
815
816 const updatableFields = [
817 'signatureAlgorithm',
818 ];
819
820 const patchValues = common.pick({
821 ...ctx.queryParams,
822 ...ctx.parsedBody,
823 }, updatableFields);
824
825 const patchKeys = Object.keys(patchValues);
826 if (patchKeys.length === 0
827 // eslint-disable-next-line security/detect-object-injection
828 || patchKeys.every((k) => patchValues[k] == subscription[k])) {
829 res.statusCode = 204;
830 res.end();
831 return;
832 }
833 const patchedSubscription = {
834 ...subscription,
835 ...patchValues,
836 };
837
838 try {
839 await this.db.subscriptionUpdate(txCtx, { subscriptionId, ...patchedSubscription });
840 } catch (e) {
841 if (e instanceof DBErrors.DataValidation) {
842 this.logger.debug(_scope, 'validation error', { error: e, ctx, subscriptionId });
843 throw new Errors.ResponseError(Enum.ErrorResponse.BadRequest, e.message);
844 }
845 this.logger.info(_scope, 'failed', { error: e, ctx, subscriptionId });
846 throw e;
847 }
848 res.end();
849 this.logger.info(_scope, 'subscription updated', { ctx, subscriptionId, patchValues });
850 }); // transaction
851 }); // context
852 }
853
854 /**
855 * POST request for manually running worker.
856 * @param {http.ServerResponse} res
857 * @param {object} ctx
858 */
859 async processTasks(res, ctx) {
860 const _scope = _fileScope('processTasks');
861 this.logger.debug(_scope, 'called', { ctx });
862
863 // N.B. no await on this
864 this.communication.worker.process().catch((e) => {
865 this.logger.error(_scope, 'failed', { error: e, ctx });
866 });
867
868 res.end();
869 this.logger.info(_scope, 'invoked worker process', { ctx });
870 }
871
872 }
873
874 module.exports = Manager;