oak-db/lib/PostgreSQL/connector.js

148 lines
4.9 KiB
JavaScript

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.PostgreSQLConnector = void 0;
const tslib_1 = require("tslib");
const pg_1 = require("pg");
const uuid_1 = require("uuid");
const assert_1 = tslib_1.__importDefault(require("assert"));
class PostgreSQLConnector {
pool;
configuration;
txnDict;
constructor(configuration) {
this.configuration = configuration;
this.txnDict = {};
this.pool = new pg_1.Pool(configuration);
// 错误处理
this.pool.on('error', (err) => {
console.error('PostgreSQL pool error:', err);
});
}
connect() {
// pg Pool 是惰性连接,不需要显式连接
// 但可以在这里进行连接测试
}
async disconnect() {
// 先获取所有事务ID的快照
const txnIds = Object.keys(this.txnDict);
for (const txnId of txnIds) {
try {
await this.rollbackTransaction(txnId);
}
catch (e) {
console.error(`Failed to rollback transaction ${txnId}:`, e);
}
}
await this.pool.end();
}
async startTransaction(option) {
const startInner = async () => {
const connection = await this.pool.connect();
// 添加:检测连接是否已被其他事务占用
for (const txn2 in this.txnDict) {
if (this.txnDict[txn2] === connection) {
return new Promise((resolve) => {
this.pool.on('release', () => resolve(startInner()));
});
}
}
try {
let beginStmt = 'BEGIN';
if (option?.isolationLevel) {
// PostgreSQL 隔离级别:
// READ UNCOMMITTED, READ COMMITTED, REPEATABLE READ, SERIALIZABLE
// 注意: PostgreSQL 的 READ UNCOMMITTED 行为等同于 READ COMMITTED
const level = this.mapIsolationLevel(option.isolationLevel);
beginStmt = `BEGIN ISOLATION LEVEL ${level}`;
}
await connection.query(beginStmt);
const id = (0, uuid_1.v4)();
this.txnDict[id] = connection;
return id;
}
catch (error) {
// 如果启动事务失败,释放连接
connection.release();
throw error;
}
};
return startInner();
}
/**
* 映射隔离级别到 PostgreSQL 语法
*/
mapIsolationLevel(level) {
const levelUpper = level.toUpperCase().replace(/[-_]/g, ' '); // 同时处理 - 和 _
const validLevels = [
'READ UNCOMMITTED',
'READ COMMITTED',
'REPEATABLE READ',
'SERIALIZABLE'
];
if (validLevels.includes(levelUpper)) {
return levelUpper;
}
// 默认使用 READ COMMITTED
console.warn(`Unknown isolation level "${level}", using READ COMMITTED`);
return 'READ COMMITTED';
}
async exec(sql, txn) {
if (process.env.NODE_ENV === 'development') {
// console.log(`SQL: ${sql}; \n`);
}
try {
let result;
if (txn) {
const connection = this.txnDict[txn];
(0, assert_1.default)(connection, `Transaction ${txn} not found`);
result = await connection.query(sql);
}
else {
result = await this.pool.query(sql);
}
// 返回格式与 mysql2 兼容: [rows, fields/result]
return [result.rows, result];
}
catch (error) {
// 增强错误信息
const enhancedError = new Error(`PostgreSQL query failed: ${error.message}\nSQL: ${sql.slice(0, 500)}${sql.length > 500 ? '...' : ''}`);
enhancedError.originalError = error;
enhancedError.sql = sql;
throw enhancedError;
}
}
async commitTransaction(txn) {
const connection = this.txnDict[txn];
(0, assert_1.default)(connection, `Transaction ${txn} not found`);
try {
await connection.query('COMMIT');
}
finally {
delete this.txnDict[txn];
connection.release();
}
}
async rollbackTransaction(txn) {
const connection = this.txnDict[txn];
(0, assert_1.default)(connection, `Transaction ${txn} not found`);
try {
await connection.query('ROLLBACK');
}
finally {
delete this.txnDict[txn];
connection.release();
}
}
/**
* 获取连接池状态
*/
getPoolStatus() {
return {
total: this.pool.totalCount,
idle: this.pool.idleCount,
waiting: this.pool.waitingCount
};
}
}
exports.PostgreSQLConnector = PostgreSQLConnector;