update dependencies and devDependencies, fix lint issues
[websub-hub] / src / communication.js
1 'use strict';
2
3 /**
4 * Here we wrangle all outgoing requests, as well as the
5 * worker which initiates most of them.
6 */
7
8 const common = require('./common');
9 const crypto = require('crypto');
10 const Enum = require('./enum');
11 const Errors = require('./errors');
12 const Worker = require('./worker');
13 const LinkHelper = require('./link-helper');
14 const { version: packageVersion, name: packageName } = require('../package.json'); // For default UA string
15
16 const _fileScope = common.fileScope(__filename);
17
18 class Communication {
19 constructor(logger, db, options) {
20 this.logger = logger;
21 this.db = db;
22 this.options = options;
23 this.linkHelper = new LinkHelper(logger, options);
24
25 if (this.options.dingus.selfBaseUrl) {
26 this.linkHub = `, <${this.options.dingus.selfBaseUrl}>; rel="hub"`;
27 } else {
28 this.linkHub = '';
29 this.logger.error(_fileScope('constructor'), 'empty dingus.selfBaseUrl value, server responses will not be compliant');
30 }
31
32 this.Got = undefined; // Will become the async imported got.
33 this.got = this._init; // First invocation imports got and replaces this.
34
35 this.worker = new Worker(logger, db, this.workFeed.bind(this), options);
36 this.worker.start();
37 }
38
39
40 /**
41 * Do a little dance to cope with ESM dynamic import.
42 * @param {...any} args arguments
43 * @returns {Promise<any>} got response
44 */
45 async _init(...args) {
46 if (!this.Got) {
47 // For some reason eslint is confused about import being supported here.
48
49 this.Got = await import('got');
50 this.got = this.Got.got.extend({
51 followRedirect: false, // Outgoing API calls should not encounter redirects
52 throwHttpErrors: false, // We will be checking status codes explicitly
53 headers: {
54 [Enum.Header.UserAgent]: Communication.userAgentString(this.options.userAgent),
55 },
56 timeout: {
57 request: this.options.communication.requestTimeoutMs || 120000,
58 },
59 hooks: {
60 beforeRetry: [
61 this._onRetry,
62 ],
63 },
64 });
65 }
66
67 /* istanbul ignore if */
68 if (args.length) {
69 /* istanbul ignore next */
70 return this.got(...args);
71 }
72 }
73
74
75 /**
76 * Take note of transient retries.
77 * @param {*} error error
78 * @param {*} retryCount retry count
79 */
80 _onRetry(error, retryCount) {
81 const _scope = _fileScope('_onRetry');
82 this.logger.debug(_scope, 'retry', { retryCount, error });
83 }
84
85
86 /**
87 * Construct a user-agent value.
88 * @param {object} userAgentConfig user agent config
89 * @param {string=} userAgentConfig.product product name (default package name)
90 * @param {string=} userAgentConfig.version version (default package version)
91 * @param {string=} userAgentConfig.implementation implementation (default spec supported)
92 * @returns {string} user agent string 'product/version (implementation)'
93 */
94 static userAgentString(userAgentConfig) {
95 // eslint-disable-next-line security/detect-object-injection
96 const _conf = (field, def) => (userAgentConfig && field in userAgentConfig) ? userAgentConfig[field] : def;
97 const product = _conf('product', packageName).split('/').pop();
98 const version = _conf('version', packageVersion);
99 let implementation = _conf('implementation', Enum.Specification);
100 if (implementation) {
101 implementation = ` (${implementation})`;
102 }
103 return `${product}/${version}${implementation}`;
104 }
105
106
107 /**
108 * @alias {number} Integer
109 */
110 /**
111 * Generate a random string.
112 * @param {Integer} bytes size of challenge
113 * @returns {Promise<string>} base64 randomness
114 */
115 static async generateChallenge(bytes = 30) {
116 return (await common.randomBytesAsync(bytes)).toString('base64');
117 }
118
119
120 /**
121 * Generate the signature string for content.
122 * @param {Buffer} message message to sign
123 * @param {Buffer} secret secret to sign with
124 * @param {string} algorithm algorithm to sign with
125 * @returns {string} signature string
126 */
127 static signature(message, secret, algorithm) {
128 const hmac = crypto.createHmac(algorithm, secret)
129 .update(message)
130 .digest('hex');
131 return `${algorithm}=${hmac}`;
132 }
133
134
135 /**
136 * Generate the hash for content.
137 * @param {Buffer} content content
138 * @param {string} algorithm algorithm
139 * @returns {string} hash of content
140 */
141 static contentHash(content, algorithm) {
142 return crypto.createHash(algorithm)
143 .update(content)
144 .digest('hex');
145 }
146
147
148 /**
149 * Attempt to verify a requested intent with client callback endpoint.
150 * @param {*} dbCtx db context
151 * @param {*} verificationId verification id
152 * @param {string} requestId request id
153 * @returns {Promise<boolean>} whether to subsequently attempt next task if verification succeeds
154 */
155 async verificationProcess(dbCtx, verificationId, requestId) {
156 const _scope = _fileScope('verificationProcess');
157
158 const verification = await this.db.verificationGetById(dbCtx, verificationId);
159 if (!verification) {
160 this.logger.error(_scope, 'no such verification', { verificationId, requestId });
161 throw new Errors.InternalInconsistencyError('no such verification id');
162 }
163
164 const topic = await this.db.topicGetById(dbCtx, verification.topicId);
165 if (!topic) {
166 this.logger.error(_scope, Enum.Message.NoSuchTopicId, { verification, requestId });
167 throw new Errors.InternalInconsistencyError(Enum.Message.NoSuchTopicId);
168 }
169
170 if (!topic.isActive) {
171 // These should be filtered out when selecting verification tasks to process.
172 this.logger.debug(_scope, 'topic not active, skipping verification', { verification, requestId });
173 await this.db.verificationRelease(dbCtx, verificationId);
174 return;
175 }
176
177 // If topic is deleted, deny any subscriptions.
178 // Un-subscriptions can continue to be verified.
179 if (topic.isDeleted && verification.mode === Enum.Mode.Subscribe) {
180 this.logger.info(_scope, 'topic is deleted, verification becomes denial', { verification, requestId });
181
182 verification.mode = Enum.Mode.Denied;
183 verification.reason = 'Gone: topic no longer valid on this hub.';
184 verification.isPublisherValidated = true;
185 await this.db.verificationUpdate(dbCtx, verification);
186 }
187
188 // If verification needs publisher validation, this delivery is for publisher.
189 if (verification.mode === Enum.Mode.Subscribe && verification.isPublisherValidated === false) {
190 this.logger.debug(_scope, 'attempting publisher validation', { verification, requestId });
191 const continueVerification = await this.publisherValidate(dbCtx, topic, verification);
192
193 // If publisher validation completed, verification will proceed.
194 // If not, we're done for now and shall try again later.
195 if (!continueVerification) {
196 this.logger.debug(_scope, 'publisher validation did not complete, belaying verification', { verification });
197 await this.db.verificationIncomplete(dbCtx, verificationId, this.options.communication.retryBackoffSeconds);
198 return;
199 }
200 }
201
202 const callbackRequestConfig = {
203 method: 'GET',
204 url: new URL(verification.callback),
205 responseType: 'text',
206 };
207 const callbackParams = {
208 'hub.topic': topic.url,
209 'hub.mode': verification.mode,
210 };
211
212 let challenge;
213 if (verification.mode === Enum.Mode.Denied) {
214 // Denials don't have a challenge, but might have a reason.
215 if (verification.reason) {
216 callbackParams['hub.reason'] = verification.reason;
217 }
218 } else {
219 // Subscriptions and unsubscriptions require challenge matching.
220 challenge = await Communication.generateChallenge();
221 Object.assign(callbackParams, {
222 'hub.challenge': challenge,
223 // Explicitly convert leaseSeconds to string, due to some DB backends. (Looking at you, sqlite..)
224 'hub.lease_seconds': verification.leaseSeconds.toString(),
225 });
226 }
227 Object.entries(callbackParams)
228 .forEach(([k, v]) => callbackRequestConfig.url.searchParams.set(k, v))
229 ;
230
231 const logInfoData = {
232 callbackUrl: callbackRequestConfig.url.href,
233 topicUrl: topic.url,
234 mode: verification.mode,
235 originalRequestId: verification.requestId,
236 requestId,
237 verificationId,
238 };
239
240 this.logger.info(_scope, 'verification request', logInfoData);
241
242 let response;
243 try {
244 response = await this.got(callbackRequestConfig);
245 } catch (e) {
246 this.logger.error(_scope, 'verification request failed', { ...logInfoData, error: e });
247 await this.db.verificationIncomplete(dbCtx, verificationId, this.options.communication.retryBackoffSeconds);
248 return;
249 }
250 logInfoData.response = common.gotResponseLogData(response);
251 this.logger.debug(_scope, 'verification response', logInfoData );
252
253 let verificationAccepted = true; // Presume success.
254
255 switch (common.httpStatusCodeClass(response.statusCode)) {
256 case 2:
257 // Success, fall out of switch.
258 break;
259
260 case 5:
261 // Retry
262 this.logger.info(_scope, 'verification remote server error', logInfoData );
263 await this.db.verificationIncomplete(dbCtx, verificationId, this.options.communication.retryBackoffSeconds);
264 return;
265
266 default:
267 // Anything else is unsuccessful.
268 this.logger.info(_scope, 'verification rejected by status', logInfoData );
269 verificationAccepted = false;
270 }
271
272 // Any denial is not accepted.
273 if (verification.mode === Enum.Mode.Denied) {
274 this.logger.info(_scope, 'verification denial accepted', logInfoData );
275 verificationAccepted = false;
276 }
277
278 if ([Enum.Mode.Subscribe, Enum.Mode.Unsubscribe].includes(verification.mode)
279 && response.body !== challenge) {
280 this.logger.info(_scope, 'verification rejected by challenge', logInfoData);
281 verificationAccepted = false;
282 }
283
284 await this.db.transaction(dbCtx, async (txCtx) => {
285 switch (verification.mode) {
286 case Enum.Mode.Subscribe:
287 if (verificationAccepted) {
288 await this.db.subscriptionUpsert(txCtx, verification);
289 }
290 break;
291
292 case Enum.Mode.Unsubscribe:
293 if (verificationAccepted) {
294 await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
295 if (topic.isDeleted) {
296 // Remove a deleted topic after the last subscription is notified.
297 await this.db.topicPendingDelete(txCtx, topic.id);
298 }
299 }
300 break;
301
302 case Enum.Mode.Denied:
303 await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
304 if (topic.isDeleted) {
305 // Remove a deleted topic after the last subscription is notified.
306 await this.db.topicPendingDelete(txCtx, topic.id);
307 }
308 break;
309
310 default:
311 this.logger.error(_scope, 'unanticipated mode', { logInfoData });
312 throw new Errors.InternalInconsistencyError(verification.mode);
313 }
314
315 await this.db.verificationComplete(txCtx, verificationId, verification.callback, verification.topicId);
316 }); // txCtx
317
318 this.logger.info(_scope, 'verification complete', { ...logInfoData, verificationAccepted });
319 }
320
321
322 /**
323 * @alias {object} TopicData
324 * @alias {object} VerificationData
325 */
326 /**
327 * Attempt to verify a pending subscription request with publisher.
328 * Updates (and persists) verification.
329 * Returns boolean of status of publisher contact, and hence
330 * whether to continue verification with client.
331 *
332 * This is not defined by the spec. We opt to speak JSON here.
333 * @param {*} dbCtx db context
334 * @param {TopicData} topic topic
335 * @param {VerificationData} verification verification
336 * @returns {Promise<boolean>} true if successful contact with publisher
337 */
338 async publisherValidate(dbCtx, topic, verification) {
339 const _scope = _fileScope('publisherValidate');
340 const logInfoData = {
341 topicUrl: topic.url,
342 callbackUrl: verification.callback,
343 requestId: verification.requestId,
344 };
345 let response;
346
347 this.logger.info(_scope, 'publisher validation request', logInfoData);
348
349 const publisherValidationRequestConfig = {
350 method: 'POST',
351 url: topic.publisherValidationUrl,
352 json: {
353 callback: verification.callback,
354 topic: topic.url,
355 ...(verification.httpFrom && { from: verification.httpFrom }),
356 ...(verification.httpRemoteAddr && { address: verification.httpRemoteAddr }),
357 },
358 responseType: 'json',
359 };
360 try {
361 response = await this.got(publisherValidationRequestConfig);
362 } catch (e) {
363 this.logger.error(_scope, 'publisher validation failed', { ...logInfoData, error: e });
364 return false; // Do not continue with client verification.
365 }
366
367 logInfoData.response = common.gotResponseLogData(response);
368 this.logger.debug(_scope, 'validation response', logInfoData);
369
370 let verificationNeedsUpdate = false;
371 switch (common.httpStatusCodeClass(response.statusCode)) {
372 case 2:
373 this.logger.info(_scope, 'publisher validation complete, allowed', logInfoData);
374 break;
375
376 case 5:
377 this.logger.info(_scope, 'publisher validation remote server error', logInfoData);
378 return false; // Do not continue with client verification.
379
380 default:
381 this.logger.info(_scope, 'publisher validation complete, denied', logInfoData);
382 // Change client verification
383 verification.mode = Enum.Mode.Denied;
384 verification.reason = 'publisher rejected request'; // TODO: details from response?
385 verificationNeedsUpdate = true;
386 }
387
388 // Success from publisher, either accepted or denied.
389 // Set validated flag, and allow client verification to continue.
390 await this.db.transaction(dbCtx, async (txCtx) => {
391 if (verificationNeedsUpdate) {
392 await this.db.verificationUpdate(txCtx, verification.id, verification);
393 }
394 await this.db.verificationValidated(txCtx, verification.id);
395 });
396 return true;
397 }
398
399
400 /**
401 * Retrieve content from a topic.
402 * @param {*} dbCtx db context
403 * @param {*} topicId topic id
404 * @param {string} requestId request id
405 * @returns {Promise<void>}
406 */
407 async topicFetchProcess(dbCtx, topicId, requestId) {
408 const _scope = _fileScope('topicFetchProcess');
409 const logInfoData = {
410 topicId,
411 requestId,
412 };
413
414 this.logger.debug(_scope, 'called', logInfoData);
415
416 const topic = await this.db.topicGetById(dbCtx, topicId);
417 if (topic === undefined) {
418 this.logger.error(_scope, Enum.Message.NoSuchTopicId, logInfoData);
419 throw new Errors.InternalInconsistencyError(Enum.Message.NoSuchTopicId);
420 }
421
422 // Cull any expired subscriptions
423 await this.db.subscriptionDeleteExpired(dbCtx, topicId);
424
425 logInfoData.url = topic.url;
426
427 if (topic.isDeleted) {
428 this.logger.debug(_scope, 'topic deleted, skipping update request', logInfoData);
429 return;
430 }
431
432 const updateRequestConfig = {
433 followRedirect: true,
434 method: 'GET',
435 url: topic.url,
436 headers: {
437 [Enum.Header.Accept]: [topic.contentType, `*/*${topic.contentType ? ';q=0.9' : ''}`].filter((x) => x).join(', '),
438 ...(topic.httpEtag && { [Enum.Header.IfNoneMatch]: topic.httpEtag }),
439 ...(topic.httpLastModified && { [Enum.Header.IfModifiedSince]: topic.httpLastModified }),
440 },
441 responseType: 'buffer',
442 };
443
444 this.logger.info(_scope, 'topic update request', logInfoData);
445
446 let response;
447 try {
448 response = await this.got(updateRequestConfig);
449 } catch (e) {
450 this.logger.error(_scope, 'update request failed', { ...logInfoData, error: e });
451 await this.db.topicFetchIncomplete(dbCtx, topicId, this.options.communication.retryBackoffSeconds);
452 return;
453 }
454 logInfoData.response = common.gotResponseLogData(response);
455 this.logger.debug(_scope, 'fetch response', logInfoData);
456
457 switch (common.httpStatusCodeClass(response.statusCode)) {
458 case 2:
459 case 3:
460 // Fall out of switch on success
461 break;
462
463 case 5:
464 this.logger.info(_scope, 'update remote server error', logInfoData);
465 await this.db.topicFetchIncomplete(dbCtx, topicId, this.options.communication.retryBackoffSeconds);
466 return;
467
468 default:
469 this.logger.info(_scope, 'fetch failed by status', logInfoData);
470 await this.db.topicFetchIncomplete(dbCtx, topicId, this.options.communication.retryBackoffSeconds);
471 return;
472 }
473
474 if (response.statusCode === 304) {
475 this.logger.info(_scope, 'content has not changed, per server', logInfoData);
476 await this.db.topicFetchComplete(dbCtx, topicId);
477 return;
478 }
479
480 const contentHash = Communication.contentHash(response.body, topic.contentHashAlgorithm);
481 logInfoData.contentHash = contentHash;
482 if (topic.contentHash === contentHash) {
483 this.logger.info(_scope, 'content has not changed', logInfoData);
484 await this.db.topicFetchComplete(dbCtx, topicId);
485 return;
486 }
487
488 const validHub = await this.linkHelper.validHub(topic.url, response.headers, response.body);
489 if (!validHub) {
490 this.logger.info(_scope, 'retrieved topic does not list us as hub', { logInfoData });
491 if (this.options.communication.strictTopicHubLink) {
492 await this.db.transaction(dbCtx, async (txCtx) => {
493 // Set as deleted and set content_updated so subscriptions are notified.
494 await this.db.topicDeleted(txCtx, topicId);
495 await this.db.topicFetchComplete(txCtx, topicId);
496 });
497 // Attempt to remove from db, if no active subscriptions.
498 await this.db.topicPendingDelete(dbCtx, topicId);
499 return;
500 }
501 }
502
503 const contentType = response.headers[Enum.Header.ContentType.toLowerCase()];
504 const httpETag = response.headers[Enum.Header.ETag.toLowerCase()];
505 const httpLastModified = response.headers[Enum.Header.LastModified.toLowerCase()];
506
507 await this.db.transaction(dbCtx, async (txCtx) => {
508 await this.db.topicSetContent(txCtx, {
509 topicId,
510 content: Buffer.from(response.body),
511 contentHash,
512 ...(contentType && { contentType }),
513 ...(httpETag && { httpETag }),
514 ...(httpLastModified && { httpLastModified }),
515 });
516
517 await this.db.topicFetchComplete(txCtx, topicId);
518 });
519 this.logger.info(_scope, 'content updated', logInfoData);
520 }
521
522
523 /**
524 * Attempt to deliver a topic's content to a subscription.
525 * @param {*} dbCtx db context
526 * @param {string} subscriptionId subscription id
527 * @param {string} requestId request id
528 * @returns {Promise<void>}
529 */
530 async subscriptionDeliveryProcess(dbCtx, subscriptionId, requestId) {
531 const _scope = _fileScope('subscriptionDeliveryProcess');
532
533 const logInfoData = {
534 subscriptionId,
535 requestId,
536 };
537
538 this.logger.debug(_scope, 'called', logInfoData);
539
540 const subscription = await this.db.subscriptionGetById(dbCtx, subscriptionId);
541 if (!subscription) {
542 this.logger.error(_scope, 'no such subscription', logInfoData);
543 throw new Errors.InternalInconsistencyError('no such subscription');
544 }
545
546 logInfoData.callback = subscription.callback;
547
548 const topic = await this.db.topicGetContentById(dbCtx, subscription.topicId);
549 if (!topic) {
550 this.logger.error(_scope, 'no such topic', logInfoData);
551 throw new Errors.InternalInconsistencyError('no such topic');
552 }
553
554 if (topic.isDeleted) {
555 // If a topic has been set deleted, it does not list us as a valid hub.
556 // Queue an unsubscription.
557 const verification = {
558 topicId: subscription.topicId,
559 callback: subscription.callback,
560 mode: Enum.Mode.Denied,
561 reason: 'Gone: topic no longer valid on this hub.',
562 isPublisherValidated: true,
563 requestId,
564 };
565
566 await this.db.transaction(dbCtx, async (txCtx) => {
567 await this.db.verificationInsert(txCtx, verification);
568 await this.db.subscriptionDeliveryComplete(txCtx, subscription.callback, subscription.topicId, topic.contentUpdated);
569 });
570 this.logger.info(_scope, 'update unsubscription for deleted topic', logInfoData);
571 return;
572 }
573
574 logInfoData.contentLength = topic.content.length;
575 logInfoData.contentHash = topic.contentHash;
576
577 const updateConfig = {
578 method: 'POST',
579 url: subscription.callback,
580 body: topic.content,
581 headers: {
582 [Enum.Header.Link]: `<${topic.url}>; rel="self"${this.linkHub}`,
583 [Enum.Header.ContentType]: topic.contentType || Enum.ContentType.TextPlain,
584 ...(subscription.secret && { [Enum.Header.XHubSignature]: Communication.signature(topic.content, subscription.secret, subscription.signatureAlgorithm) }),
585 },
586 responseType: 'text',
587 };
588
589 this.logger.info(_scope, 'update request', logInfoData);
590
591 let response;
592 try {
593 response = await this.got(updateConfig);
594 } catch (e) {
595 this.logger.error(_scope, 'update request failed', { ...logInfoData, error: e });
596 await this.db.subscriptionDeliveryIncomplete(dbCtx, subscription.callback, subscription.topicId, this.options.communication.retryBackoffSeconds);
597 return;
598 }
599 logInfoData.response = common.gotResponseLogData(response);
600 this.logger.debug(_scope, 'update response', logInfoData);
601
602 switch (common.httpStatusCodeClass(response.statusCode)) {
603 case 2:
604 // Fall out of switch on success.
605 break;
606
607 case 5:
608 this.logger.info(_scope, 'update remote server error', logInfoData);
609 await this.db.subscriptionDeliveryIncomplete(dbCtx, subscription.callback, subscription.topicId, this.options.communication.retryBackoffSeconds);
610 return;
611
612 case 4:
613 if (response.statusCode === 410) { // GONE
614 this.logger.info(_scope, 'client declined further updates', logInfoData);
615 await this.db.subscriptionDeliveryGone(dbCtx, subscription.callback, subscription.topicId);
616 return;
617 }
618 // All other 4xx falls through as failure
619
620 default:
621 this.logger.info(_scope, 'update failed with non-2xx status code', logInfoData);
622 await this.db.subscriptionDeliveryIncomplete(dbCtx, subscription.callback, subscription.topicId, this.options.communication.retryBackoffSeconds);
623 return;
624 }
625
626 await this.db.subscriptionDeliveryComplete(dbCtx, subscription.callback, subscription.topicId, topic.contentUpdated);
627 this.logger.info(_scope, 'update success', logInfoData);
628 }
629
630
631 /**
632 * Claim and work a specific topic fetch task.
633 * @param {*} dbCtx db context
634 * @param {string} topicId topic id
635 * @param {string} requestId request id
636 * @returns {Promise<void>}
637 */
638 async topicFetchClaimAndProcessById(dbCtx, topicId, requestId) {
639 const _scope = _fileScope('topicFetchClaimAndProcessById');
640
641 const claimResult = await this.db.topicFetchClaimById(dbCtx, topicId, this.options.communication.claimTimeoutSeconds, this.options.nodeId);
642 if (claimResult.changes != 1) {
643 this.logger.debug(_scope, 'did not claim topic fetch', { topicId, requestId });
644 return;
645 }
646 await this.topicFetchProcess(dbCtx, topicId, requestId);
647 }
648
649
650 /**
651 * Claim and work a specific verification confirmation task.
652 * @param {*} dbCtx db context
653 * @param {*} verificationId verification id
654 * @param {string} requestId request id
655 * @returns {Promise<boolean>} whether to subsequently attempt next task if verification succeeds
656 */
657 async verificationClaimAndProcessById(dbCtx, verificationId, requestId) {
658 const _scope = _fileScope('verificationClaimAndProcessById');
659
660 const claimResult = await this.db.verificationClaimById(dbCtx, verificationId, this.options.communication.claimTimeoutSeconds, this.options.nodeId);
661 if (claimResult.changes != 1) {
662 this.logger.debug(_scope, 'did not claim verification', { verificationId, requestId });
663 return;
664 }
665 await this.verificationProcess(dbCtx, verificationId, requestId);
666 }
667
668
669 /**
670 *
671 * @param {*} dbCtx db context
672 * @param {number} wanted maximum tasks to claim
673 * @returns {Promise<void>[]} array of promises processing work
674 */
675 async workFeed(dbCtx, wanted) {
676 const _scope = _fileScope('workFeed');
677 const inProgress = [];
678 const requestId = common.requestId();
679 const claimTimeoutSeconds = this.options.communication.claimTimeoutSeconds;
680 const nodeId = this.options.nodeId;
681 let topicFetchPromises = [], verificationPromises = [], updatePromises = [];
682
683 this.logger.debug(_scope, 'called', { wanted });
684
685 try {
686 if (wanted > 0) {
687 // Update topics before anything else.
688 const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
689 topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId)));
690 inProgress.push(...topicFetchPromises);
691 wanted -= topicFetchPromises.length;
692 }
693
694 if (wanted > 0) {
695 // Then any pending verifications.
696 const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
697 verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId)));
698 inProgress.push(...verificationPromises);
699 wanted -= verificationPromises.length;
700 }
701
702 if (wanted > 0) {
703 // Finally dole out content.
704 const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
705 updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId)));
706 inProgress.push(...updatePromises);
707 wanted -= updatePromises.length;
708 }
709 } catch (e) {
710 this.logger.error(_scope, 'failed', { error: e });
711 // do not re-throw, return what we've claimed so far
712 }
713 this.logger.debug(_scope, 'searched for work', { topics: topicFetchPromises.length, verifications: verificationPromises.length, updates: updatePromises.length, wantedRemaining: wanted, requestId });
714
715 return inProgress;
716 }
717
718
719 }
720
721 module.exports = Communication;