03cc670dd7154ccc64a50b87f4d7bb1d5eddafb3
[websub-hub] / src / communication.js
1 'use strict';
2
3 /**
4 * Here we wrangle all outgoing requests, as well as the
5 * worker which initiates most of them.
6 */
7
8 const axios = require('axios');
9 const common = require('./common');
10 const crypto = require('crypto');
11 const Enum = require('./enum');
12 const Errors = require('./errors');
13 const Worker = require('./worker');
14 const LinkHelper = require('./link-helper');
15 const { version: packageVersion, name: packageName } = require('../package.json'); // For default UA string
16
17 const { performance } = require('perf_hooks');
18
19 const _fileScope = common.fileScope(__filename);
20
21 class Communication {
22 constructor(logger, db, options) {
23 this.logger = logger;
24 this.db = db;
25 this.options = options;
26 this.linkHelper = new LinkHelper(logger, options);
27
28 if (this.options.dingus.selfBaseUrl) {
29 this.linkHub = `, <${this.options.dingus.selfBaseUrl}>; rel="hub"`;
30 } else {
31 this.linkHub = '';
32 this.logger.error(_fileScope('constructor'), 'empty dingus.selfBaseUrl value, server responses will not be compliant');
33 }
34
35 // Set common options
36 this.axios = axios.create({
37 validateStatus: null, // Non-success responses are not exceptional
38 headers: {
39 [Enum.Header.UserAgent]: Communication.userAgentString(options.userAgent),
40 },
41 });
42
43 this.axios.interceptors.request.use((request) => {
44 request.startTimestampMs = performance.now();
45 return request;
46 });
47 this.axios.interceptors.response.use((response) => {
48 response.elapsedTimeMs = performance.now() - response.config.startTimestampMs;
49 return response;
50 });
51
52 this.worker = new Worker(logger, db, this.workFeed.bind(this), options);
53 this.worker.start();
54 }
55
56
57 static userAgentString(userAgentConfig) {
58 // eslint-disable-next-line security/detect-object-injection
59 const _conf = (field, def) => (userAgentConfig && field in userAgentConfig) ? userAgentConfig[field] : def;
60 const product = _conf('product', packageName).split('/').pop();
61 const version = _conf('version', packageVersion);
62 let implementation = _conf('implementation', Enum.Specification);
63 if (implementation) {
64 implementation = ` (${implementation})`;
65 }
66 return `${product}/${version}${implementation}`;
67 }
68
69
70 /**
71 * Generate a random string.
72 * @param {Integer} bytes
73 * @returns {String}
74 */
75 static async generateChallenge(bytes = 30) {
76 return (await common.randomBytesAsync(bytes)).toString('base64');
77 }
78
79
80 /**
81 * Generate the signature string for content.
82 * @param {Buffer} message
83 * @param {Buffer} secret
84 * @param {String} algorithm
85 * @returns {String}
86 */
87 static signature(message, secret, algorithm) {
88 const hmac = crypto.createHmac(algorithm, secret);
89 hmac.update(message);
90 return `${algorithm}=${hmac.digest('hex')}`;
91 }
92
93
94 /**
95 * Generate the hash for content.
96 * @param {Buffer} content
97 * @param {String} algorithm
98 * @returns
99 */
100 static contentHash(content, algorithm) {
101 const hash = crypto.createHash(algorithm);
102 hash.update(content);
103 return hash.digest('hex');
104 }
105
106
107 /**
108 * A request skeleton config.
109 * @param {String} method
110 * @param {String} requestUrl
111 * @param {String} body
112 * @param {Object} params
113 */
114 static _axiosConfig(method, requestUrl, body, params = {}, headers = {}) {
115 const urlObj = new URL(requestUrl);
116 const config = {
117 method,
118 url: `${urlObj.origin}${urlObj.pathname}`,
119 params: urlObj.searchParams,
120 headers,
121 ...(body && { data: body }),
122 // Setting this does not appear to be enough to keep axios from parsing JSON response into object
123 responseType: 'text',
124 // So force the matter by eliding all response transformations
125 transformResponse: [ (res) => res ],
126 };
127 Object.entries(params).map(([k, v]) => config.params.set(k, v));
128 return config;
129 }
130
131
132 /**
133 * Create request config for verifying an intent.
134 * @param {URL} requestUrl
135 * @param {String} topicUrl
136 * @param {String} mode
137 * @param {Integer} leaseSeconds
138 * @param {String} challenge
139 */
140 static _intentVerifyAxiosConfig(requestUrl, topicUrl, mode, leaseSeconds, challenge) {
141 // Explicitly convert leaseSeconds to string, due to some DB backends. (Looking at you, sqlite..)
142 leaseSeconds = leaseSeconds.toString();
143
144 return Communication._axiosConfig('GET', requestUrl, undefined, {
145 'hub.mode': mode,
146 'hub.topic': topicUrl,
147 'hub.challenge': challenge,
148 'hub.lease_seconds': leaseSeconds,
149 }, {});
150 }
151
152
153 /**
154 * Create request config for denying an intent.
155 * @param {String} requestUrl
156 * @param {String} topicUrl
157 * @param {String} reason
158 * @returns {String}
159 */
160 static _intentDenyAxiosConfig(requestUrl, topicUrl, reason) {
161 return Communication._axiosConfig('GET', requestUrl, undefined, {
162 'hub.mode': Enum.Mode.Denied,
163 'hub.topic': topicUrl,
164 ...(reason && { 'hub.reason': reason }),
165 }, {});
166 }
167
168
169 /**
170 * Create request config for querying publisher for subscription validation.
171 * @param {Topic} topic
172 * @param {Verification} verification
173 * @returns {String}
174 */
175 static _publisherValidationAxiosConfig(topic, verification) {
176 const body = {
177 callback: verification.callback,
178 topic: topic.url,
179 ...(verification.httpFrom && { from: verification.httpFrom }),
180 ...(verification.httpRemoteAddr && { address: verification.httpRemoteAddr }),
181 };
182 return Communication._axiosConfig('POST', topic.publisherValidationUrl, body, {}, {
183 [Enum.Header.ContentType]: Enum.ContentType.ApplicationJson,
184 });
185 }
186
187
188 /**
189 * Create request config for fetching topic content.
190 * Prefer existing content-type, but accept anything.
191 * @param {Topic} topic
192 * @returns {String}
193 */
194 static _topicFetchAxiosConfig(topic) {
195 const acceptWildcard = '*/*' + (topic.contentType ? ';q=0.9' : '');
196 const acceptPreferred = [topic.contentType, acceptWildcard].filter((x) => x).join(', ');
197 return Communication._axiosConfig('GET', topic.url, undefined, {}, {
198 [Enum.Header.Accept]: acceptPreferred,
199 });
200 }
201
202
203 /**
204 * Attempt to verify a requested intent with client callback endpoint.
205 * @param {*} dbCtx
206 * @param {*} verificationId
207 * @param {String} requestId
208 * @returns {Boolean} whether to subsequently attempt next task if verification succeeds
209 */
210 async verificationProcess(dbCtx, verificationId, requestId) {
211 const _scope = _fileScope('verificationProcess');
212
213 const verification = await this.db.verificationGetById(dbCtx, verificationId);
214 if (!verification) {
215 this.logger.error(_scope, 'no such verification', { verificationId, requestId });
216 throw new Errors.InternalInconsistencyError('no such verification id');
217 }
218
219 const topic = await this.db.topicGetById(dbCtx, verification.topicId);
220 if (!topic) {
221 this.logger.error(_scope, 'no such topic id', { verification, requestId });
222 throw new Errors.InternalInconsistencyError('no such topic id');
223 }
224
225 if (!topic.isActive) {
226 // These should be filtered out when selecting verification tasks to process.
227 this.logger.debug(_scope, 'topic not active, skipping verification', { verification, requestId });
228 await this.db.verificationRelease(dbCtx, verificationId);
229 return;
230 }
231
232 // If topic is deleted, deny any subscriptions.
233 // Un-subscriptions can continue to be verified.
234 if (topic.isDeleted && verification.mode === Enum.Mode.Subscribe) {
235 this.logger.info(_scope, 'topic is deleted, verification becomes denial', { verification, requestId });
236
237 verification.mode = Enum.Mode.Denied;
238 verification.reason = 'Gone: topic no longer valid on this hub.';
239 verification.isPublisherValidated = true;
240 await this.db.verificationUpdate(dbCtx, verification);
241 }
242
243 // If verification needs publisher validation, this delivery is for publisher.
244 if (verification.mode === Enum.Mode.Subscribe && verification.isPublisherValidated === false) {
245 this.logger.debug(_scope, 'attempting publisher validation', { verification, requestId });
246 const continueVerification = await this.publisherValidate(dbCtx, topic, verification);
247
248 // If publisher validation completed, verification will proceed.
249 // If not, we're done for now and shall try again later.
250 if (!continueVerification) {
251 this.logger.debug(_scope, 'publisher validation did not complete, belaying verification', { verification });
252 await this.db.verificationIncomplete(dbCtx, verificationId, this.options.communication.retryBackoffSeconds);
253 return;
254 }
255 }
256
257 const u = new URL(verification.callback);
258 let callbackRequestConfig, challenge;
259 if (verification.mode === Enum.Mode.Denied) {
260 // Denials don't have a challenge.
261 callbackRequestConfig = Communication._intentDenyAxiosConfig(u, topic.url, verification.reason);
262 } else {
263 // Subscriptions and unsubscriptions require challenge matching.
264 challenge = await Communication.generateChallenge();
265 callbackRequestConfig = Communication._intentVerifyAxiosConfig(u, topic.url, verification.mode, verification.leaseSeconds, challenge);
266 }
267
268 const logInfoData = {
269 callbackUrl: u.href,
270 topicUrl: topic.url,
271 mode: verification.mode,
272 originalRequestId: verification.requestId,
273 requestId,
274 verificationId,
275 };
276
277 this.logger.info(_scope, 'verification request', logInfoData);
278
279 let response;
280 try {
281 response = await this.axios(callbackRequestConfig);
282 } catch (e) {
283 this.logger.error(_scope, 'verification request failed', { ...logInfoData, error: e });
284 await this.db.verificationIncomplete(dbCtx, verificationId, this.options.communication.retryBackoffSeconds);
285 return;
286 }
287 logInfoData.response = common.axiosResponseLogData(response);
288 this.logger.debug(_scope, 'verification response', logInfoData );
289
290 let verificationAccepted = true; // Presume success.
291
292 switch (common.httpStatusCodeClass(response.status)) {
293 case 2:
294 // Success, fall out of switch.
295 break;
296
297 case 5:
298 // Retry
299 this.logger.info(_scope, 'verification remote server error', logInfoData );
300 await this.db.verificationIncomplete(dbCtx, verificationId, this.options.communication.retryBackoffSeconds);
301 return;
302
303 default:
304 // Anything else is unsuccessful.
305 this.logger.info(_scope, 'verification rejected by status', logInfoData );
306 verificationAccepted = false;
307 }
308
309 // Any denial is not accepted.
310 if (verification.mode === Enum.Mode.Denied) {
311 this.logger.info(_scope, 'verification denial accepted', logInfoData );
312 verificationAccepted = false;
313 }
314
315 if ([Enum.Mode.Subscribe, Enum.Mode.Unsubscribe].includes(verification.mode)
316 && response.data !== challenge) {
317 this.logger.info(_scope, 'verification rejected by challenge', logInfoData);
318 verificationAccepted = false;
319 }
320
321 await this.db.transaction(dbCtx, async (txCtx) => {
322 switch (verification.mode) {
323 case Enum.Mode.Subscribe:
324 if (verificationAccepted) {
325 await this.db.subscriptionUpsert(txCtx, verification);
326 }
327 break;
328
329 case Enum.Mode.Unsubscribe:
330 if (verificationAccepted) {
331 await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
332 if (topic.isDeleted) {
333 // Remove a deleted topic after the last subscription is notified.
334 await this.db.topicPendingDelete(txCtx, topic.id);
335 }
336 }
337 break;
338
339 case Enum.Mode.Denied:
340 await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
341 if (topic.isDeleted) {
342 // Remove a deleted topic after he last subscription is notified.
343 await this.db.topicPendingDelete(txCtx, topic.id);
344 }
345 break;
346
347 default:
348 this.logger.error(_scope, 'unanticipated mode', { logInfoData });
349 throw new Errors.InternalInconsistencyError(verification.mode);
350 }
351
352 await this.db.verificationComplete(txCtx, verificationId, verification.callback, verification.topicId);
353 }); // txCtx
354
355 this.logger.info(_scope, 'verification complete', { ...logInfoData, verificationAccepted });
356 }
357
358
359 /**
360 * Attempt to verify a pending subscription request with publisher.
361 * Updates (and persists) verification.
362 * Returns boolean of status of publisher contact, and hence
363 * whether to continue verification with client.
364 * @param {*} dbCtx
365 * @param {TopicData} topic
366 * @param {VerificationData} verification
367 * @returns {Boolean}
368 */
369 async publisherValidate(dbCtx, topic, verification) {
370 const _scope = _fileScope('publisherValidate');
371 const publisherValidationRequestConfig = Communication._publisherValidationAxiosConfig(topic, verification);
372 const logInfoData = {
373 topicUrl: topic.url,
374 callbackUrl: verification.callback,
375 requestId: verification.requestId,
376 };
377 let response;
378
379 this.logger.info(_scope, 'publisher validation request', logInfoData);
380
381 try {
382 response = await this.axios(publisherValidationRequestConfig);
383 } catch (e) {
384 this.logger.error(_scope, 'publisher validation failed', { ...logInfoData, error: e });
385 return false; // Do not continue with client verification.
386 }
387
388 logInfoData.response = common.axiosResponseLogData(response);
389 this.logger.debug(_scope, 'validation response', logInfoData);
390
391 let verificationNeedsUpdate = false;
392 switch (common.httpStatusCodeClass(response.status)) {
393 case 2:
394 this.logger.info(_scope, 'publisher validation complete, allowed', logInfoData);
395 break;
396
397 case 5:
398 this.logger.info(_scope, 'publisher validation remote server error', logInfoData);
399 return false; // Do not continue with client verification.
400
401 default:
402 this.logger.info(_scope, 'publisher validation complete, denied', logInfoData);
403 // Change client verification
404 verification.mode = Enum.Mode.Denied;
405 verification.reason = 'publisher rejected request'; // TODO: details from response?
406 verificationNeedsUpdate = true;
407 }
408
409 // Success from publisher, either accepted or denied.
410 // Set validated flag, and allow client verification to continue.
411 await this.db.transaction(dbCtx, async (txCtx) => {
412 if (verificationNeedsUpdate) {
413 await this.db.verificationUpdate(txCtx, verification.id, verification);
414 }
415 await this.db.verificationValidated(txCtx, verification.id);
416 });
417 return true;
418 }
419
420
421 /**
422 * Retrieve content from a topic.
423 * @param {*} dbCtx
424 * @param {*} topicId
425 * @param {String} requestId
426 * @returns
427 */
428 async topicFetchProcess(dbCtx, topicId, requestId) {
429 const _scope = _fileScope('topicFetchProcess');
430 const logInfoData = {
431 topicId,
432 requestId,
433 };
434
435 this.logger.debug(_scope, 'called', logInfoData);
436
437 const topic = await this.db.topicGetById(dbCtx, topicId);
438 if (topic === undefined) {
439 this.logger.error(_scope, 'no such topic id', logInfoData);
440 throw new Errors.InternalInconsistencyError('no such topic id');
441 }
442
443 // Cull any expired subscriptions
444 await this.db.subscriptionDeleteExpired(dbCtx, topicId);
445
446 logInfoData.url = topic.url;
447
448 if (topic.isDeleted) {
449 this.logger.debug(_scope, 'topic deleted, skipping update request', logInfoData);
450 return;
451 }
452
453 const updateRequestConfig = Communication._topicFetchAxiosConfig(topic);
454
455 this.logger.info(_scope, 'topic update request', logInfoData);
456
457 let response;
458 try {
459 response = await this.axios(updateRequestConfig);
460 } catch (e) {
461 this.logger.error(_scope, 'update request failed', logInfoData);
462 await this.db.topicFetchIncomplete(dbCtx, topicId, this.options.communication.retryBackoffSeconds);
463 return;
464 }
465 logInfoData.response = common.axiosResponseLogData(response);
466 this.logger.debug(_scope, 'fetch response', logInfoData);
467
468 switch (common.httpStatusCodeClass(response.status)) {
469 case 2:
470 // Fall out of switch on success
471 break;
472
473 case 5:
474 this.logger.info(_scope, 'update remote server error', logInfoData);
475 await this.db.topicFetchIncomplete(dbCtx, topicId, this.options.communication.retryBackoffSeconds);
476 return;
477
478 default:
479 this.logger.info(_scope, 'fetch failed by status', logInfoData);
480 await this.db.topicFetchIncomplete(dbCtx, topicId, this.options.communication.retryBackoffSeconds);
481 return;
482 }
483
484 const contentHash = Communication.contentHash(response.data, topic.contentHashAlgorithm);
485 logInfoData.contentHash = contentHash;
486 if (topic.contentHash === contentHash) {
487 this.logger.info(_scope, 'content has not changed', logInfoData);
488 await this.db.topicFetchComplete(dbCtx, topicId);
489 return;
490 }
491
492 const validHub = await this.linkHelper.validHub(topic.url, response.headers, response.data);
493 if (!validHub) {
494 this.logger.info(_scope, 'retrieved topic does not list us as hub', { logInfoData });
495 if (this.options.communication.strictTopicHubLink) {
496 await this.db.transaction(dbCtx, async (txCtx) => {
497 // Set as deleted and set content_updated so subscriptions are notified.
498 await this.db.topicDeleted(txCtx, topicId);
499 await this.db.topicFetchComplete(txCtx, topicId);
500 });
501 // Attempt to remove from db, if no active subscriptions.
502 await this.db.topicPendingDelete(dbCtx, topicId);
503 return;
504 }
505 }
506
507 const contentType = response.headers[Enum.Header.ContentType.toLowerCase()];
508
509 await this.db.transaction(dbCtx, async (txCtx) => {
510 await this.db.topicSetContent(txCtx, {
511 topicId,
512 content: Buffer.from(response.data),
513 contentHash,
514 ...(contentType && { contentType }),
515 });
516
517 await this.db.topicFetchComplete(txCtx, topicId);
518 });
519 this.logger.info(_scope, 'content updated', logInfoData);
520 }
521
522
523 /**
524 * Attempt to deliver a topic's content to a subscription.
525 * @param {*} dbCtx
526 * @param {String} callback
527 * @param {*} topicId
528 * @param {String} requestId
529 */
530 async subscriptionDeliveryProcess(dbCtx, subscriptionId, requestId) {
531 const _scope = _fileScope('subscriptionDeliveryProcess');
532
533 const logInfoData = {
534 subscriptionId,
535 requestId,
536 };
537
538 this.logger.debug(_scope, 'called', logInfoData);
539
540 const subscription = await this.db.subscriptionGetById(dbCtx, subscriptionId);
541 if (!subscription) {
542 this.logger.error(_scope, 'no such subscription', logInfoData);
543 throw new Errors.InternalInconsistencyError('no such subscription');
544 }
545
546 logInfoData.callback = subscription.callback;
547
548 const topic = await this.db.topicGetContentById(dbCtx, subscription.topicId);
549 if (!topic) {
550 this.logger.error(_scope, 'no such topic', logInfoData);
551 throw new Errors.InternalInconsistencyError('no such topic');
552 }
553
554 if (topic.isDeleted) {
555 // If a topic has been set deleted, it does not list us as a valid hub.
556 // Queue an unsubscription.
557 const verification = {
558 topicId: subscription.topicId,
559 callback: subscription.callback,
560 mode: Enum.Mode.Denied,
561 reason: 'Gone: topic no longer valid on this hub.',
562 isPublisherValidated: true,
563 requestId,
564 };
565
566 await this.db.transaction(dbCtx, async (txCtx) => {
567 await this.db.verificationInsert(txCtx, verification);
568 await this.db.subscriptionDeliveryComplete(txCtx, subscription.callback, subscription.topicId);
569 });
570 this.logger.info(_scope, 'update unsubscription for deleted topic', logInfoData);
571 return;
572 }
573
574 logInfoData.contentLength = topic.content.length;
575 logInfoData.contentHash = topic.contentHash;
576
577 const updateAxiosConfig = Communication._axiosConfig('POST', subscription.callback, topic.content, {}, {
578 [Enum.Header.Link]: `<${topic.url}>; rel="self"${this.linkHub}`,
579 [Enum.Header.ContentType]: topic.contentType || Enum.ContentType.TextPlain,
580 ...(subscription.secret && { [Enum.Header.XHubSignature]: Communication.signature(topic.content, subscription.secret, subscription.signatureAlgorithm) }),
581 });
582
583 this.logger.info(_scope, 'update request', logInfoData);
584
585 let response;
586 try {
587 response = await this.axios(updateAxiosConfig);
588 } catch (e) {
589 this.logger.error(_scope, 'update request failed', { ...logInfoData, error: e });
590 await this.db.subscriptionDeliveryIncomplete(dbCtx, subscription.callback, subscription.topicId, this.options.communication.retryBackoffSeconds);
591 return;
592 }
593 logInfoData.response = common.axiosResponseLogData(response);
594 this.logger.debug(_scope, 'update response', logInfoData);
595
596 switch (common.httpStatusCodeClass(response.status)) {
597 case 2:
598 // Fall out of switch on success.
599 break;
600
601 case 5:
602 this.logger.info(_scope, 'update remote server error', logInfoData);
603 await this.db.subscriptionDeliveryIncomplete(dbCtx, subscription.callback, subscription.topicId, this.options.communication.retryBackoffSeconds);
604 return;
605
606 case 4:
607 if (response.status === 410) { // GONE
608 this.logger.info(_scope, 'client declined further updates', logInfoData);
609 await this.db.subscriptionDeliveryGone(dbCtx, subscription.callback, subscription.topicId);
610 return;
611 }
612 // All other 4xx falls through as failure
613
614 default:
615 this.logger.info(_scope, 'update failed with non-2xx status code', logInfoData);
616 await this.db.subscriptionDeliveryIncomplete(dbCtx, subscription.callback, subscription.topicId, this.options.communication.retryBackoffSeconds);
617 return;
618 }
619
620 await this.db.subscriptionDeliveryComplete(dbCtx, subscription.callback, subscription.topicId);
621 this.logger.info(_scope, 'update success', logInfoData);
622 }
623
624
625 /**
626 * Claim and work a specific topic fetch task.
627 * @param {*} dbCtx
628 * @param {*} id
629 * @param {String} requestId
630 */
631 async topicFetchClaimAndProcessById(dbCtx, topicId, requestId) {
632 const _scope = _fileScope('topicFetchClaimAndProcessById');
633
634 const claimResult = await this.db.topicFetchClaimById(dbCtx, topicId, this.options.communication.claimTimeoutSeconds, this.options.nodeId);
635 if (claimResult.changes != 1) {
636 this.logger.debug(_scope, 'did not claim topic fetch', { topicId, requestId });
637 return;
638 }
639 await this.topicFetchProcess(dbCtx, topicId, requestId);
640 }
641
642
643 /**
644 * Claim and work a specific verification confirmation task.
645 * @param {*} dbCtx
646 * @param {*} verificationId
647 * @param {String} requestId
648 * @returns
649 */
650 async verificationClaimAndProcessById(dbCtx, verificationId, requestId) {
651 const _scope = _fileScope('verificationClaimAndProcessById');
652
653 const claimResult = await this.db.verificationClaimById(dbCtx, verificationId, this.options.communication.claimTimeoutSeconds, this.options.nodeId);
654 if (claimResult.changes != 1) {
655 this.logger.debug(_scope, 'did not claim verification', { verificationId, requestId });
656 return;
657 }
658 await this.verificationProcess(dbCtx, verificationId, requestId);
659 }
660
661
662 /**
663 *
664 * @param {*} dbCtx
665 * @param {Number} wanted maximum tasks to claim
666 * @returns {Promise<void>[]}
667 */
668 async workFeed(dbCtx, wanted) {
669 const _scope = _fileScope('workFeed');
670 const inProgress = [];
671 const requestId = common.requestId();
672 const claimTimeoutSeconds = this.options.communication.claimTimeoutSeconds;
673 const nodeId = this.options.nodeId;
674 let topicFetchPromises = [], verificationPromises = [], updatePromises = [];
675
676 this.logger.debug(_scope, 'called', { wanted });
677
678 try {
679 if (wanted > 0) {
680 // Update topics before anything else.
681 const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
682 topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId)));
683 inProgress.push(...topicFetchPromises);
684 wanted -= topicFetchPromises.length;
685 }
686
687 if (wanted > 0) {
688 // Then any pending verifications.
689 const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
690 verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId)));
691 inProgress.push(...verificationPromises);
692 wanted -= verificationPromises.length;
693 }
694
695 if (wanted > 0) {
696 // Finally dole out content.
697 const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
698 updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId)));
699 inProgress.push(...updatePromises);
700 wanted -= updatePromises.length;
701 }
702 } catch (e) {
703 this.logger.error(_scope, 'failed', { error: e });
704 // do not re-throw, return what we've claimed so far
705 }
706 this.logger.debug(_scope, 'searched for work', { topics: topicFetchPromises.length, verifications: verificationPromises.length, updates: updatePromises.length, wantedRemaining: wanted, requestId });
707
708 return inProgress;
709 }
710
711
712 }
713
714 module.exports = Communication;