update dependencies and devDependencies, fix lint issues
[websub-hub] / src / manager.js
1 'use strict';
2
3 /**
4 * Here we process all the incoming requests.
5 * Payload validation occurs here, before committing the pending work to the
6 * database and (usually) calling a processor to act upon it.
7 *
8 * As this is the mediator between server framework and actions, this would
9 * be where most of a rewrite for a new server framework would happen.
10 */
11
12 const common = require('./common');
13 const Communication = require('./communication');
14 const Enum = require('./enum');
15 const Errors = require('./errors');
16 const DBErrors = require('./db/errors');
17 const { ResponseError } = require('./errors');
18 const Template = require('./template');
19
20 const _fileScope = common.fileScope(__filename);
21
22 class Manager {
23 constructor(logger, db, options) {
24 this.logger = logger;
25 this.db = db;
26 this.options = options;
27 this.communication = new Communication(logger, db, options);
28 }
29
30 /**
31 * @typedef {import('node:http')} http
32 */
33
34 /**
35 * GET request for healthcheck.
36 * @param {http.ServerResponse} res response
37 * @param {object} ctx context
38 */
39 async getHealthcheck(res, ctx) {
40 const _scope = _fileScope('getHealthcheck');
41 const health = 'happy';
42
43 // What else could we check...
44 const dbHealth = await this.db.healthCheck();
45 this.logger.debug(_scope, 'called', { health, dbHealth, ctx });
46 res.end(health);
47 }
48
49
50 /**
51 * GET request for root.
52 * @param {http.ClientRequest} req request
53 * @param {http.ServerResponse} res response
54 * @param {object} ctx context
55 */
56 async getRoot(req, res, ctx) {
57 const _scope = _fileScope('getRoot');
58 this.logger.debug(_scope, 'called', { ctx });
59
60 const content = Template.rootHTML(ctx, this.options);
61 res.end(content);
62 this.logger.info(_scope, 'finished', { ctx });
63 }
64
65
66 /**
67 * All the fields the root handler deals with.
68 * @typedef {object} RootData
69 * @property {string} callback url
70 * @property {string} mode mode
71 * @property {string} topic topic
72 * @property {number} topicId topic id
73 * @property {string} leaseSeconds lease seconds
74 * @property {string} secret secret
75 * @property {string} httpRemoteAddr remote address
76 * @property {string} httpFrom from
77 * @property {boolean} isSecure is secure
78 * @property {boolean} isPublisherValidated is published validated
79 */
80
81 /**
82 * Extract api parameters.
83 * @param {http.ClientRequest} req request
84 * @param {object} ctx context
85 * @returns {RootData} root data
86 */
87 static _getRootData(req, ctx) {
88 const postData = ctx.parsedBody;
89 const mode = (postData['hub.mode'] || '').toLowerCase();
90 return {
91 callback: postData['hub.callback'],
92 mode,
93 ...(mode === Enum.Mode.Publish && { url: postData['hub.url'] }), // Publish accepts either hub.url or hub.topic
94 topic: postData['hub.topic'],
95 ...(postData['hub.lease_seconds'] && { leaseSeconds: parseInt(postData['hub.lease_seconds'], 10) }),
96 secret: postData['hub.secret'],
97 httpRemoteAddr: ctx.clientAddress,
98 httpFrom: req.getHeader(Enum.Header.From),
99 isSecure: ((ctx.clientProtocol || '').toLowerCase() === 'https'),
100 isPublisherValidated: true, // Default to true. Will be set to false later, if topic has publisher validation url.
101 };
102 }
103
104
105 /**
106 *
107 * @param {*} dbCtx db context
108 * @param {RootData} data root data
109 * @param {string[]} warn warnings
110 * @param {string[]} err errors
111 * @param {string} requestId request id
112 * @returns {Promise<void>}
113 */
114 async _validateRootData(dbCtx, data, warn, err, requestId) {
115 // These checks can modify data, so order matters.
116 await this._checkTopic(dbCtx, data, warn, err, requestId);
117 this._checkCallbackAndSecrets(data, warn, err, requestId);
118 await this._checkMode(dbCtx, data, warn, err, requestId);
119 }
120
121
122 /**
123 * Check that requested topic exists and values are in range.
124 * Sets topic id, publisher validation state, and requested lease
125 * seconds on data.
126 * @param {*} dbCtx db context
127 * @param {RootData} data root data
128 * @param {string[]} warn warnings
129 * @param {string[]} err errors
130 * @param {string} requestId request id
131 * @returns {Promise<void>}
132 */
133 async _checkTopic(dbCtx, data, warn, err, requestId) {
134 const _scope = _fileScope('_checkTopic');
135 let topic;
136
137 if (data.topic) {
138 topic = await this.db.topicGetByUrl(dbCtx, data.topic);
139
140 if (!topic && this._newTopicCreationAllowed()) {
141 this.logger.info(_scope, 'new topic from subscribe request', { data, requestId });
142
143 try {
144 new URL(data.topic);
145 } catch (e) { // eslint-disable-line no-unused-vars
146 err.push('invalid topic url (failed to parse url)');
147 return;
148 }
149
150 await this.db.topicSet(dbCtx, {
151 url: data.topic,
152 });
153 topic = await this.db.topicGetByUrl(dbCtx, data.topic);
154 }
155 }
156
157 if (!topic || topic.isDeleted) {
158 err.push('not a supported topic');
159 return;
160 }
161
162 data.topicId = topic.id;
163
164 if (data.leaseSeconds === undefined || isNaN(data.leaseSeconds)) {
165 data.leaseSeconds = topic.leaseSecondsPreferred;
166 } else if (data.leaseSeconds > topic.leaseSecondsMax) {
167 data.leaseSeconds = topic.leaseSecondsMax;
168 warn.push(`requested lease too long, using ${data.leaseSeconds}`);
169 } else if (data.leaseSeconds < topic.leaseSecondsMin) {
170 data.leaseSeconds = topic.leaseSecondsMin;
171 warn.push(`requested lease too short, using ${data.leaseSeconds}`);
172 }
173
174 if (topic.publisherValidationUrl) {
175 data.isPublisherValidated = false;
176 }
177 }
178
179
180 /**
181 * Check data for valid callback url and scheme constraints.
182 * @param {RootData} data root data
183 * @param {string[]} warn warnings
184 * @param {string[]} err errors
185 */
186 _checkCallbackAndSecrets(data, warn, err) {
187 let isCallbackSecure = false;
188
189 if (!data.callback) {
190 err.push('invalid callback url (empty)');
191 } else {
192 try {
193 const c = new URL(data.callback);
194 isCallbackSecure = (c.protocol.toLowerCase() === 'https:'); // Colon included because url module is weird
195 } catch (e) { // eslint-disable-line no-unused-vars
196 err.push('invalid callback url (failed to parse url');
197 return;
198 }
199 }
200
201 if (!isCallbackSecure) {
202 warn.push('insecure callback');
203 }
204
205 if (data.secret) {
206 const secretSeverity = this.options.manager.strictSecrets ? err : warn;
207 if (!data.isSecure) {
208 secretSeverity.push('secret not safe (insecure hub)');
209 }
210 if (!isCallbackSecure) {
211 secretSeverity.push('secret not safe (insecure callback)');
212 }
213 if (data.secret.length > 199) {
214 err.push('cannot keep a secret that big');
215 }
216 }
217 }
218
219 /**
220 * Check mode validity and subscription requirements.
221 * Publish mode is handled elsewhere in the flow.
222 * @param {*} dbCtx db context
223 * @param {RootData} data root data
224 * @param {string[]} warn warnings
225 * @param {string[]} err errors
226 * @returns {Promise<void>}
227 */
228 async _checkMode(dbCtx, data, warn, err) {
229 switch (data.mode) {
230 case Enum.Mode.Subscribe:
231 break;
232
233 case Enum.Mode.Unsubscribe: {
234 const currentEpoch = Date.now() / 1000;
235 let s;
236 if (data.callback && data.topicId) {
237 s = await this.db.subscriptionGet(dbCtx, data.callback, data.topicId);
238 }
239 if (s === undefined) {
240 err.push('not subscribed');
241 } else if (s.expires < currentEpoch) {
242 err.push('subscription already expired');
243 }
244
245 break;
246 }
247
248 default: {
249 err.push('invalid mode');
250 }
251 }
252 }
253
254
255 /**
256 * Determine if a topic url is allowed to be created.
257 * In the future, this may be more complicated.
258 * @returns {boolean} is public hub
259 */
260 _newTopicCreationAllowed() {
261 return this.options.manager.publicHub;
262 }
263
264
265 /**
266 * Check that a publish request's topic(s) are valid and exist,
267 * returning an array with the results for each.
268 * For a public-hub publish request, creates topics if they do not exist.
269 * @param {*} dbCtx db context
270 * @param {RootData} data root data
271 * @param {string} requestId request id
272 * @returns {Promise<object[]>} results
273 */
274 async _publishTopics(dbCtx, data, requestId) {
275 const _scope = _fileScope('_checkPublish');
276
277 // Publish requests may include multiple topics, consider them all, but deduplicate.
278 const publishUrls = Array.from(new Set([
279 ...common.ensureArray(data.url),
280 ...common.ensureArray(data.topic),
281 ]));
282
283 // Map the requested topics to their ids, creating if necessary.
284 return Promise.all(publishUrls.map(async (url) => {
285 const result = {
286 url,
287 warn: [],
288 err: [],
289 topicId: undefined,
290 };
291 let topic = await this.db.topicGetByUrl(dbCtx, url);
292 if (!topic && this._newTopicCreationAllowed()) {
293 try {
294 new URL(url);
295 } catch (e) { // eslint-disable-line no-unused-vars
296 result.err.push('invalid topic url (failed to parse url)');
297 return result;
298 }
299 await this.db.topicSet(dbCtx, {
300 // TODO: accept a publisherValidationUrl parameter
301 url,
302 });
303 topic = await this.db.topicGetByUrl(dbCtx, url);
304 this.logger.info(_scope, 'new topic from publish request', { url, requestId });
305 }
306 if (!topic || topic.isDeleted) {
307 result.err.push('topic not supported');
308 return result;
309 }
310 result.topicId = topic.id;
311 return result;
312 }));
313 }
314
315
316 /**
317 * Render response for multi-topic publish requests.
318 * @param {object} ctx context
319 * @param {object[]} publishTopics topics
320 * @returns {string} response content
321 */
322 static multiPublishContent(ctx, publishTopics) {
323 const responses = publishTopics.map((topic) => ({
324 href: topic.url,
325 status: topic.status,
326 statusMessage: topic.statusMessage,
327 errors: topic.err,
328 warnings: topic.warn,
329 }));
330 switch (ctx.responseType) {
331 case Enum.ContentType.ApplicationJson:
332 return JSON.stringify(responses);
333
334 case Enum.ContentType.TextPlain:
335 default: {
336 const textResponses = responses.map((response) => {
337 const details = Manager._prettyDetails(response.errors, response.warnings);
338 const textDetails = (details.length ? '\n' : '') + details.map((d) => `\t${d}`).join('\n');
339 return `${response.href} [${response.status} ${response.statusMessage}]${textDetails}`;
340 });
341 return textResponses.join('\n----\n');
342 }
343 }
344 }
345
346
347 /**
348 * Process a publish request.
349 * @param {*} dbCtx db context
350 * @param {object} data data
351 * @param {http.ServerResponse} res response
352 * @param {object} ctx context
353 */
354 async _publishRequest(dbCtx, data, res, ctx) {
355 const _scope = _fileScope('_parsePublish');
356 this.logger.debug(_scope, 'called', { data });
357
358 const requestId = ctx.requestId;
359
360 // Parse and validate all the topics in the request.
361 data.publishTopics = await this._publishTopics(dbCtx, data, requestId);
362 if (!data?.publishTopics?.length) {
363 const details = Manager._prettyDetails(['no valid topic urls to publish'], []);
364 throw new ResponseError(Enum.ErrorResponse.BadRequest, details);
365 }
366
367 // Set status per topic
368 for (const topicResult of data.publishTopics) {
369 topicResult.status = topicResult.err.length ? 400 : 202;
370 topicResult.statusMessage = topicResult.err.length ? 'Bad Request' : 'Accepted';
371 }
372
373 // Process the valid publish notifications
374 const validPublishTopics = data.publishTopics.filter((topicResult) => !topicResult.err.length);
375 try {
376 await Promise.all(validPublishTopics.map(async (topicResult) => this.db.topicFetchRequested(dbCtx, topicResult.topicId)));
377 } catch (e) {
378 this.logger.error(_scope, 'topicFetchRequest failed', { error: e, ctx, data, requestId });
379 throw e;
380 }
381
382 this.logger.info(_scope, 'request accepted', { ctx, data, requestId });
383
384 if (data.publishTopics.length === 1) {
385 const soleTopic = data.publishTopics[0];
386 res.statusCode = soleTopic.status;
387 res.end(Manager._prettyDetails(soleTopic.err, soleTopic.warn).join('\n'));
388 } else {
389 res.statusCode = 207;
390 res.end(Manager.multiPublishContent(ctx, data.publishTopics));
391 }
392
393 if (this.options.manager.processImmediately
394 && validPublishTopics.length) {
395 try {
396 await Promise.all(validPublishTopics.map(async (topicResult) => this.communication.topicFetchClaimAndProcessById(dbCtx, topicResult.topicId, requestId)));
397 } catch (e) { // eslint-disable-line no-unused-vars
398 this.logger.error(_scope, 'topicFetchClaimAndProcessById failed', { data, validPublishTopics, requestId });
399 // Don't bother re-throwing, as we've already ended this response.
400 }
401 }
402 }
403
404
405 /**
406 * Annotate any encountered issues.
407 * @param {string[]} err errors
408 * @param {string[]} warn warnings
409 * @returns {string[]} rendered list of errors and warnings
410 */
411 static _prettyDetails(err, warn) {
412 return [
413 ...err.map((entry) => `error: ${entry}`),
414 ...warn.map((entry) => `warning: ${entry}`),
415 ];
416 }
417
418
419 /**
420 * POST request for root.
421 * @param {http.ClientRequest} req request
422 * @param {http.ServerResponse} res response
423 * @param {object} ctx context
424 */
425 async postRoot(req, res, ctx) {
426 const _scope = _fileScope('postRoot');
427 this.logger.debug(_scope, 'called', { ctx });
428
429 res.statusCode = 202; // Presume success.
430
431 const warn = [];
432 const err = [];
433 const data = Manager._getRootData(req, ctx);
434 const requestId = ctx.requestId;
435
436 await this.db.context(async (dbCtx) => {
437
438 // Handle publish requests elsewhere
439 if (data.mode === Enum.Mode.Publish) {
440 return this._publishRequest(dbCtx, data, res, ctx);
441 }
442
443 await this._validateRootData(dbCtx, data, warn, err, requestId);
444
445 const details = Manager._prettyDetails(err, warn);
446
447 // Any errors are fatal. Stop and report anything that went wrong.
448 if (err.length) {
449 this.logger.debug(_scope, { msg: 'invalid request', data, err, warn, requestId });
450 throw new ResponseError(Enum.ErrorResponse.BadRequest, details);
451 }
452
453 // Commit the request for later processing.
454 let id;
455 try {
456 id = await this.db.verificationInsert(dbCtx, { ...data, requestId });
457 } catch (e) {
458 this.logger.error(_scope, 'verificationInsert failed', { error: e, data, warn, id, requestId });
459 throw e;
460 }
461
462 // If we committed to the db, we've succeeded as far as the client is concerned.
463 res.end(details.join('\n'));
464 this.logger.info(_scope, 'request accepted', { data, warn, requestId });
465
466 // Immediately attempt to claim and process the request.
467 if (this.options.manager.processImmediately
468 && id) {
469 try {
470 await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId);
471 } catch (e) { // eslint-disable-line no-unused-vars
472 this.logger.error(_scope, 'verificationClaimAndProcessById failed', { ...data, id, requestId });
473 // Don't bother re-throwing, as we've already ended this response.
474 }
475 }
476 }); // dbCtx
477 }
478
479
480 /**
481 * Render topic info content.
482 * @param {object} ctx context
483 * @param {string} ctx.responseType response type
484 * @param {string} ctx.topicUrl topic url
485 * @param {number} ctx.count count of subscribers
486 * @returns {string} response content
487 */
488 // eslint-disable-next-line class-methods-use-this
489 infoContent(ctx) {
490
491 switch (ctx.responseType) {
492 case Enum.ContentType.ApplicationJson:
493 return JSON.stringify({
494 topic: ctx.topicUrl,
495 count: ctx.count,
496 });
497
498 case Enum.ContentType.ImageSVG:
499 return Template.badgeSVG({}, ` ${ctx.topicUrl} `, ` ${ctx.count} subscribers `, `${ctx.topicUrl} has ${ctx.count} subscribers.`);
500
501 default:
502 return ctx.count.toString();
503 }
504 }
505
506
507 /**
508 * GET request for /info?topic=url&format=type
509 * @param {http.ServerResponse} res response
510 * @param {object} ctx context
511 */
512 async getInfo(res, ctx) {
513 const _scope = _fileScope('getInfo');
514 this.logger.debug(_scope, 'called', { ctx });
515
516 if (!ctx.queryParams.topic) {
517 throw new ResponseError(Enum.ErrorResponse.BadRequest, 'missing required parameter');
518 }
519 ctx.topicUrl = ctx.queryParams.topic;
520
521 switch ((ctx.queryParams.format || '').toLowerCase()) {
522 case 'svg':
523 ctx.responseType = Enum.ContentType.ImageSVG;
524 res.setHeader(Enum.Header.ContentType, ctx.responseType);
525 break;
526
527 case 'json':
528 ctx.responseType = Enum.ContentType.ApplicationJson;
529 res.setHeader(Enum.Header.ContentType, ctx.responseType);
530 break;
531
532 default:
533 break;
534 }
535
536 try {
537 new URL(ctx.topicUrl);
538 } catch (e) { // eslint-disable-line no-unused-vars
539 throw new ResponseError(Enum.ErrorResponse.BadRequest, 'invalid topic');
540 }
541
542 let count;
543 await this.db.context(async (dbCtx) => {
544 count = await this.db.subscriptionCountByTopicUrl(dbCtx, ctx.topicUrl);
545 if (!count) {
546 throw new ResponseError(Enum.ErrorResponse.NotFound, 'no such topic');
547 }
548 ctx.count = count.count;
549 });
550
551 const content = this.infoContent(ctx);
552 res.setHeader(Enum.Header.ETag, common.generateETag(undefined, undefined, content));
553 res.setHeader(Enum.Header.CacheControl, 'no-cache');
554 res.end(content);
555 this.logger.info(_scope, 'finished', { ctx });
556 }
557
558
559 /**
560 * label the bars of the topic update history graph
561 * @param {number} index index
562 * @param {number} value value
563 * @returns {string} caption
564 */
565 static _historyBarCaption(index, value) {
566 let when;
567 switch (index) {
568 case 0:
569 when ='today';
570 break;
571 case 1:
572 when = 'yesterday';
573 break;
574 default:
575 when = `${index} days ago`;
576 }
577 return `${when}, ${value || 'no'} update${value === 1 ? '': 's'}`;
578 }
579
580
581 /**
582 * GET SVG chart of topic update history
583 * @param {http.ServerResponse} res response
584 * @param {object} ctx context
585 */
586 async getHistorySVG(res, ctx) {
587 const _scope = _fileScope('getHistorySVG');
588 this.logger.debug(_scope, 'called', { ctx });
589
590 const days = Math.min(parseInt(ctx.queryParams.days) || this.options.manager.publishHistoryDays, 365);
591 const histOptions = {
592 title: 'Topic Publish History',
593 description: 'Updates per Day',
594 labelZero: '^ Today',
595 labelX: 'Days Ago',
596 maxItems: days,
597 minItems: days,
598 tickEvery: 7,
599 barWidth: 25,
600 barHeight: 40,
601 labelHeight: 12,
602 barCaptionFn: Manager._historyBarCaption,
603 };
604
605 let publishHistory;
606 await this.db.context(async (dbCtx) => {
607 publishHistory = await this.db.topicPublishHistory(dbCtx, ctx.params.topicId, days);
608 });
609
610 res.end(Template.histogramSVG(publishHistory, histOptions));
611 this.logger.info(_scope, 'finished', { ctx });
612 }
613
614
615 /**
616 * Determine if a profile url matches enough of a topic url to describe control over it.
617 * Topic must match hostname and start with the profile's path.
618 * @param {URL} profileUrlObj profile url
619 * @param {URL} topicUrlObj topic url
620 * @returns {boolean} profile is super-url of topic
621 */
622 static _profileControlsTopic(profileUrlObj, topicUrlObj) {
623 const hostnameMatches = profileUrlObj.hostname === topicUrlObj.hostname;
624 const pathIsPrefix = topicUrlObj.pathname.startsWith(profileUrlObj.pathname);
625 return hostnameMatches && pathIsPrefix;
626 }
627
628
629 /**
630 * GET request for authorized /admin information.
631 * @param {http.ServerResponse} res response
632 * @param {object} ctx context
633 */
634 async getAdminOverview(res, ctx) {
635 const _scope = _fileScope('getAdminOverview');
636 this.logger.debug(_scope, 'called', { ctx });
637
638 await this.db.context(async (dbCtx) => {
639 ctx.topics = await this.db.topicGetAll(dbCtx);
640 });
641 this.logger.debug(_scope, 'got topics', { topics: ctx.topics });
642
643 // Profile users can only see related topics.
644 if (ctx?.session?.authenticatedProfile) {
645 const profileUrlObj = new URL(ctx.session.authenticatedProfile);
646 ctx.topics = ctx.topics.filter((topic) => {
647 const topicUrlObj = new URL(topic.url);
648 return Manager._profileControlsTopic(profileUrlObj, topicUrlObj);
649 });
650 }
651
652 res.end(Template.adminOverviewHTML(ctx, this.options));
653 this.logger.info(_scope, 'finished', { ctx, topics: ctx.topics.length });
654 }
655
656
657 /**
658 * GET request for authorized /admin/topic/:topicId information.
659 * @param {http.ServerResponse} res response
660 * @param {object} ctx context
661 */
662 async getTopicDetails(res, ctx) {
663 const _scope = _fileScope('getTopicDetails');
664 this.logger.debug(_scope, 'called', { ctx });
665
666 ctx.publishSpan = 60; // FIXME: configurable
667 const topicId = ctx.params.topicId;
668 let publishHistory;
669 await this.db.context(async (dbCtx) => {
670 ctx.topic = await this.db.topicGetById(dbCtx, topicId);
671 ctx.subscriptions = await this.db.subscriptionsByTopicId(dbCtx, topicId);
672 publishHistory = await this.db.topicPublishHistory(dbCtx, topicId, ctx.publishSpan);
673 });
674 ctx.publishCount = publishHistory.reduce((a, b) => a + b, 0);
675 ctx.subscriptionsDelivered = ctx.subscriptions.filter((subscription) => {
676 return subscription.latestContentDelivered >= ctx.topic.contentUpdated;
677 }).length;
678 this.logger.debug(_scope, 'got topic details', { topic: ctx.topic, subscriptions: ctx.subscriptions, updates: ctx.publishCount });
679
680 // Profile users can only see related topics.
681 if (ctx?.session?.authenticatedProfile) {
682 const profileUrlObj = new URL(ctx.session.authenticatedProfile);
683 const topicUrlObj = new URL(ctx.topic.url);
684 if (!Manager._profileControlsTopic(profileUrlObj, topicUrlObj)) {
685 ctx.topic = null;
686 ctx.subscriptions = [];
687 }
688 }
689
690 res.end(Template.adminTopicDetailsHTML(ctx, this.options));
691 this.logger.info(_scope, 'finished', { ctx, subscriptions: ctx.subscriptions.length, topic: ctx?.topic?.id || ctx.topic });
692 }
693
694
695 /**
696 * PATCH and DELETE for updating topic data.
697 * @param {http.ServerResponse} res response
698 * @param {object} ctx context
699 */
700 async updateTopic(res, ctx) {
701 const _scope = _fileScope('updateTopic');
702 this.logger.debug(_scope, 'called', { ctx });
703
704 const topicId = ctx.params.topicId;
705
706 await this.db.context(async (dbCtx) => {
707 await this.db.transaction(dbCtx, async (txCtx) => {
708 // Get topic without defaults filled in, to persist nulls
709 const topic = await this.db.topicGetById(txCtx, topicId, false);
710 if (!topic) {
711 this.logger.debug(_scope, 'no topic', { ctx });
712 throw new Errors.ResponseError(Enum.ErrorResponse.NotFound);
713 }
714
715 if (ctx.method === 'DELETE') {
716 await this.db.topicDeleted(txCtx, topicId);
717 res.end();
718 this.logger.info(_scope, 'topic set deleted', { ctx, topicId });
719 // Attempt to remove from db if no active subscriptions.
720 await this.db.topicPendingDelete(txCtx, topicId);
721 return;
722 }
723
724 const updatableFields = [
725 'leaseSecondsPreferred',
726 'leaseSecondsMin',
727 'leaseSecondsMax',
728 'publisherValidationUrl',
729 'contentHashAlgorithm',
730 ];
731
732 const patchValues = common.pick({
733 ...ctx.queryParams,
734 ...ctx.parsedBody,
735 }, updatableFields);
736
737 [
738 'leaseSecondsPreferred',
739 'leaseSecondsMin',
740 'leaseSecondsMax',
741 ].filter((field) => field in patchValues).forEach((field) => {
742 // eslint-disable-next-line security/detect-object-injection
743 patchValues[field] = parseInt(patchValues[field], 10);
744 });
745
746 const patchKeys = Object.keys(patchValues);
747 if (patchKeys.length === 0
748 // eslint-disable-next-line security/detect-object-injection
749 || patchKeys.every((k) => patchValues[k] == topic[k])) {
750 res.statusCode = 204;
751 res.end();
752 this.logger.info(_scope, 'empty topic update', { ctx, topicId });
753 return;
754 }
755 const patchedTopic = {
756 ...topic,
757 ...patchValues,
758 };
759
760 this.logger.debug(_scope, 'data', { topic, patchValues, patchedTopic });
761
762 try {
763 await this.db.topicUpdate(txCtx, { topicId, ...patchedTopic });
764 } catch (e) {
765 if (e instanceof DBErrors.DataValidation) {
766 this.logger.debug(_scope, 'validation error', { error: e, ctx, topicId });
767 throw new Errors.ResponseError(Enum.ErrorResponse.BadRequest, e.message);
768 }
769 this.logger.error(_scope, 'failed', { error: e, ctx, topicId });
770 throw e;
771 }
772 res.end();
773 this.logger.info(_scope, 'topic updated', { ctx, topicId, patchValues });
774 }); // transaction
775 }); // context
776 }
777
778
779 /**
780 * PATCH and DELETE for updating subscription data.
781 * @param {http.ServerResponse} res response
782 * @param {object} ctx context
783 */
784 async updateSubscription(res, ctx) {
785 const _scope = _fileScope('updateSubscription');
786 this.logger.debug(_scope, 'called', { ctx });
787
788 const subscriptionId = ctx.params.subscriptionId;
789
790 await this.db.context(async (dbCtx) => {
791 await this.db.transaction(dbCtx, async (txCtx) => {
792 const subscription = await this.db.subscriptionGetById(txCtx, subscriptionId);
793 if (!subscription) {
794 this.logger.debug(_scope, 'no subscription', { ctx });
795 throw new Errors.ResponseError(Enum.ErrorResponse.NotFound);
796 }
797
798 if (ctx.method === 'DELETE') {
799 const deleteFields = common.pick({
800 ...ctx.queryParams,
801 ...ctx.parsedBody,
802 }, ['reason']);
803
804 // Queue an unsubscription.
805 const verification = {
806 topicId: subscription.topicId,
807 callback: subscription.callback,
808 mode: Enum.Mode.Denied,
809 reason: 'subscription removed by administrative action',
810 isPublisherValidated: true,
811 requestId: ctx.requestId,
812 ...deleteFields,
813 };
814
815 await this.db.verificationInsert(txCtx, verification);
816 this.logger.info(_scope, 'subscription removal initiated', { ctx, verification });
817 res.end();
818 return;
819 }
820
821 const updatableFields = [
822 'signatureAlgorithm',
823 ];
824
825 const patchValues = common.pick({
826 ...ctx.queryParams,
827 ...ctx.parsedBody,
828 }, updatableFields);
829
830 const patchKeys = Object.keys(patchValues);
831 if (patchKeys.length === 0
832 // eslint-disable-next-line security/detect-object-injection
833 || patchKeys.every((k) => patchValues[k] == subscription[k])) {
834 res.statusCode = 204;
835 res.end();
836 return;
837 }
838 const patchedSubscription = {
839 ...subscription,
840 ...patchValues,
841 };
842
843 try {
844 await this.db.subscriptionUpdate(txCtx, { subscriptionId, ...patchedSubscription });
845 } catch (e) {
846 if (e instanceof DBErrors.DataValidation) {
847 this.logger.debug(_scope, 'validation error', { error: e, ctx, subscriptionId });
848 throw new Errors.ResponseError(Enum.ErrorResponse.BadRequest, e.message);
849 }
850 this.logger.info(_scope, 'failed', { error: e, ctx, subscriptionId });
851 throw e;
852 }
853 res.end();
854 this.logger.info(_scope, 'subscription updated', { ctx, subscriptionId, patchValues });
855 }); // transaction
856 }); // context
857 }
858
859 /**
860 * POST request for manually running worker.
861 * @param {http.ServerResponse} res response
862 * @param {object} ctx context
863 */
864 async processTasks(res, ctx) {
865 const _scope = _fileScope('processTasks');
866 this.logger.debug(_scope, 'called', { ctx });
867
868 // N.B. no await on this
869 this.communication.worker.process().catch((e) => {
870 this.logger.error(_scope, 'failed', { error: e, ctx });
871 });
872
873 res.end();
874 this.logger.info(_scope, 'invoked worker process', { ctx });
875 }
876
877 }
878
879 module.exports = Manager;