Compare commits

..

15 Commits
3.3.12 ... dev

Author SHA1 Message Date
Pan Qiancheng c080b15078 fix: 完善DbStore类型,在init时自动创建扩展 2026-01-23 10:53:10 +08:00
Xu Chang ec085ddfd1 3.3.14-dev 2026-01-21 09:55:09 +08:00
Xu Chang 411f4db18f 3.3.13-pub 2026-01-21 09:53:56 +08:00
Pan Qiancheng 62bc866606 Merge branch 'dev' of https://gitea.51mars.com/Oak-Team/oak-db into dev 2026-01-21 09:50:21 +08:00
Pan Qiancheng eee2f7c874 fix: 去除了多余的添加deleteAt字段的逻辑 2026-01-21 09:50:14 +08:00
Xu Chang cb8b0428b4 Merge branch 'dev' of gitea.51mars.com:Oak-Team/oak-db into dev 2026-01-21 09:42:31 +08:00
Xu Chang be10547065 漏了一根索引上的deleteAt 2026-01-21 09:42:23 +08:00
Pan Qiancheng 49dc9141de feat: 完善类型支持,在pg中提供部分ddl支持 2026-01-20 18:03:33 +08:00
Pan Qiancheng 5a5ac5c194 feat: 支持在init时自动创建地理和paser插件,以及将初始化包裹在事务中 2026-01-20 12:04:57 +08:00
Pan Qiancheng b4e0b08ba7 fix: 统一datatime类型的处理行为 2026-01-20 11:00:51 +08:00
Pan Qiancheng 195e97b3d9 fix: 修复了部分内置字段查询结果为string导致的上层转换问题 2026-01-20 10:52:53 +08:00
Pan Qiancheng c5456b3fcb test: 新增测试用例:嵌套跨节点表达式 2026-01-13 15:04:08 +08:00
Pan Qiancheng a5cb652468 Merge branch 'dev' of https://gitea.51mars.com/Oak-Team/oak-db into dev 2026-01-13 15:02:59 +08:00
Pan Qiancheng 8f0319c648 fix: 修复一处mysql的filter深层调用节点表达式时会出现的bug 2026-01-13 15:02:37 +08:00
Xu Chang 5ab7e9e43b 3.3.13-dev 2026-01-09 15:44:32 +08:00
20 changed files with 1620 additions and 191 deletions

17
lib/MySQL/store.d.ts vendored
View File

@ -1,4 +1,4 @@
import { EntityDict, OperateOption, OperationResult, TxnOption, StorageSchema, SelectOption, AggregationResult, Attribute, Index } from 'oak-domain/lib/types'; import { EntityDict, OperateOption, OperationResult, TxnOption, StorageSchema, SelectOption, AggregationResult } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { CascadeStore } from 'oak-domain/lib/store/CascadeStore'; import { CascadeStore } from 'oak-domain/lib/store/CascadeStore';
import { MySQLConfiguration } from './types/Configuration'; import { MySQLConfiguration } from './types/Configuration';
@ -7,7 +7,7 @@ import { MySqlTranslator, MySqlSelectOption, MysqlOperateOption } from './transl
import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore'; import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
import { SyncContext } from 'oak-domain/lib/store/SyncRowStore'; import { SyncContext } from 'oak-domain/lib/store/SyncRowStore';
import { CreateEntityOption } from '../types/Translator'; import { CreateEntityOption } from '../types/Translator';
import { DbStore } from '../types/dbStore'; import { DbStore, Plan } from '../types/dbStore';
export declare class MysqlStore<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> extends CascadeStore<ED> implements DbStore<ED, Cxt> { export declare class MysqlStore<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> extends CascadeStore<ED> implements DbStore<ED, Cxt> {
protected countAbjointRow<T extends keyof ED, OP extends SelectOption, Cxt extends SyncContext<ED>>(entity: T, selection: Pick<ED[T]['Selection'], 'filter' | 'count'>, context: Cxt, option: OP): number; protected countAbjointRow<T extends keyof ED, OP extends SelectOption, Cxt extends SyncContext<ED>>(entity: T, selection: Pick<ED[T]['Selection'], 'filter' | 'count'>, context: Cxt, option: OP): number;
protected aggregateAbjointRowSync<T extends keyof ED, OP extends SelectOption, Cxt extends SyncContext<ED>>(entity: T, aggregation: ED[T]['Aggregation'], context: Cxt, option: OP): AggregationResult<ED[T]['Schema']>; protected aggregateAbjointRowSync<T extends keyof ED, OP extends SelectOption, Cxt extends SyncContext<ED>>(entity: T, aggregation: ED[T]['Aggregation'], context: Cxt, option: OP): AggregationResult<ED[T]['Schema']>;
@ -48,16 +48,3 @@ export declare class MysqlStore<ED extends EntityDict & BaseEntityDict, Cxt exte
*/ */
diffSchema(schemaOld: StorageSchema<any>, schemaNew: StorageSchema<any>): Plan; diffSchema(schemaOld: StorageSchema<any>, schemaNew: StorageSchema<any>): Plan;
} }
type Plan = {
newTables: Record<string, {
attributes: Record<string, Attribute>;
}>;
newIndexes: Record<string, Index<any>[]>;
updatedTables: Record<string, {
attributes: Record<string, Attribute & {
isNew: boolean;
}>;
}>;
updatedIndexes: Record<string, Index<any>[]>;
};
export {};

View File

