oak-db/lib/PostgreSQL/store.js

528 lines
22 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.PostgreSQLStore = void 0;
const tslib_1 = require("tslib");
const CascadeStore_1 = require("oak-domain/lib/store/CascadeStore");
const connector_1 = require("./connector");
const translator_1 = require("./translator");
const lodash_1 = require("lodash");
const assert_1 = tslib_1.__importDefault(require("assert"));
const relation_1 = require("oak-domain/lib/store/relation");
const ToNumberAttrs = new Set([
'$$seq$$',
'$$createAt$$',
'$$updateAt$$',
'$$deleteAt$$',
]);
function convertGeoTextToObject(geoText) {
if (geoText.startsWith('POINT')) {
const coord = geoText.match(/(-?\d+\.?\d*)/g);
(0, assert_1.default)(coord && coord.length === 2);
return {
type: 'point',
coordinate: coord.map(ele => parseFloat(ele)),
};
}
else if (geoText.startsWith('LINESTRING')) {
const coordsMatch = geoText.match(/\(([^)]+)\)/);
if (coordsMatch) {
const points = coordsMatch[1].split(',').map(p => {
const [x, y] = p.trim().split(/\s+/).map(parseFloat);
return [x, y];
});
return {
type: 'path',
coordinate: points,
};
}
}
else if (geoText.startsWith('POLYGON')) {
const ringsMatch = geoText.match(/\(\(([^)]+)\)\)/g);
if (ringsMatch) {
const rings = ringsMatch.map(ring => {
const coordStr = ring.replace(/[()]/g, '');
return coordStr.split(',').map(p => {
const [x, y] = p.trim().split(/\s+/).map(parseFloat);
return [x, y];
});
});
return {
type: 'polygon',
coordinate: rings,
};
}
}
throw new Error(`Unsupported geometry type: ${geoText.slice(0, 50)}`);
}
class PostgreSQLStore extends CascadeStore_1.CascadeStore {
countAbjointRow(entity, selection, context, option) {
throw new Error('PostgreSQL store 不支持同步取数据');
}
aggregateAbjointRowSync(entity, aggregation, context, option) {
throw new Error('PostgreSQL store 不支持同步取数据');
}
selectAbjointRow(entity, selection, context, option) {
throw new Error('PostgreSQL store 不支持同步取数据');
}
updateAbjointRow(entity, operation, context, option) {
throw new Error('PostgreSQL store 不支持同步更新数据');
}
async exec(script, txnId) {
await this.connector.exec(script, txnId);
}
connector;
translator;
constructor(storageSchema, configuration) {
super(storageSchema);
this.connector = new connector_1.PostgreSQLConnector(configuration);
this.translator = new translator_1.PostgreSQLTranslator(storageSchema);
}
checkRelationAsync(entity, operation, context) {
throw new Error('Method not implemented.');
}
async aggregateAbjointRowAsync(entity, aggregation, context, option) {
const sql = this.translator.translateAggregate(entity, aggregation, option);
const result = await this.connector.exec(sql, context.getCurrentTxnId());
return this.formResult(entity, result[0]);
}
aggregate(entity, aggregation, context, option) {
return this.aggregateAsync(entity, aggregation, context, option);
}
supportManyToOneJoin() {
return true;
}
supportMultipleCreate() {
return true;
}
formResult(entity, result) {
const schema = this.getSchema();
function resolveAttribute(entity2, r, attr, value) {
const { attributes, view } = schema[entity2];
if (!view) {
const i = attr.indexOf(".");
if (i !== -1) {
const attrHead = attr.slice(0, i);
const attrTail = attr.slice(i + 1);
const rel = (0, relation_1.judgeRelation)(schema, entity2, attrHead);
if (rel === 1) {
(0, lodash_1.set)(r, attr, value);
}
else {
if (!r[attrHead]) {
r[attrHead] = {};
}
if (rel === 0) {
resolveAttribute(entity2, r[attrHead], attrTail, value);
}
else if (rel === 2) {
resolveAttribute(attrHead, r[attrHead], attrTail, value);
}
else {
(0, assert_1.default)(typeof rel === 'string');
resolveAttribute(rel, r[attrHead], attrTail, value);
}
}
}
else if (attributes[attr]) {
const { type } = attributes[attr];
switch (type) {
case 'date':
case 'time':
case 'datetime': {
if (value instanceof Date) {
r[attr] = value.valueOf();
}
else {
if (typeof value === 'string') {
r[attr] = parseInt(value, 10);
}
else {
(0, assert_1.default)(typeof value === 'number' || value === null);
r[attr] = value;
}
}
break;
}
case 'geometry': {
if (typeof value === 'string') {
r[attr] = convertGeoTextToObject(value);
}
else {
r[attr] = value;
}
break;
}
case 'object':
case 'array': {
// PostgreSQL jsonb 直接返回对象,不需要 parse
if (typeof value === 'string') {
r[attr] = JSON.parse(value);
}
else {
r[attr] = value;
}
break;
}
case 'function': {
if (typeof value === 'string') {
r[attr] = `return ${Buffer.from(value, 'base64').toString()}`;
}
else {
r[attr] = value;
}
break;
}
case 'bool':
case 'boolean': {
// PostgreSQL 直接返回 boolean 类型
r[attr] = value;
break;
}
case 'decimal': {
// PostgreSQL numeric 类型可能返回字符串
if (typeof value === 'string') {
r[attr] = parseFloat(value);
}
else {
(0, assert_1.default)(value === null || typeof value === 'number');
r[attr] = value;
}
break;
}
// TODO: 这里和mysql统一行为ref类型的字符串去除前后空格
case "char":
case "ref": {
if (value) {
(0, assert_1.default)(typeof value === 'string');
r[attr] = value.trim();
}
else {
r[attr] = value;
}
break;
}
default: {
r[attr] = value;
}
}
}
else {
// TODO: 这里和mysql统一行为id字段为char类型时去除后面的空格
if (value && typeof value === 'string') {
if (attr === 'id') {
r[attr] = value.trim();
}
else if (attr.startsWith("#count")) {
// PostgreSQL count 返回字符串
r[attr] = parseInt(value, 10);
}
else if (attr.startsWith("#sum") || attr.startsWith("#avg") || attr.startsWith("#min") || attr.startsWith("#max")) {
// PostgreSQL sum/avg/min/max 返回字符串
r[attr] = parseFloat(value);
}
else if (ToNumberAttrs.has(attr)) {
// PostgreSQL sum/avg/min/max 返回字符串
r[attr] = parseInt(value, 10);
}
else {
r[attr] = value;
}
}
else {
r[attr] = value;
}
}
}
else {
(0, lodash_1.assign)(r, { [attr]: value });
}
}
function removeNullObjects(r, e) {
for (let attr in r) {
const rel = (0, relation_1.judgeRelation)(schema, e, attr);
if (rel === 2) {
if (r[attr].id === null) {
(0, assert_1.default)(schema[e].toModi || r.entity !== attr);
delete r[attr];
continue;
}
removeNullObjects(r[attr], attr);
}
else if (typeof rel === 'string') {
if (r[attr].id === null) {
(0, assert_1.default)(schema[e].toModi || r[`${attr}Id`] === null, `对象${String(e)}取数据时发现其外键找不到目标对象rowId是${r.id},其外键${attr}Id值为${r[`${attr}Id`]}`);
delete r[attr];
continue;
}
removeNullObjects(r[attr], rel);
}
}
}
function formSingleRow(r) {
let result2 = {};
for (let attr in r) {
const value = r[attr];
resolveAttribute(entity, result2, attr, value);
}
removeNullObjects(result2, entity);
return result2;
}
if (result instanceof Array) {
return result.map(r => formSingleRow(r));
}
return formSingleRow(result);
}
async selectAbjointRowAsync(entity, selection, context, option) {
const sql = this.translator.translateSelect(entity, selection, option);
const result = await this.connector.exec(sql, context.getCurrentTxnId());
return this.formResult(entity, result[0]);
}
async updateAbjointRowAsync(entity, operation, context, option) {
const { translator, connector } = this;
const { action } = operation;
const txn = context.getCurrentTxnId();
switch (action) {
case 'create': {
const { data } = operation;
const sql = translator.translateInsert(entity, data instanceof Array ? data : [data]);
const result = await connector.exec(sql, txn);
// PostgreSQL QueryResult.rowCount
return result[1].rowCount || 0;
}
case 'remove': {
const sql = translator.translateRemove(entity, operation, option);
const result = await connector.exec(sql, txn);
return result[1].rowCount || 0;
}
default: {
(0, assert_1.default)(!['select', 'download', 'stat'].includes(action));
const sql = translator.translateUpdate(entity, operation, option);
const result = await connector.exec(sql, txn);
return result[1].rowCount || 0;
}
}
}
async operate(entity, operation, context, option) {
const { action } = operation;
(0, assert_1.default)(!['select', 'download', 'stat'].includes(action), '不支持使用 select operation');
return await super.operateAsync(entity, operation, context, option);
}
async select(entity, selection, context, option) {
return await super.selectAsync(entity, selection, context, option);
}
async countAbjointRowAsync(entity, selection, context, option) {
const sql = this.translator.translateCount(entity, selection, option);
const result = await this.connector.exec(sql, context.getCurrentTxnId());
// PostgreSQL 返回的 count 是 string 类型bigint
const cnt = result[0][0]?.cnt;
return typeof cnt === 'string' ? parseInt(cnt, 10) : (cnt || 0);
}
async count(entity, selection, context, option) {
return this.countAsync(entity, selection, context, option);
}
async begin(option) {
return await this.connector.startTransaction(option);
}
async commit(txnId) {
await this.connector.commitTransaction(txnId);
}
async rollback(txnId) {
await this.connector.rollbackTransaction(txnId);
}
async connect() {
await this.connector.connect();
}
async disconnect() {
await this.connector.disconnect();
}
async initialize(option) {
// PG的DDL支持事务所以这里直接用一个事务包裹所有的初始化操作
const txn = await this.connector.startTransaction({
isolationLevel: 'serializable',
});
try {
const schema = this.getSchema();
let hasGeoType = false;
let hasChineseTsConfig = false;
for (const entity in schema) {
const { attributes, indexes } = schema[entity];
for (const attr in attributes) {
const { type } = attributes[attr];
if (type === 'geometry') {
hasGeoType = true;
}
}
for (const index of indexes || []) {
if (index.config?.tsConfig === 'chinese' || index.config?.tsConfig?.includes('chinese')) {
hasChineseTsConfig = true;
}
}
}
if (hasGeoType) {
console.log('Initializing PostGIS extension for geometry support...');
await this.connector.exec('CREATE EXTENSION IF NOT EXISTS postgis;');
}
if (hasChineseTsConfig) {
console.log('Initializing Chinese text search configuration...');
const checkChineseConfigSql = `
SELECT COUNT(*) as cnt
FROM pg_catalog.pg_ts_config
WHERE cfgname = 'chinese';
`;
const result = await this.connector.exec(checkChineseConfigSql);
const count = parseInt(result[0][0]?.cnt || '0', 10);
if (count === 0) {
const createChineseConfigSql = `
CREATE EXTENSION IF NOT EXISTS zhparser;
CREATE TEXT SEARCH CONFIGURATION chinese (PARSER = zhparser);
ALTER TEXT SEARCH CONFIGURATION chinese ADD MAPPING FOR n,v,a,i,e,l WITH simple;
`;
await this.connector.exec(createChineseConfigSql);
}
}
for (const entity in schema) {
const sqls = this.translator.translateCreateEntity(entity, option);
for (const sql of sqls) {
await this.connector.exec(sql, txn);
}
}
await this.connector.commitTransaction(txn);
}
catch (error) {
await this.connector.rollbackTransaction(txn);
throw error;
}
}
// 从数据库中读取当前schema
readSchema() {
return this.translator.readSchema((sql) => this.connector.exec(sql));
}
/**
* 根据载入的dataSchema和数据库中原来的schema决定如何来upgrade
* 制订出来的plan分为两阶段增加阶段和削减阶段在两个阶段之间由用户来修正数据
*/
async makeUpgradePlan() {
const originSchema = await this.readSchema();
const plan = this.diffSchema(originSchema, this.translator.schema);
return plan;
}
/**
* 比较两个schema的不同这里计算的是new对old的增量
* @param schemaOld
* @param schemaNew
*/
diffSchema(schemaOld, schemaNew) {
const plan = {
newTables: {},
newIndexes: {},
updatedIndexes: {},
updatedTables: {},
};
for (const table in schemaNew) {
// PostgreSQL 表名区分大小写(使用双引号时)
if (schemaOld[table]) {
const { attributes, indexes } = schemaOld[table];
const { attributes: attributesNew, indexes: indexesNew } = schemaNew[table];
const assignToUpdateTables = (attr, isNew) => {
const skipAttrs = ['$$seq$$', '$$createAt$$', '$$updateAt$$', '$$deleteAt$$', 'id'];
if (skipAttrs.includes(attr)) {
return;
}
if (!plan.updatedTables[table]) {
plan.updatedTables[table] = {
attributes: {
[attr]: {
...attributesNew[attr],
isNew,
}
}
};
}
else {
plan.updatedTables[table].attributes[attr] = {
...attributesNew[attr],
isNew,
};
}
};
for (const attr in attributesNew) {
if (attributes[attr]) {
// 比较两次创建的属性定义是否一致
const sql1 = this.translator.translateAttributeDef(attr, attributesNew[attr]);
const sql2 = this.translator.translateAttributeDef(attr, attributes[attr]);
if (!this.translator.compareSql(sql1, sql2)) {
assignToUpdateTables(attr, false);
}
}
else {
assignToUpdateTables(attr, true);
}
}
if (indexesNew) {
const assignToIndexes = (index, isNew) => {
if (isNew) {
if (plan.newIndexes[table]) {
plan.newIndexes[table].push(index);
}
else {
plan.newIndexes[table] = [index];
}
}
else {
if (plan.updatedIndexes[table]) {
plan.updatedIndexes[table].push(index);
}
else {
plan.updatedIndexes[table] = [index];
}
}
};
const compareConfig = (config1, config2) => {
const unique1 = config1?.unique || false;
const unique2 = config2?.unique || false;
if (unique1 !== unique2) {
return false;
}
const type1 = config1?.type || 'btree';
const type2 = config2?.type || 'btree';
// tsConfig 比较
const tsConfig1 = config1?.tsConfig;
const tsConfig2 = config2?.tsConfig;
if (JSON.stringify(tsConfig1) !== JSON.stringify(tsConfig2)) {
return false;
}
return type1 === type2;
};
for (const index of indexesNew) {
const { name, config, attributes: indexAttrs } = index;
const origin = indexes?.find(ele => ele.name === name);
if (origin) {
if (JSON.stringify(indexAttrs) !== JSON.stringify(origin.attributes)) {
assignToIndexes(index, false);
}
else {
if (!compareConfig(config, origin.config)) {
assignToIndexes(index, false);
}
}
}
else {
assignToIndexes(index, true);
}
}
}
}
else {
plan.newTables[table] = {
attributes: schemaNew[table].attributes,
};
if (schemaNew[table].indexes) {
plan.newIndexes[table] = schemaNew[table].indexes;
}
}
}
return plan;
}
}
exports.PostgreSQLStore = PostgreSQLStore;