oak-db/src/MySQL/connector.ts

88 lines
2.8 KiB
TypeScript

import mysql, { Pool } from 'mysql2/promise';
import { v4 } from 'uuid';
import { TxnOption } from 'oak-domain/lib/types';
import { MySQLConfiguration } from './types/Configuration';
import assert from 'assert';
import { unset } from 'lodash';
export class MySqlConnector {
pool: Pool;
configuration: MySQLConfiguration;
txnDict: Record<string, mysql.PoolConnection>;
constructor(configuration: MySQLConfiguration) {
this.configuration = configuration;
this.txnDict = {};
this.pool = mysql.createPool(this.configuration);
}
connect() {
}
disconnect() {
return this.pool!.end();
}
async startTransaction(option?: TxnOption): Promise<string> {
// 防止用户配置connection可复用
const startInner = async (): Promise<string> => {
const connection = await this.pool.getConnection();
// 分配出来的connection不能被别的事务占据
for (const txn2 in this.txnDict) {
if (this.txnDict[txn2] === connection) {
return new Promise(
(resolve) => {
this.pool.on('release', () => resolve(startInner()));
}
);
}
}
await connection.beginTransaction();
const id = v4();
// console.log('start_txn', id, connection.threadId);
this.txnDict[id] = connection;
if (option?.isolationLevel) {
await connection.query(`SET TRANSACTION ISOLATION LEVEL ${option.isolationLevel};`);
}
return id;
}
return startInner();
}
async exec(sql: string, txn?: string): Promise<any> {
if (process.env.NODE_ENV === 'development') {
// console.log(sql);
}
if (txn) {
const connection = this.txnDict[txn];
assert(connection);
const result = await connection.query(sql);
return result;
}
else {
const result = await this.pool.query(sql);
return result;
}
}
async commitTransaction(txn: string): Promise<void> {
const connection = this.txnDict[txn];
assert(connection);
delete this.txnDict[txn];
// console.log('commit_txn', txn, connection.threadId);
await connection.commit();
connection.release();
}
async rollbackTransaction(txn: string): Promise<void> {
const connection = this.txnDict[txn];
assert(connection);
// console.log('rollback_txn', txn, connection.threadId);
await connection.rollback();
delete this.txnDict[txn];
connection.release();
}
}