@ -354,7 +354,7 @@ class MysqlStore extends CascadeStore_1.CascadeStore {
}; };
for (const attr in attributesNew) { for (const attr in attributesNew) {
if (attributes[attr]) { if (attributes[attr]) {
// 因为反向无法复原原来定义的attribute类型这里就比较两次创建的sql是不是一致 // 因为反向无法复原原来定义的attribute类型这里就比较两次创建的sql是不是一致,不是太好的设计
const sql1 = this.translator.translateAttributeDef(attr, attributesNew[attr]); const sql1 = this.translator.translateAttributeDef(attr, attributesNew[attr]);
const sql2 = this.translator.translateAttributeDef(attr, attributes[attr]); const sql2 = this.translator.translateAttributeDef(attr, attributes[attr]);
if (!this.translator.compareSql(sql1, sql2)) { if (!this.translator.compareSql(sql1, sql2)) {

View File

@ -872,8 +872,10 @@ class MySqlTranslator extends sqlTranslator_1.SqlTranslator {
const refId = (expr)['#refId']; const refId = (expr)['#refId'];
const refAttr = (expr)['#refAttr']; const refAttr = (expr)['#refAttr'];
(0, assert_1.default)(refDict[refId]); (0, assert_1.default)(refDict[refId]);
const attrText = `\`${refDict[refId][0]}\`.\`${refAttr}\``; const [refAlias, refEntity] = refDict[refId];
result = this.translateAttrInExpression(entity, (expr)['#refAttr'], attrText); const attrText = `\`${refAlias}\`.\`${refAttr}\``;
// 这里必须使用refEntity否则在filter深层嵌套节点表达式时会出现entity不对应
result = this.translateAttrInExpression(refEntity, (expr)['#refAttr'], attrText);
} }
else { else {
(0, assert_1.default)(k.length === 1); (0, assert_1.default)(k.length === 1);

View File

@ -16,10 +16,6 @@ export declare class PostgreSQLConnector {
exec(sql: string, txn?: string): Promise<[QueryResultRow[], QueryResult]>; exec(sql: string, txn?: string): Promise<[QueryResultRow[], QueryResult]>;
commitTransaction(txn: string): Promise<void>; commitTransaction(txn: string): Promise<void>;
rollbackTransaction(txn: string): Promise<void>; rollbackTransaction(txn: string): Promise<void>;
/**
* SQL
*/
execBatch(sqls: string[], txn?: string): Promise<void>;
/** /**
* *
*/ */

View File

@ -133,16 +133,6 @@ class PostgreSQLConnector {
connection.release(); connection.release();
} }
} }
/**
* 执行多条 SQL 语句用于初始化等场景
*/
async execBatch(sqls, txn) {
for (const sql of sqls) {
if (sql.trim()) {
await this.exec(sql, txn);
}
}
}
/** /**
* 获取连接池状态 * 获取连接池状态
*/ */

View File

@ -7,7 +7,7 @@ import { PostgreSQLTranslator, PostgreSQLSelectOption, PostgreSQLOperateOption }
import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore'; import { AsyncContext } from 'oak-domain/lib/store/AsyncRowStore';
import { SyncContext } from 'oak-domain/lib/store/SyncRowStore'; import { SyncContext } from 'oak-domain/lib/store/SyncRowStore';
import { CreateEntityOption } from '../types/Translator'; import { CreateEntityOption } from '../types/Translator';
import { DbStore } from '../types/dbStore'; import { DbStore, Plan } from '../types/dbStore';
export declare class PostgreSQLStore<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> extends CascadeStore<ED> implements DbStore<ED, Cxt> { export declare class PostgreSQLStore<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> extends CascadeStore<ED> implements DbStore<ED, Cxt> {
protected countAbjointRow<T extends keyof ED, OP extends SelectOption, Cxt extends SyncContext<ED>>(entity: T, selection: Pick<ED[T]['Selection'], 'filter' | 'count'>, context: Cxt, option: OP): number; protected countAbjointRow<T extends keyof ED, OP extends SelectOption, Cxt extends SyncContext<ED>>(entity: T, selection: Pick<ED[T]['Selection'], 'filter' | 'count'>, context: Cxt, option: OP): number;
protected aggregateAbjointRowSync<T extends keyof ED, OP extends SelectOption, Cxt extends SyncContext<ED>>(entity: T, aggregation: ED[T]['Aggregation'], context: Cxt, option: OP): AggregationResult<ED[T]['Schema']>; protected aggregateAbjointRowSync<T extends keyof ED, OP extends SelectOption, Cxt extends SyncContext<ED>>(entity: T, aggregation: ED[T]['Aggregation'], context: Cxt, option: OP): AggregationResult<ED[T]['Schema']>;
@ -35,4 +35,16 @@ export declare class PostgreSQLStore<ED extends EntityDict & BaseEntityDict, Cxt
connect(): Promise<void>; connect(): Promise<void>;
disconnect(): Promise<void>; disconnect(): Promise<void>;
initialize(option: CreateEntityOption): Promise<void>; initialize(option: CreateEntityOption): Promise<void>;
readSchema(): Promise<StorageSchema<ED>>;
/**
* dataSchemaschemaupgrade
* plan分为两阶段
*/
makeUpgradePlan(): Promise<Plan>;
/**
* schema的不同new对old的增量
* @param schemaOld
* @param schemaNew
*/
diffSchema(schemaOld: StorageSchema<any>, schemaNew: StorageSchema<any>): Plan;
} }

View File

@ -8,6 +8,12 @@ const translator_1 = require("./translator");
const lodash_1 = require("lodash"); const lodash_1 = require("lodash");
const assert_1 = tslib_1.__importDefault(require("assert")); const assert_1 = tslib_1.__importDefault(require("assert"));
const relation_1 = require("oak-domain/lib/store/relation"); const relation_1 = require("oak-domain/lib/store/relation");
const ToNumberAttrs = new Set([
'$$seq$$',
'$$createAt$$',
'$$updateAt$$',
'$$deleteAt$$',
]);
function convertGeoTextToObject(geoText) { function convertGeoTextToObject(geoText) {
if (geoText.startsWith('POINT')) { if (geoText.startsWith('POINT')) {
const coord = geoText.match(/(-?\d+\.?\d*)/g); const coord = geoText.match(/(-?\d+\.?\d*)/g);
@ -121,12 +127,19 @@ class PostgreSQLStore extends CascadeStore_1.CascadeStore {
const { type } = attributes[attr]; const { type } = attributes[attr];
switch (type) { switch (type) {
case 'date': case 'date':
case 'time': { case 'time':
case 'datetime': {
if (value instanceof Date) { if (value instanceof Date) {
r[attr] = value.valueOf(); r[attr] = value.valueOf();
} }
else { else {
r[attr] = value; if (typeof value === 'string') {
r[attr] = parseInt(value, 10);
}
else {
(0, assert_1.default)(typeof value === 'number' || value === null);
r[attr] = value;
}
} }
break; break;
} }
@ -203,6 +216,14 @@ class PostgreSQLStore extends CascadeStore_1.CascadeStore {
// PostgreSQL count 返回字符串 // PostgreSQL count 返回字符串
r[attr] = parseInt(value, 10); r[attr] = parseInt(value, 10);
} }
else if (attr.startsWith("#sum") || attr.startsWith("#avg") || attr.startsWith("#min") || attr.startsWith("#max")) {
// PostgreSQL sum/avg/min/max 返回字符串
r[attr] = parseFloat(value);
}
else if (ToNumberAttrs.has(attr)) {
// PostgreSQL sum/avg/min/max 返回字符串
r[attr] = parseInt(value, 10);
}
else { else {
r[attr] = value; r[attr] = value;
} }
@ -316,14 +337,203 @@ class PostgreSQLStore extends CascadeStore_1.CascadeStore {
} }
async initialize(option) { async initialize(option) {
const schema = this.getSchema(); const schema = this.getSchema();
// 可选:先创建 PostGIS 扩展 // ===== 第一阶段:事务外创建扩展 =====
// await this.connector.exec('CREATE EXTENSION IF NOT EXISTS postgis;'); let hasGeoType = false;
let hasChineseTsConfig = false;
let chineseParser = null;
// 扫描 schema
for (const entity in schema) { for (const entity in schema) {
const sqls = this.translator.translateCreateEntity(entity, option); const { attributes, indexes } = schema[entity];
for (const sql of sqls) { for (const attr in attributes) {
await this.connector.exec(sql); if (attributes[attr].type === 'geometry') {
hasGeoType = true;
}
}
for (const index of indexes || []) {
if (index.config?.tsConfig === 'chinese' || index.config?.tsConfig?.includes('chinese')) {
hasChineseTsConfig = true;
}
if (index.config?.chineseParser) {
(0, assert_1.default)(!chineseParser || chineseParser === index.config.chineseParser, '当前定义了多个中文分词器,请保持一致');
chineseParser = index.config.chineseParser;
}
} }
} }
// 在事务外创建扩展
if (hasGeoType) {
console.log('Initializing PostGIS extension for geometry support...');
await this.connector.exec('CREATE EXTENSION IF NOT EXISTS postgis;');
}
if (hasChineseTsConfig) {
console.log('Initializing Chinese parser extension...');
await this.connector.exec(`CREATE EXTENSION IF NOT EXISTS ${chineseParser || 'zhparser'};`);
}
// ===== 第二阶段:事务内创建配置和表 =====
const txn = await this.connector.startTransaction({
isolationLevel: 'serializable',
});
try {
// 创建中文文本搜索配置
if (hasChineseTsConfig) {
console.log('Initializing Chinese text search configuration...');
const checkChineseConfigSql = `
SELECT COUNT(*) as cnt
FROM pg_catalog.pg_ts_config
WHERE cfgname = 'chinese';
`;
const result = await this.connector.exec(checkChineseConfigSql, txn);
const count = parseInt(result[0][0]?.cnt || '0', 10);
if (count === 0) {
const createChineseConfigSql = `
CREATE TEXT SEARCH CONFIGURATION chinese (PARSER = ${chineseParser || 'zhparser'});
ALTER TEXT SEARCH CONFIGURATION chinese ADD MAPPING FOR n,v,a,i,e,l WITH simple;
`;
await this.connector.exec(createChineseConfigSql, txn);
}
}
// 创建实体表
for (const entity in schema) {
const sqls = this.translator.translateCreateEntity(entity, option);
for (const sql of sqls) {
await this.connector.exec(sql, txn);
}
}
await this.connector.commitTransaction(txn);
}
catch (error) {
await this.connector.rollbackTransaction(txn);
throw error;
}
}
// 从数据库中读取当前schema
readSchema() {
return this.translator.readSchema((sql) => this.connector.exec(sql));
}
/**
* 根据载入的dataSchema和数据库中原来的schema决定如何来upgrade
* 制订出来的plan分为两阶段增加阶段和削减阶段在两个阶段之间由用户来修正数据
*/
async makeUpgradePlan() {
const originSchema = await this.readSchema();
const plan = this.diffSchema(originSchema, this.translator.schema);
return plan;
}
/**
* 比较两个schema的不同这里计算的是new对old的增量
* @param schemaOld
* @param schemaNew
*/
diffSchema(schemaOld, schemaNew) {
const plan = {
newTables: {},
newIndexes: {},
updatedIndexes: {},
updatedTables: {},
};
for (const table in schemaNew) {
// PostgreSQL 表名区分大小写(使用双引号时)
if (schemaOld[table]) {
const { attributes, indexes } = schemaOld[table];
const { attributes: attributesNew, indexes: indexesNew } = schemaNew[table];
const assignToUpdateTables = (attr, isNew) => {
const skipAttrs = ['$$seq$$', '$$createAt$$', '$$updateAt$$', '$$deleteAt$$', 'id'];
if (skipAttrs.includes(attr)) {
return;
}
if (!plan.updatedTables[table]) {
plan.updatedTables[table] = {
attributes: {
[attr]: {
...attributesNew[attr],
isNew,
}
}
};
}
else {
plan.updatedTables[table].attributes[attr] = {
...attributesNew[attr],
isNew,
};
}
};
for (const attr in attributesNew) {
if (attributes[attr]) {
// 比较两次创建的属性定义是否一致
const sql1 = this.translator.translateAttributeDef(attr, attributesNew[attr]);
const sql2 = this.translator.translateAttributeDef(attr, attributes[attr]);
if (!this.translator.compareSql(sql1, sql2)) {
assignToUpdateTables(attr, false);
}
}
else {
assignToUpdateTables(attr, true);
}
}
if (indexesNew) {
const assignToIndexes = (index, isNew) => {
if (isNew) {
if (plan.newIndexes[table]) {
plan.newIndexes[table].push(index);
}
else {
plan.newIndexes[table] = [index];
}
}
else {
if (plan.updatedIndexes[table]) {
plan.updatedIndexes[table].push(index);
}
else {
plan.updatedIndexes[table] = [index];
}
}
};
const compareConfig = (config1, config2) => {
const unique1 = config1?.unique || false;
const unique2 = config2?.unique || false;
if (unique1 !== unique2) {
return false;
}
const type1 = config1?.type || 'btree';
const type2 = config2?.type || 'btree';
// tsConfig 比较
const tsConfig1 = config1?.tsConfig;
const tsConfig2 = config2?.tsConfig;
if (JSON.stringify(tsConfig1) !== JSON.stringify(tsConfig2)) {
return false;
}
return type1 === type2;
};
for (const index of indexesNew) {
const { name, config, attributes: indexAttrs } = index;
const origin = indexes?.find(ele => ele.name === name);
if (origin) {
if (JSON.stringify(indexAttrs) !== JSON.stringify(origin.attributes)) {
assignToIndexes(index, false);
}
else {
if (!compareConfig(config, origin.config)) {
assignToIndexes(index, false);
}
}
}
else {
assignToIndexes(index, true);
}
}
}
}
else {
plan.newTables[table] = {
attributes: schemaNew[table].attributes,
};
if (schemaNew[table].indexes) {
plan.newIndexes[table] = schemaNew[table].indexes;
}
}
}
return plan;
} }
} }
exports.PostgreSQLStore = PostgreSQLStore; exports.PostgreSQLStore = PostgreSQLStore;

View File

@ -1,4 +1,4 @@
import { EntityDict, Q_FullTextValue, RefOrExpression, Ref, StorageSchema } from "oak-domain/lib/types"; import { EntityDict, Q_FullTextValue, RefOrExpression, Ref, StorageSchema, Attribute } from "oak-domain/lib/types";
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { DataType } from "oak-domain/lib/types/schema/DataTypes"; import { DataType } from "oak-domain/lib/types/schema/DataTypes";
import { SqlOperateOption, SqlSelectOption, SqlTranslator } from "../sqlTranslator"; import { SqlOperateOption, SqlSelectOption, SqlTranslator } from "../sqlTranslator";
@ -80,4 +80,24 @@ export declare class PostgreSQLTranslator<ED extends EntityDict & BaseEntityDict
translateUpsert<T extends keyof ED>(entity: T, data: ED[T]['CreateMulti']['data'], conflictKeys: string[], updateAttrs?: string[]): string; translateUpsert<T extends keyof ED>(entity: T, data: ED[T]['CreateMulti']['data'], conflictKeys: string[], updateAttrs?: string[]): string;
protected populateUpdateStmt(updateText: string, fromText: string, aliasDict: Record<string, string>, filterText: string, sorterText?: string, indexFrom?: number, count?: number, option?: PostgreSQLOperateOption): string; protected populateUpdateStmt(updateText: string, fromText: string, aliasDict: Record<string, string>, filterText: string, sorterText?: string, indexFrom?: number, count?: number, option?: PostgreSQLOperateOption): string;
protected populateRemoveStmt(updateText: string, fromText: string, aliasDict: Record<string, string>, filterText: string, sorterText?: string, indexFrom?: number, count?: number, option?: PostgreSQLOperateOption): string; protected populateRemoveStmt(updateText: string, fromText: string, aliasDict: Record<string, string>, filterText: string, sorterText?: string, indexFrom?: number, count?: number, option?: PostgreSQLOperateOption): string;
/**
* PostgreSQL Type oak populateDataTypeDef
* @param type PostgreSQL
*/
private reTranslateToAttribute;
/**
* PostgreSQL schema
*/
readSchema(execFn: (sql: string) => Promise<any>): Promise<StorageSchema<ED>>;
/**
* PostgreSQL DDL
* @param attr
* @param attrDef
*/
translateAttributeDef(attr: string, attrDef: Attribute): string;
/**
* SQL schema diff
*
*/
compareSql(sql1: string, sql2: string): boolean;
} }

View File

@ -741,6 +741,7 @@ class PostgreSQLTranslator extends sqlTranslator_1.SqlTranslator {
else if (typeof value === 'number') { else if (typeof value === 'number') {
return `${value}`; return `${value}`;
} }
(0, assert_1.default)(typeof value === 'string', 'Invalid date/time value');
return `'${(new Date(value)).valueOf()}'`; return `'${(new Date(value)).valueOf()}'`;
} }
case 'object': case 'object':
@ -967,12 +968,8 @@ class PostgreSQLTranslator extends sqlTranslator_1.SqlTranslator {
} }
indexSql += '('; indexSql += '(';
const indexColumns = []; const indexColumns = [];
let includeDeleteAt = false;
for (const indexAttr of indexAttrs) { for (const indexAttr of indexAttrs) {
const { name: attrName, direction } = indexAttr; const { name: attrName, direction } = indexAttr;
if (attrName === '$$deleteAt$$') {
includeDeleteAt = true;
}
if (indexType === 'fulltext') { if (indexType === 'fulltext') {
// 全文索引:使用 to_tsvector // 全文索引:使用 to_tsvector
indexColumns.push(`to_tsvector('${tsLang}', COALESCE("${attrName}", ''))`); indexColumns.push(`to_tsvector('${tsLang}', COALESCE("${attrName}", ''))`);
@ -986,10 +983,6 @@ class PostgreSQLTranslator extends sqlTranslator_1.SqlTranslator {
indexColumns.push(col); indexColumns.push(col);
} }
} }
// 非特殊索引自动包含 deleteAt
if (!includeDeleteAt && !indexType) {
indexColumns.push('"$$deleteAt$$"');
}
indexSql += indexColumns.join(', '); indexSql += indexColumns.join(', ');
indexSql += ');'; indexSql += ');';
sqls.push(indexSql); sqls.push(indexSql);
@ -1766,5 +1759,401 @@ class PostgreSQLTranslator extends sqlTranslator_1.SqlTranslator {
// 这个方法不应该被直接调用了因为translateRemove已经重写 // 这个方法不应该被直接调用了因为translateRemove已经重写
throw new Error('populateRemoveStmt should not be called directly in PostgreSQL. Use translateRemove instead.'); throw new Error('populateRemoveStmt should not be called directly in PostgreSQL. Use translateRemove instead.');
} }
/**
* PostgreSQL 返回的 Type 回译成 oak 的类型 populateDataTypeDef 的反函数
* @param type PostgreSQL 类型字符串
*/
reTranslateToAttribute(type) {
// 处理带长度的类型character varying(255), character(10)
const varcharMatch = /^character varying\((\d+)\)$/.exec(type);
if (varcharMatch) {
return {
type: 'varchar',
params: {
length: parseInt(varcharMatch[1], 10),
}
};
}
const charMatch = /^character\((\d+)\)$/.exec(type);
if (charMatch) {
return {
type: 'char',
params: {
length: parseInt(charMatch[1], 10),
}
};
}
// 处理带精度和小数位的类型numeric(10,2)
const numericWithScaleMatch = /^numeric\((\d+),(\d+)\)$/.exec(type);
if (numericWithScaleMatch) {
return {
type: 'decimal',
params: {
precision: parseInt(numericWithScaleMatch[1], 10),
scale: parseInt(numericWithScaleMatch[2], 10),
},
};
}
// 处理只带精度的类型numeric(10), timestamp(6)
const numericMatch = /^numeric\((\d+)\)$/.exec(type);
if (numericMatch) {
return {
type: 'decimal',
params: {
precision: parseInt(numericMatch[1], 10),
scale: 0,
},
};
}
const timestampMatch = /^timestamp\((\d+)\) without time zone$/.exec(type);
if (timestampMatch) {
return {
type: 'timestamp',
params: {
precision: parseInt(timestampMatch[1], 10),
},
};
}
const timeMatch = /^time\((\d+)\) without time zone$/.exec(type);
if (timeMatch) {
return {
type: 'time',
params: {
precision: parseInt(timeMatch[1], 10),
},
};
}
// PostgreSQL 类型映射到 oak 类型
const typeMap = {
'bigint': 'bigint',
'integer': 'integer',
'smallint': 'smallint',
'real': 'real',
'double precision': 'double precision',
'boolean': 'boolean',
'text': 'text',
'jsonb': 'object',
'json': 'object',
'bytea': 'bytea',
'character varying': 'varchar',
'character': 'char',
'timestamp without time zone': 'timestamp',
'time without time zone': 'time',
'date': 'date',
'uuid': 'uuid',
'geometry': 'geometry',
'numeric': 'decimal',
};
const mappedType = typeMap[type];
if (mappedType) {
return { type: mappedType };
}
// 如果是用户定义的枚举类型,返回 enum具体值需要额外查询
// 这里先返回基础类型,枚举值在 readSchema 中单独处理
return { type: type };
}
/**
* PostgreSQL 数据库读取当前的 schema 结构
*/
async readSchema(execFn) {
const result = {};
// 1. 获取所有表
const tablesSql = `
SELECT tablename
FROM pg_tables
WHERE schemaname = 'public'
ORDER BY tablename;
`;
const [tablesResult] = await execFn(tablesSql);
for (const tableRow of tablesResult) {
const tableName = tableRow.tablename;
// 2. 获取表的列信息
const columnsSql = `
SELECT
column_name,
data_type,
character_maximum_length,
numeric_precision,
numeric_scale,
is_nullable,
column_default,
udt_name
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = '${tableName}'
ORDER BY ordinal_position;
`;
const [columnsResult] = await execFn(columnsSql);
const attributes = {};
for (const col of columnsResult) {
const { column_name: colName, data_type: dataType, character_maximum_length: maxLength, numeric_precision: precision, numeric_scale: scale, is_nullable: isNullable, column_default: defaultValue, udt_name: udtName, } = col;
let attr;
// 处理用户定义类型(枚举)
if (dataType === 'USER-DEFINED') {
const enumSql = `
SELECT e.enumlabel
FROM pg_type t
JOIN pg_enum e ON t.oid = e.enumtypid
WHERE t.typname = '${udtName}'
ORDER BY e.enumsortorder;
`;
const [enumResult] = await execFn(enumSql);
const enumeration = enumResult.map((r) => r.enumlabel);
attr = {
type: 'enum',
enumeration,
};
}
else {
// 构建完整的类型字符串
let fullType = dataType;
const integerTypes = ['bigint', 'integer', 'smallint', 'serial', 'bigserial', 'smallserial'];
if (maxLength && !integerTypes.includes(dataType)) {
fullType = `${dataType}(${maxLength})`;
}
else if (precision !== null && scale !== null && !integerTypes.includes(dataType)) {
fullType = `${dataType}(${precision},${scale})`;
}
else if (precision !== null && !integerTypes.includes(dataType)) {
fullType = `${dataType}(${precision})`;
}
attr = this.reTranslateToAttribute(fullType);
}
// ========== 类型还原逻辑 ==========
// 框架将某些语义类型存储为 bigint需要根据列名还原
if (attr.type === 'bigint') {
// 1. 检查是否是序列列
if (colName === '$$seq$$' || (defaultValue && defaultValue.includes('nextval'))) {
attr.type = 'sequence';
attr.sequenceStart = 10000; // 默认起始值
}
// 2. 检查是否是时间戳列
else if (['$$createAt$$', '$$updateAt$$', '$$deleteAt$$'].includes(colName)) {
attr.type = 'datetime';
}
// 3. 检查其他可能的时间类型列(根据命名约定)
else if (colName.endsWith('At') || colName.endsWith('Time')) {
// 可选:根据业务约定判断是否应该是 datetime
// 这里保守处理,只转换框架标准字段
}
}
// 处理约束 - 只在为 true 时添加
if (isNullable === 'NO') {
attr.notNull = true;
}
// 处理默认值
if (defaultValue && !defaultValue.includes('nextval')) {
let cleanDefault = defaultValue.replace(/::[a-z]+/gi, '').replace(/'/g, '');
if (cleanDefault === 'true') {
attr.default = true;
}
else if (cleanDefault === 'false') {
attr.default = false;
}
else if (!isNaN(Number(cleanDefault))) {
attr.default = Number(cleanDefault);
}
else if (cleanDefault !== '') {
attr.default = cleanDefault;
}
}
// 检查唯一约束
const uniqueSql = `
SELECT COUNT(*) as cnt
FROM pg_index ix
JOIN pg_class t ON t.oid = ix.indrelid
JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey)
WHERE t.relname = '${tableName}'
AND a.attname = '${colName}'
AND ix.indisunique = true
AND NOT ix.indisprimary
AND array_length(ix.indkey, 1) = 1;
`;
const [uniqueResult] = await execFn(uniqueSql);
const uniqueCount = parseInt(uniqueResult[0]?.cnt || '0', 10);
if (uniqueCount > 0) {
attr.unique = true;
}
attributes[colName] = attr;
}
// 3. 获取索引信息
const indexesSql = `
SELECT
i.relname as index_name,
ix.indisunique as is_unique,
am.amname as index_type,
pg_get_indexdef(ix.indexrelid) as index_def
FROM pg_class t
JOIN pg_index ix ON t.oid = ix.indrelid
JOIN pg_class i ON i.oid = ix.indexrelid
JOIN pg_am am ON i.relam = am.oid
WHERE t.relname = '${tableName}'
AND t.relkind = 'r'
AND i.relname NOT LIKE '%_pkey'
AND NOT ix.indisprimary
ORDER BY i.relname;
`;
const [indexesResult] = await execFn(indexesSql);
if (indexesResult.length > 0) {
const indexes = [];
for (const row of indexesResult) {
const { index_name: indexName, is_unique: isUnique, index_type: indexType, index_def: indexDef } = row;
// 解析索引定义以获取列名和配置
const index = {
name: indexName,
attributes: [],
};
// 解析索引定义字符串
// 示例: CREATE INDEX "user_index_fulltext_chinese" ON public."user" USING gin (to_tsvector('chinese'::regconfig, (COALESCE(name, ''::text) || ' '::text) || COALESCE(nickname, ''::text)))
if (indexType === 'gin' && indexDef.includes('to_tsvector')) {
// 全文索引
index.config = { type: 'fulltext' };
// 提取 tsConfig
const tsConfigMatch = indexDef.match(/to_tsvector\('([^']+)'/);
if (tsConfigMatch) {
const tsConfig = tsConfigMatch[1];
index.config.tsConfig = tsConfig;
}
// 提取列名(从 COALESCE 中)
const columnMatches = indexDef.matchAll(/COALESCE\("?([^",\s]+)"?/g);
const columns = Array.from(columnMatches, m => m[1]);
index.attributes = columns.map(col => ({ name: col }));
// 处理多语言索引的情况:移除语言后缀
// 例如: user_index_fulltext_chinese -> index_fulltext
const nameParts = indexName.split('_');
if (nameParts.length > 2) {
const possibleLang = nameParts[nameParts.length - 1];
// 如果最后一部分是语言代码,移除它
if (['chinese', 'english', 'simple', 'german', 'french', 'spanish', 'russian', 'japanese'].includes(possibleLang)) {
index.name = nameParts.slice(0, -1).join('_');
}
}
}
else if (indexType === 'gist') {
// 空间索引
index.config = { type: 'spatial' };
// 提取列名
const columnMatch = indexDef.match(/\(([^)]+)\)/);
if (columnMatch) {
const columns = columnMatch[1].split(',').map(c => c.trim().replace(/"/g, ''));
index.attributes = columns.map(col => ({ name: col }));
}
}
else if (indexType === 'hash') {
// 哈希索引
index.config = { type: 'hash' };
// 提取列名
const columnMatch = indexDef.match(/\(([^)]+)\)/);
if (columnMatch) {
const columns = columnMatch[1].split(',').map(c => c.trim().replace(/"/g, ''));
index.attributes = columns.map(col => ({ name: col }));
}
}
else {
// B-tree 索引(默认)
// 提取列名和排序方向
const columnMatch = indexDef.match(/\(([^)]+)\)/);
if (columnMatch) {
const columnDefs = columnMatch[1].split(',');
index.attributes = columnDefs.map(colDef => {
const trimmed = colDef.trim().replace(/"/g, '');
const parts = trimmed.split(/\s+/);
const attr = { name: parts[0] };
// 检查排序方向
if (parts.includes('DESC')) {
attr.direction = 'DESC';
}
else if (parts.includes('ASC')) {
attr.direction = 'ASC';
}
return attr;
});
}
// 如果是唯一索引
if (isUnique) {
index.config = { unique: true };
}
}
// 移除表名前缀(如果存在)
// 例如: user_index_fulltext -> index_fulltext
if (index.name.startsWith(`${tableName}_`)) {
index.name = index.name.substring(tableName.length + 1);
}
indexes.push(index);
}
Object.assign(result, {
[tableName]: {
attributes,
indexes,
}
});
}
else {
Object.assign(result, {
[tableName]: {
attributes,
}
});
}
}
return result;
}
/**
* 将属性定义转换为 PostgreSQL DDL 语句
* @param attr 属性名
* @param attrDef 属性定义
*/
translateAttributeDef(attr, attrDef) {
let sql = `"${attr}" `;
const { type, params, default: defaultValue, unique, notNull, sequenceStart, enumeration, } = attrDef;
// 处理序列类型IDENTITY
if (type === 'sequence' || (typeof sequenceStart === 'number')) {
sql += `bigint GENERATED BY DEFAULT AS IDENTITY (START WITH ${sequenceStart || 10000}) UNIQUE`;
return sql;
}
// 处理枚举类型
if (type === 'enum') {
(0, assert_1.default)(enumeration, 'Enum type requires enumeration values');
sql += `enum(${enumeration.map(v => `'${v}'`).join(',')})`;
}
else {
sql += this.populateDataTypeDef(type, params, enumeration);
}
// NOT NULL 约束
if (notNull || type === 'geometry') {
sql += ' NOT NULL';
}
// UNIQUE 约束
if (unique) {
sql += ' UNIQUE';
}
// 默认值
if (defaultValue !== undefined && !sequenceStart) {
(0, assert_1.default)(type !== 'ref', 'ref type should not have default value');
sql += ` DEFAULT ${this.translateAttrValue(type, defaultValue)}`;
}
// 主键
if (attr === 'id') {
sql += ' PRIMARY KEY';
}
return sql;
}
/**
* 比较两个 SQL 语句是否等价用于 schema diff
* 忽略空格大小写等格式差异
*/
compareSql(sql1, sql2) {
// 标准化 SQL移除多余空格统一大小写
const normalize = (sql) => {
return sql
.replace(/\s+/g, ' ') // 多个空格合并为一个
.replace(/\(\s+/g, '(') // 移除括号后的空格
.replace(/\s+\)/g, ')') // 移除括号前的空格
.replace(/,\s+/g, ',') // 移除逗号后的空格
.trim()
.toLowerCase();
};
return normalize(sql1) === normalize(sql2);
}
} }
exports.PostgreSQLTranslator = PostgreSQLTranslator; exports.PostgreSQLTranslator = PostgreSQLTranslator;

View File

@ -72,6 +72,8 @@ class SqlTranslator {
name: `${entity}_trigger_uuid_auto_create`, name: `${entity}_trigger_uuid_auto_create`,
attributes: [{ attributes: [{
name: types_1.TriggerUuidAttribute, name: types_1.TriggerUuidAttribute,
}, {
name: types_1.DeleteAtAttribute,
}] }]
}, },
]; ];
@ -84,7 +86,7 @@ class SqlTranslator {
attributes: [{ attributes: [{
name: attr, name: attr,
}, { }, {
name: '$$deleteAt$$', name: types_1.DeleteAtAttribute,
}] }]
}); });
} }
@ -100,7 +102,7 @@ class SqlTranslator {
}, { }, {
name: 'entityId', name: 'entityId',
}, { }, {
name: '$$deleteAt$$', name: types_1.DeleteAtAttribute,
}] }]
}); });
} }
@ -113,7 +115,7 @@ class SqlTranslator {
attributes: [{ attributes: [{
name: attr, name: attr,
}, { }, {
name: '$$deleteAt$$', name: types_1.DeleteAtAttribute,
}] }]
}); });
} }
@ -129,7 +131,7 @@ class SqlTranslator {
}, { }, {
name: 'expiresAt', name: 'expiresAt',
}, { }, {
name: '$$deleteAt$$', name: types_1.DeleteAtAttribute,
}] }]
}); });
} }

