oak-db/src/PostgreSQL/store.ts

444 lines
16 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import {
EntityDict,
OperateOption,
OperationResult,
TxnOption,
StorageSchema,
SelectOption,
AggregationResult,
Geo
} from 'oak-domain/lib/types';
import { EntityDict as BaseEntityDict } from 'oak-domain/lib/base-app-domain';
import { CascadeStore } from 'oak-domain/lib/store/CascadeStore';
import { PostgreSQLConfiguration } from './types/Configuration';
import { PostgreSQLConnector } from './connector';
import { PostgreSQLTranslator, PostgreSQLSelectOption, PostgreSQLOperateOption } from './translator';
import { assign, set } from 'lodash';
import assert from 'assert';
import { judgeRelation } from 'oak-domain/lib/store/relation';
import { AsyncContext, AsyncRowStore } from 'oak-domain/lib/store/AsyncRowStore';
import { SyncContext } from 'oak-domain/lib/store/SyncRowStore';
import { CreateEntityOption } from '../types/Translator';
import { QueryResult } from 'pg';
import { DbStore } from '../types/dbStore';
function convertGeoTextToObject(geoText: string): Geo {
if (geoText.startsWith('POINT')) {
const coord = geoText.match(/(-?\d+\.?\d*)/g) as string[];
assert(coord && coord.length === 2);
return {
type: 'point',
coordinate: coord.map(ele => parseFloat(ele)) as [number, number],
};
} else if (geoText.startsWith('LINESTRING')) {
const coordsMatch = geoText.match(/\(([^)]+)\)/);
if (coordsMatch) {
const points = coordsMatch[1].split(',').map(p => {
const [x, y] = p.trim().split(/\s+/).map(parseFloat);
return [x, y] as [number, number];
});
return {
type: 'path',
coordinate: points,
};
}
} else if (geoText.startsWith('POLYGON')) {
const ringsMatch = geoText.match(/\(\(([^)]+)\)\)/g);
if (ringsMatch) {
const rings = ringsMatch.map(ring => {
const coordStr = ring.replace(/[()]/g, '');
return coordStr.split(',').map(p => {
const [x, y] = p.trim().split(/\s+/).map(parseFloat);
return [x, y] as [number, number];
});
});
return {
type: 'polygon',
coordinate: rings,
};
}
}
throw new Error(`Unsupported geometry type: ${geoText.slice(0, 50)}`);
}
export class PostgreSQLStore<
ED extends EntityDict & BaseEntityDict,
Cxt extends AsyncContext<ED>
> extends CascadeStore<ED> implements DbStore<ED, Cxt> {
protected countAbjointRow<T extends keyof ED, OP extends SelectOption, Cxt extends SyncContext<ED>>(
entity: T,
selection: Pick<ED[T]['Selection'], 'filter' | 'count'>,
context: Cxt,
option: OP
): number {
throw new Error('PostgreSQL store 不支持同步取数据');
}
protected aggregateAbjointRowSync<T extends keyof ED, OP extends SelectOption, Cxt extends SyncContext<ED>>(
entity: T,
aggregation: ED[T]['Aggregation'],
context: Cxt,
option: OP
): AggregationResult<ED[T]['Schema']> {
throw new Error('PostgreSQL store 不支持同步取数据');
}
protected selectAbjointRow<T extends keyof ED, OP extends SelectOption>(
entity: T,
selection: ED[T]['Selection'],
context: SyncContext<ED>,
option: OP
): Partial<ED[T]['Schema']>[] {
throw new Error('PostgreSQL store 不支持同步取数据');
}
protected updateAbjointRow<T extends keyof ED, OP extends OperateOption>(
entity: T,
operation: ED[T]['Operation'],
context: SyncContext<ED>,
option: OP
): number {
throw new Error('PostgreSQL store 不支持同步更新数据');
}
async exec(script: string, txnId?: string) {
await this.connector.exec(script, txnId);
}
connector: PostgreSQLConnector;
translator: PostgreSQLTranslator<ED>;
constructor(storageSchema: StorageSchema<ED>, configuration: PostgreSQLConfiguration) {
super(storageSchema);
this.connector = new PostgreSQLConnector(configuration);
this.translator = new PostgreSQLTranslator(storageSchema);
}
checkRelationAsync<T extends keyof ED, Cxt extends AsyncContext<ED>>(
entity: T,
operation: Omit<ED[T]['Operation'] | ED[T]['Selection'], 'id'>,
context: Cxt
): Promise<void> {
throw new Error('Method not implemented.');
}
protected async aggregateAbjointRowAsync<T extends keyof ED, OP extends SelectOption, Cxt extends AsyncContext<ED>>(
entity: T,
aggregation: ED[T]['Aggregation'],
context: Cxt,
option: OP
): Promise<AggregationResult<ED[T]['Schema']>> {
const sql = this.translator.translateAggregate(entity, aggregation, option);
const result = await this.connector.exec(sql, context.getCurrentTxnId());
return this.formResult(entity, result[0]);
}
aggregate<T extends keyof ED, OP extends SelectOption>(
entity: T,
aggregation: ED[T]['Aggregation'],
context: Cxt,
option: OP
): Promise<AggregationResult<ED[T]['Schema']>> {
return this.aggregateAsync(entity, aggregation, context, option);
}
protected supportManyToOneJoin(): boolean {
return true;
}
protected supportMultipleCreate(): boolean {
return true;
}
private formResult<T extends keyof ED>(entity: T, result: any): any {
const schema = this.getSchema();
function resolveAttribute<E extends keyof ED>(
entity2: E,
r: Record<string, any>,
attr: string,
value: any
) {
const { attributes, view } = schema[entity2];
if (!view) {
const i = attr.indexOf(".");
if (i !== -1) {
const attrHead = attr.slice(0, i);
const attrTail = attr.slice(i + 1);
const rel = judgeRelation(schema, entity2, attrHead);
if (rel === 1) {
set(r, attr, value);
} else {
if (!r[attrHead]) {
r[attrHead] = {};
}
if (rel === 0) {
resolveAttribute(entity2, r[attrHead], attrTail, value);
} else if (rel === 2) {
resolveAttribute(attrHead, r[attrHead], attrTail, value);
} else {
assert(typeof rel === 'string');
resolveAttribute(rel, r[attrHead], attrTail, value);
}
}
} else if (attributes[attr]) {
const { type } = attributes[attr];
switch (type) {
case 'date':
case 'time': {
if (value instanceof Date) {
r[attr] = value.valueOf();
} else {
r[attr] = value;
}
break;
}
case 'geometry': {
if (typeof value === 'string') {
r[attr] = convertGeoTextToObject(value);
} else {
r[attr] = value;
}
break;
}
case 'object':
case 'array': {
// PostgreSQL jsonb 直接返回对象,不需要 parse
if (typeof value === 'string') {
r[attr] = JSON.parse(value);
} else {
r[attr] = value;
}
break;
}
case 'function': {
if (typeof value === 'string') {
r[attr] = `return ${Buffer.from(value, 'base64').toString()}`;
} else {
r[attr] = value;
}
break;
}
case 'bool':
case 'boolean': {
// PostgreSQL 直接返回 boolean 类型
r[attr] = value;
break;
}
case 'decimal': {
// PostgreSQL numeric 类型可能返回字符串
if (typeof value === 'string') {
r[attr] = parseFloat(value);
} else {
assert(value === null || typeof value === 'number');
r[attr] = value;
}
break;
}
// TODO: 这里和mysql统一行为ref类型的字符串去除前后空格
case "char":
case "ref": {
if (value) {
assert(typeof value === 'string');
r[attr] = value.trim();
} else {
r[attr] = value;
}
break;
}
default: {
r[attr] = value;
}
}
} else {
// TODO: 这里和mysql统一行为id字段为char类型时去除后面的空格
if (value && typeof value === 'string') {
if (attr === 'id') {
r[attr] = value.trim();
} else if (attr.startsWith("#count")) {
// PostgreSQL count 返回字符串
r[attr] = parseInt(value, 10);
} else {
r[attr] = value;
}
} else {
r[attr] = value;
}
}
} else {
assign(r, { [attr]: value });
}
}
function removeNullObjects<E extends keyof ED>(r: Record<string, any>, e: E) {
for (let attr in r) {
const rel = judgeRelation(schema, e, attr);
if (rel === 2) {
if (r[attr].id === null) {
assert(schema[e].toModi || r.entity !== attr);
delete r[attr];
continue;
}
removeNullObjects(r[attr], attr);
} else if (typeof rel === 'string') {
if (r[attr].id === null) {
assert(
schema[e].toModi || r[`${attr}Id`] === null,
`对象${String(e)}取数据时发现其外键找不到目标对象rowId是${r.id},其外键${attr}Id值为${r[`${attr}Id`]}`
);
delete r[attr];
continue;
}
removeNullObjects(r[attr], rel);
}
}
}
function formSingleRow(r: any): any {
let result2: Record<string, any> = {};
for (let attr in r) {
const value = r[attr];
resolveAttribute(entity, result2, attr, value);
}
removeNullObjects(result2, entity);
return result2;
}
if (result instanceof Array) {
return result.map(r => formSingleRow(r));
}
return formSingleRow(result);
}
protected async selectAbjointRowAsync<T extends keyof ED>(
entity: T,
selection: ED[T]['Selection'],
context: AsyncContext<ED>,
option?: PostgreSQLSelectOption
): Promise<Partial<ED[T]['Schema']>[]> {
const sql = this.translator.translateSelect(entity, selection, option);
const result = await this.connector.exec(sql, context.getCurrentTxnId());
return this.formResult(entity, result[0]);
}
protected async updateAbjointRowAsync<T extends keyof ED>(
entity: T,
operation: ED[T]['Operation'],
context: AsyncContext<ED>,
option?: PostgreSQLOperateOption
): Promise<number> {
const { translator, connector } = this;
const { action } = operation;
const txn = context.getCurrentTxnId();
switch (action) {
case 'create': {
const { data } = operation as ED[T]['Create'];
const sql = translator.translateInsert(entity, data instanceof Array ? data : [data]);
const result = await connector.exec(sql, txn);
// PostgreSQL QueryResult.rowCount
return (result[1] as QueryResult).rowCount || 0;
}
case 'remove': {
const sql = translator.translateRemove(entity, operation as ED[T]['Remove'], option);
const result = await connector.exec(sql, txn);
return (result[1] as QueryResult).rowCount || 0;
}
default: {
assert(!['select', 'download', 'stat'].includes(action));
const sql = translator.translateUpdate(entity, operation as ED[T]['Update'], option);
const result = await connector.exec(sql, txn);
return (result[1] as QueryResult).rowCount || 0;
}
}
}
async operate<T extends keyof ED>(
entity: T,
operation: ED[T]['Operation'],
context: Cxt,
option: OperateOption
): Promise<OperationResult<ED>> {
const { action } = operation;
assert(!['select', 'download', 'stat'].includes(action), '不支持使用 select operation');
return await super.operateAsync(entity, operation as any, context, option);
}
async select<T extends keyof ED>(
entity: T,
selection: ED[T]['Selection'],
context: Cxt,
option: SelectOption
): Promise<Partial<ED[T]['Schema']>[]> {
return await super.selectAsync(entity, selection, context, option);
}
protected async countAbjointRowAsync<T extends keyof ED>(
entity: T,
selection: Pick<ED[T]['Selection'], 'filter' | 'count'>,
context: AsyncContext<ED>,
option: SelectOption
): Promise<number> {
const sql = this.translator.translateCount(entity, selection, option);
const result = await this.connector.exec(sql, context.getCurrentTxnId());
// PostgreSQL 返回的 count 是 string 类型bigint
const cnt = result[0][0]?.cnt;
return typeof cnt === 'string' ? parseInt(cnt, 10) : (cnt || 0);
}
async count<T extends keyof ED>(
entity: T,
selection: Pick<ED[T]['Selection'], 'filter' | 'count'>,
context: Cxt,
option: SelectOption
) {
return this.countAsync(entity, selection, context, option);
}
async begin(option?: TxnOption): Promise<string> {
return await this.connector.startTransaction(option);
}
async commit(txnId: string): Promise<void> {
await this.connector.commitTransaction(txnId);
}
async rollback(txnId: string): Promise<void> {
await this.connector.rollbackTransaction(txnId);
}
async connect() {
await this.connector.connect();
}
async disconnect() {
await this.connector.disconnect();
}
async initialize(option: CreateEntityOption) {
const schema = this.getSchema();
// 可选:先创建 PostGIS 扩展
// await this.connector.exec('CREATE EXTENSION IF NOT EXISTS postgis;');
for (const entity in schema) {
const sqls = this.translator.translateCreateEntity(entity, option);
for (const sql of sqls) {
await this.connector.exec(sql);
}
}
}
}