database migration 1.0.4, store topic fetch etag/last-modified, provide these when...
[websub-hub] / src / communication.js
1 'use strict';
2
3 /**
4 * Here we wrangle all outgoing requests, as well as the
5 * worker which initiates most of them.
6 */
7
8 const axios = require('axios');
9 const common = require('./common');
10 const crypto = require('crypto');
11 const Enum = require('./enum');
12 const Errors = require('./errors');
13 const Worker = require('./worker');
14 const LinkHelper = require('./link-helper');
15 const { version: packageVersion, name: packageName } = require('../package.json'); // For default UA string
16
17 const { performance } = require('perf_hooks');
18
19 const _fileScope = common.fileScope(__filename);
20
21 class Communication {
22 constructor(logger, db, options) {
23 this.logger = logger;
24 this.db = db;
25 this.options = options;
26 this.linkHelper = new LinkHelper(logger, options);
27
28 if (this.options.dingus.selfBaseUrl) {
29 this.linkHub = `, <${this.options.dingus.selfBaseUrl}>; rel="hub"`;
30 } else {
31 this.linkHub = '';
32 this.logger.error(_fileScope('constructor'), 'empty dingus.selfBaseUrl value, server responses will not be compliant');
33 }
34
35 // Set common options
36 this.axios = axios.create({
37 validateStatus: null, // Non-success responses are not exceptional
38 headers: {
39 [Enum.Header.UserAgent]: Communication.userAgentString(options.userAgent),
40 },
41 });
42
43 this.axios.interceptors.request.use((request) => {
44 request.startTimestampMs = performance.now();
45 return request;
46 });
47 this.axios.interceptors.response.use((response) => {
48 response.elapsedTimeMs = performance.now() - response.config.startTimestampMs;
49 return response;
50 });
51
52 this.worker = new Worker(logger, db, this.workFeed.bind(this), options);
53 this.worker.start();
54 }
55
56
57 static userAgentString(userAgentConfig) {
58 // eslint-disable-next-line security/detect-object-injection
59 const _conf = (field, def) => (userAgentConfig && field in userAgentConfig) ? userAgentConfig[field] : def;
60 const product = _conf('product', packageName).split('/').pop();
61 const version = _conf('version', packageVersion);
62 let implementation = _conf('implementation', Enum.Specification);
63 if (implementation) {
64 implementation = ` (${implementation})`;
65 }
66 return `${product}/${version}${implementation}`;
67 }
68
69
70 /**
71 * Generate a random string.
72 * @param {Integer} bytes
73 * @returns {String}
74 */
75 static async generateChallenge(bytes = 30) {
76 return (await common.randomBytesAsync(bytes)).toString('base64');
77 }
78
79
80 /**
81 * Generate the signature string for content.
82 * @param {Buffer} message
83 * @param {Buffer} secret
84 * @param {String} algorithm
85 * @returns {String}
86 */
87 static signature(message, secret, algorithm) {
88 const hmac = crypto.createHmac(algorithm, secret);
89 hmac.update(message);
90 return `${algorithm}=${hmac.digest('hex')}`;
91 }
92
93
94 /**
95 * Generate the hash for content.
96 * @param {Buffer} content
97 * @param {String} algorithm
98 * @returns
99 */
100 static contentHash(content, algorithm) {
101 const hash = crypto.createHash(algorithm);
102 hash.update(content);
103 return hash.digest('hex');
104 }
105
106
107 /**
108 * A request skeleton config.
109 * @param {String} method
110 * @param {String} requestUrl
111 * @param {String} body
112 * @param {Object} params
113 */
114 static _axiosConfig(method, requestUrl, body, params = {}, headers = {}) {
115 const urlObj = new URL(requestUrl);
116 const config = {
117 method,
118 url: `${urlObj.origin}${urlObj.pathname}`,
119 params: urlObj.searchParams,
120 headers,
121 ...(body && { data: body }),
122 // Setting this does not appear to be enough to keep axios from parsing JSON response into object
123 responseType: 'text',
124 // So force the matter by eliding all response transformations
125 transformResponse: [ (res) => res ],
126 };
127 Object.entries(params).map(([k, v]) => config.params.set(k, v));
128 return config;
129 }
130
131
132 /**
133 * Create request config for verifying an intent.
134 * @param {URL} requestUrl
135 * @param {String} topicUrl
136 * @param {String} mode
137 * @param {Integer} leaseSeconds
138 * @param {String} challenge
139 */
140 static _intentVerifyAxiosConfig(requestUrl, topicUrl, mode, leaseSeconds, challenge) {
141 // Explicitly convert leaseSeconds to string, due to some DB backends. (Looking at you, sqlite..)
142 leaseSeconds = leaseSeconds.toString();
143
144 return Communication._axiosConfig('GET', requestUrl, undefined, {
145 'hub.mode': mode,
146 'hub.topic': topicUrl,
147 'hub.challenge': challenge,
148 'hub.lease_seconds': leaseSeconds,
149 }, {});
150 }
151
152
153 /**
154 * Create request config for denying an intent.
155 * @param {String} requestUrl
156 * @param {String} topicUrl
157 * @param {String} reason
158 * @returns {String}
159 */
160 static _intentDenyAxiosConfig(requestUrl, topicUrl, reason) {
161 return Communication._axiosConfig('GET', requestUrl, undefined, {
162 'hub.mode': Enum.Mode.Denied,
163 'hub.topic': topicUrl,
164 ...(reason && { 'hub.reason': reason }),
165 }, {});
166 }
167
168
169 /**
170 * Create request config for querying publisher for subscription validation.
171 * @param {Topic} topic
172 * @param {Verification} verification
173 * @returns {String}
174 */
175 static _publisherValidationAxiosConfig(topic, verification) {
176 const body = {
177 callback: verification.callback,
178 topic: topic.url,
179 ...(verification.httpFrom && { from: verification.httpFrom }),
180 ...(verification.httpRemoteAddr && { address: verification.httpRemoteAddr }),
181 };
182 return Communication._axiosConfig('POST', topic.publisherValidationUrl, body, {}, {
183 [Enum.Header.ContentType]: Enum.ContentType.ApplicationJson,
184 });
185 }
186
187
188 /**
189 * Create request config for fetching topic content.
190 * Prefer existing content-type, but accept anything.
191 * @param {Topic} topic
192 * @returns {String}
193 */
194 static _topicFetchAxiosConfig(topic) {
195 const acceptWildcard = '*/*' + (topic.contentType ? ';q=0.9' : '');
196 const acceptPreferred = [topic.contentType, acceptWildcard].filter((x) => x).join(', ');
197 return Communication._axiosConfig('GET', topic.url, undefined, {}, {
198 [Enum.Header.Accept]: acceptPreferred,
199 ...(topic.httpEtag && { [Enum.Header.IfNoneMatch]: topic.httpEtag }),
200 ...(topic.httpLastModified && { [Enum.Header.IfModifiedSince]: topic.httpLastModified }),
201 });
202 }
203
204
205 /**
206 * Attempt to verify a requested intent with client callback endpoint.
207 * @param {*} dbCtx
208 * @param {*} verificationId
209 * @param {String} requestId
210 * @returns {Boolean} whether to subsequently attempt next task if verification succeeds
211 */
212 async verificationProcess(dbCtx, verificationId, requestId) {
213 const _scope = _fileScope('verificationProcess');
214
215 const verification = await this.db.verificationGetById(dbCtx, verificationId);
216 if (!verification) {
217 this.logger.error(_scope, 'no such verification', { verificationId, requestId });
218 throw new Errors.InternalInconsistencyError('no such verification id');
219 }
220
221 const topic = await this.db.topicGetById(dbCtx, verification.topicId);
222 if (!topic) {
223 this.logger.error(_scope, 'no such topic id', { verification, requestId });
224 throw new Errors.InternalInconsistencyError('no such topic id');
225 }
226
227 if (!topic.isActive) {
228 // These should be filtered out when selecting verification tasks to process.
229 this.logger.debug(_scope, 'topic not active, skipping verification', { verification, requestId });
230 await this.db.verificationRelease(dbCtx, verificationId);
231 return;
232 }
233
234 // If topic is deleted, deny any subscriptions.
235 // Un-subscriptions can continue to be verified.
236 if (topic.isDeleted && verification.mode === Enum.Mode.Subscribe) {
237 this.logger.info(_scope, 'topic is deleted, verification becomes denial', { verification, requestId });
238
239 verification.mode = Enum.Mode.Denied;
240 verification.reason = 'Gone: topic no longer valid on this hub.';
241 verification.isPublisherValidated = true;
242 await this.db.verificationUpdate(dbCtx, verification);
243 }
244
245 // If verification needs publisher validation, this delivery is for publisher.
246 if (verification.mode === Enum.Mode.Subscribe && verification.isPublisherValidated === false) {
247 this.logger.debug(_scope, 'attempting publisher validation', { verification, requestId });
248 const continueVerification = await this.publisherValidate(dbCtx, topic, verification);
249
250 // If publisher validation completed, verification will proceed.
251 // If not, we're done for now and shall try again later.
252 if (!continueVerification) {
253 this.logger.debug(_scope, 'publisher validation did not complete, belaying verification', { verification });
254 await this.db.verificationIncomplete(dbCtx, verificationId, this.options.communication.retryBackoffSeconds);
255 return;
256 }
257 }
258
259 const u = new URL(verification.callback);
260 let callbackRequestConfig, challenge;
261 if (verification.mode === Enum.Mode.Denied) {
262 // Denials don't have a challenge.
263 callbackRequestConfig = Communication._intentDenyAxiosConfig(u, topic.url, verification.reason);
264 } else {
265 // Subscriptions and unsubscriptions require challenge matching.
266 challenge = await Communication.generateChallenge();
267 callbackRequestConfig = Communication._intentVerifyAxiosConfig(u, topic.url, verification.mode, verification.leaseSeconds, challenge);
268 }
269
270 const logInfoData = {
271 callbackUrl: u.href,
272 topicUrl: topic.url,
273 mode: verification.mode,
274 originalRequestId: verification.requestId,
275 requestId,
276 verificationId,
277 };
278
279 this.logger.info(_scope, 'verification request', logInfoData);
280
281 let response;
282 try {
283 response = await this.axios(callbackRequestConfig);
284 } catch (e) {
285 this.logger.error(_scope, 'verification request failed', { ...logInfoData, error: e });
286 await this.db.verificationIncomplete(dbCtx, verificationId, this.options.communication.retryBackoffSeconds);
287 return;
288 }
289 logInfoData.response = common.axiosResponseLogData(response);
290 this.logger.debug(_scope, 'verification response', logInfoData );
291
292 let verificationAccepted = true; // Presume success.
293
294 switch (common.httpStatusCodeClass(response.status)) {
295 case 2:
296 // Success, fall out of switch.
297 break;
298
299 case 5:
300 // Retry
301 this.logger.info(_scope, 'verification remote server error', logInfoData );
302 await this.db.verificationIncomplete(dbCtx, verificationId, this.options.communication.retryBackoffSeconds);
303 return;
304
305 default:
306 // Anything else is unsuccessful.
307 this.logger.info(_scope, 'verification rejected by status', logInfoData );
308 verificationAccepted = false;
309 }
310
311 // Any denial is not accepted.
312 if (verification.mode === Enum.Mode.Denied) {
313 this.logger.info(_scope, 'verification denial accepted', logInfoData );
314 verificationAccepted = false;
315 }
316
317 if ([Enum.Mode.Subscribe, Enum.Mode.Unsubscribe].includes(verification.mode)
318 && response.data !== challenge) {
319 this.logger.info(_scope, 'verification rejected by challenge', logInfoData);
320 verificationAccepted = false;
321 }
322
323 await this.db.transaction(dbCtx, async (txCtx) => {
324 switch (verification.mode) {
325 case Enum.Mode.Subscribe:
326 if (verificationAccepted) {
327 await this.db.subscriptionUpsert(txCtx, verification);
328 }
329 break;
330
331 case Enum.Mode.Unsubscribe:
332 if (verificationAccepted) {
333 await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
334 if (topic.isDeleted) {
335 // Remove a deleted topic after the last subscription is notified.
336 await this.db.topicPendingDelete(txCtx, topic.id);
337 }
338 }
339 break;
340
341 case Enum.Mode.Denied:
342 await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
343 if (topic.isDeleted) {
344 // Remove a deleted topic after he last subscription is notified.
345 await this.db.topicPendingDelete(txCtx, topic.id);
346 }
347 break;
348
349 default:
350 this.logger.error(_scope, 'unanticipated mode', { logInfoData });
351 throw new Errors.InternalInconsistencyError(verification.mode);
352 }
353
354 await this.db.verificationComplete(txCtx, verificationId, verification.callback, verification.topicId);
355 }); // txCtx
356
357 this.logger.info(_scope, 'verification complete', { ...logInfoData, verificationAccepted });
358 }
359
360
361 /**
362 * Attempt to verify a pending subscription request with publisher.
363 * Updates (and persists) verification.
364 * Returns boolean of status of publisher contact, and hence
365 * whether to continue verification with client.
366 * @param {*} dbCtx
367 * @param {TopicData} topic
368 * @param {VerificationData} verification
369 * @returns {Boolean}
370 */
371 async publisherValidate(dbCtx, topic, verification) {
372 const _scope = _fileScope('publisherValidate');
373 const publisherValidationRequestConfig = Communication._publisherValidationAxiosConfig(topic, verification);
374 const logInfoData = {
375 topicUrl: topic.url,
376 callbackUrl: verification.callback,
377 requestId: verification.requestId,
378 };
379 let response;
380
381 this.logger.info(_scope, 'publisher validation request', logInfoData);
382
383 try {
384 response = await this.axios(publisherValidationRequestConfig);
385 } catch (e) {
386 this.logger.error(_scope, 'publisher validation failed', { ...logInfoData, error: e });
387 return false; // Do not continue with client verification.
388 }
389
390 logInfoData.response = common.axiosResponseLogData(response);
391 this.logger.debug(_scope, 'validation response', logInfoData);
392
393 let verificationNeedsUpdate = false;
394 switch (common.httpStatusCodeClass(response.status)) {
395 case 2:
396 this.logger.info(_scope, 'publisher validation complete, allowed', logInfoData);
397 break;
398
399 case 5:
400 this.logger.info(_scope, 'publisher validation remote server error', logInfoData);
401 return false; // Do not continue with client verification.
402
403 default:
404 this.logger.info(_scope, 'publisher validation complete, denied', logInfoData);
405 // Change client verification
406 verification.mode = Enum.Mode.Denied;
407 verification.reason = 'publisher rejected request'; // TODO: details from response?
408 verificationNeedsUpdate = true;
409 }
410
411 // Success from publisher, either accepted or denied.
412 // Set validated flag, and allow client verification to continue.
413 await this.db.transaction(dbCtx, async (txCtx) => {
414 if (verificationNeedsUpdate) {
415 await this.db.verificationUpdate(txCtx, verification.id, verification);
416 }
417 await this.db.verificationValidated(txCtx, verification.id);
418 });
419 return true;
420 }
421
422
423 /**
424 * Retrieve content from a topic.
425 * @param {*} dbCtx
426 * @param {*} topicId
427 * @param {String} requestId
428 * @returns
429 */
430 async topicFetchProcess(dbCtx, topicId, requestId) {
431 const _scope = _fileScope('topicFetchProcess');
432 const logInfoData = {
433 topicId,
434 requestId,
435 };
436
437 this.logger.debug(_scope, 'called', logInfoData);
438
439 const topic = await this.db.topicGetById(dbCtx, topicId);
440 if (topic === undefined) {
441 this.logger.error(_scope, 'no such topic id', logInfoData);
442 throw new Errors.InternalInconsistencyError('no such topic id');
443 }
444
445 // Cull any expired subscriptions
446 await this.db.subscriptionDeleteExpired(dbCtx, topicId);
447
448 logInfoData.url = topic.url;
449
450 if (topic.isDeleted) {
451 this.logger.debug(_scope, 'topic deleted, skipping update request', logInfoData);
452 return;
453 }
454
455 const updateRequestConfig = Communication._topicFetchAxiosConfig(topic);
456
457 this.logger.info(_scope, 'topic update request', logInfoData);
458
459 let response;
460 try {
461 response = await this.axios(updateRequestConfig);
462 } catch (e) {
463 this.logger.error(_scope, 'update request failed', logInfoData);
464 await this.db.topicFetchIncomplete(dbCtx, topicId, this.options.communication.retryBackoffSeconds);
465 return;
466 }
467 logInfoData.response = common.axiosResponseLogData(response);
468 this.logger.debug(_scope, 'fetch response', logInfoData);
469
470 switch (common.httpStatusCodeClass(response.status)) {
471 case 2:
472 case 3:
473 // Fall out of switch on success
474 break;
475
476 case 5:
477 this.logger.info(_scope, 'update remote server error', logInfoData);
478 await this.db.topicFetchIncomplete(dbCtx, topicId, this.options.communication.retryBackoffSeconds);
479 return;
480
481 default:
482 this.logger.info(_scope, 'fetch failed by status', logInfoData);
483 await this.db.topicFetchIncomplete(dbCtx, topicId, this.options.communication.retryBackoffSeconds);
484 return;
485 }
486
487 if (response.status === 304) {
488 this.logger.info(_scope, 'content has not changed, per server', logInfoData);
489 await this.db.topicFetchComplete(dbCtx, topicId);
490 return;
491 }
492
493 const contentHash = Communication.contentHash(response.data, topic.contentHashAlgorithm);
494 logInfoData.contentHash = contentHash;
495 if (topic.contentHash === contentHash) {
496 this.logger.info(_scope, 'content has not changed', logInfoData);
497 await this.db.topicFetchComplete(dbCtx, topicId);
498 return;
499 }
500
501 const validHub = await this.linkHelper.validHub(topic.url, response.headers, response.data);
502 if (!validHub) {
503 this.logger.info(_scope, 'retrieved topic does not list us as hub', { logInfoData });
504 if (this.options.communication.strictTopicHubLink) {
505 await this.db.transaction(dbCtx, async (txCtx) => {
506 // Set as deleted and set content_updated so subscriptions are notified.
507 await this.db.topicDeleted(txCtx, topicId);
508 await this.db.topicFetchComplete(txCtx, topicId);
509 });
510 // Attempt to remove from db, if no active subscriptions.
511 await this.db.topicPendingDelete(dbCtx, topicId);
512 return;
513 }
514 }
515
516 const contentType = response.headers[Enum.Header.ContentType.toLowerCase()];
517 const httpETag = response.headers[Enum.Header.ETag.toLowerCase()];
518 const httpLastModified = response.headers[Enum.Header.LastModified.toLowerCase()];
519
520 await this.db.transaction(dbCtx, async (txCtx) => {
521 await this.db.topicSetContent(txCtx, {
522 topicId,
523 content: Buffer.from(response.data),
524 contentHash,
525 ...(contentType && { contentType }),
526 ...(httpETag && { httpETag }),
527 ...(httpLastModified && { httpLastModified }),
528 });
529
530 await this.db.topicFetchComplete(txCtx, topicId);
531 });
532 this.logger.info(_scope, 'content updated', logInfoData);
533 }
534
535
536 /**
537 * Attempt to deliver a topic's content to a subscription.
538 * @param {*} dbCtx
539 * @param {String} callback
540 * @param {*} topicId
541 * @param {String} requestId
542 */
543 async subscriptionDeliveryProcess(dbCtx, subscriptionId, requestId) {
544 const _scope = _fileScope('subscriptionDeliveryProcess');
545
546 const logInfoData = {
547 subscriptionId,
548 requestId,
549 };
550
551 this.logger.debug(_scope, 'called', logInfoData);
552
553 const subscription = await this.db.subscriptionGetById(dbCtx, subscriptionId);
554 if (!subscription) {
555 this.logger.error(_scope, 'no such subscription', logInfoData);
556 throw new Errors.InternalInconsistencyError('no such subscription');
557 }
558
559 logInfoData.callback = subscription.callback;
560
561 const topic = await this.db.topicGetContentById(dbCtx, subscription.topicId);
562 if (!topic) {
563 this.logger.error(_scope, 'no such topic', logInfoData);
564 throw new Errors.InternalInconsistencyError('no such topic');
565 }
566
567 if (topic.isDeleted) {
568 // If a topic has been set deleted, it does not list us as a valid hub.
569 // Queue an unsubscription.
570 const verification = {
571 topicId: subscription.topicId,
572 callback: subscription.callback,
573 mode: Enum.Mode.Denied,
574 reason: 'Gone: topic no longer valid on this hub.',
575 isPublisherValidated: true,
576 requestId,
577 };
578
579 await this.db.transaction(dbCtx, async (txCtx) => {
580 await this.db.verificationInsert(txCtx, verification);
581 await this.db.subscriptionDeliveryComplete(txCtx, subscription.callback, subscription.topicId, topic.contentUpdated);
582 });
583 this.logger.info(_scope, 'update unsubscription for deleted topic', logInfoData);
584 return;
585 }
586
587 logInfoData.contentLength = topic.content.length;
588 logInfoData.contentHash = topic.contentHash;
589
590 const updateAxiosConfig = Communication._axiosConfig('POST', subscription.callback, topic.content, {}, {
591 [Enum.Header.Link]: `<${topic.url}>; rel="self"${this.linkHub}`,
592 [Enum.Header.ContentType]: topic.contentType || Enum.ContentType.TextPlain,
593 ...(subscription.secret && { [Enum.Header.XHubSignature]: Communication.signature(topic.content, subscription.secret, subscription.signatureAlgorithm) }),
594 });
595
596 this.logger.info(_scope, 'update request', logInfoData);
597
598 let response;
599 try {
600 response = await this.axios(updateAxiosConfig);
601 } catch (e) {
602 this.logger.error(_scope, 'update request failed', { ...logInfoData, error: e });
603 await this.db.subscriptionDeliveryIncomplete(dbCtx, subscription.callback, subscription.topicId, this.options.communication.retryBackoffSeconds);
604 return;
605 }
606 logInfoData.response = common.axiosResponseLogData(response);
607 this.logger.debug(_scope, 'update response', logInfoData);
608
609 switch (common.httpStatusCodeClass(response.status)) {
610 case 2:
611 // Fall out of switch on success.
612 break;
613
614 case 5:
615 this.logger.info(_scope, 'update remote server error', logInfoData);
616 await this.db.subscriptionDeliveryIncomplete(dbCtx, subscription.callback, subscription.topicId, this.options.communication.retryBackoffSeconds);
617 return;
618
619 case 4:
620 if (response.status === 410) { // GONE
621 this.logger.info(_scope, 'client declined further updates', logInfoData);
622 await this.db.subscriptionDeliveryGone(dbCtx, subscription.callback, subscription.topicId);
623 return;
624 }
625 // All other 4xx falls through as failure
626
627 default:
628 this.logger.info(_scope, 'update failed with non-2xx status code', logInfoData);
629 await this.db.subscriptionDeliveryIncomplete(dbCtx, subscription.callback, subscription.topicId, this.options.communication.retryBackoffSeconds);
630 return;
631 }
632
633 await this.db.subscriptionDeliveryComplete(dbCtx, subscription.callback, subscription.topicId, topic.contentUpdated);
634 this.logger.info(_scope, 'update success', logInfoData);
635 }
636
637
638 /**
639 * Claim and work a specific topic fetch task.
640 * @param {*} dbCtx
641 * @param {*} id
642 * @param {String} requestId
643 */
644 async topicFetchClaimAndProcessById(dbCtx, topicId, requestId) {
645 const _scope = _fileScope('topicFetchClaimAndProcessById');
646
647 const claimResult = await this.db.topicFetchClaimById(dbCtx, topicId, this.options.communication.claimTimeoutSeconds, this.options.nodeId);
648 if (claimResult.changes != 1) {
649 this.logger.debug(_scope, 'did not claim topic fetch', { topicId, requestId });
650 return;
651 }
652 await this.topicFetchProcess(dbCtx, topicId, requestId);
653 }
654
655
656 /**
657 * Claim and work a specific verification confirmation task.
658 * @param {*} dbCtx
659 * @param {*} verificationId
660 * @param {String} requestId
661 * @returns
662 */
663 async verificationClaimAndProcessById(dbCtx, verificationId, requestId) {
664 const _scope = _fileScope('verificationClaimAndProcessById');
665
666 const claimResult = await this.db.verificationClaimById(dbCtx, verificationId, this.options.communication.claimTimeoutSeconds, this.options.nodeId);
667 if (claimResult.changes != 1) {
668 this.logger.debug(_scope, 'did not claim verification', { verificationId, requestId });
669 return;
670 }
671 await this.verificationProcess(dbCtx, verificationId, requestId);
672 }
673
674
675 /**
676 *
677 * @param {*} dbCtx
678 * @param {Number} wanted maximum tasks to claim
679 * @returns {Promise<void>[]}
680 */
681 async workFeed(dbCtx, wanted) {
682 const _scope = _fileScope('workFeed');
683 const inProgress = [];
684 const requestId = common.requestId();
685 const claimTimeoutSeconds = this.options.communication.claimTimeoutSeconds;
686 const nodeId = this.options.nodeId;
687 let topicFetchPromises = [], verificationPromises = [], updatePromises = [];
688
689 this.logger.debug(_scope, 'called', { wanted });
690
691 try {
692 if (wanted > 0) {
693 // Update topics before anything else.
694 const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
695 topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId)));
696 inProgress.push(...topicFetchPromises);
697 wanted -= topicFetchPromises.length;
698 }
699
700 if (wanted > 0) {
701 // Then any pending verifications.
702 const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
703 verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId)));
704 inProgress.push(...verificationPromises);
705 wanted -= verificationPromises.length;
706 }
707
708 if (wanted > 0) {
709 // Finally dole out content.
710 const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
711 updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId)));
712 inProgress.push(...updatePromises);
713 wanted -= updatePromises.length;
714 }
715 } catch (e) {
716 this.logger.error(_scope, 'failed', { error: e });
717 // do not re-throw, return what we've claimed so far
718 }
719 this.logger.debug(_scope, 'searched for work', { topics: topicFetchPromises.length, verifications: verificationPromises.length, updates: updatePromises.length, wantedRemaining: wanted, requestId });
720
721 return inProgress;
722 }
723
724
725 }
726
727 module.exports = Communication;