View File

@ -1,8 +1,33 @@
import { EntityDict } from "oak-domain/lib/base-app-domain"; import { Attribute, EntityDict, Index, OperateOption, OperationResult, StorageSchema, TxnOption } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { AsyncContext, AsyncRowStore } from "oak-domain/lib/store/AsyncRowStore"; import { AsyncContext, AsyncRowStore } from "oak-domain/lib/store/AsyncRowStore";
import { CreateEntityOption } from "./Translator"; import { CreateEntityOption } from "./Translator";
export interface DbStore<ED extends EntityDict, Cxt extends AsyncContext<ED>> extends AsyncRowStore<ED, Cxt> { import { AggregationResult, SelectOption } from "oak-domain/lib/types";
export type Plan = {
newTables: Record<string, {
attributes: Record<string, Attribute>;
}>;
newIndexes: Record<string, Index<any>[]>;
updatedTables: Record<string, {
attributes: Record<string, Attribute & {
isNew: boolean;
}>;
}>;
updatedIndexes: Record<string, Index<any>[]>;
};
export interface DbStore<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> extends AsyncRowStore<ED, Cxt> {
checkRelationAsync<T extends keyof ED, Cxt extends AsyncContext<ED>>(entity: T, operation: Omit<ED[T]['Operation'] | ED[T]['Selection'], 'id'>, context: Cxt): Promise<void>;
connect: () => Promise<void>; connect: () => Promise<void>;
disconnect: () => Promise<void>; disconnect: () => Promise<void>;
initialize(options: CreateEntityOption): Promise<void>; initialize(options: CreateEntityOption): Promise<void>;
aggregate<T extends keyof ED, OP extends SelectOption>(entity: T, aggregation: ED[T]['Aggregation'], context: Cxt, option: OP): Promise<AggregationResult<ED[T]['Schema']>>;
operate<T extends keyof ED>(entity: T, operation: ED[T]['Operation'], context: Cxt, option: OperateOption): Promise<OperationResult<ED>>;
select<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt, option: SelectOption): Promise<Partial<ED[T]['Schema']>[]>;
count<T extends keyof ED>(entity: T, selection: Pick<ED[T]['Selection'], 'filter' | 'count'>, context: Cxt, option: SelectOption): Promise<number>;
begin(option?: TxnOption): Promise<string>;
commit(txnId: string): Promise<void>;
rollback(txnId: string): Promise<void>;
readSchema(): Promise<StorageSchema<ED>>;
makeUpgradePlan(): Promise<Plan>;
diffSchema(schemaOld: StorageSchema<any>, schemaNew: StorageSchema<any>): Plan;
} }

