"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.PostgreSQLStore = void 0; const tslib_1 = require("tslib"); const CascadeStore_1 = require("oak-domain/lib/store/CascadeStore"); const connector_1 = require("./connector"); const translator_1 = require("./translator"); const lodash_1 = require("lodash"); const assert_1 = tslib_1.__importDefault(require("assert")); const relation_1 = require("oak-domain/lib/store/relation"); const ToNumberAttrs = new Set([ '$$seq$$', '$$createAt$$', '$$updateAt$$', '$$deleteAt$$', ]); function convertGeoTextToObject(geoText) { if (geoText.startsWith('POINT')) { const coord = geoText.match(/(-?\d+\.?\d*)/g); (0, assert_1.default)(coord && coord.length === 2); return { type: 'point', coordinate: coord.map(ele => parseFloat(ele)), }; } else if (geoText.startsWith('LINESTRING')) { const coordsMatch = geoText.match(/\(([^)]+)\)/); if (coordsMatch) { const points = coordsMatch[1].split(',').map(p => { const [x, y] = p.trim().split(/\s+/).map(parseFloat); return [x, y]; }); return { type: 'path', coordinate: points, }; } } else if (geoText.startsWith('POLYGON')) { const ringsMatch = geoText.match(/\(\(([^)]+)\)\)/g); if (ringsMatch) { const rings = ringsMatch.map(ring => { const coordStr = ring.replace(/[()]/g, ''); return coordStr.split(',').map(p => { const [x, y] = p.trim().split(/\s+/).map(parseFloat); return [x, y]; }); }); return { type: 'polygon', coordinate: rings, }; } } throw new Error(`Unsupported geometry type: ${geoText.slice(0, 50)}`); } class PostgreSQLStore extends CascadeStore_1.CascadeStore { countAbjointRow(entity, selection, context, option) { throw new Error('PostgreSQL store 不支持同步取数据'); } aggregateAbjointRowSync(entity, aggregation, context, option) { throw new Error('PostgreSQL store 不支持同步取数据'); } selectAbjointRow(entity, selection, context, option) { throw new Error('PostgreSQL store 不支持同步取数据'); } updateAbjointRow(entity, operation, context, option) { throw new Error('PostgreSQL store 不支持同步更新数据'); } async exec(script, txnId) { await this.connector.exec(script, txnId); } connector; translator; constructor(storageSchema, configuration) { super(storageSchema); this.connector = new connector_1.PostgreSQLConnector(configuration); this.translator = new translator_1.PostgreSQLTranslator(storageSchema); } checkRelationAsync(entity, operation, context) { throw new Error('Method not implemented.'); } async aggregateAbjointRowAsync(entity, aggregation, context, option) { const sql = this.translator.translateAggregate(entity, aggregation, option); const result = await this.connector.exec(sql, context.getCurrentTxnId()); return this.formResult(entity, result[0]); } aggregate(entity, aggregation, context, option) { return this.aggregateAsync(entity, aggregation, context, option); } supportManyToOneJoin() { return true; } supportMultipleCreate() { return true; } formResult(entity, result) { const schema = this.getSchema(); function resolveAttribute(entity2, r, attr, value) { const { attributes, view } = schema[entity2]; if (!view) { const i = attr.indexOf("."); if (i !== -1) { const attrHead = attr.slice(0, i); const attrTail = attr.slice(i + 1); const rel = (0, relation_1.judgeRelation)(schema, entity2, attrHead); if (rel === 1) { (0, lodash_1.set)(r, attr, value); } else { if (!r[attrHead]) { r[attrHead] = {}; } if (rel === 0) { resolveAttribute(entity2, r[attrHead], attrTail, value); } else if (rel === 2) { resolveAttribute(attrHead, r[attrHead], attrTail, value); } else { (0, assert_1.default)(typeof rel === 'string'); resolveAttribute(rel, r[attrHead], attrTail, value); } } } else if (attributes[attr]) { const { type } = attributes[attr]; switch (type) { case 'date': case 'time': case 'datetime': { if (value instanceof Date) { r[attr] = value.valueOf(); } else { if (typeof value === 'string') { r[attr] = parseInt(value, 10); } else { (0, assert_1.default)(typeof value === 'number' || value === null); r[attr] = value; } } break; } case 'geometry': { if (typeof value === 'string') { r[attr] = convertGeoTextToObject(value); } else { r[attr] = value; } break; } case 'object': case 'array': { // PostgreSQL jsonb 直接返回对象,不需要 parse if (typeof value === 'string') { r[attr] = JSON.parse(value); } else { r[attr] = value; } break; } case 'function': { if (typeof value === 'string') { r[attr] = `return ${Buffer.from(value, 'base64').toString()}`; } else { r[attr] = value; } break; } case 'bool': case 'boolean': { // PostgreSQL 直接返回 boolean 类型 r[attr] = value; break; } case 'decimal': { // PostgreSQL numeric 类型可能返回字符串 if (typeof value === 'string') { r[attr] = parseFloat(value); } else { (0, assert_1.default)(value === null || typeof value === 'number'); r[attr] = value; } break; } // TODO: 这里和mysql统一行为,ref类型的字符串去除前后空格 case "char": case "ref": { if (value) { (0, assert_1.default)(typeof value === 'string'); r[attr] = value.trim(); } else { r[attr] = value; } break; } default: { r[attr] = value; } } } else { // TODO: 这里和mysql统一行为,id字段为char类型时,去除后面的空格 if (value && typeof value === 'string') { if (attr === 'id') { r[attr] = value.trim(); } else if (attr.startsWith("#count")) { // PostgreSQL count 返回字符串 r[attr] = parseInt(value, 10); } else if (attr.startsWith("#sum") || attr.startsWith("#avg") || attr.startsWith("#min") || attr.startsWith("#max")) { // PostgreSQL sum/avg/min/max 返回字符串 r[attr] = parseFloat(value); } else if (ToNumberAttrs.has(attr)) { // PostgreSQL sum/avg/min/max 返回字符串 r[attr] = parseInt(value, 10); } else { r[attr] = value; } } else { r[attr] = value; } } } else { (0, lodash_1.assign)(r, { [attr]: value }); } } function removeNullObjects(r, e) { for (let attr in r) { const rel = (0, relation_1.judgeRelation)(schema, e, attr); if (rel === 2) { if (r[attr].id === null) { (0, assert_1.default)(schema[e].toModi || r.entity !== attr); delete r[attr]; continue; } removeNullObjects(r[attr], attr); } else if (typeof rel === 'string') { if (r[attr].id === null) { (0, assert_1.default)(schema[e].toModi || r[`${attr}Id`] === null, `对象${String(e)}取数据时,发现其外键找不到目标对象,rowId是${r.id},其外键${attr}Id值为${r[`${attr}Id`]}`); delete r[attr]; continue; } removeNullObjects(r[attr], rel); } } } function formSingleRow(r) { let result2 = {}; for (let attr in r) { const value = r[attr]; resolveAttribute(entity, result2, attr, value); } removeNullObjects(result2, entity); return result2; } if (result instanceof Array) { return result.map(r => formSingleRow(r)); } return formSingleRow(result); } async selectAbjointRowAsync(entity, selection, context, option) { const sql = this.translator.translateSelect(entity, selection, option); const result = await this.connector.exec(sql, context.getCurrentTxnId()); return this.formResult(entity, result[0]); } async updateAbjointRowAsync(entity, operation, context, option) { const { translator, connector } = this; const { action } = operation; const txn = context.getCurrentTxnId(); switch (action) { case 'create': { const { data } = operation; const sql = translator.translateInsert(entity, data instanceof Array ? data : [data]); const result = await connector.exec(sql, txn); // PostgreSQL QueryResult.rowCount return result[1].rowCount || 0; } case 'remove': { const sql = translator.translateRemove(entity, operation, option); const result = await connector.exec(sql, txn); return result[1].rowCount || 0; } default: { (0, assert_1.default)(!['select', 'download', 'stat'].includes(action)); const sql = translator.translateUpdate(entity, operation, option); const result = await connector.exec(sql, txn); return result[1].rowCount || 0; } } } async operate(entity, operation, context, option) { const { action } = operation; (0, assert_1.default)(!['select', 'download', 'stat'].includes(action), '不支持使用 select operation'); return await super.operateAsync(entity, operation, context, option); } async select(entity, selection, context, option) { return await super.selectAsync(entity, selection, context, option); } async countAbjointRowAsync(entity, selection, context, option) { const sql = this.translator.translateCount(entity, selection, option); const result = await this.connector.exec(sql, context.getCurrentTxnId()); // PostgreSQL 返回的 count 是 string 类型(bigint) const cnt = result[0][0]?.cnt; return typeof cnt === 'string' ? parseInt(cnt, 10) : (cnt || 0); } async count(entity, selection, context, option) { return this.countAsync(entity, selection, context, option); } async begin(option) { return await this.connector.startTransaction(option); } async commit(txnId) { await this.connector.commitTransaction(txnId); } async rollback(txnId) { await this.connector.rollbackTransaction(txnId); } async connect() { await this.connector.connect(); } async disconnect() { await this.connector.disconnect(); } async initialize(option) { const schema = this.getSchema(); // ===== 第一阶段:事务外创建扩展 ===== let hasGeoType = false; let hasChineseTsConfig = false; let chineseParser = null; // 扫描 schema for (const entity in schema) { const { attributes, indexes } = schema[entity]; for (const attr in attributes) { if (attributes[attr].type === 'geometry') { hasGeoType = true; } } for (const index of indexes || []) { if (index.config?.tsConfig === 'chinese' || index.config?.tsConfig?.includes('chinese')) { hasChineseTsConfig = true; } if (index.config?.chineseParser) { (0, assert_1.default)(!chineseParser || chineseParser === index.config.chineseParser, '当前定义了多个中文分词器,请保持一致'); chineseParser = index.config.chineseParser; } } } // 在事务外创建扩展 if (hasGeoType) { console.log('Initializing PostGIS extension for geometry support...'); await this.connector.exec('CREATE EXTENSION IF NOT EXISTS postgis;'); } if (hasChineseTsConfig) { console.log('Initializing Chinese parser extension...'); await this.connector.exec(`CREATE EXTENSION IF NOT EXISTS ${chineseParser || 'zhparser'};`); } // ===== 第二阶段:事务内创建配置和表 ===== const txn = await this.connector.startTransaction({ isolationLevel: 'serializable', }); try { // 创建中文文本搜索配置 if (hasChineseTsConfig) { console.log('Initializing Chinese text search configuration...'); const checkChineseConfigSql = ` SELECT COUNT(*) as cnt FROM pg_catalog.pg_ts_config WHERE cfgname = 'chinese'; `; const result = await this.connector.exec(checkChineseConfigSql, txn); const count = parseInt(result[0][0]?.cnt || '0', 10); if (count === 0) { const createChineseConfigSql = ` CREATE TEXT SEARCH CONFIGURATION chinese (PARSER = ${chineseParser || 'zhparser'}); ALTER TEXT SEARCH CONFIGURATION chinese ADD MAPPING FOR n,v,a,i,e,l WITH simple; `; await this.connector.exec(createChineseConfigSql, txn); } } // 创建实体表 for (const entity in schema) { const sqls = this.translator.translateCreateEntity(entity, option); for (const sql of sqls) { await this.connector.exec(sql, txn); } } await this.connector.commitTransaction(txn); } catch (error) { await this.connector.rollbackTransaction(txn); throw error; } } // 从数据库中读取当前schema readSchema() { return this.translator.readSchema((sql) => this.connector.exec(sql)); } /** * 根据载入的dataSchema,和数据库中原来的schema,决定如何来upgrade * 制订出来的plan分为两阶段:增加阶段和削减阶段,在两个阶段之间,由用户来修正数据 */ async makeUpgradePlan() { const originSchema = await this.readSchema(); const plan = this.diffSchema(originSchema, this.translator.schema); return plan; } /** * 比较两个schema的不同,这里计算的是new对old的增量 * @param schemaOld * @param schemaNew */ diffSchema(schemaOld, schemaNew) { const plan = { newTables: {}, newIndexes: {}, updatedIndexes: {}, updatedTables: {}, }; for (const table in schemaNew) { // PostgreSQL 表名区分大小写(使用双引号时) if (schemaOld[table]) { const { attributes, indexes } = schemaOld[table]; const { attributes: attributesNew, indexes: indexesNew } = schemaNew[table]; const assignToUpdateTables = (attr, isNew) => { const skipAttrs = ['$$seq$$', '$$createAt$$', '$$updateAt$$', '$$deleteAt$$', 'id']; if (skipAttrs.includes(attr)) { return; } if (!plan.updatedTables[table]) { plan.updatedTables[table] = { attributes: { [attr]: { ...attributesNew[attr], isNew, } } }; } else { plan.updatedTables[table].attributes[attr] = { ...attributesNew[attr], isNew, }; } }; for (const attr in attributesNew) { if (attributes[attr]) { // 比较两次创建的属性定义是否一致 const sql1 = this.translator.translateAttributeDef(attr, attributesNew[attr]); const sql2 = this.translator.translateAttributeDef(attr, attributes[attr]); if (!this.translator.compareSql(sql1, sql2)) { assignToUpdateTables(attr, false); } } else { assignToUpdateTables(attr, true); } } if (indexesNew) { const assignToIndexes = (index, isNew) => { if (isNew) { if (plan.newIndexes[table]) { plan.newIndexes[table].push(index); } else { plan.newIndexes[table] = [index]; } } else { if (plan.updatedIndexes[table]) { plan.updatedIndexes[table].push(index); } else { plan.updatedIndexes[table] = [index]; } } }; const compareConfig = (config1, config2) => { const unique1 = config1?.unique || false; const unique2 = config2?.unique || false; if (unique1 !== unique2) { return false; } const type1 = config1?.type || 'btree'; const type2 = config2?.type || 'btree'; // tsConfig 比较 const tsConfig1 = config1?.tsConfig; const tsConfig2 = config2?.tsConfig; if (JSON.stringify(tsConfig1) !== JSON.stringify(tsConfig2)) { return false; } return type1 === type2; }; for (const index of indexesNew) { const { name, config, attributes: indexAttrs } = index; const origin = indexes?.find(ele => ele.name === name); if (origin) { if (JSON.stringify(indexAttrs) !== JSON.stringify(origin.attributes)) { assignToIndexes(index, false); } else { if (!compareConfig(config, origin.config)) { assignToIndexes(index, false); } } } else { assignToIndexes(index, true); } } } } else { plan.newTables[table] = { attributes: schemaNew[table].attributes, }; if (schemaNew[table].indexes) { plan.newIndexes[table] = schemaNew[table].indexes; } } } return plan; } } exports.PostgreSQLStore = PostgreSQLStore;