fixed typo in logs
[websub-hub] / src / manager.js
1 'use strict';
2
3 /**
4 * Here we process all the incoming requests.
5 * Payload validation occurs here, before committing the pending work to the
6 * database and (usually) calling a processor to act upon it.
7 *
8 * As this is the mediator between server framework and actions, this would
9 * be where most of a rewrite for a new server framework would happen.
10 */
11
12 const common = require('./common');
13 const Communication = require('./communication');
14 const Enum = require('./enum');
15 const Errors = require('./errors');
16 const DBErrors = require('./db/errors');
17 const { ResponseError } = require('./errors');
18 const Template = require('./template');
19
20 const _fileScope = common.fileScope(__filename);
21
22 class Manager {
23 constructor(logger, db, options) {
24 this.logger = logger;
25 this.db = db;
26 this.options = options;
27 this.communication = new Communication(logger, db, options);
28
29 // Precalculate the invariant root GET metadata.
30 this.getRootContent = Template.rootHTML(undefined, options);
31 const now = new Date();
32 this.startTimeString = now.toGMTString();
33 this.startTimeMs = now.getTime();
34 this.getRootETag = common.generateETag(undefined, undefined, this.getRootContent);
35 }
36
37
38 /**
39 * GET request for healthcheck.
40 * @param {http.ServerResponse} res
41 * @param {object} ctx
42 */
43 async getHealthcheck(res, ctx) {
44 const _scope = _fileScope('getHealthcheck');
45 const health = 'happy';
46
47 // What else could we check...
48 const dbHealth = await this.db.healthCheck();
49 this.logger.debug(_scope, 'called', { health, dbHealth, ctx });
50 res.end(health);
51 }
52
53
54 /**
55 * GET request for root.
56 * @param {http.ClientRequest} req
57 * @param {http.ServerResponse} res
58 * @param {object} ctx
59 */
60 async getRoot(req, res, ctx) {
61 const _scope = _fileScope('getRoot');
62 this.logger.debug(_scope, 'called', { ctx });
63
64 res.setHeader(Enum.Header.LastModified, this.startTimeString);
65 res.setHeader(Enum.Header.ETag, this.getRootETag);
66
67 if (common.isClientCached(req, this.startTimeMs, this.getRootETag)) {
68 this.logger.debug(_scope, 'client cached response', { ctx });
69 res.statusCode = 304;
70 res.end();
71 return;
72 }
73 res.end(this.getRootContent);
74 this.logger.info(_scope, 'finished', { ctx });
75 }
76
77
78 /** All the fields the root handler deals with.
79 * @typedef {object} RootData
80 * @property {string} callback - url
81 * @property {string} mode
82 * @property {string} topic
83 * @property {number} topicId
84 * @property {string} leaseSeconds
85 * @property {string} secret
86 * @property {string} httpRemoteAddr
87 * @property {string} httpFrom
88 * @property {boolean} isSecure
89 * @property {boolean} isPublisherValidated
90 */
91
92 /**
93 * Extract api parameters.
94 * @param {http.ClientRequest} req
95 * @param {Object} ctx
96 * @returns {RootData}
97 */
98 static _getRootData(req, ctx) {
99 const postData = ctx.parsedBody;
100 const mode = (postData['hub.mode'] || '').toLowerCase();
101 return {
102 callback: postData['hub.callback'],
103 mode,
104 ...(mode === Enum.Mode.Publish && { url: postData['hub.url'] }), // Publish accepts either hub.url or hub.topic
105 topic: postData['hub.topic'],
106 ...(postData['hub.lease_seconds'] && { leaseSeconds: parseInt(postData['hub.lease_seconds'], 10) }),
107 secret: postData['hub.secret'],
108 httpRemoteAddr: ctx.clientAddress,
109 httpFrom: req.getHeader(Enum.Header.From),
110 isSecure: ((ctx.clientProtocol || '').toLowerCase() === 'https'),
111 isPublisherValidated: true, // Default to true. Will be set to false later, if topic has publisher validation url.
112 };
113 }
114
115
116 /**
117 *
118 * @param {*} dbCtx
119 * @param {RootData} data
120 * @param {String[]} warn
121 * @param {String[]} err
122 * @param {String} requestId
123 */
124 async _validateRootData(dbCtx, data, warn, err, requestId) {
125 // These checks can modify data, so order matters.
126 await this._checkTopic(dbCtx, data, warn, err, requestId);
127 this._checkCallbackAndSecrets(data, warn, err, requestId);
128 await this._checkMode(dbCtx, data, warn, err, requestId);
129 }
130
131
132 /**
133 * Check that requested topic exists and values are in range.
134 * Sets topic id, publisher validation state, and requested lease
135 * seconds on data.
136 * @param {*} dbCtx
137 * @param {RootData} data
138 * @param {String[]} warn
139 * @param {String[]} err
140 */
141 async _checkTopic(dbCtx, data, warn, err, requestId) {
142 const _scope = _fileScope('_checkTopic');
143 let topic;
144
145 if (data.topic) {
146 topic = await this.db.topicGetByUrl(dbCtx, data.topic);
147
148 if (!topic && this._newTopicCreationAllowed()) {
149 this.logger.info(_scope, 'new topic from subscribe request', { data, requestId });
150
151 try {
152 new URL(data.topic);
153 } catch (e) {
154 err.push('invalid topic url (failed to parse url)');
155 return;
156 }
157
158 await this.db.topicSet(dbCtx, {
159 url: data.topic,
160 });
161 topic = await this.db.topicGetByUrl(dbCtx, data.topic);
162 }
163 }
164
165 if (!topic || topic.isDeleted) {
166 err.push('not a supported topic');
167 return;
168 }
169
170 data.topicId = topic.id;
171
172 if (data.leaseSeconds === undefined || isNaN(data.leaseSeconds)) {
173 data.leaseSeconds = topic.leaseSecondsPreferred;
174 } else {
175 if (data.leaseSeconds > topic.leaseSecondsMax) {
176 data.leaseSeconds = topic.leaseSecondsMax;
177 warn.push(`requested lease too long, using ${data.leaseSeconds}`);
178 } else if (data.leaseSeconds < topic.leaseSecondsMin) {
179 data.leaseSeconds = topic.leaseSecondsMin;
180 warn.push(`requested lease too short, using ${data.leaseSeconds}`);
181 }
182 }
183
184 if (topic.publisherValidationUrl) {
185 data.isPublisherValidated = false;
186 }
187 }
188
189
190 /**
191 * Check data for valid callback url and scheme constraints.
192 * @param {RootData} data
193 * @param {String[]} warn
194 * @param {String[]} err
195 */
196 _checkCallbackAndSecrets(data, warn, err) {
197 let isCallbackSecure = false;
198
199 if (!data.callback) {
200 err.push('invalid callback url (empty)');
201 } else {
202 try {
203 const c = new URL(data.callback);
204 isCallbackSecure = (c.protocol.toLowerCase() === 'https:'); // Colon included because url module is weird
205 } catch (e) {
206 err.push('invalid callback url (failed to parse url');
207 return;
208 }
209 }
210
211 if (!isCallbackSecure) {
212 warn.push('insecure callback');
213 }
214
215 if (data.secret) {
216 const secretSeverity = this.options.manager.strictSecrets ? err : warn;
217 if (!data.isSecure) {
218 secretSeverity.push('secret not safe (insecure hub)');
219 }
220 if (!isCallbackSecure) {
221 secretSeverity.push('secret not safe (insecure callback)');
222 }
223 if (data.secret.length > 199) {
224 err.push('cannot keep a secret that big');
225 }
226 }
227 }
228
229 /**
230 * Check mode validity and subscription requirements.
231 * Publish mode is handled elsewhere in the flow.
232 * @param {*} dbCtx
233 * @param {RootData} data
234 * @param {String[]} warn
235 * @param {String[]} err
236 * @param {String} requestId
237 */
238 async _checkMode(dbCtx, data, warn, err) {
239 switch (data.mode) {
240 case Enum.Mode.Subscribe:
241 break;
242
243 case Enum.Mode.Unsubscribe: {
244 const currentEpoch = Date.now() / 1000;
245 let s;
246 if (data.callback && data.topicId) {
247 s = await this.db.subscriptionGet(dbCtx, data.callback, data.topicId);
248 }
249 if (s === undefined) {
250 err.push('not subscribed');
251 } else {
252 if (s.expires < currentEpoch) {
253 err.push('subscription already expired');
254 }
255 }
256 break;
257 }
258
259 default: {
260 err.push('invalid mode');
261 }
262 }
263 }
264
265
266 /**
267 * Determine if a topic url is allowed to be created.
268 * In the future, this may be more complicated.
269 * @returns {Boolean}
270 */
271 _newTopicCreationAllowed() {
272 return this.options.manager.publicHub;
273 }
274
275
276 /**
277 * Check that a publish request's topic(s) are valid and exist,
278 * returning an array with the results for each.
279 * For a public-hub publish request, creates topics if they do not exist.
280 * @param {*} dbCtx
281 * @param {RootData} data
282 * @param {String[]} warn
283 * @param {String[]} err
284 * @param {String} requestId
285 */
286 async _publishTopics(dbCtx, data, requestId) {
287 const _scope = _fileScope('_checkPublish');
288
289 // Publish requests may include multiple topics, consider them all, but deduplicate.
290 const publishUrls = Array.from(new Set([
291 ...common.ensureArray(data.url),
292 ...common.ensureArray(data.topic),
293 ]));
294
295 // Map the requested topics to their ids, creating if necessary.
296 return Promise.all(publishUrls.map(async (url) => {
297 const result = {
298 url,
299 warn: [],
300 err: [],
301 topicId: undefined,
302 };
303 let topic = await this.db.topicGetByUrl(dbCtx, url);
304 if (!topic && this._newTopicCreationAllowed()) {
305 try {
306 new URL(url);
307 } catch (e) {
308 result.err.push('invalid topic url (failed to parse url)');
309 return result;
310 }
311 await this.db.topicSet(dbCtx, {
312 // TODO: accept a publisherValidationUrl parameter
313 url,
314 });
315 topic = await this.db.topicGetByUrl(dbCtx, url);
316 this.logger.info(_scope, 'new topic from publish request', { url, requestId });
317 }
318 if (!topic || topic.isDeleted) {
319 result.err.push('topic not supported');
320 return result;
321 }
322 result.topicId = topic.id;
323 return result;
324 }));
325 }
326
327
328 /**
329 * Render response for multi-topic publish requests.
330 * @param {Object[]} publishTopics
331 */
332 static multiPublishContent(ctx, publishTopics) {
333 const responses = publishTopics.map((topic) => ({
334 href: topic.url,
335 status: topic.status,
336 statusMessage: topic.statusMessage,
337 errors: topic.err,
338 warnings: topic.warn,
339 }));
340 switch (ctx.responseType) {
341 case Enum.ContentType.ApplicationJson:
342 return JSON.stringify(responses);
343
344 case Enum.ContentType.TextPlain:
345 default: {
346 const textResponses = responses.map((response) => {
347 const details = Manager._prettyDetails(response.errors, response.warnings);
348 const textDetails = (details.length ? '\n' : '') + details.map((d) => `\t${d}`).join('\n');
349 return `${response.href} [${response.status} ${response.statusMessage}]${textDetails}`;
350 });
351 return textResponses.join('\n----\n');
352 }
353 }
354 }
355
356
357 /**
358 * Process a publish request.
359 * @param {*} dbCtx
360 * @param {Object} data
361 * @param {http.ServerResponse} res
362 * @param {Object} ctx
363 */
364 async _publishRequest(dbCtx, data, res, ctx) {
365 const _scope = _fileScope('_parsePublish');
366 this.logger.debug(_scope, 'called', { data });
367
368 const requestId = ctx.requestId;
369
370 // Parse and validate all the topics in the request.
371 data.publishTopics = await this._publishTopics(dbCtx, data, requestId);
372 if (!data.publishTopics || !data.publishTopics.length) {
373 const details = Manager._prettyDetails(['no valid topic urls to publish'], []);
374 throw new ResponseError(Enum.ErrorResponse.BadRequest, details);
375 }
376
377 // Set status per topic
378 for (const topicResult of data.publishTopics) {
379 topicResult.status = topicResult.err.length ? 400 : 202;
380 topicResult.statusMessage = topicResult.err.length ? 'Bad Request' : 'Accepted';
381 }
382
383 // Process the valid publish notifications
384 const validPublishTopics = data.publishTopics.filter((topicResult) => !topicResult.err.length);
385 try {
386 await Promise.all(validPublishTopics.map(async (topicResult) => this.db.topicFetchRequested(dbCtx, topicResult.topicId)));
387 } catch (e) {
388 this.logger.error(_scope, 'topicFetchRequest failed', { error: e, ctx, data, requestId });
389 throw e;
390 }
391
392 this.logger.info(_scope, 'request accepted', { ctx, data, requestId });
393
394 if (data.publishTopics.length === 1) {
395 const soleTopic = data.publishTopics[0];
396 res.statusCode = soleTopic.status;
397 res.end(Manager._prettyDetails(soleTopic.err, soleTopic.warn).join('\n'));
398 } else {
399 res.statusCode = 207;
400 res.end(Manager.multiPublishContent(ctx, data.publishTopics));
401 }
402
403 if (this.options.manager.processImmediately
404 && validPublishTopics.length) {
405 try {
406 await Promise.all(validPublishTopics.map(async (topicResult) => this.communication.topicFetchClaimAndProcessById(dbCtx, topicResult.topicId, requestId)));
407 } catch (e) {
408 this.logger.error(_scope, 'topicFetchClaimAndProcessById failed', { data, validPublishTopics, requestId });
409 // Don't bother re-throwing, as we've already ended this response.
410 }
411 }
412 }
413
414
415 /**
416 * Annotate any encountered issues.
417 * @param {String[]} err
418 * @param {String[]} warn
419 * @returns {String[]}
420 */
421 static _prettyDetails(err, warn) {
422 return [
423 ...err.map((entry) => `error: ${entry}`),
424 ...warn.map((entry) => `warning: ${entry}`),
425 ];
426 }
427
428
429 /**
430 * POST request for root.
431 * @param {http.ClientRequest} req
432 * @param {http.ServerResponse} res
433 * @param {object} ctx
434 */
435 async postRoot(req, res, ctx) {
436 const _scope = _fileScope('postRoot');
437 this.logger.debug(_scope, 'called', { ctx });
438
439 res.statusCode = 202; // Presume success.
440
441 const warn = [];
442 const err = [];
443 const data = Manager._getRootData(req, ctx);
444 const requestId = ctx.requestId;
445
446 await this.db.context(async (dbCtx) => {
447
448 // Handle publish requests elsewhere
449 if (data.mode === Enum.Mode.Publish) {
450 return this._publishRequest(dbCtx, data, res, ctx);
451 }
452
453 await this._validateRootData(dbCtx, data, warn, err, requestId);
454
455 const details = Manager._prettyDetails(err, warn);
456
457 // Any errors are fatal. Stop and report anything that went wrong.
458 if (err.length) {
459 this.logger.debug(_scope, { msg: 'invalid request', data, err, warn, requestId });
460 throw new ResponseError(Enum.ErrorResponse.BadRequest, details);
461 }
462
463 // Commit the request for later processing.
464 let id;
465 try {
466 id = await this.db.verificationInsert(dbCtx, { ...data, requestId });
467 } catch (e) {
468 this.logger.error(_scope, 'verificationInsert failed', { error: e, data, warn, id, requestId });
469 throw e;
470 }
471
472 // If we committed to the db, we've succeeded as far as the client is concerned.
473 res.end(details.join('\n'));
474 this.logger.info(_scope, 'request accepted', { data, warn, requestId });
475
476 // Immediately attempt to claim and process the request.
477 if (this.options.manager.processImmediately
478 && id) {
479 try {
480 await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId);
481 } catch (e) {
482 this.logger.error(_scope, 'verificationClaimAndProcessById failed', { ...data, id, requestId });
483 // Don't bother re-throwing, as we've already ended this response.
484 }
485 }
486 }); // dbCtx
487 }
488
489
490 /**
491 * Render topic info content.
492 * @param {Object} ctx
493 * @param {String} ctx.responseType
494 * @param {String} ctx.topicUrl
495 * @param {Number} ctx.count
496 * @returns {String}
497 */
498 // eslint-disable-next-line class-methods-use-this
499 infoContent(ctx) {
500 // eslint-disable-next-line sonarjs/no-small-switch
501 switch (ctx.responseType) {
502 case Enum.ContentType.ApplicationJson:
503 return JSON.stringify({
504 topic: ctx.topicUrl,
505 count: ctx.count,
506 });
507
508 case Enum.ContentType.ImageSVG:
509 return Template.badgeSVG({}, ` ${ctx.topicUrl} `, ` ${ctx.count} subscribers `, `${ctx.topicUrl} has ${ctx.count} subscribers.`);
510
511 default:
512 return ctx.count.toString();
513 }
514 }
515
516
517 /**
518 * GET request for /info?topic=url&format=type
519 * @param {http.ServerResponse} res
520 * @param {object} ctx
521 */
522 async getInfo(res, ctx) {
523 const _scope = _fileScope('getInfo');
524 this.logger.debug(_scope, 'called', { ctx });
525
526 if (!ctx.queryParams.topic) {
527 throw new ResponseError(Enum.ErrorResponse.BadRequest, 'missing required parameter');
528 }
529 ctx.topicUrl = ctx.queryParams.topic;
530
531 switch ((ctx.queryParams.format || '').toLowerCase()) {
532 case 'svg':
533 ctx.responseType = Enum.ContentType.ImageSVG;
534 res.setHeader(Enum.Header.ContentType, ctx.responseType);
535 break;
536
537 case 'json':
538 ctx.responseType = Enum.ContentType.ApplicationJson;
539 res.setHeader(Enum.Header.ContentType, ctx.responseType);
540 break;
541
542 default:
543 break;
544 }
545
546 try {
547 new URL(ctx.topicUrl);
548 } catch (e) {
549 throw new ResponseError(Enum.ErrorResponse.BadRequest, 'invalid topic');
550 }
551
552 let count;
553 await this.db.context(async (dbCtx) => {
554 count = await this.db.subscriptionCountByTopicUrl(dbCtx, ctx.topicUrl);
555 if (!count) {
556 throw new ResponseError(Enum.ErrorResponse.NotFound, 'no such topic');
557 }
558 ctx.count = count.count;
559 });
560
561 res.end(this.infoContent(ctx));
562 this.logger.info(_scope, 'finished', { ...ctx });
563 }
564
565
566 /**
567 * GET request for authorized /admin information.
568 * @param {http.ServerResponse} res
569 * @param {object} ctx
570 */
571 async getAdminOverview(res, ctx) {
572 const _scope = _fileScope('getAdminOverview');
573 this.logger.debug(_scope, 'called', { ctx });
574
575 await this.db.context(async (dbCtx) => {
576 ctx.topics = await this.db.topicGetAll(dbCtx);
577 });
578 this.logger.debug(_scope, 'got topics', { topics: ctx.topics });
579
580 // Profile users can only see related topics.
581 if (ctx.session && ctx.session.authenticatedProfile) {
582 const profileUrlObj = new URL(ctx.session.authenticatedProfile);
583 ctx.topics = ctx.topics.filter((topic) => {
584 const topicUrlObj = new URL(topic.url);
585 return (topicUrlObj.hostname === profileUrlObj.hostname);
586 });
587 }
588
589 res.end(Template.adminOverviewHTML(ctx, this.options));
590 this.logger.info(_scope, 'finished', { ...ctx, topics: ctx.topics.length })
591 }
592
593
594 /**
595 * GET request for authorized /admin/topic/:topicId information.
596 * @param {http.ServerResponse} res
597 * @param {object} ctx
598 */
599 async getTopicDetails(res, ctx) {
600 const _scope = _fileScope('getTopicDetails');
601 this.logger.debug(_scope, 'called', { ctx });
602
603 const topicId = ctx.params.topicId;
604 await this.db.context(async (dbCtx) => {
605 ctx.topic = await this.db.topicGetById(dbCtx, topicId);
606 ctx.subscriptions = await this.db.subscriptionsByTopicId(dbCtx, topicId);
607 });
608 this.logger.debug(_scope, 'got topic details', { topic: ctx.topic, subscriptions: ctx.subscriptions });
609
610 // Profile users can only see related topics.
611 if (ctx.session && ctx.session.authenticatedProfile) {
612 const profileUrlObj = new URL(ctx.session.authenticatedProfile);
613 const topicUrlObj = new URL(ctx.topic.url);
614 if (topicUrlObj.hostname !== profileUrlObj.hostname) {
615 ctx.topic = null;
616 ctx.subscriptions = [];
617 }
618 }
619
620 res.end(Template.adminTopicDetailsHTML(ctx, this.options));
621 this.logger.info(_scope, 'finished', { ...ctx, subscriptions: ctx.subscriptions.length, topic: ctx.topic && ctx.topic.id || ctx.topic });
622 }
623
624
625 /**
626 * PATCH and DELETE for updating topic data.
627 * @param {http.ServerResponse} res
628 * @param {Object} ctx
629 */
630 async updateTopic(res, ctx) {
631 const _scope = _fileScope('updateTopic');
632 this.logger.debug(_scope, 'called', { ctx });
633
634 const topicId = ctx.params.topicId;
635
636 await this.db.context(async (dbCtx) => {
637 await this.db.transaction(dbCtx, async (txCtx) => {
638 // Get topic without defaults filled in, to persist nulls
639 const topic = await this.db.topicGetById(txCtx, topicId, false);
640 if (!topic) {
641 this.logger.debug(_scope, 'no topic', { ctx });
642 throw new Errors.ResponseError(Enum.ErrorResponse.NotFound);
643 }
644
645 if (ctx.method === 'DELETE') {
646 await this.db.topicDeleted(txCtx, topicId);
647 res.end();
648 this.logger.info(_scope, 'topic set deleted', { ctx, topicId });
649 // Attempt to remove from db if no active subscriptions.
650 await this.db.topicPendingDelete(txCtx, topicId);
651 return;
652 }
653
654 const updatableFields = [
655 'leaseSecondsPreferred',
656 'leaseSecondsMin',
657 'leaseSecondsMax',
658 'publisherValidationUrl',
659 'contentHashAlgorithm',
660 ];
661
662 const patchValues = common.pick({
663 ...ctx.queryParams,
664 ...ctx.parsedBody,
665 }, updatableFields);
666
667 [
668 'leaseSecondsPreferred',
669 'leaseSecondsMin',
670 'leaseSecondsMax',
671 ].filter((field) => field in patchValues).forEach((field) => {
672 // eslint-disable-next-line security/detect-object-injection
673 patchValues[field] = parseInt(patchValues[field], 10);
674 });
675
676 const patchKeys = Object.keys(patchValues);
677 if (patchKeys.length === 0
678 // eslint-disable-next-line security/detect-object-injection
679 || patchKeys.every((k) => patchValues[k] == topic[k])) {
680 res.statusCode = 204;
681 res.end();
682 this.logger.info(_scope, 'empty topic update', { ctx, topicId });
683 return;
684 }
685 const patchedTopic = {
686 ...topic,
687 ...patchValues,
688 };
689
690 this.logger.debug(_scope, 'data', { topic, patchValues, patchedTopic });
691
692 try {
693 await this.db.topicUpdate(txCtx, { topicId, ...patchedTopic });
694 } catch (e) {
695 if (e instanceof DBErrors.DataValidation) {
696 this.logger.debug(_scope, 'validation error', { error: e, ctx, topicId });
697 throw new Errors.ResponseError(Enum.ErrorResponse.BadRequest, e.message);
698 }
699 this.logger.error(_scope, 'failed', { error: e, ctx, topicId });
700 throw e;
701 }
702 res.end();
703 this.logger.info(_scope, 'topic updated', { ctx, topicId, patchValues });
704 }); // transaction
705 }); // context
706 }
707
708
709 /**
710 * PATCH and DELETE for updating subscription data.
711 * @param {http.ServerResponse} res
712 * @param {Object} ctx
713 */
714 async updateSubscription(res, ctx) {
715 const _scope = _fileScope('updateSubscription');
716 this.logger.debug(_scope, 'called', { ctx });
717
718 const subscriptionId = ctx.params.subscriptionId;
719
720 await this.db.context(async (dbCtx) => {
721 await this.db.transaction(dbCtx, async (txCtx) => {
722 const subscription = await this.db.subscriptionGetById(txCtx, subscriptionId);
723 if (!subscription) {
724 this.logger.debug(_scope, 'no subscription', { ctx });
725 throw new Errors.ResponseError(Enum.ErrorResponse.NotFound);
726 }
727
728 if (ctx.method === 'DELETE') {
729 const deleteFields = common.pick({
730 ...ctx.queryParams,
731 ...ctx.parsedBody,
732 }, ['reason']);
733
734 // Queue an unsubscription.
735 const verification = {
736 topicId: subscription.topicId,
737 callback: subscription.callback,
738 mode: Enum.Mode.Denied,
739 reason: 'subscription removed by administrative action',
740 isPublisherValidated: true,
741 requestId: ctx.requestId,
742 ...deleteFields,
743 };
744
745 await this.db.verificationInsert(txCtx, verification);
746 this.logger.info(_scope, 'subscription removal initiated', { ctx, verification });
747 res.end();
748 return;
749 }
750
751 const updatableFields = [
752 'signatureAlgorithm',
753 ];
754
755 const patchValues = common.pick({
756 ...ctx.queryParams,
757 ...ctx.parsedBody,
758 }, updatableFields);
759
760 const patchKeys = Object.keys(patchValues);
761 if (patchKeys.length === 0
762 // eslint-disable-next-line security/detect-object-injection
763 || patchKeys.every((k) => patchValues[k] == subscription[k])) {
764 res.statusCode = 204;
765 res.end();
766 return;
767 }
768 const patchedSubscription = {
769 ...subscription,
770 ...patchValues,
771 };
772
773 try {
774 await this.db.subscriptionUpdate(txCtx, { subscriptionId, ...patchedSubscription });
775 } catch (e) {
776 if (e instanceof DBErrors.DataValidation) {
777 this.logger.debug(_scope, 'validation error', { error: e, ctx, subscriptionId });
778 throw new Errors.ResponseError(Enum.ErrorResponse.BadRequest, e.message);
779 }
780 this.logger.info(_scope, 'failed', { error: e, ctx, subscriptionId });
781 throw e;
782 }
783 res.end();
784 this.logger.info(_scope, 'subscription updated', { ctx, subscriptionId, patchValues });
785 }); // transaction
786 }); // context
787 }
788
789 /**
790 * POST request for manually running worker.
791 * @param {http.ServerResponse} res
792 * @param {object} ctx
793 */
794 async processTasks(res, ctx) {
795 const _scope = _fileScope('processTasks');
796 this.logger.debug(_scope, 'called', { ctx });
797
798 // N.B. no await on this
799 this.communication.worker.process().catch((e) => {
800 this.logger.error(_scope, 'failed', { error: e, ctx });
801 });
802
803 res.end();
804 this.logger.info(_scope, 'invoked worker process', { ctx });
805 }
806
807 }
808
809 module.exports = Manager;