View File

@ -1,6 +1,6 @@
{ {
"name": "oak-db", "name": "oak-db",
"version": "3.3.12", "version": "3.3.14",
"description": "oak-db", "description": "oak-db",
"main": "lib/index", "main": "lib/index",
"author": { "author": {
@ -18,7 +18,7 @@
"lodash": "^4.17.21", "lodash": "^4.17.21",
"mysql": "^2.18.1", "mysql": "^2.18.1",
"mysql2": "^2.3.3", "mysql2": "^2.3.3",
"oak-domain": "^5.1.33", "oak-domain": "file:../oak-domain",
"pg": "^8.16.3", "pg": "^8.16.3",
"uuid": "^8.3.2" "uuid": "^8.3.2"
}, },

View File

@ -11,7 +11,7 @@ import { AsyncContext, AsyncRowStore } from 'oak-domain/lib/store/AsyncRowStore'
import { SyncContext } from 'oak-domain/lib/store/SyncRowStore'; import { SyncContext } from 'oak-domain/lib/store/SyncRowStore';
import { CreateEntityOption } from '../types/Translator'; import { CreateEntityOption } from '../types/Translator';
import { FieldPacket, ResultSetHeader, RowDataPacket } from 'mysql2'; import { FieldPacket, ResultSetHeader, RowDataPacket } from 'mysql2';
import { DbStore } from '../types/dbStore'; import { DbStore, Plan } from '../types/dbStore';
function convertGeoTextToObject(geoText: string): Geo { function convertGeoTextToObject(geoText: string): Geo {
@ -391,7 +391,7 @@ export class MysqlStore<ED extends EntityDict & BaseEntityDict, Cxt extends Asyn
}; };
for (const attr in attributesNew) { for (const attr in attributesNew) {
if (attributes[attr]) { if (attributes[attr]) {
// 因为反向无法复原原来定义的attribute类型这里就比较两次创建的sql是不是一致 // 因为反向无法复原原来定义的attribute类型这里就比较两次创建的sql是不是一致,不是太好的设计
const sql1 = this.translator.translateAttributeDef(attr, attributesNew[attr]); const sql1 = this.translator.translateAttributeDef(attr, attributesNew[attr]);
const sql2 = this.translator.translateAttributeDef(attr, attributes[attr]); const sql2 = this.translator.translateAttributeDef(attr, attributes[attr]);
if (!this.translator.compareSql(sql1, sql2)) { if (!this.translator.compareSql(sql1, sql2)) {
@ -466,16 +466,4 @@ export class MysqlStore<ED extends EntityDict & BaseEntityDict, Cxt extends Asyn
return plan; return plan;
} }
} }
type Plan = {
newTables: Record<string, {
attributes: Record<string, Attribute>;
}>;
newIndexes: Record<string, Index<any>[]>;
updatedTables: Record<string, {
attributes: Record<string, Attribute & { isNew: boolean }>;
}>;
updatedIndexes: Record<string, Index<any>[]>;
};

View File

@ -971,8 +971,10 @@ export class MySqlTranslator<ED extends EntityDict & BaseEntityDict> extends Sql
const refAttr = (expr)['#refAttr']; const refAttr = (expr)['#refAttr'];
assert(refDict[refId]); assert(refDict[refId]);
const attrText = `\`${refDict[refId][0]}\`.\`${refAttr}\``; const [refAlias, refEntity] = refDict[refId];
result = this.translateAttrInExpression(entity, (expr)['#refAttr'], attrText); const attrText = `\`${refAlias}\`.\`${refAttr}\``;
// 这里必须使用refEntity否则在filter深层嵌套节点表达式时会出现entity不对应
result = this.translateAttrInExpression(refEntity as T, (expr)['#refAttr'], attrText);
} }
else { else {
assert(k.length === 1); assert(k.length === 1);

View File

@ -155,17 +155,6 @@ export class PostgreSQLConnector {
} }
} }
/**
* SQL
*/
async execBatch(sqls: string[], txn?: string): Promise<void> {
for (const sql of sqls) {
if (sql.trim()) {
await this.exec(sql, txn);
}
}
}
/** /**
* *
*/ */

View File

@ -1,12 +1,15 @@
import { import {
EntityDict, EntityDict,
OperateOption, OperateOption,
OperationResult, OperationResult,
TxnOption, TxnOption,
StorageSchema, StorageSchema,
SelectOption, SelectOption,
AggregationResult, AggregationResult,
Geo Geo,
IndexConfig,
Index,
Attribute
} from 'oak-domain/lib/types'; } from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { CascadeStore } from 'oak-domain/lib/store/CascadeStore'; import { CascadeStore } from 'oak-domain/lib/store/CascadeStore';
@ -20,7 +23,14 @@ import { AsyncContext, AsyncRowStore } from 'oak-domain/lib/store/AsyncRowStore'
import { SyncContext } from 'oak-domain/lib/store/SyncRowStore'; import { SyncContext } from 'oak-domain/lib/store/SyncRowStore';
import { CreateEntityOption } from '../types/Translator'; import { CreateEntityOption } from '../types/Translator';
import { QueryResult } from 'pg'; import { QueryResult } from 'pg';
import { DbStore } from '../types/dbStore'; import { DbStore, Plan } from '../types/dbStore';
const ToNumberAttrs = new Set([
'$$seq$$',
'$$createAt$$',
'$$updateAt$$',
'$$deleteAt$$',
]);
function convertGeoTextToObject(geoText: string): Geo { function convertGeoTextToObject(geoText: string): Geo {
if (geoText.startsWith('POINT')) { if (geoText.startsWith('POINT')) {
@ -59,119 +69,119 @@ function convertGeoTextToObject(geoText: string): Geo {
}; };
} }
} }
throw new Error(`Unsupported geometry type: ${geoText.slice(0, 50)}`); throw new Error(`Unsupported geometry type: ${geoText.slice(0, 50)}`);
} }
export class PostgreSQLStore< export class PostgreSQLStore<
ED extends EntityDict & BaseEntityDict, ED extends EntityDict & BaseEntityDict,
Cxt extends AsyncContext<ED> Cxt extends AsyncContext<ED>
> extends CascadeStore<ED> implements DbStore<ED, Cxt> { > extends CascadeStore<ED> implements DbStore<ED, Cxt> {
protected countAbjointRow<T extends keyof ED, OP extends SelectOption, Cxt extends SyncContext<ED>>( protected countAbjointRow<T extends keyof ED, OP extends SelectOption, Cxt extends SyncContext<ED>>(
entity: T, entity: T,
selection: Pick<ED[T]['Selection'], 'filter' | 'count'>, selection: Pick<ED[T]['Selection'], 'filter' | 'count'>,
context: Cxt, context: Cxt,
option: OP option: OP
): number { ): number {
throw new Error('PostgreSQL store 不支持同步取数据'); throw new Error('PostgreSQL store 不支持同步取数据');
} }
protected aggregateAbjointRowSync<T extends keyof ED, OP extends SelectOption, Cxt extends SyncContext<ED>>( protected aggregateAbjointRowSync<T extends keyof ED, OP extends SelectOption, Cxt extends SyncContext<ED>>(
entity: T, entity: T,
aggregation: ED[T]['Aggregation'], aggregation: ED[T]['Aggregation'],
context: Cxt, context: Cxt,
option: OP option: OP
): AggregationResult<ED[T]['Schema']> { ): AggregationResult<ED[T]['Schema']> {
throw new Error('PostgreSQL store 不支持同步取数据'); throw new Error('PostgreSQL store 不支持同步取数据');
} }
protected selectAbjointRow<T extends keyof ED, OP extends SelectOption>( protected selectAbjointRow<T extends keyof ED, OP extends SelectOption>(
entity: T, entity: T,
selection: ED[T]['Selection'], selection: ED[T]['Selection'],
context: SyncContext<ED>, context: SyncContext<ED>,
option: OP option: OP
): Partial<ED[T]['Schema']>[] { ): Partial<ED[T]['Schema']>[] {
throw new Error('PostgreSQL store 不支持同步取数据'); throw new Error('PostgreSQL store 不支持同步取数据');
} }
protected updateAbjointRow<T extends keyof ED, OP extends OperateOption>( protected updateAbjointRow<T extends keyof ED, OP extends OperateOption>(
entity: T, entity: T,
operation: ED[T]['Operation'], operation: ED[T]['Operation'],
context: SyncContext<ED>, context: SyncContext<ED>,
option: OP option: OP
): number { ): number {
throw new Error('PostgreSQL store 不支持同步更新数据'); throw new Error('PostgreSQL store 不支持同步更新数据');
} }
async exec(script: string, txnId?: string) { async exec(script: string, txnId?: string) {
await this.connector.exec(script, txnId); await this.connector.exec(script, txnId);
} }
connector: PostgreSQLConnector; connector: PostgreSQLConnector;
translator: PostgreSQLTranslator<ED>; translator: PostgreSQLTranslator<ED>;
constructor(storageSchema: StorageSchema<ED>, configuration: PostgreSQLConfiguration) { constructor(storageSchema: StorageSchema<ED>, configuration: PostgreSQLConfiguration) {
super(storageSchema); super(storageSchema);
this.connector = new PostgreSQLConnector(configuration); this.connector = new PostgreSQLConnector(configuration);
this.translator = new PostgreSQLTranslator(storageSchema); this.translator = new PostgreSQLTranslator(storageSchema);
} }
checkRelationAsync<T extends keyof ED, Cxt extends AsyncContext<ED>>( checkRelationAsync<T extends keyof ED, Cxt extends AsyncContext<ED>>(
entity: T, entity: T,
operation: Omit<ED[T]['Operation'] | ED[T]['Selection'], 'id'>, operation: Omit<ED[T]['Operation'] | ED[T]['Selection'], 'id'>,
context: Cxt context: Cxt
): Promise<void> { ): Promise<void> {
throw new Error('Method not implemented.'); throw new Error('Method not implemented.');
} }
protected async aggregateAbjointRowAsync<T extends keyof ED, OP extends SelectOption, Cxt extends AsyncContext<ED>>( protected async aggregateAbjointRowAsync<T extends keyof ED, OP extends SelectOption, Cxt extends AsyncContext<ED>>(
entity: T, entity: T,
aggregation: ED[T]['Aggregation'], aggregation: ED[T]['Aggregation'],
context: Cxt, context: Cxt,
option: OP option: OP
): Promise<AggregationResult<ED[T]['Schema']>> { ): Promise<AggregationResult<ED[T]['Schema']>> {
const sql = this.translator.translateAggregate(entity, aggregation, option); const sql = this.translator.translateAggregate(entity, aggregation, option);
const result = await this.connector.exec(sql, context.getCurrentTxnId()); const result = await this.connector.exec(sql, context.getCurrentTxnId());
return this.formResult(entity, result[0]); return this.formResult(entity, result[0]);
} }
aggregate<T extends keyof ED, OP extends SelectOption>( aggregate<T extends keyof ED, OP extends SelectOption>(
entity: T, entity: T,
aggregation: ED[T]['Aggregation'], aggregation: ED[T]['Aggregation'],
context: Cxt, context: Cxt,
option: OP option: OP
): Promise<AggregationResult<ED[T]['Schema']>> { ): Promise<AggregationResult<ED[T]['Schema']>> {
return this.aggregateAsync(entity, aggregation, context, option); return this.aggregateAsync(entity, aggregation, context, option);
} }
protected supportManyToOneJoin(): boolean { protected supportManyToOneJoin(): boolean {
return true; return true;
} }
protected supportMultipleCreate(): boolean { protected supportMultipleCreate(): boolean {
return true; return true;
} }
private formResult<T extends keyof ED>(entity: T, result: any): any { private formResult<T extends keyof ED>(entity: T, result: any): any {
const schema = this.getSchema(); const schema = this.getSchema();
function resolveAttribute<E extends keyof ED>( function resolveAttribute<E extends keyof ED>(
entity2: E, entity2: E,
r: Record<string, any>, r: Record<string, any>,
attr: string, attr: string,
value: any value: any
) { ) {
const { attributes, view } = schema[entity2]; const { attributes, view } = schema[entity2];
if (!view) { if (!view) {
const i = attr.indexOf("."); const i = attr.indexOf(".");
if (i !== -1) { if (i !== -1) {
const attrHead = attr.slice(0, i); const attrHead = attr.slice(0, i);
const attrTail = attr.slice(i + 1); const attrTail = attr.slice(i + 1);
const rel = judgeRelation(schema, entity2, attrHead); const rel = judgeRelation(schema, entity2, attrHead);
if (rel === 1) { if (rel === 1) {
set(r, attr, value); set(r, attr, value);
} else { } else {
@ -190,14 +200,20 @@ export class PostgreSQLStore<
} }
} else if (attributes[attr]) { } else if (attributes[attr]) {
const { type } = attributes[attr]; const { type } = attributes[attr];
switch (type) { switch (type) {
case 'date': case 'date':
case 'time': { case 'time':
case 'datetime': {
if (value instanceof Date) { if (value instanceof Date) {
r[attr] = value.valueOf(); r[attr] = value.valueOf();
} else { } else {
r[attr] = value; if (typeof value === 'string') {
r[attr] = parseInt(value, 10);
} else {
assert(typeof value === 'number' || value === null);
r[attr] = value;
}
} }
break; break;
} }
@ -266,6 +282,14 @@ export class PostgreSQLStore<
} else if (attr.startsWith("#count")) { } else if (attr.startsWith("#count")) {
// PostgreSQL count 返回字符串 // PostgreSQL count 返回字符串
r[attr] = parseInt(value, 10); r[attr] = parseInt(value, 10);
}
else if (attr.startsWith("#sum") || attr.startsWith("#avg") || attr.startsWith("#min") || attr.startsWith("#max")) {
// PostgreSQL sum/avg/min/max 返回字符串
r[attr] = parseFloat(value);
}
else if (ToNumberAttrs.has(attr)) {
// PostgreSQL sum/avg/min/max 返回字符串
r[attr] = parseInt(value, 10);
} else { } else {
r[attr] = value; r[attr] = value;
} }
@ -281,7 +305,7 @@ export class PostgreSQLStore<
function removeNullObjects<E extends keyof ED>(r: Record<string, any>, e: E) { function removeNullObjects<E extends keyof ED>(r: Record<string, any>, e: E) {
for (let attr in r) { for (let attr in r) {
const rel = judgeRelation(schema, e, attr); const rel = judgeRelation(schema, e, attr);
if (rel === 2) { if (rel === 2) {
if (r[attr].id === null) { if (r[attr].id === null) {
assert(schema[e].toModi || r.entity !== attr); assert(schema[e].toModi || r.entity !== attr);
@ -305,7 +329,7 @@ export class PostgreSQLStore<
function formSingleRow(r: any): any { function formSingleRow(r: any): any {
let result2: Record<string, any> = {}; let result2: Record<string, any> = {};
for (let attr in r) { for (let attr in r) {
const value = r[attr]; const value = r[attr];
resolveAttribute(entity, result2, attr, value); resolveAttribute(entity, result2, attr, value);
@ -320,7 +344,7 @@ export class PostgreSQLStore<
} }
return formSingleRow(result); return formSingleRow(result);
} }
protected async selectAbjointRowAsync<T extends keyof ED>( protected async selectAbjointRowAsync<T extends keyof ED>(
entity: T, entity: T,
selection: ED[T]['Selection'], selection: ED[T]['Selection'],
@ -331,7 +355,7 @@ export class PostgreSQLStore<
const result = await this.connector.exec(sql, context.getCurrentTxnId()); const result = await this.connector.exec(sql, context.getCurrentTxnId());
return this.formResult(entity, result[0]); return this.formResult(entity, result[0]);
} }
protected async updateAbjointRowAsync<T extends keyof ED>( protected async updateAbjointRowAsync<T extends keyof ED>(
entity: T, entity: T,
operation: ED[T]['Operation'], operation: ED[T]['Operation'],
@ -363,81 +387,282 @@ export class PostgreSQLStore<
} }
} }
} }
async operate<T extends keyof ED>( async operate<T extends keyof ED>(
entity: T, entity: T,
operation: ED[T]['Operation'], operation: ED[T]['Operation'],
context: Cxt, context: Cxt,
option: OperateOption option: OperateOption
): Promise<OperationResult<ED>> { ): Promise<OperationResult<ED>> {
const { action } = operation; const { action } = operation;
assert(!['select', 'download', 'stat'].includes(action), '不支持使用 select operation'); assert(!['select', 'download', 'stat'].includes(action), '不支持使用 select operation');
return await super.operateAsync(entity, operation as any, context, option); return await super.operateAsync(entity, operation as any, context, option);
} }
async select<T extends keyof ED>( async select<T extends keyof ED>(
entity: T, entity: T,
selection: ED[T]['Selection'], selection: ED[T]['Selection'],
context: Cxt, context: Cxt,
option: SelectOption option: SelectOption
): Promise<Partial<ED[T]['Schema']>[]> { ): Promise<Partial<ED[T]['Schema']>[]> {
return await super.selectAsync(entity, selection, context, option); return await super.selectAsync(entity, selection, context, option);
} }
protected async countAbjointRowAsync<T extends keyof ED>( protected async countAbjointRowAsync<T extends keyof ED>(
entity: T, entity: T,
selection: Pick<ED[T]['Selection'], 'filter' | 'count'>, selection: Pick<ED[T]['Selection'], 'filter' | 'count'>,
context: AsyncContext<ED>, context: AsyncContext<ED>,
option: SelectOption option: SelectOption
): Promise<number> { ): Promise<number> {
const sql = this.translator.translateCount(entity, selection, option); const sql = this.translator.translateCount(entity, selection, option);
const result = await this.connector.exec(sql, context.getCurrentTxnId()); const result = await this.connector.exec(sql, context.getCurrentTxnId());
// PostgreSQL 返回的 count 是 string 类型bigint // PostgreSQL 返回的 count 是 string 类型bigint
const cnt = result[0][0]?.cnt; const cnt = result[0][0]?.cnt;
return typeof cnt === 'string' ? parseInt(cnt, 10) : (cnt || 0); return typeof cnt === 'string' ? parseInt(cnt, 10) : (cnt || 0);
} }
async count<T extends keyof ED>( async count<T extends keyof ED>(
entity: T, entity: T,
selection: Pick<ED[T]['Selection'], 'filter' | 'count'>, selection: Pick<ED[T]['Selection'], 'filter' | 'count'>,
context: Cxt, context: Cxt,
option: SelectOption option: SelectOption
) { ) {
return this.countAsync(entity, selection, context, option); return this.countAsync(entity, selection, context, option);
} }
async begin(option?: TxnOption): Promise<string> { async begin(option?: TxnOption): Promise<string> {
return await this.connector.startTransaction(option); return await this.connector.startTransaction(option);
} }
async commit(txnId: string): Promise<void> { async commit(txnId: string): Promise<void> {
await this.connector.commitTransaction(txnId); await this.connector.commitTransaction(txnId);
} }
async rollback(txnId: string): Promise<void> { async rollback(txnId: string): Promise<void> {
await this.connector.rollbackTransaction(txnId); await this.connector.rollbackTransaction(txnId);
} }
async connect() { async connect() {
await this.connector.connect(); await this.connector.connect();
} }
async disconnect() { async disconnect() {
await this.connector.disconnect(); await this.connector.disconnect();
} }
async initialize(option: CreateEntityOption) { async initialize(option: CreateEntityOption) {
const schema = this.getSchema(); const schema = this.getSchema();
// 可选:先创建 PostGIS 扩展 // ===== 第一阶段:事务外创建扩展 =====
// await this.connector.exec('CREATE EXTENSION IF NOT EXISTS postgis;'); let hasGeoType = false;
let hasChineseTsConfig = false;
let chineseParser = null;
// 扫描 schema
for (const entity in schema) { for (const entity in schema) {
const sqls = this.translator.translateCreateEntity(entity, option); const { attributes, indexes } = schema[entity];
for (const sql of sqls) { for (const attr in attributes) {
await this.connector.exec(sql); if (attributes[attr].type === 'geometry') {
hasGeoType = true;
}
}
for (const index of indexes || []) {
if (index.config?.tsConfig === 'chinese' || index.config?.tsConfig?.includes('chinese')) {
hasChineseTsConfig = true;
}
if (index.config?.chineseParser) {
assert(!chineseParser || chineseParser === index.config.chineseParser,
'当前定义了多个中文分词器,请保持一致');
chineseParser = index.config.chineseParser;
}
} }
} }
// 在事务外创建扩展
if (hasGeoType) {
console.log('Initializing PostGIS extension for geometry support...');
await this.connector.exec('CREATE EXTENSION IF NOT EXISTS postgis;');
}
if (hasChineseTsConfig) {
console.log('Initializing Chinese parser extension...');
await this.connector.exec(`CREATE EXTENSION IF NOT EXISTS ${chineseParser || 'zhparser'};`);
}
// ===== 第二阶段:事务内创建配置和表 =====
const txn = await this.connector.startTransaction({
isolationLevel: 'serializable',
});
try {
// 创建中文文本搜索配置
if (hasChineseTsConfig) {
console.log('Initializing Chinese text search configuration...');
const checkChineseConfigSql = `
SELECT COUNT(*) as cnt
FROM pg_catalog.pg_ts_config
WHERE cfgname = 'chinese';
`;
const result = await this.connector.exec(checkChineseConfigSql, txn);
const count = parseInt(result[0][0]?.cnt || '0', 10);
if (count === 0) {
const createChineseConfigSql = `
CREATE TEXT SEARCH CONFIGURATION chinese (PARSER = ${chineseParser || 'zhparser'});
ALTER TEXT SEARCH CONFIGURATION chinese ADD MAPPING FOR n,v,a,i,e,l WITH simple;
`;
await this.connector.exec(createChineseConfigSql, txn);
}
}
// 创建实体表
for (const entity in schema) {
const sqls = this.translator.translateCreateEntity(entity, option);
for (const sql of sqls) {
await this.connector.exec(sql, txn);
}
}
await this.connector.commitTransaction(txn);
} catch (error) {
await this.connector.rollbackTransaction(txn);
throw error;
}
}
// 从数据库中读取当前schema
readSchema() {
return this.translator.readSchema((sql) => this.connector.exec(sql));
}
/**
* dataSchemaschemaupgrade
* plan分为两阶段
*/
async makeUpgradePlan() {
const originSchema = await this.readSchema();
const plan = this.diffSchema(originSchema, this.translator.schema);
return plan;
}
/**
* schema的不同new对old的增量
* @param schemaOld
* @param schemaNew
*/
diffSchema(schemaOld: StorageSchema<any>, schemaNew: StorageSchema<any>) {
const plan: Plan = {
newTables: {},
newIndexes: {},
updatedIndexes: {},
updatedTables: {},
};
for (const table in schemaNew) {
// PostgreSQL 表名区分大小写(使用双引号时)
if (schemaOld[table]) {
const { attributes, indexes } = schemaOld[table];
const { attributes: attributesNew, indexes: indexesNew } = schemaNew[table];
const assignToUpdateTables = (attr: string, isNew: boolean) => {
const skipAttrs = ['$$seq$$', '$$createAt$$', '$$updateAt$$', '$$deleteAt$$', 'id'];
if (skipAttrs.includes(attr)) {
return;
}
if (!plan.updatedTables[table]) {
plan.updatedTables[table] = {
attributes: {
[attr]: {
...attributesNew[attr],
isNew,
}
}
};
} else {
plan.updatedTables[table].attributes[attr] = {
...attributesNew[attr],
isNew,
};
}
};
for (const attr in attributesNew) {
if (attributes[attr]) {
// 比较两次创建的属性定义是否一致
const sql1 = this.translator.translateAttributeDef(attr, attributesNew[attr]);
const sql2 = this.translator.translateAttributeDef(attr, attributes[attr]);
if (!this.translator.compareSql(sql1, sql2)) {
assignToUpdateTables(attr, false);
}
} else {
assignToUpdateTables(attr, true);
}
}
if (indexesNew) {
const assignToIndexes = (index: Index<any>, isNew: boolean) => {
if (isNew) {
if (plan.newIndexes[table]) {
plan.newIndexes[table].push(index);
} else {
plan.newIndexes[table] = [index];
}
} else {
if (plan.updatedIndexes[table]) {
plan.updatedIndexes[table].push(index);
} else {
plan.updatedIndexes[table] = [index];
}
}
};
const compareConfig = (config1?: IndexConfig, config2?: IndexConfig) => {
const unique1 = config1?.unique || false;
const unique2 = config2?.unique || false;
if (unique1 !== unique2) {
return false;
}
const type1 = config1?.type || 'btree';
const type2 = config2?.type || 'btree';
// tsConfig 比较
const tsConfig1 = config1?.tsConfig;
const tsConfig2 = config2?.tsConfig;
if (JSON.stringify(tsConfig1) !== JSON.stringify(tsConfig2)) {
return false;
}
return type1 === type2;
};
for (const index of indexesNew) {
const { name, config, attributes: indexAttrs } = index;
const origin = indexes?.find(ele => ele.name === name);
if (origin) {
if (JSON.stringify(indexAttrs) !== JSON.stringify(origin.attributes)) {
assignToIndexes(index, false);
} else {
if (!compareConfig(config, origin.config)) {
assignToIndexes(index, false);
}
}
} else {
assignToIndexes(index, true);
}
}
}
} else {
plan.newTables[table] = {
attributes: schemaNew[table].attributes,
};
if (schemaNew[table].indexes) {
plan.newIndexes[table] = schemaNew[table].indexes!;
}
}
}
return plan;
} }
} }

View File

@ -1,7 +1,7 @@
import assert from 'assert'; import assert from 'assert';
import { format } from 'util'; import { format } from 'util';
import { assign, difference } from 'lodash'; import { assign, difference } from 'lodash';
import { EntityDict, Geo, Q_FullTextValue, RefOrExpression, Ref, StorageSchema, Index, RefAttr, DeleteAtAttribute, TriggerDataAttribute, TriggerUuidAttribute, UpdateAtAttribute } from "oak-domain/lib/types"; import { EntityDict, Geo, Q_FullTextValue, RefOrExpression, Ref, StorageSchema, Index, RefAttr, DeleteAtAttribute, TriggerDataAttribute, TriggerUuidAttribute, UpdateAtAttribute, Attribute, Attributes } from "oak-domain/lib/types";
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain'; import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { DataType, DataTypeParams } from "oak-domain/lib/types/schema/DataTypes"; import { DataType, DataTypeParams } from "oak-domain/lib/types/schema/DataTypes";
import { SqlOperateOption, SqlSelectOption, SqlTranslator } from "../sqlTranslator"; import { SqlOperateOption, SqlSelectOption, SqlTranslator } from "../sqlTranslator";
@ -796,6 +796,7 @@ export class PostgreSQLTranslator<ED extends EntityDict & BaseEntityDict> extend
else if (typeof value === 'number') { else if (typeof value === 'number') {
return `${value}`; return `${value}`;
} }
assert(typeof value === 'string', 'Invalid date/time value');
return `'${(new Date(value)).valueOf()}'`; return `'${(new Date(value)).valueOf()}'`;
} }
case 'object': case 'object':
@ -1066,15 +1067,10 @@ export class PostgreSQLTranslator<ED extends EntityDict & BaseEntityDict> extend
indexSql += '('; indexSql += '(';
const indexColumns: string[] = []; const indexColumns: string[] = [];
let includeDeleteAt = false;
for (const indexAttr of indexAttrs) { for (const indexAttr of indexAttrs) {
const { name: attrName, direction } = indexAttr; const { name: attrName, direction } = indexAttr;
if (attrName === '$$deleteAt$$') {
includeDeleteAt = true;
}
if (indexType === 'fulltext') { if (indexType === 'fulltext') {
// 全文索引:使用 to_tsvector // 全文索引:使用 to_tsvector
indexColumns.push(`to_tsvector('${tsLang}', COALESCE("${attrName as string}", ''))`); indexColumns.push(`to_tsvector('${tsLang}', COALESCE("${attrName as string}", ''))`);
@ -1088,11 +1084,6 @@ export class PostgreSQLTranslator<ED extends EntityDict & BaseEntityDict> extend
} }
} }
// 非特殊索引自动包含 deleteAt
if (!includeDeleteAt && !indexType) {
indexColumns.push('"$$deleteAt$$"');
}
indexSql += indexColumns.join(', '); indexSql += indexColumns.join(', ');
indexSql += ');'; indexSql += ');';
@ -2038,4 +2029,465 @@ export class PostgreSQLTranslator<ED extends EntityDict & BaseEntityDict> extend
// 这个方法不应该被直接调用了因为translateRemove已经重写 // 这个方法不应该被直接调用了因为translateRemove已经重写
throw new Error('populateRemoveStmt should not be called directly in PostgreSQL. Use translateRemove instead.'); throw new Error('populateRemoveStmt should not be called directly in PostgreSQL. Use translateRemove instead.');
} }
/**
* PostgreSQL Type oak populateDataTypeDef
* @param type PostgreSQL
*/
private reTranslateToAttribute(type: string): Attribute {
// 处理带长度的类型character varying(255), character(10)
const varcharMatch = /^character varying\((\d+)\)$/.exec(type);
if (varcharMatch) {
return {
type: 'varchar',
params: {
length: parseInt(varcharMatch[1], 10),
}
};
}
const charMatch = /^character\((\d+)\)$/.exec(type);
if (charMatch) {
return {
type: 'char',
params: {
length: parseInt(charMatch[1], 10),
}
};
}
// 处理带精度和小数位的类型numeric(10,2)
const numericWithScaleMatch = /^numeric\((\d+),(\d+)\)$/.exec(type);
if (numericWithScaleMatch) {
return {
type: 'decimal',
params: {
precision: parseInt(numericWithScaleMatch[1], 10),
scale: parseInt(numericWithScaleMatch[2], 10),
},
};
}
// 处理只带精度的类型numeric(10), timestamp(6)
const numericMatch = /^numeric\((\d+)\)$/.exec(type);
if (numericMatch) {
return {
type: 'decimal',
params: {
precision: parseInt(numericMatch[1], 10),
scale: 0,
},
};
}
const timestampMatch = /^timestamp\((\d+)\) without time zone$/.exec(type);
if (timestampMatch) {
return {
type: 'timestamp',
params: {
precision: parseInt(timestampMatch[1], 10),
},
};
}
const timeMatch = /^time\((\d+)\) without time zone$/.exec(type);
if (timeMatch) {
return {
type: 'time',
params: {
precision: parseInt(timeMatch[1], 10),
},
};
}
// PostgreSQL 类型映射到 oak 类型
const typeMap: Record<string, DataType> = {
'bigint': 'bigint',
'integer': 'integer',
'smallint': 'smallint',
'real': 'real',
'double precision': 'double precision',
'boolean': 'boolean',
'text': 'text',
'jsonb': 'object',
'json': 'object',
'bytea': 'bytea',
'character varying': 'varchar',
'character': 'char',
'timestamp without time zone': 'timestamp',
'time without time zone': 'time',
'date': 'date',
'uuid': 'uuid',
'geometry': 'geometry',
'numeric': 'decimal',
};
const mappedType = typeMap[type];
if (mappedType) {
return { type: mappedType };
}
// 如果是用户定义的枚举类型,返回 enum具体值需要额外查询
// 这里先返回基础类型,枚举值在 readSchema 中单独处理
return { type: type as DataType };
}
/**
* PostgreSQL schema
*/
async readSchema(execFn: (sql: string) => Promise<any>): Promise<StorageSchema<ED>> {
const result: StorageSchema<ED> = {} as StorageSchema<ED>;
// 1. 获取所有表
const tablesSql = `
SELECT tablename
FROM pg_tables
WHERE schemaname = 'public'
ORDER BY tablename;
`;
const [tablesResult] = await execFn(tablesSql);
for (const tableRow of tablesResult) {
const tableName = tableRow.tablename;
// 2. 获取表的列信息
const columnsSql = `
SELECT
column_name,
data_type,
character_maximum_length,
numeric_precision,
numeric_scale,
is_nullable,
column_default,
udt_name
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = '${tableName}'
ORDER BY ordinal_position;
`;
const [columnsResult] = await execFn(columnsSql);
const attributes: Attributes<any> = {};
for (const col of columnsResult) {
const {
column_name: colName,
data_type: dataType,
character_maximum_length: maxLength,
numeric_precision: precision,
numeric_scale: scale,
is_nullable: isNullable,
column_default: defaultValue,
udt_name: udtName,
} = col;
let attr: Attribute;
// 处理用户定义类型(枚举)
if (dataType === 'USER-DEFINED') {
const enumSql = `
SELECT e.enumlabel
FROM pg_type t
JOIN pg_enum e ON t.oid = e.enumtypid
WHERE t.typname = '${udtName}'
ORDER BY e.enumsortorder;
`;
const [enumResult] = await execFn(enumSql);
const enumeration = enumResult.map((r: any) => r.enumlabel);
attr = {
type: 'enum',
enumeration,
};
} else {
// 构建完整的类型字符串
let fullType = dataType;
const integerTypes = ['bigint', 'integer', 'smallint', 'serial', 'bigserial', 'smallserial'];
if (maxLength && !integerTypes.includes(dataType)) {
fullType = `${dataType}(${maxLength})`;
} else if (precision !== null && scale !== null && !integerTypes.includes(dataType)) {
fullType = `${dataType}(${precision},${scale})`;
} else if (precision !== null && !integerTypes.includes(dataType)) {
fullType = `${dataType}(${precision})`;
}
attr = this.reTranslateToAttribute(fullType);
}
// ========== 类型还原逻辑 ==========
// 框架将某些语义类型存储为 bigint需要根据列名还原
if (attr.type === 'bigint') {
// 1. 检查是否是序列列
if (colName === '$$seq$$' || (defaultValue && defaultValue.includes('nextval'))) {
attr.type = 'sequence';
attr.sequenceStart = 10000; // 默认起始值
}
// 2. 检查是否是时间戳列
else if (['$$createAt$$', '$$updateAt$$', '$$deleteAt$$'].includes(colName)) {
attr.type = 'datetime';
}
// 3. 检查其他可能的时间类型列(根据命名约定)
else if (colName.endsWith('At') || colName.endsWith('Time')) {
// 可选:根据业务约定判断是否应该是 datetime
// 这里保守处理,只转换框架标准字段
}
}
// 处理约束 - 只在为 true 时添加
if (isNullable === 'NO') {
attr.notNull = true;
}
// 处理默认值
if (defaultValue && !defaultValue.includes('nextval')) {
let cleanDefault = defaultValue.replace(/::[a-z]+/gi, '').replace(/'/g, '');
if (cleanDefault === 'true') {
attr.default = true;
} else if (cleanDefault === 'false') {
attr.default = false;
} else if (!isNaN(Number(cleanDefault))) {
attr.default = Number(cleanDefault);
} else if (cleanDefault !== '') {
attr.default = cleanDefault;
}
}
// 检查唯一约束
const uniqueSql = `
SELECT COUNT(*) as cnt
FROM pg_index ix
JOIN pg_class t ON t.oid = ix.indrelid
JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey)
WHERE t.relname = '${tableName}'
AND a.attname = '${colName}'
AND ix.indisunique = true
AND NOT ix.indisprimary
AND array_length(ix.indkey, 1) = 1;
`;
const [uniqueResult] = await execFn(uniqueSql);
const uniqueCount = parseInt(uniqueResult[0]?.cnt || '0', 10);
if (uniqueCount > 0) {
attr.unique = true;
}
attributes[colName] = attr;
}
// 3. 获取索引信息
const indexesSql = `
SELECT
i.relname as index_name,
ix.indisunique as is_unique,
am.amname as index_type,
pg_get_indexdef(ix.indexrelid) as index_def
FROM pg_class t
JOIN pg_index ix ON t.oid = ix.indrelid
JOIN pg_class i ON i.oid = ix.indexrelid
JOIN pg_am am ON i.relam = am.oid
WHERE t.relname = '${tableName}'
AND t.relkind = 'r'
AND i.relname NOT LIKE '%_pkey'
AND NOT ix.indisprimary
ORDER BY i.relname;
`;
const [indexesResult] = await execFn(indexesSql) as [{
index_name: string;
is_unique: boolean;
index_type: string;
index_def: string;
}[]]
if (indexesResult.length > 0) {
const indexes: Index<any>[] = [];
for (const row of indexesResult) {
const { index_name: indexName, is_unique: isUnique, index_type: indexType, index_def: indexDef } = row;
// 解析索引定义以获取列名和配置
const index: Index<any> = {
name: indexName,
attributes: [],
};
// 解析索引定义字符串
// 示例: CREATE INDEX "user_index_fulltext_chinese" ON public."user" USING gin (to_tsvector('chinese'::regconfig, (COALESCE(name, ''::text) || ' '::text) || COALESCE(nickname, ''::text)))
if (indexType === 'gin' && indexDef.includes('to_tsvector')) {
// 全文索引
index.config = { type: 'fulltext' };
// 提取 tsConfig
const tsConfigMatch = indexDef.match(/to_tsvector\('([^']+)'/);
if (tsConfigMatch) {
const tsConfig = tsConfigMatch[1];
index.config.tsConfig = tsConfig;
}
// 提取列名(从 COALESCE 中)
const columnMatches = indexDef.matchAll(/COALESCE\("?([^",\s]+)"?/g);
const columns = Array.from(columnMatches, m => m[1]);
index.attributes = columns.map(col => ({ name: col }));
// 处理多语言索引的情况:移除语言后缀
// 例如: user_index_fulltext_chinese -> index_fulltext
const nameParts = indexName.split('_');
if (nameParts.length > 2) {
const possibleLang = nameParts[nameParts.length - 1];
// 如果最后一部分是语言代码,移除它
if (['chinese', 'english', 'simple', 'german', 'french', 'spanish', 'russian', 'japanese'].includes(possibleLang)) {
index.name = nameParts.slice(0, -1).join('_');
}
}
} else if (indexType === 'gist') {
// 空间索引
index.config = { type: 'spatial' };
// 提取列名
const columnMatch = indexDef.match(/\(([^)]+)\)/);
if (columnMatch) {
const columns = columnMatch[1].split(',').map(c => c.trim().replace(/"/g, ''));
index.attributes = columns.map(col => ({ name: col }));
}
} else if (indexType === 'hash') {
// 哈希索引
index.config = { type: 'hash' };
// 提取列名
const columnMatch = indexDef.match(/\(([^)]+)\)/);
if (columnMatch) {
const columns = columnMatch[1].split(',').map(c => c.trim().replace(/"/g, ''));
index.attributes = columns.map(col => ({ name: col }));
}
} else {
// B-tree 索引(默认)
// 提取列名和排序方向
const columnMatch = indexDef.match(/\(([^)]+)\)/);
if (columnMatch) {
const columnDefs = columnMatch[1].split(',');
index.attributes = columnDefs.map(colDef => {
const trimmed = colDef.trim().replace(/"/g, '');
const parts = trimmed.split(/\s+/);
const attr: any = { name: parts[0] };
// 检查排序方向
if (parts.includes('DESC')) {
attr.direction = 'DESC';
} else if (parts.includes('ASC')) {
attr.direction = 'ASC';
}
return attr;
});
}
// 如果是唯一索引
if (isUnique) {
index.config = { unique: true };
}
}
// 移除表名前缀(如果存在)
// 例如: user_index_fulltext -> index_fulltext
if (index.name.startsWith(`${tableName}_`)) {
index.name = index.name.substring(tableName.length + 1);
}
indexes.push(index);
}
Object.assign(result, {
[tableName]: {
attributes,
indexes,
}
});
} else {
Object.assign(result, {
[tableName]: {
attributes,
}
});
}
}
return result;
}
/**
* PostgreSQL DDL
* @param attr
* @param attrDef
*/
translateAttributeDef(attr: string, attrDef: Attribute): string {
let sql = `"${attr}" `;
const {
type,
params,
default: defaultValue,
unique,
notNull,
sequenceStart,
enumeration,
} = attrDef;
// 处理序列类型IDENTITY
if (type === 'sequence' || (typeof sequenceStart === 'number')) {
sql += `bigint GENERATED BY DEFAULT AS IDENTITY (START WITH ${sequenceStart || 10000}) UNIQUE`;
return sql;
}
// 处理枚举类型
if (type === 'enum') {
assert(enumeration, 'Enum type requires enumeration values');
sql += `enum(${enumeration.map(v => `'${v}'`).join(',')})`;
} else {
sql += this.populateDataTypeDef(type, params, enumeration);
}
// NOT NULL 约束
if (notNull || type === 'geometry') {
sql += ' NOT NULL';
}
// UNIQUE 约束
if (unique) {
sql += ' UNIQUE';
}
// 默认值
if (defaultValue !== undefined && !sequenceStart) {
assert(type !== 'ref', 'ref type should not have default value');
sql += ` DEFAULT ${this.translateAttrValue(type, defaultValue)}`;
}
// 主键
if (attr === 'id') {
sql += ' PRIMARY KEY';
}
return sql;
}
/**
* SQL schema diff
*
*/
compareSql(sql1: string, sql2: string): boolean {
// 标准化 SQL移除多余空格统一大小写
const normalize = (sql: string): string => {
return sql
.replace(/\s+/g, ' ') // 多个空格合并为一个
.replace(/\(\s+/g, '(') // 移除括号后的空格
.replace(/\s+\)/g, ')') // 移除括号前的空格
.replace(/,\s+/g, ',') // 移除逗号后的空格
.trim()
.toLowerCase();
};
return normalize(sql1) === normalize(sql2);
}
} }

View File

@ -82,6 +82,8 @@ export abstract class SqlTranslator<ED extends EntityDict & BaseEntityDict> {
name: `${entity}_trigger_uuid_auto_create`, name: `${entity}_trigger_uuid_auto_create`,
attributes: [{ attributes: [{
name: TriggerUuidAttribute, name: TriggerUuidAttribute,
}, {
name: DeleteAtAttribute,
}] }]
}, },
]; ];
@ -97,7 +99,7 @@ export abstract class SqlTranslator<ED extends EntityDict & BaseEntityDict> {
attributes: [{ attributes: [{
name: attr, name: attr,
}, { }, {
name: '$$deleteAt$$', name: DeleteAtAttribute,
}] }]
}); });
} }
@ -116,7 +118,7 @@ export abstract class SqlTranslator<ED extends EntityDict & BaseEntityDict> {
}, { }, {
name: 'entityId', name: 'entityId',
}, { }, {
name: '$$deleteAt$$', name: DeleteAtAttribute,
}] }]
}); });
} }
@ -132,7 +134,7 @@ export abstract class SqlTranslator<ED extends EntityDict & BaseEntityDict> {
attributes: [{ attributes: [{
name: attr, name: attr,
}, { }, {
name: '$$deleteAt$$', name: DeleteAtAttribute,
}] }]
}); });
} }
@ -151,7 +153,7 @@ export abstract class SqlTranslator<ED extends EntityDict & BaseEntityDict> {
}, { }, {
name: 'expiresAt', name: 'expiresAt',
}, { }, {
name: '$$deleteAt$$', name: DeleteAtAttribute,
}] }]
}); });
} }

