oak-db/lib/PostgreSQL/store.js

322 lines
13 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.PostgreSQLStore = void 0;
const tslib_1 = require("tslib");
const CascadeStore_1 = require("oak-domain/lib/store/CascadeStore");
const connector_1 = require("./connector");
const translator_1 = require("./translator");
const lodash_1 = require("lodash");
const assert_1 = tslib_1.__importDefault(require("assert"));
const relation_1 = require("oak-domain/lib/store/relation");
function convertGeoTextToObject(geoText) {
if (geoText.startsWith('POINT')) {
const coord = geoText.match(/(-?\d+\.?\d*)/g);
(0, assert_1.default)(coord && coord.length === 2);
return {
type: 'point',
coordinate: coord.map(ele => parseFloat(ele)),
};
}
else if (geoText.startsWith('LINESTRING')) {
const coordsMatch = geoText.match(/\(([^)]+)\)/);
if (coordsMatch) {
const points = coordsMatch[1].split(',').map(p => {
const [x, y] = p.trim().split(/\s+/).map(parseFloat);
return [x, y];
});
return {
type: 'path',
coordinate: points,
};
}
}
else if (geoText.startsWith('POLYGON')) {
const ringsMatch = geoText.match(/\(\(([^)]+)\)\)/g);
if (ringsMatch) {
const rings = ringsMatch.map(ring => {
const coordStr = ring.replace(/[()]/g, '');
return coordStr.split(',').map(p => {
const [x, y] = p.trim().split(/\s+/).map(parseFloat);
return [x, y];
});
});
return {
type: 'polygon',
coordinate: rings,
};
}
}
throw new Error(`Unsupported geometry type: ${geoText.slice(0, 50)}`);
}
class PostgreSQLStore extends CascadeStore_1.CascadeStore {
countAbjointRow(entity, selection, context, option) {
throw new Error('PostgreSQL store 不支持同步取数据');
}
aggregateAbjointRowSync(entity, aggregation, context, option) {
throw new Error('PostgreSQL store 不支持同步取数据');
}
selectAbjointRow(entity, selection, context, option) {
throw new Error('PostgreSQL store 不支持同步取数据');
}
updateAbjointRow(entity, operation, context, option) {
throw new Error('PostgreSQL store 不支持同步更新数据');
}
async exec(script, txnId) {
await this.connector.exec(script, txnId);
}
connector;
translator;
constructor(storageSchema, configuration) {
super(storageSchema);
this.connector = new connector_1.PostgreSQLConnector(configuration);
this.translator = new translator_1.PostgreSQLTranslator(storageSchema);
}
checkRelationAsync(entity, operation, context) {
throw new Error('Method not implemented.');
}
async aggregateAbjointRowAsync(entity, aggregation, context, option) {
const sql = this.translator.translateAggregate(entity, aggregation, option);
const result = await this.connector.exec(sql, context.getCurrentTxnId());
return this.formResult(entity, result[0]);
}
aggregate(entity, aggregation, context, option) {
return this.aggregateAsync(entity, aggregation, context, option);
}
supportManyToOneJoin() {
return true;
}
supportMultipleCreate() {
return true;
}
formResult(entity, result) {
const schema = this.getSchema();
function resolveAttribute(entity2, r, attr, value) {
const { attributes, view } = schema[entity2];
if (!view) {
const i = attr.indexOf(".");
if (i !== -1) {
const attrHead = attr.slice(0, i);
const attrTail = attr.slice(i + 1);
const rel = (0, relation_1.judgeRelation)(schema, entity2, attrHead);
if (rel === 1) {
(0, lodash_1.set)(r, attr, value);
}
else {
if (!r[attrHead]) {
r[attrHead] = {};
}
if (rel === 0) {
resolveAttribute(entity2, r[attrHead], attrTail, value);
}
else if (rel === 2) {
resolveAttribute(attrHead, r[attrHead], attrTail, value);
}
else {
(0, assert_1.default)(typeof rel === 'string');
resolveAttribute(rel, r[attrHead], attrTail, value);
}
}
}
else if (attributes[attr]) {
const { type } = attributes[attr];
switch (type) {
case 'date':
case 'time': {
if (value instanceof Date) {
r[attr] = value.valueOf();
}
else {
r[attr] = value;
}
break;
}
case 'geometry': {
if (typeof value === 'string') {
r[attr] = convertGeoTextToObject(value);
}
else {
r[attr] = value;
}
break;
}
case 'object':
case 'array': {
// PostgreSQL jsonb 直接返回对象,不需要 parse
if (typeof value === 'string') {
r[attr] = JSON.parse(value);
}
else {
r[attr] = value;
}
break;
}
case 'function': {
if (typeof value === 'string') {
r[attr] = `return ${Buffer.from(value, 'base64').toString()}`;
}
else {
r[attr] = value;
}
break;
}
case 'bool':
case 'boolean': {
// PostgreSQL 直接返回 boolean 类型
r[attr] = value;
break;
}
case 'decimal': {
// PostgreSQL numeric 类型可能返回字符串
if (typeof value === 'string') {
r[attr] = parseFloat(value);
}
else {
(0, assert_1.default)(value === null || typeof value === 'number');
r[attr] = value;
}
break;
}
// TODO: 这里和mysql统一行为ref类型的字符串去除前后空格
case "char":
case "ref": {
if (value) {
(0, assert_1.default)(typeof value === 'string');
r[attr] = value.trim();
}
else {
r[attr] = value;
}
break;
}
default: {
r[attr] = value;
}
}
}
else {
// TODO: 这里和mysql统一行为id字段为char类型时去除后面的空格
if (value && typeof value === 'string' && attr === 'id') {
r[attr] = value.trim();
}
else {
r[attr] = value;
}
}
}
else {
(0, lodash_1.assign)(r, { [attr]: value });
}
}
function removeNullObjects(r, e) {
for (let attr in r) {
const rel = (0, relation_1.judgeRelation)(schema, e, attr);
if (rel === 2) {
if (r[attr].id === null) {
(0, assert_1.default)(schema[e].toModi || r.entity !== attr);
delete r[attr];
continue;
}
removeNullObjects(r[attr], attr);
}
else if (typeof rel === 'string') {
if (r[attr].id === null) {
(0, assert_1.default)(schema[e].toModi || r[`${attr}Id`] === null, `对象${String(e)}取数据时发现其外键找不到目标对象rowId是${r.id},其外键${attr}Id值为${r[`${attr}Id`]}`);
delete r[attr];
continue;
}
removeNullObjects(r[attr], rel);
}
}
}
function formSingleRow(r) {
let result2 = {};
for (let attr in r) {
const value = r[attr];
resolveAttribute(entity, result2, attr, value);
}
removeNullObjects(result2, entity);
return result2;
}
if (result instanceof Array) {
return result.map(r => formSingleRow(r));
}
return formSingleRow(result);
}
async selectAbjointRowAsync(entity, selection, context, option) {
const sql = this.translator.translateSelect(entity, selection, option);
console.log('selection:', JSON.stringify(selection, null, 2), 'Select SQL:', sql);
const result = await this.connector.exec(sql, context.getCurrentTxnId());
return this.formResult(entity, result[0]);
}
async updateAbjointRowAsync(entity, operation, context, option) {
const { translator, connector } = this;
const { action } = operation;
const txn = context.getCurrentTxnId();
switch (action) {
case 'create': {
const { data } = operation;
const sql = translator.translateInsert(entity, data instanceof Array ? data : [data]);
const result = await connector.exec(sql, txn);
// PostgreSQL QueryResult.rowCount
return result[1].rowCount || 0;
}
case 'remove': {
const sql = translator.translateRemove(entity, operation, option);
const result = await connector.exec(sql, txn);
return result[1].rowCount || 0;
}
default: {
(0, assert_1.default)(!['select', 'download', 'stat'].includes(action));
const sql = translator.translateUpdate(entity, operation, option);
const result = await connector.exec(sql, txn);
return result[1].rowCount || 0;
}
}
}
async operate(entity, operation, context, option) {
const { action } = operation;
(0, assert_1.default)(!['select', 'download', 'stat'].includes(action), '不支持使用 select operation');
return await super.operateAsync(entity, operation, context, option);
}
async select(entity, selection, context, option) {
return await super.selectAsync(entity, selection, context, option);
}
async countAbjointRowAsync(entity, selection, context, option) {
const sql = this.translator.translateCount(entity, selection, option);
const result = await this.connector.exec(sql, context.getCurrentTxnId());
// PostgreSQL 返回的 count 是 string 类型bigint
const cnt = result[0][0]?.cnt;
return typeof cnt === 'string' ? parseInt(cnt, 10) : (cnt || 0);
}
async count(entity, selection, context, option) {
return this.countAsync(entity, selection, context, option);
}
async begin(option) {
return await this.connector.startTransaction(option);
}
async commit(txnId) {
await this.connector.commitTransaction(txnId);
}
async rollback(txnId) {
await this.connector.rollbackTransaction(txnId);
}
async connect() {
await this.connector.connect();
}
async disconnect() {
await this.connector.disconnect();
}
async initialize(option) {
const schema = this.getSchema();
// 可选:先创建 PostGIS 扩展
// await this.connector.exec('CREATE EXTENSION IF NOT EXISTS postgis;');
for (const entity in schema) {
const sqls = this.translator.translateCreateEntity(entity, option);
for (const sql of sqls) {
await this.connector.exec(sql);
}
}
}
}
exports.PostgreSQLStore = PostgreSQLStore;