a7f9f55b49cc9cbd4e7db76fac5d0c0b6c23889c
4 * Here we process all the incoming requests.
5 * Payload validation occurs here, before committing the pending work to the
6 * database and (usually) calling a processor to act upon it.
8 * As this is the mediator between server framework and actions, this would
9 * be where most of a rewrite for a new server framework would happen.
12 const common
= require('./common');
13 const Communication
= require('./communication');
14 const Enum
= require('./enum');
15 const Errors
= require('./errors');
16 const DBErrors
= require('./db/errors');
17 const { ResponseError
} = require('./errors');
18 const Template
= require('./template');
20 const _fileScope
= common
.fileScope(__filename
);
23 constructor(logger
, db
, options
) {
26 this.options
= options
;
27 this.communication
= new Communication(logger
, db
, options
);
31 * @typedef {import('node:http')} http
35 * GET request for healthcheck.
36 * @param {http.ServerResponse} res response
37 * @param {object} ctx context
39 async
getHealthcheck(res
, ctx
) {
40 const _scope
= _fileScope('getHealthcheck');
41 const health
= 'happy';
43 // What else could we check...
44 const dbHealth
= await
this.db
.healthCheck();
45 this.logger
.debug(_scope
, 'called', { health
, dbHealth
, ctx
});
51 * GET request for root.
52 * @param {http.ClientRequest} req request
53 * @param {http.ServerResponse} res response
54 * @param {object} ctx context
56 async
getRoot(req
, res
, ctx
) {
57 const _scope
= _fileScope('getRoot');
58 this.logger
.debug(_scope
, 'called', { ctx
});
60 const content
= Template
.rootHTML(ctx
, this.options
);
62 this.logger
.info(_scope
, 'finished', { ctx
});
67 * All the fields the root handler deals with.
68 * @typedef {object} RootData
69 * @property {string} callback url
70 * @property {string} mode mode
71 * @property {string} topic topic
72 * @property {number} topicId topic id
73 * @property {string} leaseSeconds lease seconds
74 * @property {string} secret secret
75 * @property {string} httpRemoteAddr remote address
76 * @property {string} httpFrom from
77 * @property {boolean} isSecure is secure
78 * @property {boolean} isPublisherValidated is published validated
82 * Extract api parameters.
83 * @param {http.ClientRequest} req request
84 * @param {object} ctx context
85 * @returns {RootData} root data
87 static _getRootData(req
, ctx
) {
88 const postData
= ctx
.parsedBody
;
89 const mode
= (postData
['hub.mode'] || '').toLowerCase();
91 callback: postData
['hub.callback'],
93 ...(mode
=== Enum
.Mode
.Publish
&& { url: postData
['hub.url'] }), // Publish accepts either hub.url or hub.topic
94 topic: postData
['hub.topic'],
95 ...(postData
['hub.lease_seconds'] && { leaseSeconds: parseInt(postData
['hub.lease_seconds'], 10) }),
96 secret: postData
['hub.secret'],
97 httpRemoteAddr: ctx
.clientAddress
,
98 httpFrom: req
.getHeader(Enum
.Header
.From
),
99 isSecure: ((ctx
.clientProtocol
|| '').toLowerCase() === 'https'),
100 isPublisherValidated: true, // Default to true. Will be set to false later, if topic has publisher validation url.
107 * @param {*} dbCtx db context
108 * @param {RootData} data root data
109 * @param {string[]} warn warnings
110 * @param {string[]} err errors
111 * @param {string} requestId request id
112 * @returns {Promise<void>}
114 async
_validateRootData(dbCtx
, data
, warn
, err
, requestId
) {
115 // These checks can modify data, so order matters.
116 await
this._checkTopic(dbCtx
, data
, warn
, err
, requestId
);
117 this._checkCallbackAndSecrets(data
, warn
, err
, requestId
);
118 await
this._checkMode(dbCtx
, data
, warn
, err
, requestId
);
123 * Check that requested topic exists and values are in range.
124 * Sets topic id, publisher validation state, and requested lease
126 * @param {*} dbCtx db context
127 * @param {RootData} data root data
128 * @param {string[]} warn warnings
129 * @param {string[]} err errors
130 * @param {string} requestId request id
131 * @returns {Promise<void>}
133 async
_checkTopic(dbCtx
, data
, warn
, err
, requestId
) {
134 const _scope
= _fileScope('_checkTopic');
138 topic
= await
this.db
.topicGetByUrl(dbCtx
, data
.topic
);
140 if (!topic
&& this._newTopicCreationAllowed()) {
141 this.logger
.info(_scope
, 'new topic from subscribe request', { data
, requestId
});
145 } catch (e
) { // eslint-disable-line no-unused-vars
146 err
.push('invalid topic url (failed to parse url)');
150 await
this.db
.topicSet(dbCtx
, {
153 topic
= await
this.db
.topicGetByUrl(dbCtx
, data
.topic
);
157 if (!topic
|| topic
.isDeleted
) {
158 err
.push('not a supported topic');
162 data
.topicId
= topic
.id
;
164 if (data
.leaseSeconds
=== undefined || isNaN(data
.leaseSeconds
)) {
165 data
.leaseSeconds
= topic
.leaseSecondsPreferred
;
166 } else if (data
.leaseSeconds
> topic
.leaseSecondsMax
) {
167 data
.leaseSeconds
= topic
.leaseSecondsMax
;
168 warn
.push(`requested lease too long, using ${data.leaseSeconds}`);
169 } else if (data
.leaseSeconds
< topic
.leaseSecondsMin
) {
170 data
.leaseSeconds
= topic
.leaseSecondsMin
;
171 warn
.push(`requested lease too short, using ${data.leaseSeconds}`);
174 if (topic
.publisherValidationUrl
) {
175 data
.isPublisherValidated
= false;
181 * Check data for valid callback url and scheme constraints.
182 * @param {RootData} data root data
183 * @param {string[]} warn warnings
184 * @param {string[]} err errors
186 _checkCallbackAndSecrets(data
, warn
, err
) {
187 let isCallbackSecure
= false;
189 if (!data
.callback
) {
190 err
.push('invalid callback url (empty)');
193 const c
= new URL(data
.callback
);
194 isCallbackSecure
= (c
.protocol
.toLowerCase() === 'https:'); // Colon included because url module is weird
195 } catch (e
) { // eslint-disable-line no-unused-vars
196 err
.push('invalid callback url (failed to parse url');
201 if (!isCallbackSecure
) {
202 warn
.push('insecure callback');
206 const secretSeverity
= this.options
.manager
.strictSecrets
? err : warn
;
207 if (!data
.isSecure
) {
208 secretSeverity
.push('secret not safe (insecure hub)');
210 if (!isCallbackSecure
) {
211 secretSeverity
.push('secret not safe (insecure callback)');
213 if (data
.secret
.length
> 199) {
214 err
.push('cannot keep a secret that big');
220 * Check mode validity and subscription requirements.
221 * Publish mode is handled elsewhere in the flow.
222 * @param {*} dbCtx db context
223 * @param {RootData} data root data
224 * @param {string[]} warn warnings
225 * @param {string[]} err errors
226 * @returns {Promise<void>}
228 async
_checkMode(dbCtx
, data
, warn
, err
) {
230 case Enum
.Mode
.Subscribe:
233 case Enum
.Mode
.Unsubscribe: {
234 const currentEpoch
= Date
.now() / 1000;
236 if (data
.callback
&& data
.topicId
) {
237 s
= await
this.db
.subscriptionGet(dbCtx
, data
.callback
, data
.topicId
);
239 if (s
=== undefined) {
240 err
.push('not subscribed');
241 } else if (s
.expires
< currentEpoch
) {
242 err
.push('subscription already expired');
249 err
.push('invalid mode');
256 * Determine if a topic url is allowed to be created.
257 * In the future, this may be more complicated.
258 * @returns {boolean} is public hub
260 _newTopicCreationAllowed() {
261 return this.options
.manager
.publicHub
;
266 * Check that a publish request's topic(s) are valid and exist,
267 * returning an array with the results for each.
268 * For a public-hub publish request, creates topics if they do not exist.
269 * @param {*} dbCtx db context
270 * @param {RootData} data root data
271 * @param {string} requestId request id
272 * @returns {Promise<object[]>} results
274 async
_publishTopics(dbCtx
, data
, requestId
) {
275 const _scope
= _fileScope('_checkPublish');
277 // Publish requests may include multiple topics, consider them all, but deduplicate.
278 const publishUrls
= Array
.from(new Set([
279 ...common
.ensureArray(data
.url
),
280 ...common
.ensureArray(data
.topic
),
283 // Map the requested topics to their ids, creating if necessary.
284 return Promise
.all(publishUrls
.map(async (url
) => {
291 let topic
= await
this.db
.topicGetByUrl(dbCtx
, url
);
292 if (!topic
&& this._newTopicCreationAllowed()) {
295 } catch (e
) { // eslint-disable-line no-unused-vars
296 result
.err
.push('invalid topic url (failed to parse url)');
299 await
this.db
.topicSet(dbCtx
, {
300 // TODO: accept a publisherValidationUrl parameter
303 topic
= await
this.db
.topicGetByUrl(dbCtx
, url
);
304 this.logger
.info(_scope
, 'new topic from publish request', { url
, requestId
});
306 if (!topic
|| topic
.isDeleted
) {
307 result
.err
.push('topic not supported');
310 result
.topicId
= topic
.id
;
317 * Render response for multi-topic publish requests.
318 * @param {object} ctx context
319 * @param {object[]} publishTopics topics
320 * @returns {string} response content
322 static multiPublishContent(ctx
, publishTopics
) {
323 const responses
= publishTopics
.map((topic
) => ({
325 status: topic
.status
,
326 statusMessage: topic
.statusMessage
,
328 warnings: topic
.warn
,
330 switch (ctx
.responseType
) {
331 case Enum
.ContentType
.ApplicationJson:
332 return JSON
.stringify(responses
);
334 case Enum
.ContentType
.TextPlain:
336 const textResponses
= responses
.map((response
) => {
337 const details
= Manager
._prettyDetails(response
.errors
, response
.warnings
);
338 const textDetails
= (details
.length
? '\n' : '') + details
.map((d
) => `\t${d}`).join('\n');
339 return `${response.href} [${response.status} ${response.statusMessage}]${textDetails}`;
341 return textResponses
.join('\n----\n');
348 * Process a publish request.
349 * @param {*} dbCtx db context
350 * @param {object} data data
351 * @param {http.ServerResponse} res response
352 * @param {object} ctx context
354 async
_publishRequest(dbCtx
, data
, res
, ctx
) {
355 const _scope
= _fileScope('_parsePublish');
356 this.logger
.debug(_scope
, 'called', { data
});
358 const requestId
= ctx
.requestId
;
360 // Parse and validate all the topics in the request.
361 data
.publishTopics
= await
this._publishTopics(dbCtx
, data
, requestId
);
362 if (!data
?.publishTopics
?.length
) {
363 const details
= Manager
._prettyDetails(['no valid topic urls to publish'], []);
364 throw new ResponseError(Enum
.ErrorResponse
.BadRequest
, details
);
367 // Set status per topic
368 for (const topicResult
of data
.publishTopics
) {
369 topicResult
.status
= topicResult
.err
.length
? 400 : 202;
370 topicResult
.statusMessage
= topicResult
.err
.length
? 'Bad Request' : 'Accepted';
373 // Process the valid publish notifications
374 const validPublishTopics
= data
.publishTopics
.filter((topicResult
) => !topicResult
.err
.length
);
376 await Promise
.all(validPublishTopics
.map(async (topicResult
) => this.db
.topicFetchRequested(dbCtx
, topicResult
.topicId
)));
378 this.logger
.error(_scope
, 'topicFetchRequest failed', { error: e
, ctx
, data
, requestId
});
382 this.logger
.info(_scope
, 'request accepted', { ctx
, data
, requestId
});
384 if (data
.publishTopics
.length
=== 1) {
385 const soleTopic
= data
.publishTopics
[0];
386 res
.statusCode
= soleTopic
.status
;
387 res
.end(Manager
._prettyDetails(soleTopic
.err
, soleTopic
.warn
).join('\n'));
389 res
.statusCode
= 207;
390 res
.end(Manager
.multiPublishContent(ctx
, data
.publishTopics
));
393 if (this.options
.manager
.processImmediately
394 && validPublishTopics
.length
) {
396 await Promise
.all(validPublishTopics
.map(async (topicResult
) => this.communication
.topicFetchClaimAndProcessById(dbCtx
, topicResult
.topicId
, requestId
)));
397 } catch (e
) { // eslint-disable-line no-unused-vars
398 this.logger
.error(_scope
, 'topicFetchClaimAndProcessById failed', { data
, validPublishTopics
, requestId
});
399 // Don't bother re-throwing, as we've already ended this response.
406 * Annotate any encountered issues.
407 * @param {string[]} err errors
408 * @param {string[]} warn warnings
409 * @returns {string[]} rendered list of errors and warnings
411 static _prettyDetails(err
, warn
) {
413 ...err
.map((entry
) => `error: ${entry}`),
414 ...warn
.map((entry
) => `warning: ${entry}`),
420 * POST request for root.
421 * @param {http.ClientRequest} req request
422 * @param {http.ServerResponse} res response
423 * @param {object} ctx context
425 async
postRoot(req
, res
, ctx
) {
426 const _scope
= _fileScope('postRoot');
427 this.logger
.debug(_scope
, 'called', { ctx
});
429 res
.statusCode
= 202; // Presume success.
433 const data
= Manager
._getRootData(req
, ctx
);
434 const requestId
= ctx
.requestId
;
436 await
this.db
.context(async (dbCtx
) => {
438 // Handle publish requests elsewhere
439 if (data
.mode
=== Enum
.Mode
.Publish
) {
440 return this._publishRequest(dbCtx
, data
, res
, ctx
);
443 await
this._validateRootData(dbCtx
, data
, warn
, err
, requestId
);
445 const details
= Manager
._prettyDetails(err
, warn
);
447 // Any errors are fatal. Stop and report anything that went wrong.
449 this.logger
.debug(_scope
, { msg: 'invalid request', data
, err
, warn
, requestId
});
450 throw new ResponseError(Enum
.ErrorResponse
.BadRequest
, details
);
453 // Commit the request for later processing.
456 id
= await
this.db
.verificationInsert(dbCtx
, { ...data
, requestId
});
458 this.logger
.error(_scope
, 'verificationInsert failed', { error: e
, data
, warn
, id
, requestId
});
462 // If we committed to the db, we've succeeded as far as the client is concerned.
463 res
.end(details
.join('\n'));
464 this.logger
.info(_scope
, 'request accepted', { data
, warn
, requestId
});
466 // Immediately attempt to claim and process the request.
467 if (this.options
.manager
.processImmediately
470 await
this.communication
.verificationClaimAndProcessById(dbCtx
, id
, requestId
);
471 } catch (e
) { // eslint-disable-line no-unused-vars
472 this.logger
.error(_scope
, 'verificationClaimAndProcessById failed', { ...data
, id
, requestId
});
473 // Don't bother re-throwing, as we've already ended this response.
481 * Render topic info content.
482 * @param {object} ctx context
483 * @param {string} ctx.responseType response type
484 * @param {string} ctx.topicUrl topic url
485 * @param {number} ctx.count count of subscribers
486 * @returns {string} response content
488 // eslint-disable-next-line class-methods-use-this
491 switch (ctx
.responseType
) {
492 case Enum
.ContentType
.ApplicationJson:
493 return JSON
.stringify({
498 case Enum
.ContentType
.ImageSVG:
499 return Template
.badgeSVG({}, ` ${ctx.topicUrl} `, ` ${ctx.count} subscribers `, `${ctx.topicUrl} has ${ctx.count} subscribers.`);
502 return ctx
.count
.toString();
508 * GET request for /info?topic=url&format=type
509 * @param {http.ServerResponse} res response
510 * @param {object} ctx context
512 async
getInfo(res
, ctx
) {
513 const _scope
= _fileScope('getInfo');
514 this.logger
.debug(_scope
, 'called', { ctx
});
516 if (!ctx
.queryParams
.topic
) {
517 throw new ResponseError(Enum
.ErrorResponse
.BadRequest
, 'missing required parameter');
519 ctx
.topicUrl
= ctx
.queryParams
.topic
;
521 switch ((ctx
.queryParams
.format
|| '').toLowerCase()) {
523 ctx
.responseType
= Enum
.ContentType
.ImageSVG
;
524 res
.setHeader(Enum
.Header
.ContentType
, ctx
.responseType
);
528 ctx
.responseType
= Enum
.ContentType
.ApplicationJson
;
529 res
.setHeader(Enum
.Header
.ContentType
, ctx
.responseType
);
537 new URL(ctx
.topicUrl
);
538 } catch (e
) { // eslint-disable-line no-unused-vars
539 throw new ResponseError(Enum
.ErrorResponse
.BadRequest
, 'invalid topic');
543 await
this.db
.context(async (dbCtx
) => {
544 count
= await
this.db
.subscriptionCountByTopicUrl(dbCtx
, ctx
.topicUrl
);
546 throw new ResponseError(Enum
.ErrorResponse
.NotFound
, 'no such topic');
548 ctx
.count
= count
.count
;
551 const content
= this.infoContent(ctx
);
552 res
.setHeader(Enum
.Header
.ETag
, common
.generateETag(undefined, undefined, content
));
553 res
.setHeader(Enum
.Header
.CacheControl
, 'no-cache');
555 this.logger
.info(_scope
, 'finished', { ctx
});
560 * label the bars of the topic update history graph
561 * @param {number} index index
562 * @param {number} value value
563 * @returns {string} caption
565 static _historyBarCaption(index
, value
) {
575 when
= `${index} days ago`;
577 return `${when}, ${value || 'no'} update${value === 1 ? '': 's'}`;
582 * GET SVG chart of topic update history
583 * @param {http.ServerResponse} res response
584 * @param {object} ctx context
586 async
getHistorySVG(res
, ctx
) {
587 const _scope
= _fileScope('getHistorySVG');
588 this.logger
.debug(_scope
, 'called', { ctx
});
590 const days
= Math
.min(parseInt(ctx
.queryParams
.days
) || this.options
.manager
.publishHistoryDays
, 365);
591 const histOptions
= {
592 title: 'Topic Publish History',
593 description: 'Updates per Day',
594 labelZero: '^ Today',
602 barCaptionFn: Manager
._historyBarCaption
,
606 await
this.db
.context(async (dbCtx
) => {
607 publishHistory
= await
this.db
.topicPublishHistory(dbCtx
, ctx
.params
.topicId
, days
);
610 res
.end(Template
.histogramSVG(publishHistory
, histOptions
));
611 this.logger
.info(_scope
, 'finished', { ctx
});
616 * Determine if a profile url matches enough of a topic url to describe control over it.
617 * Topic must match hostname and start with the profile's path.
618 * @param {URL} profileUrlObj profile url
619 * @param {URL} topicUrlObj topic url
620 * @returns {boolean} profile is super-url of topic
622 static _profileControlsTopic(profileUrlObj
, topicUrlObj
) {
623 const hostnameMatches
= profileUrlObj
.hostname
=== topicUrlObj
.hostname
;
624 const pathIsPrefix
= topicUrlObj
.pathname
.startsWith(profileUrlObj
.pathname
);
625 return hostnameMatches
&& pathIsPrefix
;
630 * GET request for authorized /admin information.
631 * @param {http.ServerResponse} res response
632 * @param {object} ctx context
634 async
getAdminOverview(res
, ctx
) {
635 const _scope
= _fileScope('getAdminOverview');
636 this.logger
.debug(_scope
, 'called', { ctx
});
638 await
this.db
.context(async (dbCtx
) => {
639 ctx
.topics
= await
this.db
.topicGetAll(dbCtx
);
641 this.logger
.debug(_scope
, 'got topics', { topics: ctx
.topics
});
643 // Profile users can only see related topics.
644 if (ctx
?.session
?.authenticatedProfile
) {
645 const profileUrlObj
= new URL(ctx
.session
.authenticatedProfile
);
646 ctx
.topics
= ctx
.topics
.filter((topic
) => {
647 const topicUrlObj
= new URL(topic
.url
);
648 return Manager
._profileControlsTopic(profileUrlObj
, topicUrlObj
);
652 res
.end(Template
.adminOverviewHTML(ctx
, this.options
));
653 this.logger
.info(_scope
, 'finished', { ctx
, topics: ctx
.topics
.length
});
658 * GET request for authorized /admin/topic/:topicId information.
659 * @param {http.ServerResponse} res response
660 * @param {object} ctx context
662 async
getTopicDetails(res
, ctx
) {
663 const _scope
= _fileScope('getTopicDetails');
664 this.logger
.debug(_scope
, 'called', { ctx
});
666 ctx
.publishSpan
= 60; // FIXME: configurable
667 const topicId
= ctx
.params
.topicId
;
669 await
this.db
.context(async (dbCtx
) => {
670 ctx
.topic
= await
this.db
.topicGetById(dbCtx
, topicId
);
671 ctx
.subscriptions
= await
this.db
.subscriptionsByTopicId(dbCtx
, topicId
);
672 publishHistory
= await
this.db
.topicPublishHistory(dbCtx
, topicId
, ctx
.publishSpan
);
674 ctx
.publishCount
= publishHistory
.reduce((a
, b
) => a
+ b
, 0);
675 ctx
.subscriptionsDelivered
= ctx
.subscriptions
.filter((subscription
) => {
676 return subscription
.latestContentDelivered
>= ctx
.topic
.contentUpdated
;
678 this.logger
.debug(_scope
, 'got topic details', { topic: ctx
.topic
, subscriptions: ctx
.subscriptions
, updates: ctx
.publishCount
});
680 // Profile users can only see related topics.
681 if (ctx
?.session
?.authenticatedProfile
) {
682 const profileUrlObj
= new URL(ctx
.session
.authenticatedProfile
);
683 const topicUrlObj
= new URL(ctx
.topic
.url
);
684 if (!Manager
._profileControlsTopic(profileUrlObj
, topicUrlObj
)) {
686 ctx
.subscriptions
= [];
690 res
.end(Template
.adminTopicDetailsHTML(ctx
, this.options
));
691 this.logger
.info(_scope
, 'finished', { ctx
, subscriptions: ctx
.subscriptions
.length
, topic: ctx
?.topic
?.id
|| ctx
.topic
});
696 * PATCH and DELETE for updating topic data.
697 * @param {http.ServerResponse} res response
698 * @param {object} ctx context
700 async
updateTopic(res
, ctx
) {
701 const _scope
= _fileScope('updateTopic');
702 this.logger
.debug(_scope
, 'called', { ctx
});
704 const topicId
= ctx
.params
.topicId
;
706 await
this.db
.context(async (dbCtx
) => {
707 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
708 // Get topic without defaults filled in, to persist nulls
709 const topic
= await
this.db
.topicGetById(txCtx
, topicId
, false);
711 this.logger
.debug(_scope
, 'no topic', { ctx
});
712 throw new Errors
.ResponseError(Enum
.ErrorResponse
.NotFound
);
715 if (ctx
.method
=== 'DELETE') {
716 await
this.db
.topicDeleted(txCtx
, topicId
);
718 this.logger
.info(_scope
, 'topic set deleted', { ctx
, topicId
});
719 // Attempt to remove from db if no active subscriptions.
720 await
this.db
.topicPendingDelete(txCtx
, topicId
);
724 const updatableFields
= [
725 'leaseSecondsPreferred',
728 'publisherValidationUrl',
729 'contentHashAlgorithm',
732 const patchValues
= common
.pick({
738 'leaseSecondsPreferred',
741 ].filter((field
) => field
in patchValues
).forEach((field
) => {
742 // eslint-disable-next-line security/detect-object-injection
743 patchValues
[field
] = parseInt(patchValues
[field
], 10);
746 const patchKeys
= Object
.keys(patchValues
);
747 if (patchKeys
.length
=== 0
748 // eslint-disable-next-line security/detect-object-injection
749 || patchKeys
.every((k
) => patchValues
[k
] == topic
[k
])) {
750 res
.statusCode
= 204;
752 this.logger
.info(_scope
, 'empty topic update', { ctx
, topicId
});
755 const patchedTopic
= {
760 this.logger
.debug(_scope
, 'data', { topic
, patchValues
, patchedTopic
});
763 await
this.db
.topicUpdate(txCtx
, { topicId
, ...patchedTopic
});
765 if (e
instanceof DBErrors
.DataValidation
) {
766 this.logger
.debug(_scope
, 'validation error', { error: e
, ctx
, topicId
});
767 throw new Errors
.ResponseError(Enum
.ErrorResponse
.BadRequest
, e
.message
);
769 this.logger
.error(_scope
, 'failed', { error: e
, ctx
, topicId
});
773 this.logger
.info(_scope
, 'topic updated', { ctx
, topicId
, patchValues
});
780 * PATCH and DELETE for updating subscription data.
781 * @param {http.ServerResponse} res response
782 * @param {object} ctx context
784 async
updateSubscription(res
, ctx
) {
785 const _scope
= _fileScope('updateSubscription');
786 this.logger
.debug(_scope
, 'called', { ctx
});
788 const subscriptionId
= ctx
.params
.subscriptionId
;
790 await
this.db
.context(async (dbCtx
) => {
791 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
792 const subscription
= await
this.db
.subscriptionGetById(txCtx
, subscriptionId
);
794 this.logger
.debug(_scope
, 'no subscription', { ctx
});
795 throw new Errors
.ResponseError(Enum
.ErrorResponse
.NotFound
);
798 if (ctx
.method
=== 'DELETE') {
799 const deleteFields
= common
.pick({
804 // Queue an unsubscription.
805 const verification
= {
806 topicId: subscription
.topicId
,
807 callback: subscription
.callback
,
808 mode: Enum
.Mode
.Denied
,
809 reason: 'subscription removed by administrative action',
810 isPublisherValidated: true,
811 requestId: ctx
.requestId
,
815 await
this.db
.verificationInsert(txCtx
, verification
);
816 this.logger
.info(_scope
, 'subscription removal initiated', { ctx
, verification
});
821 const updatableFields
= [
822 'signatureAlgorithm',
825 const patchValues
= common
.pick({
830 const patchKeys
= Object
.keys(patchValues
);
831 if (patchKeys
.length
=== 0
832 // eslint-disable-next-line security/detect-object-injection
833 || patchKeys
.every((k
) => patchValues
[k
] == subscription
[k
])) {
834 res
.statusCode
= 204;
838 const patchedSubscription
= {
844 await
this.db
.subscriptionUpdate(txCtx
, { subscriptionId
, ...patchedSubscription
});
846 if (e
instanceof DBErrors
.DataValidation
) {
847 this.logger
.debug(_scope
, 'validation error', { error: e
, ctx
, subscriptionId
});
848 throw new Errors
.ResponseError(Enum
.ErrorResponse
.BadRequest
, e
.message
);
850 this.logger
.info(_scope
, 'failed', { error: e
, ctx
, subscriptionId
});
854 this.logger
.info(_scope
, 'subscription updated', { ctx
, subscriptionId
, patchValues
});
860 * POST request for manually running worker.
861 * @param {http.ServerResponse} res response
862 * @param {object} ctx context
864 async
processTasks(res
, ctx
) {
865 const _scope
= _fileScope('processTasks');
866 this.logger
.debug(_scope
, 'called', { ctx
});
868 // N.B. no await on this
869 this.communication
.worker
.process().catch((e
) => {
870 this.logger
.error(_scope
, 'failed', { error: e
, ctx
});
874 this.logger
.info(_scope
, 'invoked worker process', { ctx
});
879 module
.exports
= Manager
;