From: Justin Wind Date: Thu, 12 Jan 2023 18:16:05 +0000 (-0800) Subject: WIP X-Git-Url: https://git.squeep.com/?a=commitdiff_plain;h=acb5c9b4682e1a8a97f3446146c6e98a3294fb11;p=webmention-receiver WIP --- acb5c9b4682e1a8a97f3446146c6e98a3294fb11 diff --git a/.eslintrc.json b/.eslintrc.json new file mode 100644 index 0000000..b3ffbe2 --- /dev/null +++ b/.eslintrc.json @@ -0,0 +1,89 @@ +{ + "env": { + "browser": false, + "es6": true, + "node": true + }, + "extends": [ + "eslint:recommended", + "plugin:node/recommended", + "plugin:security/recommended", + "plugin:sonarjs/recommended" + ], + "parserOptions": { + "ecmaVersion": "latest" + }, + "plugins": [ + "node", + "security", + "sonarjs" + ], + "rules": { + "array-element-newline": [ + "error", + "consistent" + ], + "arrow-parens": [ + "error", + "always" + ], + "arrow-spacing": [ + "error", + { + "after": true, + "before": true + } + ], + "block-scoped-var": "error", + "block-spacing": "error", + "brace-style": "error", + "callback-return": "error", + "camelcase": "error", + "class-methods-use-this": "error", + "comma-dangle": [ + "error", + "always-multiline" + ], + "comma-spacing": [ + "error", + { + "after": true, + "before": false + } + ], + "comma-style": [ + "error", + "last" + ], + "indent": [ + "warn", + 2, + { + "SwitchCase": 1 + } + ], + "sonarjs/cognitive-complexity": "warn", + "sonarjs/no-duplicate-string": "warn", + "keyword-spacing": "error", + "linebreak-style": [ + "error", + "unix" + ], + "no-unused-vars": [ + "error", { + "varsIgnorePattern": "^_" + } + ], + "object-curly-spacing": [ + "error", + "always" + ], + "prefer-const": "error", + "quotes": [ + "error", + "single" + ], + "strict": "error", + "vars-on-top": "error" + } +} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1796b18 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.vscode +node_modules +.nyc_output +coverage diff --git a/.nycrc.json b/.nycrc.json new file mode 100644 index 0000000..497d8af --- /dev/null +++ b/.nycrc.json @@ -0,0 +1,6 @@ +{ + "reporter": [ + "lcov", + "text" + ] +} diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/config/default.js b/config/default.js new file mode 100644 index 0000000..f95b71f --- /dev/null +++ b/config/default.js @@ -0,0 +1,57 @@ +'use strict'; + +// Provide default values for all configuration. + +const common = require('../src/common'); +const roman = require('@squeep/roman'); +const { name: packageName } = require('../package.json'); + +const currentYear = (new Date()).getFullYear(); +const romanYearHTML = roman.toRoman(currentYear, true); + +const defaultOptions = { + // Uniquely identify this instance. + nodeId: common.requestId(), // Default to ephemeral ID: easiest for clustered deployments. + + // Dingus API Server Framework options. + dingus: { + // This needs to be the full externally accessible root URL, including any proxyPrefix component. + selfBaseUrl: '', + + // trustProxy: true, // If true, trust values of some headers regarding client IP address and protocol. + proxyPrefix: '', // Leading path parts to ignore when parsing routes, and include when constructing links, e.g. /indieauth + }, + + // Database options + db: { + connectionString: '', // e.g. sqlite://path/to/dbfile.sqlite + queryLogLevel: undefined, // Set to log queries + + // SQLite specific options + sqliteOptimizeAfterChanges: 0, // Number of changes before running pragma optimize, 0 for never + }, + + // Logging options + logger: { + ignoreBelowLevel: 'info', + }, + + manager: { + pageTitle: packageName, // title on html pages + logoUrl: '/static/logo.svg', // image to go with title + footerEntries: [ // common footers on all html pages + 'Development Repository', + `©`, + ], + }, + + authenticator: { + tokenIntrospectionUrl: '', + tokenIntrospectionIdentifier: '', + tokenIntrospectionSecret: '', + secureAuthOnly: true, // Require secure transport for authentication. + }, + +}; + +module.exports = defaultOptions; diff --git a/config/development.js b/config/development.js new file mode 100644 index 0000000..5df6a45 --- /dev/null +++ b/config/development.js @@ -0,0 +1,18 @@ +'use strict'; +module.exports = [ + { + logger: { + ignoreBelowLevel: 'debug', + }, + db: { + connectionString: 'sqlite://:memory:', + queryLogLevel: 'debug', + }, + authenticator: { + tokenIntrospectionUrl: 'https://ia.squeep.com/introspect', + tokenIntrospectionIdentifier: '735700b1-c5fe-4ada-b58a-5e506cafd9f3', + tokenIntrospectionSecret: 'yAITisZ5/gDK4XlG8C9Us1vniohtbKJcSNmlD3qVi1Swkvj6q12kyk9LLkcqIW7O', + secureAuthOnly: false, + }, + }, +]; \ No newline at end of file diff --git a/config/index.js b/config/index.js new file mode 100644 index 0000000..79410a6 --- /dev/null +++ b/config/index.js @@ -0,0 +1,24 @@ +'use strict'; + +const common = require('../src/common'); + +const defaultEnvironment = 'development'; +const testEnvironment = 'test'; + +function Config(environment, freeze = true) { + environment = environment || defaultEnvironment; + const defaultConfig = require('./default'); + let envConfig = require(`./${environment}`); // eslint-disable-line security/detect-non-literal-require + if (!Array.isArray(envConfig)) { + envConfig = Array(envConfig); + } + // We support arrays of config options in env to allow e.g. resetting an existing array + const combinedConfig = common.mergeDeep(defaultConfig, ...envConfig, { environment }); + if (freeze && !environment.includes(testEnvironment)) { + /* istanbul ignore next */ + common.freezeDeep(combinedConfig); + } + return combinedConfig; +} + +module.exports = Config; \ No newline at end of file diff --git a/package.json b/package.json new file mode 100644 index 0000000..412aa36 --- /dev/null +++ b/package.json @@ -0,0 +1,46 @@ +{ + "name": "@squeep/webmention-receiver", + "version": "0.0.0", + "description": "A Webmention server implementation.", + "main": "server.js", + "scripts": { + "coverage": "nyc npm test", + "coverage-check": "nyc check-coverage", + "eslint": "eslint *.js src", + "test": "mocha --recursive" + }, + "pre-commit": [ + "eslint", + "coverage", + "coverage-check" + ], + "engines": { + "node": ">=14.0" + }, + "author": "Justin Wind ", + "license": "ISC", + "dependencies": { + "@squeep/api-dingus": "file:../node-api-dingus", + "@squeep/html-template-helper": "file:../node-html-template-helper", + "@squeep/indieauth-helper": "file:../node-indieauth-helper", + "@squeep/logger-json-console": "file:../node-logger-json-console", + "@squeep/resource-authentication-module": "file:../node-resource-authentication-module", + "@squeep/roman": "^1.0.0", + "better-sqlite3": "^8.0.1", + "pg-promise": "^11.0.2", + "uuid": "^9.0.0" + }, + "devDependencies": { + "@squeep/test-helper": "git+https://git.squeep.com/squeep-test-helper#v1.0.0", + "eslint": "^8.31.0", + "eslint-plugin-node": "^11.1.0", + "eslint-plugin-promise": "^6.1.1", + "eslint-plugin-security": "^1.5.0", + "eslint-plugin-sonarjs": "^0.17.0", + "mocha": "^10.2.0", + "mocha-steps": "^1.3.0", + "nyc": "^15.1.0", + "pre-commit": "^1.2.2", + "sinon": "^15.0.1" + } +} diff --git a/server.js b/server.js new file mode 100644 index 0000000..2593fed --- /dev/null +++ b/server.js @@ -0,0 +1,39 @@ +'use strict'; + +const http = require('http'); + +const Config = require('./config'); +const DB = require('./src/db'); +const Service = require('./src/service'); +const Logger = require('./src/logger'); +const { fileScope } = require('./src/common'); +const _fileScope = fileScope(__filename); +const { version } = require('./package.json'); + +const PORT = process.env.PORT || 3004; +const ADDR = process.env.LISTEN_ADDR || '127.0.0.1'; + +(async function main () { + const _scope = _fileScope('main'); + let config, logger, db, service; + try { + config = new Config(process.env.NODE_ENV); + logger = new Logger(config); + db = new DB(logger, config); + await db.initialize(); + service = new Service(logger, db, config); + await service.initialize(); + + http.createServer((req, res) => { + service.dispatch(req, res); + }).listen(PORT, ADDR, (err) => { + if (err) { + logger.error(_scope, 'error starting server', err); + throw err; + } + logger.info(_scope, 'server started', { version, listenAddress: ADDR, listenPort: PORT }); + }); + } catch (e) { + (logger || console).error(_scope, 'error starting server', e); + } +})(); \ No newline at end of file diff --git a/src/authenticator.js b/src/authenticator.js new file mode 100644 index 0000000..46ac674 --- /dev/null +++ b/src/authenticator.js @@ -0,0 +1,161 @@ +'use strict'; + +const { Communication: IACommunication } = require('@squeep/indieauth-helper'); +const { ResourceAuthenticator } = require('@squeep/resource-authentication-module'); +const common = require('./common'); +const Enum = require('./enum'); +const Errors = require('./errors'); + +const _fileScope = common.fileScope(__filename); + +class Authenticator { +/** + * Proxy our authentication to an IndieAuth IdP. + * @param {*} logger + * @param {*} db + * @param {Object} options + * @param {Object} options.authenticator + * @param {String} options.authenticator.secureAuthOnly + * @param {String} options.authenticator.tokenIntrospectionUrl + * @param {String} options.authenticator.tokenIntrospectionIdentifier + * @param {String} options.authenticator.tokenIntrospectionSecret + */ + constructor(logger, db, options) { + this.logger = logger; + this.db = db; + this.secureAuthOnly = options.authenticator.secureAuthOnly; + this.tokenIntrospectionUrl = new URL(options.authenticator.tokenIntrospectionUrl); + this.tokenIntrospectionIdentifier = options.authenticator.tokenIntrospectionIdentifier; + this.tokenIntrospectionSecret = options.authenticator.tokenIntrospectionSecret; + + this.resourceAuthenticator = new ResourceAuthenticator(logger, db, options); + this.IACommunication = new IACommunication(logger, options); + } + + + /** + * Check for valid Bearer auth, updates ctx with identifier if valid. + * @param {String} token + * @param {Object} ctx + * @returns {Boolean} + */ + async isValidBearer(token, ctx) { + const _scope = _fileScope('isValidBearer'); + this.logger.debug(_scope, 'called', { ctx }); + + // TODO: cache valid tokens for a short while + + try { + // Prepare the authentication header for validating with our IdP + const authentication = await this.resourceAuthenticator.authenticate(this.tokenIntrospectionIdentifier, this.tokenIntrospectionSecret); + // Fetch details about token + ctx.bearer = await this.IACommunication.introspectToken(this.tokenIntrospectionUrl, authentication, token); + } catch (e) { + this.logger.error(_scope, 'introspection failed', { error: e, ctx }); + throw e; + } + if (!ctx.bearer) { + this.logger.error(_scope, 'failed to contact introspection endpoint', { ctx }); + return false; + } + + if (ctx.bearer.active) { + ctx.authenticationId = ctx.bearer.me; + } + + return !!ctx.bearer.active; + } + + + /** + * Determine which Authorization header is available, and if it is valid. + * @param {String} authorizationHeader + * @param {Object} ctx + */ + async isValidAuthorization(authorizationHeader, ctx) { + const _scope = _fileScope('isValidAuthorization'); + this.logger.debug(_scope, 'called', { authorizationHeader, ctx }); + + const [authMethod, authString] = common.splitFirst(authorizationHeader, ' ', '').map((x) => x.trim()); + switch (authMethod.toLowerCase()) { + case 'access_token': + case 'bearer': { + return this.isValidBearer(authString, ctx); + } + + default: + this.logger.debug(_scope, 'unknown authorization scheme', { ctx }); + return false; + } + } + + + /** + * Send a response requesting bearer auth. + * @param {http.ServerResponse} res + * @param {String} error + * @param {String} description + */ + static requestBearer(res, error, description) { + const headerParts = ['Bearer']; + let details; + if (error) { + headerParts.push(`error="${error}"`); + details = { error }; + if (description) { + headerParts.push(`error_description="${description}"`); + details['error_description'] = description; + } + } + res.setHeader(Enum.Header.WWWAuthenticate, headerParts.join(', ')); + throw new Errors.ResponseError(Enum.ErrorResponse.Unauthorized, details); + } + + + /** + * Send a response rejecting bearer auth. + * @param {http.ServerResponse} res + * @param {String} error + * @param {String} description + */ + static forbiddenBearer(res, error, description) { + const headerParts = ['Bearer']; + let details; + if (error) { + headerParts.push(`error="${error}"`); + details = { error }; + if (description) { + headerParts.push(`error_description="${description}"`); + details['error_description'] = description; + } + } + res.setHeader(Enum.Header.WWWAuthenticate, headerParts.join(', ')); + throw new Errors.ResponseError(Enum.ErrorResponse.Forbidden, details); + } + + /** + * Require that a request has valid auth over secure channel, requests if missing. + * @param {http.IncomingMessage} req + * @param {http.ServerResponse} res + * @param {Object} ctx + */ + async required(req, res, ctx) { + const _scope = _fileScope('required'); + this.logger.debug(_scope, 'called', { ctx }); + + if (this.secureAuthOnly && ctx.clientProtocol.toLowerCase() !== 'https') { + this.logger.debug(_scope, 'rejecting insecure auth', { ctx }); + return Authenticator.forbiddenBearer(res, 'invalid_token', 'insecure connection'); + } + + const authData = req.getHeader(Enum.Header.Authorization) || (ctx?.parsedBody?.['access_token'] && `access_token ${ctx.parsedBody['access_token']}`); + if (authData + && await this.isValidAuthorization(authData, ctx)) { + return true; + } + return Authenticator.requestBearer(res, authData ? Enum.ErrorCode.InvalidToken : undefined, authData ? 'Access token is not valid.' : undefined); + } + +} + +module.exports = Authenticator; \ No newline at end of file diff --git a/src/common.js b/src/common.js new file mode 100644 index 0000000..d6e24bd --- /dev/null +++ b/src/common.js @@ -0,0 +1,58 @@ +'use strict'; + +const { common } = require('@squeep/api-dingus'); + + +/** + * Return an array containing x if x is not already an array. + * @param {*} x + */ +const ensureArray = (x) => { + if (x === undefined) { + return []; + } + if (!Array.isArray(x)) { + return Array(x); + } + return x; +}; + + +/** + * Recursively freeze an object. + * @param {Object} o + * @returns {Object} + */ +const freezeDeep = (o) => { + Object.freeze(o); + Object.getOwnPropertyNames(o).forEach((prop) => { + if (Object.hasOwnProperty.call(o, prop) + && ['object', 'function'].includes(typeof o[prop]) + && !Object.isFrozen(o[prop])) { + return freezeDeep(o[prop]); + } + }); + return o; +} + + +/** + * Limit length of string to keep logs sane + * @param {String} str + * @param {Number} len + * @returns {String} + */ +const logTruncate = (str, len) => { + if (typeof str !== 'string' || str.toString().length <= len) { + return str; + } + return str.toString().slice(0, len) + `... (${str.toString().length} bytes)`; +}; + + +module.exports = { + ...common, + ensureArray, + freezeDeep, + logTruncate, +}; \ No newline at end of file diff --git a/src/db/base.js b/src/db/base.js new file mode 100644 index 0000000..e9a8861 --- /dev/null +++ b/src/db/base.js @@ -0,0 +1,269 @@ +'use strict'; + +const common = require('../common'); +const DatabaseErrors = require('./errors'); +const svh = require('./schema-version-helper'); + +// We reuse the resource auth uuid compaction for external postIds. +const { + embiggenIdentifier, + ensmallenIdentifier, +} = require('@squeep/resource-authentication-module'); + +const _fileScope = common.fileScope(__filename); + +class Database { + constructor(logger, options) { + this.logger = logger; + this.options = options; + } + + + /** + * Perform tasks needed to prepare database for use. Ensure this is called + * after construction, and before any other database activity. + * At the minimum, this will validate a compatible schema is present and usable. + * Some engines will also perform other initializations or async actions which + * are easier handled outside the constructor. + */ + async initialize() { + const _scope = _fileScope('initialize'); + + const currentSchema = await this._currentSchema(); + const current = svh.schemaVersionObjectToNumber(currentSchema); + const min = svh.schemaVersionObjectToNumber(this.schemaVersionsSupported.min); + const max = svh.schemaVersionObjectToNumber(this.schemaVersionsSupported.max); + if (current >= min && current <= max) { + this.logger.debug(_scope, 'schema supported', { currentSchema, schemaVersionsSupported: this.schemaVersionsSupported }); + } else { + this.logger.error(_scope, 'schema not supported', { currentSchema, schemaVersionsSupported: this.schemaVersionsSupported }); + throw new DatabaseErrors.MigrationNeeded(); + } + } + + + /** + * Query the current schema version. + * This is a standalone query function, as it is called before statements are loaded. + * @returns {Object} version + * @returns {Number} version.major + * @returns {Number} version.minor + * @returns {Number} version.patch + */ + async _currentSchema() { + this._notImplemented('_currentSchema', arguments); + } + + + /** + * Perform db connection health-check, if applicable. + * Throw something if a database situation should pull us out of a load-balancer. + */ + async healthCheck() { + this._notImplemented('healthCheck', arguments); + } + + + /** + * Wrap a function call in a database context. + * @param {Function} fn fn(ctx) + */ + async context(fn) { + this._notImplemented('context', arguments); + } + + + /** + * Wrap a function call in a transaction context. + * @param {*} dbCtx + * @param {Function} fn fn(txCtx) + */ + async transaction(dbCtx, fn) { + this._notImplemented('transaction', arguments); + } + + /** + * Turn a snake into a camel. + * Used when translating SQL column names to JS object style. + * @param {String} snakeCase + * @param {String|RegExp} delimiter + * @returns {String} + */ + static _camelfy(snakeCase, delimiter = '_') { + if (!snakeCase || typeof snakeCase.split !== 'function') { + return undefined; + } + const words = snakeCase.split(delimiter); + return [ + words.shift(), + ...words.map((word) => word.charAt(0).toUpperCase() + word.slice(1)), + ].join(''); + } + + + /** + * Basic type checking of object properties. + * @param {Object} object + * @param {String[]} properties + * @param {String[]} types + */ + _ensureTypes(object, properties, types) { + const _scope = _fileScope('_ensureTypes'); + + if (!(object && properties && types)) { + this.logger.error(_scope, 'undefined argument', { object, properties, types }); + throw new DatabaseErrors.DataValidation(); + } + properties.forEach((p) => { + // eslint-disable-next-line security/detect-object-injection + const pObj = object[p]; + const pType = typeof pObj; + if (!types.includes(pType) + && !(pObj instanceof Buffer && types.includes('buffer')) + && !(pObj === null && types.includes('null')) + && !(pType === 'bigint' && types.includes('number'))) { + const reason = `'${p}' is '${pType}', but must be ${types.length > 1 ? 'one of ' : ''}'${types}'`; + this.logger.error(_scope, reason, {}); + throw new DatabaseErrors.DataValidation(reason); + } + }); + } + + + /** + * Interface methods need implementations. Ensure the db-interaction + * methods on the base class call this, so they may be overridden by + * implementation classes. + * @param {String} method + * @param {arguments} args + */ + _notImplemented(method, args) { + this.logger.error(_fileScope(method), 'abstract method called', Array.from(args)); + throw new DatabaseErrors.NotImplemented(method); + } + + + /** + * Interface + */ + + + /** + * Recreate a proper URL from the components. + * We store decomposed urls in db to simplify sub-searching profiles prefixing target urls. + * @param {Object} parts + * @param {String} parts.protocol + * @param {String} parts.username + * @param {String} parts.password + * @param {String} parts.host + * @param {String} parts.pathname + * @param {String} parts.search + * @param {String} parts.hash + * @returns {URL} + */ + static _URLFromParts(parts) { + const { + protocol, + username, + password, + host, + pathname, + search, + hash, + } = parts; + const url = new URL('https://placeholder.'); + url.protocol = protocol; + url.username = username; + url.password = password; + url.host = host; + url.pathname = pathname; + url.search = search; + url.hash = hash; + return url; + } + + /** + * @typedef Account + * @property {String} accountId uuid + * @property {Date} created + * @property {String} profile uri + */ + + /** + * + * @param {*} dbCtx + * @param {String} profile uri + * @returns {Promise} + */ + accountGetByProfile(dbCtx, profile) { + this._notImplemented('accountGetByProfile', arguments); + } + + + /** + * Locate the account whose profile prefixes the most of the target URL + * @param {*} dbCtx + * @param {URL} targetURL + * @returns {Promise} + */ + accountGetByProfilePrefix(dbCtx, targetURL) { + this._notImplemented('accoutnGetByProfilePrefix', arguments); + } + + + /** + * + * @param {*} dbCtx + * @param {String} profile uri + * @returns {Promise} + */ + accountInsert(dbCtx, profile) { + this._notImplemented('accountInsert', arguments); + } + + + /** + * @typedef {Object} Mention + * @property {URL} profile account which controls the target url + * @property {String} mentionId uuid + * @property {Date} created + * @property {Date=} updated + * @property {URL} targetUrl + * @property {URL} sourceUrl + */ + /** + * + * @param {*} dbCtx + * @param {String} mentionId + * @returns {Promise} + */ + mentionGetById(dbCtx, mentionId) { + this._notImplemented('mentionGetById', arguments); + } + + + /** + * + * @param {*} dbCtx + * @param {URL} targetURL + * @returns {Promise} + */ + mentionsGetByTargetUrl(dbCtx, targetURL) { + this._notImplemented('mentionsGetByTargetUrl', arguments); + } + + + /** + * + * @param {*} dbCtx + * @param {URL} sourceURL + * @param {URL} targetURL + * @return {Promise} + */ + mentionUpsert(dbCtx, sourceURL, targetURL) { + this._notImplemented('mentionUpsert', arguments); + } + +} + +module.exports = Database; \ No newline at end of file diff --git a/src/db/errors.js b/src/db/errors.js new file mode 100644 index 0000000..cd43239 --- /dev/null +++ b/src/db/errors.js @@ -0,0 +1,56 @@ +'use strict'; + +class DatabaseError extends Error { + constructor(...args) { + super(...args); + Error.captureStackTrace(DatabaseError); + } + + get name() { + /* istanbul ignore next */ + return this.constructor.name; + } +} + +class DataValidation extends DatabaseError { + constructor(...args) { + super(...args); + Error.captureStackTrace(DataValidation); + } +} + +class NotImplemented extends DatabaseError { + constructor(...args) { + super(...args); + Error.captureStackTrace(NotImplemented); + } +} + +class UnexpectedResult extends DatabaseError { + constructor(...args) { + super(...args); + Error.captureStackTrace(UnexpectedResult); + } +} + +class UnsupportedEngine extends DatabaseError { + constructor(...args) { + super(...args); + Error.captureStackTrace(UnsupportedEngine); + } +} + +class MigrationNeeded extends DatabaseError { + constructor(...args) { + super(...args); + } +} + +module.exports = { + DatabaseError, + DataValidation, + MigrationNeeded, + NotImplemented, + UnexpectedResult, + UnsupportedEngine, +}; diff --git a/src/db/index.js b/src/db/index.js new file mode 100644 index 0000000..0d5ef16 --- /dev/null +++ b/src/db/index.js @@ -0,0 +1,42 @@ +'use strict'; + +const common = require('../common'); +const DatabaseErrors = require('./errors'); + +const _fileScope = common.fileScope(__filename); + +class DatabaseFactory { + constructor(logger, options, ...rest) { + const _scope = _fileScope('constructor'); + + const connectionString = options.db.connectionString || ''; + const protocol = connectionString.slice(0, connectionString.indexOf('://')).toLowerCase(); + + let Engine; + switch (protocol) { + case DatabaseFactory.Engines.PostgreSQL: + Engine = require('./postgres'); + break; + + case DatabaseFactory.Engines.SQLite: + Engine = require('./sqlite'); + break; + + default: + logger.error(_scope, 'unsupported connectionString', { protocol, options }); + throw new DatabaseErrors.UnsupportedEngine(protocol); + } + + return new Engine(logger, options, ...rest); + } + + static get Engines() { + return { + PostgreSQL: 'postgresql', + SQLite: 'sqlite', + }; + } + +} + +module.exports = DatabaseFactory; diff --git a/src/db/postgres/index.js b/src/db/postgres/index.js new file mode 100644 index 0000000..eeaa834 --- /dev/null +++ b/src/db/postgres/index.js @@ -0,0 +1,491 @@ +/* eslint-disable security/detect-object-injection */ +'use strict'; + +const pgpInitOptions = { + capSQL: true, +}; + +const path = require('path'); +const pgp = require('pg-promise')(pgpInitOptions); +const svh = require('../schema-version-helper'); +const Database = require('../base'); +const DBErrors = require('../errors'); +const Listener = require('./listener'); +const common = require('../../common'); + +const _fileScope = common.fileScope(__filename); + +const PGTypeIdINT8 = 20; // Type Id 20 == INT8 (BIGINT) +const PGTYpeIdINT8Array = 1016; //Type Id 1016 == INT8[] (BIGINT[]) +pgp.pg.types.setTypeParser(PGTypeIdINT8, BigInt); // Type Id 20 = INT8 (BIGINT) +const parseBigIntArray = pgp.pg.types.getTypeParser(PGTYpeIdINT8Array); // Type Id 1016 = INT8[] (BIGINT[]) +pgp.pg.types.setTypeParser(PGTYpeIdINT8Array, (a) => parseBigIntArray(a).map(BigInt)); + +const schemaVersionsSupported = { + min: { + major: 1, + minor: 0, + patch: 0, + }, + max: { + major: 1, + minor: 0, + patch: 0, + }, +}; + +class DatabasePostgres extends Database { + constructor(logger, options, _pgp = pgp) { + super(logger, options); + + this.db = _pgp(options.db.connectionString); + this.schemaVersionsSupported = schemaVersionsSupported; + + // Suppress QF warnings when running tests + this.noWarnings = options.db.noWarnings; + + if (options.db.cacheEnabled) { + this.listener = new Listener(logger, this.db, Object.assign({}, options.db.listener, { + channel: 'cache_invalidation', + // dataCallback: this._topicChanged.bind(this), + connectionEstablishedCallback: this._listenerEstablished.bind(this), + connectionLostCallback: this._listenerLost.bind(this), + })); + } + + // Log queries + const queryLogLevel = options.db.queryLogLevel; + if (queryLogLevel) { + pgpInitOptions.query = (event) => { + // Quell outgoing pings + if (event && event.query && event.query.startsWith('NOTIFY')) { + return; + } + this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event, ['query', 'params']) }); + }; + } + + // Log errors + pgpInitOptions.error = (err, event) => { + this.logger.error(_fileScope('pgp:error'), '', { err, event }); + }; + + // Deophidiate column names in-place, log results + pgpInitOptions.receive = (data, result, event) => { + const exemplaryRow = data[0]; + for (const prop in exemplaryRow) { + const camel = Database._camelfy(prop); + if (!(camel in exemplaryRow)) { + for (const d of data) { + d[camel] = d[prop]; + delete d[prop]; + } + } + } + if (queryLogLevel) { + // Quell outgoing pings + if (result && result.command === 'NOTIFY') { + return; + } + // Omitting .rows + const resultLog = common.pick(result, ['command', 'rowCount', 'duration']); + this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog }); + } + }; + + // Expose these for test coverage + this.pgpInitOptions = pgpInitOptions; + this._pgp = _pgp; + + this._initStatements(_pgp); + } + + + _queryFileHelper(_pgp) { + return (file) => { + const _scope = _fileScope('_queryFile'); + /* istanbul ignore next */ + const qfParams = { + minify: true, + ...(this.noWarnings && { noWarnings: this.noWarnings }), + }; + const qf = new _pgp.QueryFile(file, qfParams); + if (qf.error) { + this.logger.error(_scope, 'failed to create SQL statement', { error: qf.error, file }); + throw qf.error; + } + return qf; + }; + } + + + async initialize(applyMigrations = true) { + const _scope = _fileScope('initialize'); + this.logger.debug(_scope, 'called', { applyMigrations }); + if (applyMigrations) { + await this._initTables(); + } + await super.initialize(); + if (this.listener) { + await this.listener.start(); + } + } + + + async _initTables(_pgp) { + const _scope = _fileScope('_initTables'); + this.logger.debug(_scope, 'called', {}); + + const _queryFile = this._queryFileHelper(_pgp || this._pgp); + + // Migrations rely upon this table, ensure it exists. + const metaVersionTable = '_meta_schema_version'; + + const tableExists = async (name) => this.db.oneOrNone('SELECT table_name FROM information_schema.tables WHERE table_name=$(name)', { name }); + let metaExists = await tableExists(metaVersionTable); + if (!metaExists) { + const fPath = path.join(__dirname, 'sql', 'schema', 'init.sql'); + const initSql = _queryFile(fPath); + const results = await this.db.multiResult(initSql); + this.logger.debug(_scope, 'executed init sql', { results }); + metaExists = await tableExists(metaVersionTable); + /* istanbul ignore if */ + if (!metaExists) { + throw new DBErrors.UnexpectedResult(`did not create ${metaVersionTable} table`); + } + this.logger.info(_scope, 'created schema version table', { metaVersionTable }); + } + + // Apply migrations + const currentSchema = await this._currentSchema(); + const migrationsWanted = svh.unappliedSchemaVersions(__dirname, currentSchema, this.schemaVersionsSupported); + this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted }); + for (const v of migrationsWanted) { + const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql'); + const migrationSql = _queryFile(fPath); + const results = await this.db.multiResult(migrationSql); + this.logger.debug(_scope, 'executed migration sql', { version: v, results }); + this.logger.info(_scope, 'applied migration', { version: v }); + } + } + + + _initStatements(_pgp) { + const _scope = _fileScope('_initStatements'); + const _queryFile = this._queryFileHelper(_pgp); + this.statement = _pgp.utils.enumSql(path.join(__dirname, 'sql'), {}, _queryFile); + this.logger.debug(_scope, 'statements initialized', { statements: Object.keys(this.statement).length }); + } + + + async healthCheck() { + const _scope = _fileScope('healthCheck'); + this.logger.debug(_scope, 'called', {}); + const c = await this.db.connect(); + c.done(); + return { serverVersion: c.client.serverVersion }; + } + + + async _currentSchema() { + return this.db.one('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1'); + } + + + async _closeConnection() { + const _scope = _fileScope('_closeConnection'); + try { + if (this.listener) { + await this.listener.stop(); + } + await this._pgp.end(); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e }); + throw e; + } + } + + + /* istanbul ignore next */ + async _purgeTables(really = false) { + const _scope = _fileScope('_purgeTables'); + try { + if (really) { + await this.db.tx(async (t) => { + await t.batch([ + 'topic', + // 'topic_fetch_in_progress', + // 'verification', + // 'verification_in_progress', + // 'subscription', + // 'subscription_delivery_in_progress', + ].map(async (table) => t.query('TRUNCATE TABLE $(table:name) CASCADE', { table }))); + }); + } + } catch (e) { + this.logger.error(_scope, 'failed', { error: e }); + throw e; + } + } + + + // eslint-disable-next-line class-methods-use-this + _resultLog(result) { + return common.pick(result, ['command', 'rowCount', 'duration']); + } + + + /** + * Receive notices when topic entry is updated. + * Clear relevant cache entry. + * @param {String} payload + */ + // _topicChanged(payload) { + // const _scope = _fileScope('_topicChanged'); + // if (payload !== 'ping') { + // this.logger.debug(_scope, 'called', { payload }); + // this.cache.delete(payload); + // } + // } + + + /** + * Called when a listener connection is opened. + * Enable cache. + */ + _listenerEstablished() { + const _scope = _fileScope('_listenerEstablished'); + this.logger.debug(_scope, 'called', {}); + this.cache = new Map(); + } + + + /** + * Called when a listener connection is closed. + * Disable cache. + */ + _listenerLost() { + const _scope = _fileScope('_listenerLost'); + this.logger.debug(_scope, 'called', {}); + delete this.cache; + } + + + /** + * Return a cached entry, if available. + * @param {*} key + */ + // _cacheGet(key) { + // const _scope = _fileScope('_cacheGet'); + // if (this.cache && this.cache.has(key)) { + // const cacheEntry = this.cache.get(key); + // this.logger.debug(_scope, 'found cache entry', { key, ...common.pick(cacheEntry, ['added', 'hits', 'lastHit']) }); + // cacheEntry.hits += 1; + // cacheEntry.lastHit = new Date(); + // return cacheEntry.data; + // } + // } + + + /** + * Store an entry in cache, if available. + * @param {*} key + * @param {*} data + */ + // _cacheSet(key, data) { + // const _scope = _fileScope('_cacheSet'); + // if (this.cache) { + // this.cache.set(key, { + // added: new Date(), + // hits: 0, + // lastHit: undefined, + // data, + // }); + // this.logger.debug(_scope, 'added cache entry', { key }); + // } + // } + + + async context(fn) { + return this.db.task(async (t) => fn(t)); + } + + + // eslint-disable-next-line class-methods-use-this + async transaction(dbCtx, fn) { + return dbCtx.txIf(async (t) => fn(t)); + } + + + async accountGetByProfile(dbCtx, profile) { + const _scope = _fileScope('accountGetByProfile'); + this.logger.debug(_scope, 'called', { profile }); + + try { + return await dbCtx.oneOrNone(this.statement.accountGetByProfile, { profile }); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, profile }); + throw e; + } + } + + + async accountInsert(dbCtx, profile) { + const _scope = _fileScope('accountInsert'); + this.logger.debug(_scope, 'called', { profile }); + + try { + const result = await dbCtx.result(this.statement.accountInsert, { profile }); + if (result.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not insert account'); + } + const account = result.rows[0]; + this.logger.debug(_scope, 'success', { ...account }); + return account; + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, profile }); + throw e; + } + } + + + async channelInsert(dbCtx, accountId, name, uid) { + const _scope = _fileScope('channelInsert'); + this.logger.debug(_scope, 'called', { accountId, name, uid }); + + try { + return await this.transaction(dbCtx, async (txCtx) => { + const insertResult = await txCtx.result(this.statement.channelInsert, { accountId, name, uid }); + if (insertResult.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not insert channel'); + } + const channel = insertResult.rows[0]; + const rankResult = await txCtx.result(this.statement.channelRankInsert, { channelId: channel.channelId }); + if (rankResult.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not insert channel rank'); + } + const { rank } = rankResult.rows[0]; + this.logger.debug(_scope, 'success', { ...channel, rank }); + return { ...channel, rank }; + }); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, accountId, name, uid }); + throw e; + } + } + + + async channelUpdate(dbCtx, accountId, uid, name) { + const _scope = _fileScope('channelUpdate'); + this.logger.debug(_scope, 'called', { accountId, uid, name }); + + try { + const result = await dbCtx.result(this.statement.channelUpdate, { accountId, uid, name }); + if (result.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not update channel'); + } + return result.rows[0]; + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, accountId, uid, name }); + } + } + + + async channelDelete(dbCtx, accountId, uid) { + const _scope = _fileScope('channelDelete'); + this.logger.debug(_scope, 'called', { accountId, uid }); + + try { + const result = await dbCtx.result(this.statement.channelDelete, { accountId, uid }); + if (result.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not delete channel'); + } + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, accountId, uid }); + throw e; + } + } + + + async channelRankUpdate(dbCtx, channelId, rank) { + const _scope = _fileScope('channelRankUpdate'); + this.logger.debug(_scope, 'called', { channelId, rank }); + + try { + const result = await dbCtx.result(this.statement.channelRankUpdate, { channelId, rank }); + if (result.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not update channel rank'); + } + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, channelId, rank }); + throw e; + } + } + + + async channelsGetByAccountId(dbCtx, accountId) { + const _scope = _fileScope('channelsGetByAccountId'); + this.logger.debug(_scope, 'called', { accountId }); + + try { + const channels = await dbCtx.manyOrNone(this.statement.channelsGetByAccountId, { accountId }); + return channels || []; + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, accountId }); + throw e; + } + } + + + /** + * + * @param {*} dbCtx + * @param {String} accountId + * @returns {Object[]} + */ + async channelsUnreadGetByAccountId(dbCtx, accountId) { + const _scope = _fileScope('channelsUnreadGetByAccountId'); + this.logger.debug(_scope, 'called', { accountId }); + + try { + const channels = await dbCtx.manyOrNone(this.statement.channelsUnreadGetByAccountId, { accountId }); + return channels || []; + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, accountId }); + throw e; + } + } + + /* + async feedGetById(dbCtx, feedId) { + const _scope = _fileScope('feedGetById'); + this.logger.debug(_scope, 'called', { feedId }); + + try { + return await dbCtx.oneOrNone(this.statement.feedGetById, { feedId }); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, feedId }); + throw e; + } + } + + async feedInsert(dbCtx, feedData) { + const _scope = _fileScope('feedInsert'); + this.logger.debug(_scope, 'called', { feedData }); + + try { + const result = await dbCtx.result(this.statement.feedInsert, feedData); + if (result.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not insert feed'); + } + return result.rows[0].feedId; + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, feedData }); + throw e; + } + } + */ + +} + +module.exports = DatabasePostgres; diff --git a/src/db/postgres/listener.js b/src/db/postgres/listener.js new file mode 100644 index 0000000..6ad387a --- /dev/null +++ b/src/db/postgres/listener.js @@ -0,0 +1,169 @@ +'use strict'; + +const common = require('../../common'); + +const _fileScope = common.fileScope(__filename); + + +const defaultOptions = { + channel: 'cache_invalidation', + dataCallback: common.nop, + connectionLostCallback: common.nop, + connectionEstablishedCallback: common.nop, + pingDelayMs: 5000, + reconnectDelayMs: 6000, + reconnectTimes: 10, +}; + +/** + * Create a robust connection which listens to a notification channel. + */ +class PostgresListener { + constructor(logger, db, options) { + this.logger = logger; + this.db = db; + + this.options = Object.assign({}, defaultOptions, options); + this.notificationEventName = 'notification'; + + this.connection = null; + this.nextPingTimeout = undefined; + + this._onConnectionLostBound = this._onConnectionLost.bind(this); + this._onNotificationBound = this._onNotification.bind(this); + } + + + /** + * Establish the listener connection. + */ + async start() { + await this._reconnect(0, 1); + this._sendPing(); + } + + + /** + * Shut down the listener connection. + */ + async stop() { + const _scope = _fileScope('stop'); + if (this.reconnectPending) { + this.logger.debug(_scope, 'overriding existing reconnect retry'); + clearTimeout(this.reconnectPending); + delete this.reconnectPending; + } + if (this.connection) { + this.connection.client.removeListener(this.notificationEventName, this.onNotificationBound); + this.connection.done(); + this.connection = null; + await this.options.connectionLostCallback(); + } + } + + + /** + * Begin sending connection pings. + */ + _sendPing() { + const _scope = _fileScope('_sendPing'); + this.nextPingTimeout = setTimeout(async () => { + try { + if (this.connection) { + await this.connection.none('NOTIFY $(channel:name), $(payload)', { channel: this.options.channel, payload: 'ping' }); + } + } catch (e) { + this.logger.error(_scope, 'failed', { error: e }); + } finally { + this._sendPing(); + } + }, this.options.pingDelayMs); + } + + + /** + * Notify callback. + * @param {Object} data + */ + async _onNotification(data) { + const _scope = _fileScope('_onNotification'); + // Ignore our own messages + if (data.payload === 'ping') { + return; + } + this.logger.debug(_scope, 'called', data); + await this.options.dataCallback(data.payload); + } + + + /** + * Notify callback and attempt to reconnect. + * @param {*} error + * @param {*} event + */ + async _onConnectionLost(error, event) { + const _scope = _fileScope('_onConnectionLost'); + this.logger.error(_scope, 'listener connection lost', { error, event }); + this.connection = null; + try { + event.client.removeListener(this.notificationEventName, this.onNotificationBound); + } catch (e) { + this.logger.error(_scope, 'failed to remove listener', { error: e }); + // That's okay, it was probably just gone anyhow. + } + await this.options.connectionLostCallback(); + try { + await this._reconnect(this.options.reconnectDelayMs, this.options.reconnectTimes); + } catch (e) { + this.logger.error(_scope, 'failed to reconnect listener', { error: e }); + } + } + + + /** + * Schedule an attempt to establish a connection. + * @param {Number} delay + * @param {Number} retriesRemaining + */ + async _reconnect(delay, retriesRemaining) { + const _scope = _fileScope('_reconnect'); + if (this.connection) { + this.logger.debug(_scope, 'closing existing connection'); + this.connection.done(); + this.connection = null; + } + if (this.reconnectPending) { + this.logger.debug(_scope, 'overriding existing reconnect retry'); + clearTimeout(this.reconnectPending); + } + return new Promise((resolve, reject) => { + this.reconnectPending = setTimeout(async () => { + try { + delete this.reconnectPending; + this.connection = await this.db.connect({ + direct: true, + onLost: this._onConnectionLostBound, + }); + this.connection.client.on(this.notificationEventName, this._onNotificationBound); + await this.connection.none('LISTEN $(channel:name)', { channel: this.options.channel }); + this.logger.debug(_scope, 'listener connection established'); + await this.options.connectionEstablishedCallback(); + resolve(); + } catch (e) { + if (retriesRemaining <= 0) { + return reject(e); + } + try { + await this._reconnect(delay, retriesRemaining - 1); + resolve(); + } catch (e2) { + reject(e2); + } + } + }, delay); + }); + } + +} + +module.exports = PostgresListener; \ No newline at end of file diff --git a/src/db/postgres/sql/channel-delete.sql b/src/db/postgres/sql/channel-delete.sql new file mode 100644 index 0000000..a906337 --- /dev/null +++ b/src/db/postgres/sql/channel-delete.sql @@ -0,0 +1,4 @@ +-- +DELETE FROM channel +WHERE + account_id = $(accountId) AND uid = $(uid) diff --git a/src/db/postgres/sql/channel-insert.sql b/src/db/postgres/sql/channel-insert.sql new file mode 100644 index 0000000..f547279 --- /dev/null +++ b/src/db/postgres/sql/channel-insert.sql @@ -0,0 +1,11 @@ +-- +WITH new_id AS ( + SELECT uuid_generate_v1() AS id +) +INSERT INTO channel (account_id, channel_id, name, uid) VALUES ( + $(accountId), + (SELECT id FROM new_id), + $(name), + COALESCE($(uid), (SELECT id FROM new_id)::text) +) +RETURNING * \ No newline at end of file diff --git a/src/db/postgres/sql/channel-rank-insert.sql b/src/db/postgres/sql/channel-rank-insert.sql new file mode 100644 index 0000000..8dcb720 --- /dev/null +++ b/src/db/postgres/sql/channel-rank-insert.sql @@ -0,0 +1,11 @@ +-- +WITH next_channel_rank AS ( + SELECT COALESCE(MAX(rank) + 1, 0) AS rank + FROM channel_rank + WHERE channel_id = $(channelId) +) +INSERT INTO channel_rank (channel_id, rank) VALUES ( + $(channelId), + (SELECT rank FROM next_channel_rank) +) +RETURNING * diff --git a/src/db/postgres/sql/channel-rank-update.sql b/src/db/postgres/sql/channel-rank-update.sql new file mode 100644 index 0000000..620720f --- /dev/null +++ b/src/db/postgres/sql/channel-rank-update.sql @@ -0,0 +1,6 @@ +-- +UPDATE channel_rank +SET + rank = $(rank) +WHERE + channel_id = (SELECT channel_id FROM channel WHERE account_id = $(accountId) AND uid = $(uid)) diff --git a/src/db/postgres/sql/channel-update.sql b/src/db/postgres/sql/channel-update.sql new file mode 100644 index 0000000..de72d8b --- /dev/null +++ b/src/db/postgres/sql/channel-update.sql @@ -0,0 +1,5 @@ +-- +UPDATE channel + SET name = $(name) +WHERE + account_id = $(accountId) AND uid = $(uid) diff --git a/src/db/postgres/sql/channels-get-by-account-id.sql b/src/db/postgres/sql/channels-get-by-account-id.sql new file mode 100644 index 0000000..90deec9 --- /dev/null +++ b/src/db/postgres/sql/channels-get-by-account-id.sql @@ -0,0 +1,6 @@ +-- +SELECT c.*, cr.rank +FROM channel c +INNER JOIN channel_rank cr USING (channel_id) +WHERE c.account_id = $(accountId) +ORDER BY cr.rank ASC diff --git a/src/db/postgres/sql/channels-unread-get-by-account-id.sql b/src/db/postgres/sql/channels-unread-get-by-account-id.sql new file mode 100644 index 0000000..00f9a58 --- /dev/null +++ b/src/db/postgres/sql/channels-unread-get-by-account-id.sql @@ -0,0 +1,7 @@ +-- +SELECT c.*, ciuc.unread +FROM channel c +INNER JOIN channel_rank cr USING (channel_id) +INNER JOIN channel_item_unread_count ciuc USING (channel_id) +WHERE c.account_id = $(accountId)::uuid +ORDER BY cr.rank ASC diff --git a/src/db/postgres/sql/feed-get-by-id.sql b/src/db/postgres/sql/feed-get-by-id.sql new file mode 100644 index 0000000..f8fc062 --- /dev/null +++ b/src/db/postgres/sql/feed-get-by-id.sql @@ -0,0 +1,5 @@ +-- +SELECT * +FROM feed +WHERE feed_id = $(feedId) + diff --git a/src/db/postgres/sql/feed-insert.sql b/src/db/postgres/sql/feed-insert.sql new file mode 100644 index 0000000..d0a711c --- /dev/null +++ b/src/db/postgres/sql/feed-insert.sql @@ -0,0 +1,4 @@ +-- +INSERT INTO feed (url) +VALUES ($(url)) +RETURNING feed_id diff --git a/src/db/postgres/sql/schema/1.0.0/apply.sql b/src/db/postgres/sql/schema/1.0.0/apply.sql new file mode 100644 index 0000000..a52f1d8 --- /dev/null +++ b/src/db/postgres/sql/schema/1.0.0/apply.sql @@ -0,0 +1,175 @@ +BEGIN; + CREATE TABLE account ( + account_id UUID PRIMARY KEY DEFAULT uuid_generate_v1(), + created TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), + profile_url TEXT NOT NULL UNIQUE + ); + COMMENT ON TABLE account IS $docstring$ +Persistant mapping of IndieAuth profile to local account identifier. +$docstring$; + + CREATE TABLE channel ( + channel_id UUID PRIMARY KEY DEFAULT uuid_generate_v1(), + account_id UUID NOT NULL REFERENCES account(account_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED, + uid TEXT NOT NULL CHECK (uid != 'global'), + name TEXT NOT NULL, + CONSTRAINT account_uid_unique UNIQUE (account_id, uid) + ); + CREATE INDEX channel_account_id_idx ON channel(account_id); + CREATE INDEX channel_uid_idx ON channel(uid); + COMMENT ON TABLE channel IS $docstring$ +N.B. uid will be a duplicate of channel_id except for system channels (e.g. 'notifications'). +$docstring$; + + CREATE TABLE channel_rank ( + channel_id UUID NOT NULL REFERENCES channel(channel_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED, + rank INTEGER NOT NULL, + PRIMARY KEY (channel_id, rank) + ); + COMMENT ON TABLE channel_rank IS $docstring$ +$docstring$; + + CREATE TABLE channel_item_unread ( + channel_id UUID NOT NULL REFERENCES channel(channel_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED, + item_id UUID NOT NULL, -- REFERENCES ... + PRIMARY KEY (channel_id, item_id) + ); + COMMENT ON TABLE channel_item_unread IS $docstring$ +$docstring$; + + CREATE VIEW channel_item_unread_count AS + SELECT channel_id, COUNT(*) AS unread + FROM channel_item_unread + GROUP BY channel_id; + + -- CREATE TABLE channel_source ... + +-- + +-- CREATE TABLE feed ( +-- feed_id UUID PRIMARY KEY DEFAULT uuid_generate_v1(), +-- created TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), +-- is_active BOOLEAN NOT NULL DEFAULT true, + +-- url TEXT NOT NULL UNIQUE, +-- site_url TEXT, +-- hub_url TEXT, + +-- title TEXT, +-- description TEXT, +-- image_url TEXT, + +-- last_fetch_success TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT '-infinity'::timestamptz, +-- last_fetch_attempt TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT '-infinity'::timestamptz, +-- next_fetch_attempt TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT '-infinity'::timestamptz, +-- fetch_attempts_since_success INTEGER NOT NULL DEFAULT 0, +-- last_fetch_failure_reason TEXT, +-- content_last_modified TEXT, +-- content_etag TEXT, + +-- websub_enabled BOOLEAN NOT NULL DEFAULT true, +-- websub_lease_expires TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT '-infinity'::timestamptz, +-- websub_last_delivery TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT '-infinity'::timestamptz, +-- websub_secret TEXT +-- ); +-- CREATE INDEX feed_next_fetch_attempt_idx ON feed(next_fetch_attempt); + +-- CREATE TABLE feed_fetch_in_progress ( +-- feed_id UUID PRIMARY KEY NOT NULL REFERENCES feed(feed_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED, +-- claimant UUID NOT NULL, +-- claimed TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), +-- claim_expires TIMESTAMP WITH TIME ZONE NOT NULL +-- ); +-- CREATE INDEX feed_fetch_in_progress_claim_expires_idx ON feed_fetch_in_progress(claim_expires); + +-- CREATE VIEW feed_fetch_in_progress_active AS +-- SELECT * +-- FROM feed_fetch_in_progress +-- WHERE claim_expires >= now() +-- ; + +-- CREATE VIEW feed_fetch_needed AS +-- SELECT * +-- FROM feed +-- WHERE +-- is_active = true +-- AND +-- next_fetch_attempt <= now() +-- AND +-- feed_id NOT IN (SELECT feed_id FROM feed_fetch_in_progress_active) +-- ; + +-- -- + +-- CREATE TABLE account_feed_override ( +-- account_id UUID PRIMARY KEY NOT NULL REFERENCES account(account_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED, +-- feed_id UUID NOT NULL REFERENCES feed(feed_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED, + +-- title TEXT, +-- image_url TEXT +-- ); + + +-- -- + +-- CREATE TABLE category ( +-- category_id UUID PRIMARY KEY NOT NULL DEFAULT uuid_generate_v1(), +-- name TEXT NOT NULL +-- ); +-- CREATE INDEX category_name_idx ON category(name); + +-- CREATE TABLE account_feed_category ( +-- account_id UUID PRIMARY KEY NOT NULL REFERENCES account(account_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED, +-- feed_id UUID NOT NULL REFERENCES feed(feed_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED, +-- category_id UUID NOT NULL REFERENCES category(category_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED +-- ); +-- CREATE INDEX account_feed_category_feed_id_idx ON account_feed_category(feed_id); +-- CREATE INDEX account_feed_category_category_id_idx ON account_feed_category(category_id); +-- CREATE UNIQUE INDEX account_feed_category_unique_idx ON account_feed_category(account_id, feed_id, category_id); + +-- -- + +-- CREATE TABLE entry ( +-- entry_id UUID PRIMARY KEY NOT NULL DEFAULT uuid_generate_v1(), +-- feed_id UUID NOT NULL REFERENCES feed(feed_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED, +-- ingested TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), +-- published TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT '-infinity'::timestamptz, +-- updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT '-infinity'::timestamptz, + +-- guid TEXT NOT NULL, +-- link TEXT, +-- title TEXT, +-- author TEXT, +-- content TEXT +-- ); +-- CREATE INDEX entry_feed_id_idx ON entry(feed_id); +-- CREATE INDEX entry_published_idx ON entry(published); +-- CREATE INDEX entry_updated_idx ON entry(updated) WHERE updated != '-infinity'::timestamptz; +-- CREATE INDEX entry_feed_id_guid_idx ON entry(feed_id, guid); + +-- -- + +-- CREATE TABLE account_entry_unread ( +-- account_id UUID PRIMARY KEY NOT NULL REFERENCES account(account_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED, +-- entry_id UUID NOT NULL REFERENCES entry(entry_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED +-- ); + +-- -- + +-- CREATE TABLE tag ( +-- tag_id UUID PRIMARY KEY DEFAULT uuid_generate_v1(), +-- name TEXT NOT NULL +-- ); +-- CREATE INDEX tag_name ON tag(name); + +-- CREATE TABLE account_entry_tag ( +-- account_id UUID PRIMARY KEY NOT NULL REFERENCES account(account_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED, +-- entry_id UUID NOT NULL REFERENCES entry(entry_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED, +-- tag_id UUID NOT NULL REFERENCES tag(tag_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED +-- ); +-- CREATE INDEX account_entry_tag_entry_id_idx ON account_entry_tag(entry_id); +-- CREATE INDEX account_entry_tag_tag_id_idx ON account_entry_tag(tag_id); + +-- + INSERT INTO _meta_schema_version (major, minor, patch) VALUES (1, 0, 0); +COMMIT; diff --git a/src/db/postgres/sql/schema/1.0.0/revert.sql b/src/db/postgres/sql/schema/1.0.0/revert.sql new file mode 100644 index 0000000..f2ab0fa --- /dev/null +++ b/src/db/postgres/sql/schema/1.0.0/revert.sql @@ -0,0 +1,6 @@ +BEGIN; + DROP TABLE authentication; + +-- + DELETE FROM _meta_schema_version WHERE major = 1 AND minor = 0 AND patch = 0; +COMMIT; diff --git a/src/db/postgres/sql/schema/init.sql b/src/db/postgres/sql/schema/init.sql new file mode 100644 index 0000000..c4dd8e7 --- /dev/null +++ b/src/db/postgres/sql/schema/init.sql @@ -0,0 +1,15 @@ +-- +CREATE EXTENSION IF NOT EXISTS pg_stat_statements; +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; + +-- +BEGIN; + CREATE TABLE _meta_schema_version ( + major BIGINT NOT NULL, + minor BIGINT NOT NULL, + patch BIGINT NOT NULL, + applied TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), + PRIMARY KEY (major, minor, patch) + ); + INSERT INTO _meta_schema_version (major, minor, patch) VALUES (0, 0, 0); +COMMIT; diff --git a/src/db/schema-version-helper.js b/src/db/schema-version-helper.js new file mode 100644 index 0000000..65a1e39 --- /dev/null +++ b/src/db/schema-version-helper.js @@ -0,0 +1,131 @@ +'use strict'; + +const fs = require('fs'); +const path = require('path'); + +/** + * Utility functions for wrangling schema migrations. + * This mostly just deals with sorting and comparing 'x.y.z' version + * strings, with some presumptions about directory layouts and whatnot. + */ + +/** + * @typedef {Object} SchemaVersionObject + * @property {Number} major + * @property {Number} minor + * @property {Number} patch + */ + + +/** + * Split a dotted version string into parts. + * @param {String} v + * @returns {SchemaVersionObject} + */ +function schemaVersionStringToObject(v) { + const [ major, minor, patch ] = v.split('.', 3).map((x) => parseInt(x, 10)); + return { major, minor, patch }; +} + + +/** + * Render a version object numerically. + * @param {SchemaVersionObject} v + * @returns {Number} + */ +function schemaVersionObjectToNumber(v) { + const vScale = 1000; + return parseInt(v.major) * vScale * vScale + parseInt(v.minor) * vScale + parseInt(v.patch); +} + + +/** + * Convert dotted version string into number. + * @param {String} v + * @returns {Number} + */ +function schemaVersionStringToNumber(v) { + return schemaVersionObjectToNumber(schemaVersionStringToObject(v)); +} + + +/** + * Version string comparison, for sorting. + * @param {String} a + * @param {String} b + * @returns {Number} + */ +function schemaVersionStringCmp(a, b) { + return schemaVersionStringToNumber(a) - schemaVersionStringToNumber(b); +} + + +/** + * Check if an entry in a directory is a directory containing a migration file. + * @param {String} schemaDir + * @param {String} name + * @returns {Boolean} + */ +function isSchemaMigrationDirectory(schemaDir, name, migrationFile = 'apply.sql') { + // eslint-disable-next-line security/detect-non-literal-fs-filename + const nameStat = fs.statSync(path.join(schemaDir, name)); + if (nameStat.isDirectory()) { + let applyStat; + try { + // eslint-disable-next-line security/detect-non-literal-fs-filename + applyStat = fs.statSync(path.join(schemaDir, name, migrationFile)); + return applyStat.isFile(); + } catch (e) { + return false; + } + } + return false; +} + + +/** + * Return an array of schema migration directory names within engineDir, + * sorted in increasing order. + * @param {String} engineDir + * @returns {String[]} + */ +function allSchemaVersions(engineDir) { + const schemaDir = path.join(engineDir, 'sql', 'schema'); + // eslint-disable-next-line security/detect-non-literal-fs-filename + const availableVersions = fs.readdirSync(schemaDir).filter((d) => isSchemaMigrationDirectory(schemaDir, d)); + availableVersions.sort(schemaVersionStringCmp); + return availableVersions; +} + + +/** + * Return an array of schema migration directory names within engineDir, + * which are within supported range, and are greater than the current + * @param {String} engineDir + * @param {SchemaVersionObject} current + * @param {Object} supported + * @param {SchemaVersionObject} supported.min + * @param {SchemaVersionObject} supported.max + * @returns {String[]} + */ +function unappliedSchemaVersions(engineDir, current, supported) { + const min = schemaVersionObjectToNumber(supported.min); + const max = schemaVersionObjectToNumber(supported.max); + const cur = schemaVersionObjectToNumber(current); + const available = allSchemaVersions(engineDir); + return available.filter((a) => { + a = schemaVersionStringToNumber(a); + return a >= min && a <= max && a > cur; + }); +} + + +module.exports = { + schemaVersionStringToObject, + schemaVersionObjectToNumber, + schemaVersionStringToNumber, + schemaVersionStringCmp, + isSchemaMigrationDirectory, + allSchemaVersions, + unappliedSchemaVersions, +}; \ No newline at end of file diff --git a/src/db/sqlite/index.js b/src/db/sqlite/index.js new file mode 100644 index 0000000..b40ea9d --- /dev/null +++ b/src/db/sqlite/index.js @@ -0,0 +1,326 @@ +'use strict'; + +const common = require('../../common'); +const Database = require('../base'); +const DBErrors = require('../errors'); +const svh = require('../schema-version-helper'); +const SQLite = require('better-sqlite3'); +const fs = require('fs'); +const path = require('path'); +const { performance } = require('perf_hooks'); +const uuid = require('uuid'); + +const _fileScope = common.fileScope(__filename); + +const schemaVersionsSupported = { + min: { + major: 1, + minor: 0, + patch: 0, + }, + max: { + major: 1, + minor: 0, + patch: 0, + }, +}; + +// max of signed int64 (2^63 - 1), should be enough +// const EPOCH_FOREVER = BigInt('9223372036854775807'); + +class DatabaseSQLite extends Database { + constructor(logger, options) { + super(logger, options); + + const connectionString = options.db.connectionString || 'sqlite://:memory:'; + const csDelim = '://'; + const dbFilename = connectionString.slice(connectionString.indexOf(csDelim) + csDelim.length); + + const queryLogLevel = options.db.queryLogLevel; + + const sqliteOptions = { + ...(queryLogLevel && { + // eslint-disable-next-line security/detect-object-injection + verbose: (query) => this.logger[queryLogLevel](_fileScope('SQLite:verbose'), '', { query }), + }), + }; + this.db = new SQLite(dbFilename, sqliteOptions); + this.schemaVersionsSupported = schemaVersionsSupported; + this.changesSinceLastOptimize = BigInt(0); + this.optimizeAfterChanges = options.db.connectionString.optimizeAfterChanges; + this.db.pragma('foreign_keys = on'); // Enforce consistency. + this.db.pragma('journal_mode = WAL'); // Be faster, expect local filesystem. + this.db.defaultSafeIntegers(true); // This probably isn't necessary, but by using these BigInts we keep weird floats out of the query logs. + + this._initTables(); + this._initStatements(); + } + + + /** + * SQLite cannot prepare its statements without a schema, ensure such exists. + */ + _initTables() { + const _scope = _fileScope('_initTables'); + + // Migrations rely upon this table, ensure it exists. + const metaVersionTable = '_meta_schema_version'; + const tableExists = this.db.prepare('SELECT name FROM sqlite_master WHERE type=:type AND name=:name').pluck(true).bind({ type: 'table', name: metaVersionTable }); + let metaExists = tableExists.get(); + if (metaExists === undefined) { + const fPath = path.join(__dirname, 'sql', 'schema', 'init.sql'); + // eslint-disable-next-line security/detect-non-literal-fs-filename + const fSql = fs.readFileSync(fPath, { encoding: 'utf8' }); + this.db.exec(fSql); + metaExists = tableExists.get(); + /* istanbul ignore if */ + if (metaExists === undefined) { + throw new DBErrors.UnexpectedResult(`did not create ${metaVersionTable} table`); + } + this.logger.info(_scope, 'created schema version table', { metaVersionTable }); + } + + // Apply migrations + const currentSchema = this._currentSchema(); + const migrationsWanted = svh.unappliedSchemaVersions(__dirname, currentSchema, this.schemaVersionsSupported); + this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted }); + migrationsWanted.forEach((v) => { + const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql'); + // eslint-disable-next-line security/detect-non-literal-fs-filename + const fSql = fs.readFileSync(fPath, { encoding: 'utf8' }); + this.logger.info(_scope, 'applying migration', { version: v }); + this.db.exec(fSql); + }); + } + + + _initStatements() { + const _scope = _fileScope('_initStatements'); + const sqlDir = path.join(__dirname, 'sql'); + this.statement = {}; + + // Decorate the statement calls we use with timing and logging. + const wrapFetch = (logName, statementName, fn) => { + const _wrapScope = _fileScope(logName); + return (...args) => { + const startTimestampMs = performance.now(); + const rows = fn(...args); + DatabaseSQLite._deOphidiate(rows); + const elapsedTimeMs = performance.now() - startTimestampMs; + this.logger.debug(_wrapScope, 'complete', { statementName, elapsedTimeMs }); + return rows; + }; + }; + const wrapRun = (logName, statementName, fn) => { + const _wrapScope = _fileScope(logName); + return (...args) => { + const startTimestampMs = performance.now(); + const result = fn(...args); + const elapsedTimeMs = performance.now() - startTimestampMs; + this.logger.debug(_wrapScope, 'complete', { ...result, statementName, elapsedTimeMs }); + result.duration = elapsedTimeMs; + return result; + }; + }; + + // eslint-disable-next-line security/detect-non-literal-fs-filename + for (const f of fs.readdirSync(sqlDir)) { + const fPath = path.join(sqlDir, f); + const { name: fName, ext: fExt } = path.parse(f); + // eslint-disable-next-line security/detect-non-literal-fs-filename + const stat = fs.statSync(fPath); + if (!stat.isFile() + || fExt.toLowerCase() !== '.sql') { + continue; + } + // eslint-disable-next-line security/detect-non-literal-fs-filename + const fSql = fs.readFileSync(fPath, { encoding: 'utf8' }); + const statementName = Database._camelfy(fName.toLowerCase(), '-'); + let statement; + try { + statement = this.db.prepare(fSql); + } catch (e) { + /* istanbul ignore next */ + this.logger.error(_scope, 'failed to prepare statement', { error: e, file: f }); + /* istanbul ignore next */ + throw e; + } + // eslint-disable-next-line security/detect-object-injection + this.statement[statementName] = statement; + const { get: origGet, all: origAll, run: origRun } = statement; + statement.get = wrapFetch('SQLite:get', statementName, origGet.bind(statement)); + statement.all = wrapFetch('SQLite:all', statementName, origAll.bind(statement)); + statement.run = wrapRun('SQLite:run', statementName, origRun.bind(statement)); + } + this.statement._optimize = this.db.prepare('SELECT * FROM pragma_optimize(0x03)'); + + this.logger.debug(_scope, 'statements initialized', { statements: Object.keys(this.statement).length }); + } + + + static _deOphidiate(rows) { + const rowsIsArray = Array.isArray(rows); + if (!rowsIsArray) { + rows = [rows]; + } + const exemplaryRow = rows[0]; + for (const prop in exemplaryRow) { + const camel = Database._camelfy(prop); + if (!(camel in exemplaryRow)) { + for (const d of rows) { + // eslint-disable-next-line security/detect-object-injection + d[camel] = d[prop]; + // eslint-disable-next-line security/detect-object-injection + delete d[prop]; + } + } + } + return rowsIsArray ? rows : rows[0]; + } + + + _currentSchema() { + return this.db.prepare('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1').get(); + } + + + healthCheck() { + const _scope = _fileScope('healthCheck'); + this.logger.debug(_scope, 'called', {}); + if (!this.db.open) { + throw new DBErrors.UnexpectedResult('database is not open'); + } + return { open: this.db.open }; + } + + + _closeConnection() { + this.db.close(); + } + + + _optimize() { + const _scope = _fileScope('_optimize'); + + if (this.optimizeAfterChanges + && this.changesSinceLastOptimize >= this.optimizeAfterChanges) { + const optimize = this.statement._optimize.all(); + this.logger.debug(_scope, 'optimize', { optimize }); + this.db.pragma('optimize'); + this.changesSinceLastOptimize = BigInt(0); + } + } + + + _purgeTables(really) { + if (really) { + [ + 'account', + 'post', + ].map((table) => { + const result = this.db.prepare(`DELETE FROM ${table}`).run(); + this.logger.debug(_fileScope('_purgeTables'), 'success', { table, result }); + }); + } + } + + + context(fn) { + return fn(this.db); + } + + + transaction(dbCtx, fn) { + dbCtx = dbCtx || this.db; + return dbCtx.transaction(fn)(); + } + + + static _dateFromEpoch(epoch) { + return new Date(Number(epoch * 1000n)); + } + + + static _accountToNative(account) { + const result = { + created: DatabaseSQLite._dateFromEpoch(account.created), + accountId: Number(account.accountId), + profile: Database._URLFromParts({ + protocol: account.profile_protocol, + username: account.profile_username, + password: account.profile_password, + host: account.profile_host, + pathname: account.profile_pathname, + search: account.profile_search, + hash: account.profile_hash, + }), + }; + return result; + } + + + accountGetByProfile(dbCtx, profile) { + const _scope = _fileScope('accountGetByProfile'); + this.logger.debug(_scope, 'called', { profile }); + + try { + const { + protocol: profileProtocol, + username: profileUsername, + password: profilePassword, + host: profileHost, + pathname: profilePathname, + search: profileSearch, + hash: profileHash, + } = new URL(profile); + const account = this.statement.accountGetByProfile.get({ + profileProtocol, + profileUsername, + profilePassword, + profileHost, + profilePathname, + profileSearch, + profileHash, + }); + return DatabaseSQLite._accountToNative(account); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, profile }); + throw e; + } + } + + + accountGetByProfilePrefix(dbCtx, targetURL) { + const _scope = _fileScope('accountGetByProfilePrefix'); + this.logger.debug(_scope, 'called', { targetURL }); + + try { + + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, targetURL }); + throw e; + } + } + + accountInsert(dbCtx, profile) { + const _scope = _fileScope('authenticationUpsert'); + this.logger.debug(_scope, 'called', { profile }); + + try { + const accountId = uuid.v4(); + const result = this.statement.accountInsert.run({ accountId, profile }); + if (result.changes != 1) { + throw new DBErrors.UnexpectedResult('did not insert account'); + } + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, profile }) + throw e; + } + } + + + + +} + +module.exports = DatabaseSQLite; \ No newline at end of file diff --git a/src/db/sqlite/sql/account-get-by-profile.sql b/src/db/sqlite/sql/account-get-by-profile.sql new file mode 100644 index 0000000..b87132f --- /dev/null +++ b/src/db/sqlite/sql/account-get-by-profile.sql @@ -0,0 +1,15 @@ +-- +SELECT * FROM account WHERE + profile_protocol = :profileProtocol +AND + profile_username = :profileUsername +AND + profile_password = :profilePassword +AND + profile_host = :profileHost +AND + profile_pathname = :profilePathname +AND + profile_search = :profileSearch +AND + profile_hash = :profileHash diff --git a/src/db/sqlite/sql/account-insert.sql b/src/db/sqlite/sql/account-insert.sql new file mode 100644 index 0000000..322c007 --- /dev/null +++ b/src/db/sqlite/sql/account-insert.sql @@ -0,0 +1,3 @@ +-- +INSERT INTO account (account_id, profile) VALUES (:accountId, :profile) + diff --git a/src/db/sqlite/sql/mention-get-by-id.sql b/src/db/sqlite/sql/mention-get-by-id.sql new file mode 100644 index 0000000..27a5901 --- /dev/null +++ b/src/db/sqlite/sql/mention-get-by-id.sql @@ -0,0 +1,3 @@ +-- +SELECT * FROM mention_detail +WHERE mention_id = :mention_id diff --git a/src/db/sqlite/sql/schema/1.0.0/apply.sql b/src/db/sqlite/sql/schema/1.0.0/apply.sql new file mode 100644 index 0000000..d0be869 --- /dev/null +++ b/src/db/sqlite/sql/schema/1.0.0/apply.sql @@ -0,0 +1,40 @@ +BEGIN; + CREATE TABLE account ( + account_id INTEGER NOT NULL PRIMARY KEY CHECK (typeof(account_id) = 'integer'), + created INTEGER NOT NULL DEFAULT (strftime('%s', 'now')) CHECK (typeof(created) = 'integer'), + + profile_protocol TEXT NOT NULL DEFAULT '' CHECK (typeof(profile_protocol) = 'text'), + profile_username TEXT NOT NULL DEFAULT '' CHECK (typeof(profile_username) = 'text'), + profile_password TEXT NOT NULL DEFAULT '' CHECK (typeof(profile_password) = 'text'), + profile_host TEXT NOT NULL DEFAULT '' CHECK (typeof(profile_host) = 'text'), + profile_pathname TEXT NOT NULL DEFAULT '' CHECK (typeof(profile_pathname) = 'text'), + profile_search TEXT NOT NULL DEFAULT '' CHECK (typeof(profile_search) = 'text'), + profile_hash TEXT NOT NULL DEFAULT '' CHECK (typeof(profile_hash) = 'text') + ); + CREATE UNIQUE INDEX account_profile_idx ON account(profile_protocol, profile_username, profile_password, profile_host, profile_pathname, profile_search, profile_hash); + + CREATE TABLE mention ( + mention_id TEXT NOT NULL PRIMARY KEY CHECK (typeof(mention_id) = 'text'), + account_id INTEGER NOT NULL CHECK (typeof(account_id) = 'integer') REFERENCES account(account_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED, + created INTEGER NOT NULL DEFAULT (strftime('%s', 'now')) CHECK (typeof(created) = 'integer'), + updated INTEGER CHECK (typeof(created) IN ('integer', 'null')), + target_url TEXT NOT NULL CHECK (typeof(target_url) = 'text'), + source_url TEXT NOT NULL CHECK (typeof(source_url) = 'text'), + data TEXT NOT NULL + ); + CREATE UNIQUE INDEX mention_target_url_source_url_unq ON mention(target_url, source_url); + + CREATE VIEW mention_detail AS + SELECT + a.profile_*, + m.mention_id, + m.created, + m.updated, + m.target_url, + m.source_url + FROM mention m JOIN account a USING (account_id) + ; + +-- + INSERT INTO _meta_schema_version (major, minor, patch) VALUES (1, 0, 0); +COMMIT; diff --git a/src/db/sqlite/sql/schema/1.0.0/revert.sql b/src/db/sqlite/sql/schema/1.0.0/revert.sql new file mode 100644 index 0000000..f2ab0fa --- /dev/null +++ b/src/db/sqlite/sql/schema/1.0.0/revert.sql @@ -0,0 +1,6 @@ +BEGIN; + DROP TABLE authentication; + +-- + DELETE FROM _meta_schema_version WHERE major = 1 AND minor = 0 AND patch = 0; +COMMIT; diff --git a/src/db/sqlite/sql/schema/init.sql b/src/db/sqlite/sql/schema/init.sql new file mode 100644 index 0000000..29dbe97 --- /dev/null +++ b/src/db/sqlite/sql/schema/init.sql @@ -0,0 +1,11 @@ +-- +BEGIN; + CREATE TABLE _meta_schema_version ( + major INTEGER NOT NULL, + minor INTEGER NOT NULL, + patch INTEGER NOT NULL, + applied INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), + PRIMARY KEY (major, minor, patch) + ) WITHOUT ROWID; + INSERT INTO _meta_schema_version (major, minor, patch) VALUES (0, 0, 0); +COMMIT; diff --git a/src/enum.js b/src/enum.js new file mode 100644 index 0000000..49d7684 --- /dev/null +++ b/src/enum.js @@ -0,0 +1,33 @@ +'use strict'; + +const common = require('./common'); +const { Enum: DingusEnum } = require('@squeep/api-dingus'); + +const Enum = common.mergeDeep(DingusEnum, { + Specification: 'draft-2017-01-12', + + ContentType: { + ApplicationAtom: 'application/atom+xml', + ApplicationOctetStream: 'application/octet-stream', + ApplicationRDF: 'application/rdf+xml', + ApplicationRSS: 'application/rss+xml', + ApplicationXML: 'application/xml', + ImageSVG: 'image/svg+xml', + TextXML: 'text/xml', + }, + + Header: { + Authorization: 'Authorization', + WWWAuthenticate: 'WWW-Authenticate', + }, + + ErrorCode: { + Forbidden: 'forbidden', + InvalidRequest: 'invalid_request', + InvalidToken: 'invalid_token', + Unauthorized: 'unauthorized', + }, + +}); + +module.exports = common.freezeDeep(Enum); \ No newline at end of file diff --git a/src/errors.js b/src/errors.js new file mode 100644 index 0000000..ea5425a --- /dev/null +++ b/src/errors.js @@ -0,0 +1,20 @@ +'use strict'; + +const { Errors } = require('@squeep/api-dingus'); + +class InternalInconsistencyError extends Error { + constructor(...args) { + super(...args); + Error.captureStackTrace(InternalInconsistencyError); + } + + get name() { + /* istanbul ignore next */ + return this.constructor.name; + } +} + +module.exports = { + ...Errors, + InternalInconsistencyError, +}; \ No newline at end of file diff --git a/src/logger/data-sanitizers.js b/src/logger/data-sanitizers.js new file mode 100644 index 0000000..5df3b86 --- /dev/null +++ b/src/logger/data-sanitizers.js @@ -0,0 +1,25 @@ +'use strict'; + +/** + * Scrub credential from POST login body data. + * @param {Object} data + * @param {Boolean} sanitize + * @returns {Boolean} + */ +function sanitizePostAccessToken(data, sanitize = true) { + let unclean = false; + + const accessTokenLength = data?.ctx?.parsedBody?.['access_token']?.length; + if (accessTokenLength) { + unclean = true; + } + if (unclean && sanitize) { + data.ctx.parsedBody['access_token'] = '*'.repeat(accessTokenLength); + } + + return unclean; +} + +module.exports = { + sanitizePostAccessToken, +}; \ No newline at end of file diff --git a/src/logger/index.js b/src/logger/index.js new file mode 100644 index 0000000..7944cf6 --- /dev/null +++ b/src/logger/index.js @@ -0,0 +1,13 @@ +'use strict'; + +const BaseLogger = require('@squeep/logger-json-console'); +const dataSanitizers = require('./data-sanitizers'); + +class Logger extends BaseLogger { + constructor(options, ...args) { + super(options, ...args); + Array.prototype.push.apply(this.dataSanitizers, Object.values(dataSanitizers)); + } +} + +module.exports = Logger; \ No newline at end of file diff --git a/src/manager.js b/src/manager.js new file mode 100644 index 0000000..356c6a9 --- /dev/null +++ b/src/manager.js @@ -0,0 +1,145 @@ +'use strict'; + +/** + * Here we process all the incoming requests. + */ + +const common = require('./common'); +const Enum = require('./enum'); +const Errors = require('./errors'); +const Template = require('./template'); + +const _fileScope = common.fileScope(__filename); + +class Manager { + /** + * @param {Console} logger + * @param {*} db + * @param {Object} options + */ + constructor(logger, db, options) { + this.logger = logger; + this.db = db; + this.options = options; + } + + + // eslint-disable-next-line class-methods-use-this + async initialize() { + /* */ + } + + + /** + * GET request for healthcheck. + * @param {http.ServerResponse} res + * @param {Object} ctx + */ + async getHealthcheck(res, ctx) { + const _scope = _fileScope('getHealthcheck'); + const health = 'happy'; + + // What else could we check... + const dbHealth = await this.db.healthCheck(); + this.logger.debug(_scope, 'called', { health, dbHealth, ctx }); + res.end(health); + } + + + /** + * GET request for root. + * @param {http.ServerResponse} res + * @param {Object} ctx + */ + async getRoot(res, ctx) { + const _scope = _fileScope('getRoot'); + this.logger.debug(_scope, 'called', { ctx }); + + res.end(Template.rootHTML(ctx, this.options)); + this.logger.info(_scope, 'finished', { ctx }); + } + + + /** + * + * @param {http.ServerResponse} res + * @param {Object} ctx + */ + async getMention(res, ctx) { + const _scope = _fileScope('getMention'); + this.logger.debug(_scope, 'called', { ctx }); + + const { mentionId } = ctx.params['mentionId']; + + await this.db.context(async (dbCtx) => { + ctx.mentionData = this.db.mentionGetById(dbCtx, mentionId); + }); // dbCtx + + if (!ctx.mentionData) { + this.logger.debug(_scope, 'invalid mentionId', { ctx }); + throw new Errors.ResponseError(Enum.ErrorResponse.NotFound); + } + + res.end(Template.mentionHTML(ctx, this.options)); + + this.logger.info(_scope, 'finished', { ctx }); + } + + + /** + * + * @param {String} mentionId uuid + * @returns {String} + */ + _mentionLocationUrl(mentionId) { + return `${this.selfBaseUrl}/mention/${mentionId}`; + } + + + /** + * + * @param {} res + * @param {*} ctx + */ + async postMention(res, ctx) { + const _scope = _fileScope('postMention'); + this.logger.debug(_scope, 'called', { ctx }); + + let sourceURL, targetURL; + try { + sourceURL = new URL(ctx.parsedBody['source']); + targetURL = new URL(ctx.parsedBody['target']); + } catch (error) { + this.logger.debug(_scope, 'failed to parse parameters', { error, ctx, sourceURL, targetURL }); + throw new Errors.ResponseError(Enum.ErrorResponse.BadRequest); + } + + sourceURL.hash = ''; + targetURL.hash = ''; + if (sourceURL.href === targetURL.href) { + this.logger.debug(_scope, 'duplicate source and target', { ctx, sourceURL, targetURL }); + throw new Errors.ResponseError(Enum.ErrorResponse.BadRequest); + } + + await this.db.context(async (dbCtx) => { + const account = this.db.accountGetByProfilePrefix(dbCtx, targetURL); + if (!account) { + this.logger.debug(_scope, 'no account matches target', { ctx, targetURL }); + throw new Errors.ResponseError(Enum.ErrorResponse.BadRequest); + } + + ctx.mentionId = this.db.mentionUpsert(dbCtx, sourceURL, targetURL); + }); // dbCtx + + res.statusCode = 201; // Created + res.setHeader(Enum.Header.Location, this._mentionLocationUrl(ctx.mentionId)); + res.end(); + + // queue for processing + + this.logger.info(_scope, 'finished', { ctx }); + } + +} + +module.exports = Manager; \ No newline at end of file diff --git a/src/service.js b/src/service.js new file mode 100644 index 0000000..fd8219c --- /dev/null +++ b/src/service.js @@ -0,0 +1,152 @@ +'use strict'; + +/** + * Here we extend the base API server to define our routes and any route-specific + * behavior (middlewares) before handing off to the manager. + */ + +const { Dingus } = require('@squeep/api-dingus'); +const common = require('./common'); +const Enum = require('./enum'); +const Manager = require('./manager'); +const Authenticator = require('./authenticator'); +const path = require('path'); + +const _fileScope = common.fileScope(__filename); + +class Service extends Dingus { + constructor(logger, db, options) { + super(logger, { + ...options.dingus, + ignoreTrailingSlash: false, + }); + + // We are primarily an API service. + this.responseTypes = [ + Enum.ContentType.ApplicationJson, + Enum.ContentType.TextPlain, + ]; + + this.manager = new Manager(logger, db, options); + this.authenticator = new Authenticator(logger, db, options); + + this.staticPath = path.normalize(path.join(__dirname, '..', 'static')); + + // Information about this service + this.on(['GET', 'HEAD'], '/', this.handlerGetRoot.bind(this)); + + // Primary API + this.on(['GET', 'HEAD'], '/mention/:mentionId', this.handlerGetMention.bind(this)); + this.on(['POST'], '/mention', this.handlerPostMention.bind(this)); + + // Give load-balancers something to check + this.on(['GET', 'HEAD'], '/healthcheck', this.handlerGetHealthcheck.bind(this)); + + // These routes are intended for accessing static content during development. + // In production, a proxy server would likely handle these first. + this.on(['GET', 'HEAD'], '/static', this.handlerRedirect.bind(this), `${options.dingus.proxyPrefix}/static/`); + this.on(['GET', 'HEAD'], '/static/', this.handlerGetStaticFile.bind(this), 'index.html'); + this.on(['GET', 'HEAD'], '/static/:file', this.handlerGetStaticFile.bind(this)); + this.on(['GET', 'HEAD'], '/favicon.ico', this.handlerGetStaticFile.bind(this), 'favicon.ico'); + this.on(['GET', 'HEAD'], '/robots.txt', this.handlerGetStaticFile.bind(this), 'robots.txt'); + } + + + async initialize() { + await this.manager.initialize(); + } + + /** + * Only include error details for JSON response. + * @param {String} contentType + * @param {Object} err + * @param {String} err.details + */ + renderError(contentType, err) { + if (contentType === Enum.ContentType.ApplicationJson + && err.details) { + return JSON.stringify(err.details); + } + return super.renderError(contentType, err); + } + + + /** + * @param {http.IncomingMessage} req + * @param {http.ServerResponse} res + * @param {Object} ctx + */ + async handlerGetRoot(req, res, ctx) { + const _scope = _fileScope('handlerGetRoot'); + const responseTypes = [ + Enum.ContentType.TextHTML, + Enum.ContentType.TextPlain, + ]; + this.logger.debug(_scope, 'called', { req, ctx }); + + Dingus.setHeadHandler(req, res, ctx); + + this.setResponseType(responseTypes, req, res, ctx); + + await this.manager.getRoot(res, ctx); + } + + + /** + * @param {http.IncomingMessage} req + * @param {http.ServerResponse} res + * @param {Object} ctx + */ + async handlerGetMention(req, res, ctx) { + const _scope = _fileScope('handlerGetMicropub'); + this.logger.debug(_scope, 'called', { req, ctx }); + + Dingus.setHeadHandler(req, res, ctx); + + this.setResponseType(this.responseTypes, req, res, ctx); + + await this.authenticator.required(req, res, ctx); + + await this.manager.getMention(res, ctx); + } + + + /** + * @param {http.IncomingMessage} req + * @param {http.ServerResponse} res + * @param {Object} ctx + */ + async handlerPostMention(req, res, ctx) { + const _scope = _fileScope('handlerPostMicropub'); + this.logger.debug(_scope, 'called', { req, ctx }); + + this.setResponseType(this.responseTypes, req, res, ctx); + + await this.ingestBody(req, res, ctx); + + await this.authenticator.required(req, res, ctx); + + await this.manager.postMention(res, ctx); + } + + + /** + * @param {http.IncomingMessage} req + * @param {http.ServerResponse} res + * @param {Object} ctx + */ + async handlerGetHealthcheck(req, res, ctx) { + const _scope = _fileScope('handlerGetHealthcheck'); + this.logger.debug(_scope, 'called', { req, ctx }); + + Dingus.setHeadHandler(req, res, ctx); + + this.setResponseType(this.responseTypes, req, res, ctx); + + await this.manager.getHealthcheck(res, ctx); + } + + +} + +module.exports = Service; diff --git a/src/template/index.js b/src/template/index.js new file mode 100644 index 0000000..ed19a6a --- /dev/null +++ b/src/template/index.js @@ -0,0 +1,6 @@ +'use strict'; + +module.exports = { + rootHTML: require('./root-html'), + mentionHTML: require('./mention-html'), +}; \ No newline at end of file diff --git a/src/template/mention-html.js b/src/template/mention-html.js new file mode 100644 index 0000000..f74c3d1 --- /dev/null +++ b/src/template/mention-html.js @@ -0,0 +1,23 @@ +'use strict'; + +const th = require('./template-helper'); + +function renderMention(mentionData) { + return `
+${JSON.stringify(mentionData)} +
`; +} + +module.exports = (ctx, options) => { + const htmlOptions = { + pageTitle: options.manager.pageTitle, + logoUrl: options.manager.logoUrl, + footerEntries: options.manager.footerEntries, + navLinks: [], + }; + const content = [ + renderMention(ctx.mentionData), + ]; + + return th.htmlPage(0, ctx, htmlOptions, content); +}; \ No newline at end of file diff --git a/src/template/root-html.js b/src/template/root-html.js new file mode 100644 index 0000000..d131858 --- /dev/null +++ b/src/template/root-html.js @@ -0,0 +1,49 @@ +'use strict'; + +const th = require('./template-helper'); + +function aboutSection() { + return ` +\t
+\t\t