View File

@ -1,9 +1,61 @@
import { EntityDict } from "oak-domain/lib/base-app-domain"; import {
Attribute,
EntityDict,
Index,
OperateOption,
OperationResult,
StorageSchema,
TxnOption,
} from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { AsyncContext,AsyncRowStore } from "oak-domain/lib/store/AsyncRowStore"; import { AsyncContext,AsyncRowStore } from "oak-domain/lib/store/AsyncRowStore";
import { CreateEntityOption } from "./Translator"; import { CreateEntityOption } from "./Translator";
import { AggregationResult, SelectOption } from "oak-domain/lib/types";
export interface DbStore<ED extends EntityDict, Cxt extends AsyncContext<ED>> extends AsyncRowStore<ED, Cxt> { export type Plan = {
newTables: Record<string, {
attributes: Record<string, Attribute>;
}>;
newIndexes: Record<string, Index<any>[]>;
updatedTables: Record<string, {
attributes: Record<string, Attribute & { isNew: boolean }>;
}>;
updatedIndexes: Record<string, Index<any>[]>;
};
export interface DbStore<ED extends EntityDict & BaseEntityDict, Cxt extends AsyncContext<ED>> extends AsyncRowStore<ED, Cxt> {
checkRelationAsync<T extends keyof ED, Cxt extends AsyncContext<ED>>(entity: T, operation: Omit<ED[T]['Operation'] | ED[T]['Selection'], 'id'>, context: Cxt): Promise<void>;
connect: () => Promise<void>; connect: () => Promise<void>;
disconnect: () => Promise<void>; disconnect: () => Promise<void>;
initialize(options: CreateEntityOption): Promise<void>; initialize(options: CreateEntityOption): Promise<void>;
aggregate<T extends keyof ED, OP extends SelectOption>(
entity: T,
aggregation: ED[T]['Aggregation'],
context: Cxt,
option: OP
): Promise<AggregationResult<ED[T]['Schema']>>;
operate<T extends keyof ED>(
entity: T,
operation: ED[T]['Operation'],
context: Cxt,
option: OperateOption
): Promise<OperationResult<ED>>;
select<T extends keyof ED>(
entity: T,
selection: ED[T]['Selection'],
context: Cxt,
option: SelectOption
): Promise<Partial<ED[T]['Schema']>[]>;
count<T extends keyof ED>(
entity: T,
selection: Pick<ED[T]['Selection'], 'filter' | 'count'>,
context: Cxt,
option: SelectOption
): Promise<number>;
begin(option?: TxnOption): Promise<string>;
commit(txnId: string): Promise<void>;
rollback(txnId: string): Promise<void>;
readSchema(): Promise<StorageSchema<ED>>;
makeUpgradePlan(): Promise<Plan>;
diffSchema(schemaOld: StorageSchema<any>, schemaNew: StorageSchema<any>): Plan;
}; };

