oak-db/lib/PostgreSQL/store.js

398 lines
16 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.PostgreSQLStore = void 0;
const tslib_1 = require("tslib");
const CascadeStore_1 = require("oak-domain/lib/store/CascadeStore");
const connector_1 = require("./connector");
const translator_1 = require("./translator");
const lodash_1 = require("lodash");
const assert_1 = tslib_1.__importDefault(require("assert"));
const relation_1 = require("oak-domain/lib/store/relation");
const ToNumberAttrs = new Set([
'$$seq$$',
'$$createAt$$',
'$$updateAt$$',
'$$deleteAt$$',
]);
function convertGeoTextToObject(geoText) {
if (geoText.startsWith('POINT')) {
const coord = geoText.match(/(-?\d+\.?\d*)/g);
(0, assert_1.default)(coord && coord.length === 2);
return {
type: 'point',
coordinate: coord.map(ele => parseFloat(ele)),
};
}
else if (geoText.startsWith('LINESTRING')) {
const coordsMatch = geoText.match(/\(([^)]+)\)/);
if (coordsMatch) {
const points = coordsMatch[1].split(',').map(p => {
const [x, y] = p.trim().split(/\s+/).map(parseFloat);
return [x, y];
});
return {
type: 'path',
coordinate: points,
};
}
}
else if (geoText.startsWith('POLYGON')) {
const ringsMatch = geoText.match(/\(\(([^)]+)\)\)/g);
if (ringsMatch) {
const rings = ringsMatch.map(ring => {
const coordStr = ring.replace(/[()]/g, '');
return coordStr.split(',').map(p => {
const [x, y] = p.trim().split(/\s+/).map(parseFloat);
return [x, y];
});
});
return {
type: 'polygon',
coordinate: rings,
};
}
}
throw new Error(`Unsupported geometry type: ${geoText.slice(0, 50)}`);
}
class PostgreSQLStore extends CascadeStore_1.CascadeStore {
countAbjointRow(entity, selection, context, option) {
throw new Error('PostgreSQL store 不支持同步取数据');
}
aggregateAbjointRowSync(entity, aggregation, context, option) {
throw new Error('PostgreSQL store 不支持同步取数据');
}
selectAbjointRow(entity, selection, context, option) {
throw new Error('PostgreSQL store 不支持同步取数据');
}
updateAbjointRow(entity, operation, context, option) {
throw new Error('PostgreSQL store 不支持同步更新数据');
}
async exec(script, txnId) {
await this.connector.exec(script, txnId);
}
connector;
translator;
constructor(storageSchema, configuration) {
super(storageSchema);
this.connector = new connector_1.PostgreSQLConnector(configuration);
this.translator = new translator_1.PostgreSQLTranslator(storageSchema);
}
checkRelationAsync(entity, operation, context) {
throw new Error('Method not implemented.');
}
async aggregateAbjointRowAsync(entity, aggregation, context, option) {
const sql = this.translator.translateAggregate(entity, aggregation, option);
const result = await this.connector.exec(sql, context.getCurrentTxnId());
return this.formResult(entity, result[0]);
}
aggregate(entity, aggregation, context, option) {
return this.aggregateAsync(entity, aggregation, context, option);
}
supportManyToOneJoin() {
return true;
}
supportMultipleCreate() {
return true;
}
formResult(entity, result) {
const schema = this.getSchema();
function resolveAttribute(entity2, r, attr, value) {
const { attributes, view } = schema[entity2];
if (!view) {
const i = attr.indexOf(".");
if (i !== -1) {
const attrHead = attr.slice(0, i);
const attrTail = attr.slice(i + 1);
const rel = (0, relation_1.judgeRelation)(schema, entity2, attrHead);
if (rel === 1) {
(0, lodash_1.set)(r, attr, value);
}
else {
if (!r[attrHead]) {
r[attrHead] = {};
}
if (rel === 0) {
resolveAttribute(entity2, r[attrHead], attrTail, value);
}
else if (rel === 2) {
resolveAttribute(attrHead, r[attrHead], attrTail, value);
}
else {
(0, assert_1.default)(typeof rel === 'string');
resolveAttribute(rel, r[attrHead], attrTail, value);
}
}
}
else if (attributes[attr]) {
const { type } = attributes[attr];
switch (type) {
case 'date':
case 'time':
case 'datetime': {
if (value instanceof Date) {
r[attr] = value.valueOf();
}
else {
if (typeof value === 'string') {
r[attr] = parseInt(value, 10);
}
else {
(0, assert_1.default)(typeof value === 'number' || value === null);
r[attr] = value;
}
}
break;
}
case 'geometry': {
if (typeof value === 'string') {
r[attr] = convertGeoTextToObject(value);
}
else {
r[attr] = value;
}
break;
}
case 'object':
case 'array': {
// PostgreSQL jsonb 直接返回对象,不需要 parse
if (typeof value === 'string') {
r[attr] = JSON.parse(value);
}
else {
r[attr] = value;
}
break;
}
case 'function': {
if (typeof value === 'string') {
r[attr] = `return ${Buffer.from(value, 'base64').toString()}`;
}
else {
r[attr] = value;
}
break;
}
case 'bool':
case 'boolean': {
// PostgreSQL 直接返回 boolean 类型
r[attr] = value;
break;
}
case 'decimal': {
// PostgreSQL numeric 类型可能返回字符串
if (typeof value === 'string') {
r[attr] = parseFloat(value);
}
else {
(0, assert_1.default)(value === null || typeof value === 'number');
r[attr] = value;
}
break;
}
// TODO: 这里和mysql统一行为ref类型的字符串去除前后空格
case "char":
case "ref": {
if (value) {
(0, assert_1.default)(typeof value === 'string');
r[attr] = value.trim();
}
else {
r[attr] = value;
}
break;
}
default: {
r[attr] = value;
}
}
}
else {
// TODO: 这里和mysql统一行为id字段为char类型时去除后面的空格
if (value && typeof value === 'string') {
if (attr === 'id') {
r[attr] = value.trim();
}
else if (attr.startsWith("#count")) {
// PostgreSQL count 返回字符串
r[attr] = parseInt(value, 10);
}
else if (attr.startsWith("#sum") || attr.startsWith("#avg") || attr.startsWith("#min") || attr.startsWith("#max")) {
// PostgreSQL sum/avg/min/max 返回字符串
r[attr] = parseFloat(value);
}
else if (ToNumberAttrs.has(attr)) {
// PostgreSQL sum/avg/min/max 返回字符串
r[attr] = parseInt(value, 10);
}
else {
r[attr] = value;
}
}
else {
r[attr] = value;
}
}
}
else {
(0, lodash_1.assign)(r, { [attr]: value });
}
}
function removeNullObjects(r, e) {
for (let attr in r) {
const rel = (0, relation_1.judgeRelation)(schema, e, attr);
if (rel === 2) {
if (r[attr].id === null) {
(0, assert_1.default)(schema[e].toModi || r.entity !== attr);
delete r[attr];
continue;
}
removeNullObjects(r[attr], attr);
}
else if (typeof rel === 'string') {
if (r[attr].id === null) {
(0, assert_1.default)(schema[e].toModi || r[`${attr}Id`] === null, `对象${String(e)}取数据时发现其外键找不到目标对象rowId是${r.id},其外键${attr}Id值为${r[`${attr}Id`]}`);
delete r[attr];
continue;
}
removeNullObjects(r[attr], rel);
}
}
}
function formSingleRow(r) {
let result2 = {};
for (let attr in r) {
const value = r[attr];
resolveAttribute(entity, result2, attr, value);
}
removeNullObjects(result2, entity);
return result2;
}
if (result instanceof Array) {
return result.map(r => formSingleRow(r));
}
return formSingleRow(result);
}
async selectAbjointRowAsync(entity, selection, context, option) {
const sql = this.translator.translateSelect(entity, selection, option);
const result = await this.connector.exec(sql, context.getCurrentTxnId());
return this.formResult(entity, result[0]);
}
async updateAbjointRowAsync(entity, operation, context, option) {
const { translator, connector } = this;
const { action } = operation;
const txn = context.getCurrentTxnId();
switch (action) {
case 'create': {
const { data } = operation;
const sql = translator.translateInsert(entity, data instanceof Array ? data : [data]);
const result = await connector.exec(sql, txn);
// PostgreSQL QueryResult.rowCount
return result[1].rowCount || 0;
}
case 'remove': {
const sql = translator.translateRemove(entity, operation, option);
const result = await connector.exec(sql, txn);
return result[1].rowCount || 0;
}
default: {
(0, assert_1.default)(!['select', 'download', 'stat'].includes(action));
const sql = translator.translateUpdate(entity, operation, option);
const result = await connector.exec(sql, txn);
return result[1].rowCount || 0;
}
}
}
async operate(entity, operation, context, option) {
const { action } = operation;
(0, assert_1.default)(!['select', 'download', 'stat'].includes(action), '不支持使用 select operation');
return await super.operateAsync(entity, operation, context, option);
}
async select(entity, selection, context, option) {
return await super.selectAsync(entity, selection, context, option);
}
async countAbjointRowAsync(entity, selection, context, option) {
const sql = this.translator.translateCount(entity, selection, option);
const result = await this.connector.exec(sql, context.getCurrentTxnId());
// PostgreSQL 返回的 count 是 string 类型bigint
const cnt = result[0][0]?.cnt;
return typeof cnt === 'string' ? parseInt(cnt, 10) : (cnt || 0);
}
async count(entity, selection, context, option) {
return this.countAsync(entity, selection, context, option);
}
async begin(option) {
return await this.connector.startTransaction(option);
}
async commit(txnId) {
await this.connector.commitTransaction(txnId);
}
async rollback(txnId) {
await this.connector.rollbackTransaction(txnId);
}
async connect() {
await this.connector.connect();
}
async disconnect() {
await this.connector.disconnect();
}
async initialize(option) {
// PG的DDL支持事务所以这里直接用一个事务包裹所有的初始化操作
const txn = await this.connector.startTransaction({
isolationLevel: 'serializable',
});
try {
const schema = this.getSchema();
let hasGeoType = false;
let hasChineseTsConfig = false;
for (const entity in schema) {
const { attributes, indexes } = schema[entity];
for (const attr in attributes) {
const { type } = attributes[attr];
if (type === 'geometry') {
hasGeoType = true;
}
}
for (const index of indexes || []) {
if (index.config?.tsConfig === 'chinese' || index.config?.tsConfig?.includes('chinese')) {
hasChineseTsConfig = true;
}
}
}
if (hasGeoType) {
console.log('Initializing PostGIS extension for geometry support...');
await this.connector.exec('CREATE EXTENSION IF NOT EXISTS postgis;');
}
if (hasChineseTsConfig) {
console.log('Initializing Chinese text search configuration...');
const checkChineseConfigSql = `
SELECT COUNT(*) as cnt
FROM pg_catalog.pg_ts_config
WHERE cfgname = 'chinese';
`;
const result = await this.connector.exec(checkChineseConfigSql);
const count = parseInt(result[0][0]?.cnt || '0', 10);
if (count === 0) {
const createChineseConfigSql = `
CREATE EXTENSION IF NOT EXISTS zhparser;
CREATE TEXT SEARCH CONFIGURATION chinese (PARSER = zhparser);
ALTER TEXT SEARCH CONFIGURATION chinese ADD MAPPING FOR n,v,a,i,e,l WITH simple;
`;
await this.connector.exec(createChineseConfigSql);
}
}
for (const entity in schema) {
const sqls = this.translator.translateCreateEntity(entity, option);
for (const sql of sqls) {
await this.connector.exec(sql, txn);
}
}
await this.connector.commitTransaction(txn);
}
catch (error) {
await this.connector.rollbackTransaction(txn);
throw error;
}
}
}
exports.PostgreSQLStore = PostgreSQLStore;