What

+\t\t

+This is a Webmention service. +\t\t

+\t
`; +} + +function contactSection(contactHTML) { + if (contactHTML) { + return ` +\t
+${contactHTML} +\t
`; + } + return ''; +} + +/** + * @param {Object} ctx + * @param {Object} options + * @param {Object} options.manager + * @param {String} options.adminContactHTML + * @param {String} options.manager.pageTitle + * @param {String} options.manager.logoUrl + * @param {String[]} options.manager.footerEntries + * @returns {String} + */ +module.exports = (ctx, options) => { + const contactHTML = options.adminContactHTML; + const htmlOptions = { + pageTitle: options.manager.pageTitle, + logoUrl: options.manager.logoUrl, + footerEntries: options.manager.footerEntries, + navLinks: [], + }; + const content = [ + aboutSection(), + contactSection(contactHTML), + ]; + + return th.htmlPage(0, ctx, htmlOptions, content); +}; \ No newline at end of file diff --git a/src/template/template-helper.js b/src/template/template-helper.js new file mode 100644 index 0000000..e1d2dde --- /dev/null +++ b/src/template/template-helper.js @@ -0,0 +1,7 @@ +'use strict'; + +const { TemplateHelper } = require('@squeep/html-template-helper'); + +module.exports = { + ...TemplateHelper, +}; \ No newline at end of file diff --git a/static/.external-link.svg.meta b/static/.external-link.svg.meta new file mode 100644 index 0000000..5be8006 --- /dev/null +++ b/static/.external-link.svg.meta @@ -0,0 +1,2 @@ +Source: https://commons.wikimedia.org/wiki/File:VisualEditor_-_Icon_-_External-link.svg +License: https://commons.wikimedia.org/wiki/Category:Expat/MIT_License diff --git a/static/custom.css b/static/custom.css new file mode 100644 index 0000000..7a7a218 --- /dev/null +++ b/static/custom.css @@ -0,0 +1,6 @@ +header { + background: linear-gradient(0deg, rgba(255,255,255,0) 0%, rgb(230, 230, 230) 100%); +} +footer { + background: linear-gradient(180deg, rgba(255,255,255,0) 0%, rgb(230, 230, 230) 100%); +} diff --git a/static/external-link.svg b/static/external-link.svg new file mode 100644 index 0000000..ae7d45a --- /dev/null +++ b/static/external-link.svg @@ -0,0 +1,15 @@ + + + + + + + + + + + + diff --git a/static/index.html b/static/index.html new file mode 100644 index 0000000..9437f20 --- /dev/null +++ b/static/index.html @@ -0,0 +1,16 @@ + + + + + Static Assets + + + +
+

Static Assets

+
+
+ welcome to my static +
+ + diff --git a/static/robots.txt b/static/robots.txt new file mode 100644 index 0000000..1f53798 --- /dev/null +++ b/static/robots.txt @@ -0,0 +1,2 @@ +User-agent: * +Disallow: / diff --git a/static/theme.css b/static/theme.css new file mode 100644 index 0000000..3d079c2 --- /dev/null +++ b/static/theme.css @@ -0,0 +1,172 @@ +html { + height: 100vh; +} +body { + background-color: #fff; + font-family: Helvetica, Verdana, sans-serif; + margin: 0 1em 0 1em; + min-height: 100vh; + display: flex; + flex-direction: column; +} +header {} +header nav { + margin-bottom: 1em; +} +header nav ol { + list-style-type: none; + margin: 0; + padding: 0; +} +header nav ol li { + display: inline; + text-align: center; + border-top: 2px solid #666; + border-bottom: 2px solid #666; + border-left: 1px solid #666; + border-right: 1px solid #666; + padding: .3em .5em .2em .5em; +} +header nav ol li:hover { + background-color: #ddd; +} +header nav ol > li:first-child { + border-left: 2px solid #666; +} +header nav ol > li:last-child { + border-right: 2px solid #666; +} +header nav ol a { + font-variant-caps: small-caps; + text-decoration: none; + font-weight: bold; +} +h1 { + margin-top: 1em; + margin-bottom: 1.25em; + text-align: center; +} +h2 { + background-color: #ddd; + padding: .25em 0 .1em 0.25em; +} +main { + flex-grow: 1; +} +section {} +.logo { + vertical-align: middle; + height: 2em; +} +.about {} +.usage {} +.copyright { + font-size: small; +} +.error ul { + font-weight: bolder; + border: 2px solid red; + padding: 1em; + background-color: lightsalmon; +} +.notification ul { + background-color: aliceblue; + border: 1px solid slateblue; + padding: 1em; +} +.external { + background-image: url("external-link.svg"); + background-position: right center; + background-repeat: no-repeat; + padding-right: 13px; +} +.information img { + max-width: 4em; + max-height: 4em; + width: 100%; + height: auto; + vertical-align: middle; +} +.uri { + font-family: Courier, monospace, serif; + font-size: 1em; + background-color: lavender; + padding: .16em; +} +.code { + font-family: Courier, monospace, serif; + font-size: .75em; + white-space: nowrap; + overflow-x: hidden; +} +.client-identifier { + display: inline-block; + height: max-content; + padding: .5em; + border: 1px dotted #999; + margin-bottom: .5em; +} +.scope { + list-style-type: none; +} +.scope label { + font-variant: small-caps; + font-weight: bold; +} +.scope .description { + font-size: smaller; + font-style: italic; +} +.scope .disabled { + color: grey; + background-color: #eee; +} +.form-consent button { + border-width: thick; + font-size: x-large; + padding: .5em; + margin-left: .75em; + margin-right: .75em; +} +.button-accept { + background-color: lightgreen; + border-color: lightgreen; +} +.button-decline { + background-color: salmon; + border-color: salmon; +} +.vertical { + writing-mode: vertical-lr; + vertical-align: bottom; +} +table { + border: 0; + width: 100%; +} +thead tr th { + background-color: #ddd; + vertical-align: bottom; + text-align: start; +} +tbody tr th { + text-align: start; +} +tbody tr:nth-child(even) td, tbody tr:nth-child(even) th { + background-color: #eee; +} +tbody tr:nth-child(odd) td, tbody tr:nth-child(odd) th {} +footer { + text-align: center; + width: 100%; + border-top: .33em dotted #666; + margin-top: 1em; +} +footer ol { + list-style-type: none; + margin: .5em; + padding: 0; +} +.centered { + text-align: center; +} \ No newline at end of file