View File

@ -1835,4 +1835,90 @@ export default (storeGetter: () => DbStore<EntityDict, TestContext>) => {
assert(r1.length === 1, `Deeply nested query failed`); assert(r1.length === 1, `Deeply nested query failed`);
assert(r1[0].system?.platform?.name === 'test_platform', `Nested projection failed`); assert(r1[0].system?.platform?.name === 'test_platform', `Nested projection failed`);
}); });
it('[1.4.1]嵌套跨节点表达式', async () => {
const store = storeGetter();
const context = new TestContext(store);
await context.begin();
const platformId = v4();
await store.operate('platform', {
id: v4(),
action: 'create',
data: {
id: platformId,
name: 'test2'
}
}, context, {})
await store.operate('application', {
id: v4(),
action: 'create',
data: [{
id: v4(),
name: 'test',
description: 'ttttt',
type: 'web',
system: {
id: v4(),
action: 'create',
data: {
id: v4(),
name: 'systest',
folder: 'systest',
platformId,
}
},
}, {
id: v4(),
name: 'test222',
description: 'ttttt2',
type: 'web',
system: {
id: v4(),
action: 'create',
data: {
id: v4(),
name: 'test2222',
folder: 'test2',
platformId,
}
},
}]
}, context, {});
process.env.NODE_ENV = 'development';
// 查询所有的application过滤条件是application->system.name = application->system->platform.name
const apps = await store.select('application', {
data: {
id: 1,
name: 1,
system: {
id: 1,
name: 1,
platform: {
id: 1,
name: 1,
}
}
},
filter: {
system: {
"#id": 'node-1',
platform: {
$expr: {
$eq: [
{ "#attr": 'name' },
{ "#refId": 'node-1', "#refAttr": 'folder' }
]
}
}
}
}
}, context, {});
process.env.NODE_ENV = undefined;
assert(apps.length === 1 && apps[0].name === 'test222');
await context.commit();
});
} }