WBWatcher在select数据时默认不加forUpdate

This commit is contained in:
Xu Chang 2024-11-08 15:05:40 +08:00
parent fcbbf3d5e0
commit e4d3b31b39
6 changed files with 54 additions and 28 deletions

8
lib/AppLoader.d.ts vendored
View File

@ -15,15 +15,17 @@ export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt exten
protected dataSubscriber?: DataSubscriber<ED, Cxt>;
protected synchronizer?: Synchronizer<ED, Cxt>;
protected contextBuilder: (store: DbStore<ED, Cxt>) => Cxt;
private nsSocket;
private requireSth;
protected makeContext(cxtStr?: string, headers?: IncomingHttpHeaders): Promise<Cxt>;
/**
* configuration
*/
private getConfiguration;
constructor(path: string, ns?: Namespace, nsServer?: Namespace);
constructor(path: string, nsSubscribe: Namespace, nsSocket: Namespace, nsServer?: Namespace);
protected registerTrigger(trigger: Trigger<ED, keyof ED, Cxt>): void;
initTriggers(): void;
protected initTriggers(): void;
protected initSocket(): void;
mount(initialize?: true): Promise<void>;
unmount(): Promise<void>;
execAspect(name: string, headers?: IncomingHttpHeaders, contextString?: string, params?: any): Promise<{
@ -35,7 +37,7 @@ export declare class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt exten
getStore(): DbStore<ED, Cxt>;
getEndpoints(prefix: string): [string, "post" | "get" | "put" | "delete", string, (params: Record<string, string>, headers: IncomingHttpHeaders, req: IncomingMessage, body?: any) => Promise<any>][];
protected operateInWatcher<T extends keyof ED>(entity: T, operation: ED[T]['Update'], context: Cxt, singleton?: true): Promise<OperationResult<ED>>;
protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt, singleton?: true): Promise<Partial<ED[T]["Schema"]>[]>;
protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt, forUpdate?: true, singleton?: true): Promise<Partial<ED[T]["Schema"]>[]>;
protected execWatcher(watcher: Watcher<ED, keyof ED, Cxt>): Promise<OperationResult<ED> | undefined>;
protected getCheckpointTs(): number;
protected checkpoint(): Promise<number>;

View File

@ -11,6 +11,7 @@ const uuid_1 = require("oak-domain/lib/utils/uuid");
const types_1 = require("oak-domain/lib/types");
const DbStore_1 = require("./DbStore");
const index_1 = tslib_1.__importStar(require("oak-common-aspect/lib/index"));
const assert_1 = tslib_1.__importDefault(require("assert"));
const dependencyBuilder_1 = require("oak-domain/lib/compiler/dependencyBuilder");
const DataSubscriber_1 = tslib_1.__importDefault(require("./cluster/DataSubscriber"));
const env_1 = require("./cluster/env");
@ -22,6 +23,7 @@ class AppLoader extends types_1.AppLoader {
dataSubscriber;
synchronizer;
contextBuilder;
nsSocket;
requireSth(filePath) {
const depFilePath = (0, path_1.join)(this.path, filePath);
let sth;
@ -65,7 +67,7 @@ class AppLoader extends types_1.AppLoader {
syncConfig: syncConfigs,
};
}
constructor(path, ns, nsServer) {
constructor(path, nsSubscribe, nsSocket, nsServer) {
super(path);
const { dbConfig } = this.getConfiguration();
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
@ -74,9 +76,10 @@ class AppLoader extends types_1.AppLoader {
const { authDeduceRelationMap, selectFreeEntities, updateFreeDict } = this.requireSth('lib/configuration/relation');
this.aspectDict = Object.assign({}, index_1.default, this.requireSth('lib/aspects/index'));
this.dbStore = new DbStore_1.DbStore(storageSchema, () => this.contextBuilder(this.dbStore), dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
if (ns) {
this.dataSubscriber = new DataSubscriber_1.default(ns, nsServer);
if (nsSubscribe) {
this.dataSubscriber = new DataSubscriber_1.default(nsSubscribe, nsServer);
}
this.nsSocket = nsSocket;
const { BackendRuntimeContext } = require(`${path}/lib/context/BackendRuntimeContext`);
const loaderThis = this;
// 需要重载context上的构造和commit方法否则程序中执行context.restartToExecute这样的方法中new一个context出来是无法正确执行的
@ -123,6 +126,14 @@ class AppLoader extends types_1.AppLoader {
syncTriggers.forEach((trigger) => this.registerTrigger(trigger));
}
}
initSocket() {
// todo
const socketFilePath = (0, path_1.join)(this.path, 'lib/socket/index');
if ((0, fs_1.existsSync)(socketFilePath)) {
const { registerSocketEntry } = require(socketFilePath);
(0, assert_1.default)(typeof registerSocketEntry === 'function');
}
}
async mount(initialize) {
const { path } = this;
if (!initialize) {
@ -131,6 +142,7 @@ class AppLoader extends types_1.AppLoader {
this.synchronizer = new Synchronizer_1.default(syncConfig, this.dbStore.getSchema(), () => this.contextBuilder(this.dbStore));
}
this.initTriggers();
this.initSocket();
}
const { importations, exportations } = require(`${path}/lib/ports/index`);
(0, index_1.registerPorts)(importations || [], exportations || []);
@ -273,10 +285,10 @@ class AppLoader extends types_1.AppLoader {
operateInWatcher(entity, operation, context, singleton) {
return this.dbStore.operate(entity, operation, context, {});
}
selectInWatcher(entity, selection, context, singleton) {
selectInWatcher(entity, selection, context, forUpdate, singleton) {
return this.dbStore.select(entity, selection, context, {
blockTrigger: true,
forUpdate: true,
forUpdate,
});
}
async execWatcher(watcher) {
@ -295,13 +307,13 @@ class AppLoader extends types_1.AppLoader {
}, context, singleton);
}
else {
const { entity, projection, fn, filter, singleton } = watcher;
const { entity, projection, fn, filter, singleton, forUpdate } = watcher;
const filter2 = typeof filter === 'function' ? await filter() : (0, lodash_1.cloneDeep)(filter);
const projection2 = typeof projection === 'function' ? await projection() : (0, lodash_1.cloneDeep)(projection);
const rows = await this.selectInWatcher(entity, {
data: projection2,
filter: filter2,
}, context, singleton);
}, context, forUpdate, singleton);
if (rows.length > 0) {
result = await fn(context, rows);
}

View File

@ -10,10 +10,10 @@ export declare class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cx
protected socket: Socket;
private connectServerSocket;
private sub;
constructor(path: string, nsDs: Namespace, nsServer: Namespace, socketPath: string);
constructor(path: string, nsDs: Namespace, nsSocket: Namespace, nsServer: Namespace, socketPath: string);
protected registerTrigger(trigger: Trigger<ED, keyof ED, Cxt>): void;
protected operateInWatcher<T extends keyof ED>(entity: T, operation: ED[T]['Update'], context: Cxt, singleton?: true): Promise<OperationResult<ED>>;
protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt, singleton?: true): Promise<Partial<ED[T]['Schema']>[]>;
protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt, forUpdate?: true, singleton?: true): Promise<Partial<ED[T]['Schema']>[]>;
protected execWatcher(watcher: Watcher<ED, keyof ED, Cxt>): Promise<OperationResult<ED> | undefined>;
protected execFreeTimer(timer: FreeTimer<ED, Cxt>, context: Cxt): Promise<OperationResult<ED>> | undefined;
protected checkpoint(): Promise<number>;

View File

@ -48,8 +48,8 @@ class ClusterAppLoader extends AppLoader_1.AppLoader {
this.socket.connect();
}
}
constructor(path, nsDs, nsServer, socketPath) {
super(path, nsDs, nsServer);
constructor(path, nsDs, nsSocket, nsServer, socketPath) {
super(path, nsDs, nsSocket, nsServer);
this.dbStore.setOnVolatileTrigger(async (entity, trigger, ids, cxtStr, option) => {
const execLocal = async (ids2) => {
const context = await this.makeContext(cxtStr);
@ -145,7 +145,7 @@ class ClusterAppLoader extends AppLoader_1.AppLoader {
filter: filter2,
}, context);
}
selectInWatcher(entity, selection, context, singleton) {
selectInWatcher(entity, selection, context, forUpdate, singleton) {
const { instanceCount, instanceId } = (0, env_1.getClusterInfo)();
(0, assert_1.default)(instanceCount && typeof instanceId === 'number');
const { filter } = selection;
@ -157,7 +157,7 @@ class ClusterAppLoader extends AppLoader_1.AppLoader {
return super.selectInWatcher(entity, {
...selection,
filter: filter2,
}, context);
}, context, forUpdate);
}
async execWatcher(watcher) {
if (watcher.singleton && (0, env_1.getClusterInfo)().instanceId !== 0) {

View File

@ -29,6 +29,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
protected dataSubscriber?: DataSubscriber<ED, Cxt>;
protected synchronizer?: Synchronizer<ED, Cxt>;
protected contextBuilder: (store: DbStore<ED, Cxt>) => Cxt;
private nsSocket: Namespace;
private requireSth(filePath: string): any {
const depFilePath = join(this.path, filePath);
@ -84,7 +85,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
};
}
constructor(path: string, ns?: Namespace, nsServer?: Namespace) {
constructor(path: string, nsSubscribe: Namespace, nsSocket: Namespace, nsServer?: Namespace) {
super(path);
const { dbConfig } = this.getConfiguration();
const { storageSchema } = require(`${path}/lib/oak-app-domain/Storage`);
@ -93,9 +94,10 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
const { authDeduceRelationMap, selectFreeEntities, updateFreeDict } = this.requireSth('lib/configuration/relation')!;
this.aspectDict = Object.assign({}, generalAspectDict, this.requireSth('lib/aspects/index')!);
this.dbStore = new DbStore<ED, Cxt>(storageSchema, () => this.contextBuilder(this.dbStore), dbConfig, authDeduceRelationMap, selectFreeEntities, updateFreeDict);
if (ns) {
this.dataSubscriber = new DataSubscriber(ns, nsServer);
if (nsSubscribe) {
this.dataSubscriber = new DataSubscriber(nsSubscribe, nsServer);
}
this.nsSocket = nsSocket;
const { BackendRuntimeContext } = require(`${path}/lib/context/BackendRuntimeContext`);
const loaderThis = this;
@ -137,7 +139,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
this.dbStore.registerTrigger(trigger);
}
initTriggers() {
protected initTriggers() {
const triggers = this.requireSth('lib/triggers/index')!;
const checkers = this.requireSth('lib/checkers/index')!;
const { actionDefDict } = require(`${this.path}/lib/oak-app-domain/ActionDefDict`);
@ -170,6 +172,15 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
}
}
protected initSocket() {
// todo
const socketFilePath = join(this.path, 'lib/socket/index');
if (existsSync(socketFilePath)) {
const { registerSocketEntry } = require(socketFilePath);
assert(typeof registerSocketEntry === 'function');
}
}
async mount(initialize?: true) {
const { path } = this;
if (!initialize) {
@ -180,6 +191,7 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
}
this.initTriggers();
this.initSocket();
}
const { importations, exportations } = require(`${path}/lib/ports/index`);
registerPorts(importations || [], exportations || []);
@ -343,10 +355,10 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
});
}
protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt, singleton?: true) {
protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt, forUpdate?: true, singleton?: true) {
return this.dbStore.select(entity, selection, context, {
blockTrigger: true,
forUpdate: true,
forUpdate,
})
}
@ -366,13 +378,13 @@ export class AppLoader<ED extends EntityDict & BaseEntityDict, Cxt extends Backe
}, context, singleton);
}
else {
const { entity, projection, fn, filter, singleton } = <WBWatcher<ED, keyof ED, Cxt>>watcher;
const { entity, projection, fn, filter, singleton,forUpdate } = <WBWatcher<ED, keyof ED, Cxt>>watcher;
const filter2 = typeof filter === 'function' ? await filter() : cloneDeep(filter);
const projection2 = typeof projection === 'function' ? await projection() : cloneDeep(projection);
const rows = await this.selectInWatcher(entity, {
data: projection2,
filter: filter2,
}, context, singleton);
}, context, forUpdate, singleton);
if (rows.length > 0) {
result = await fn(context, rows);

View File

@ -54,8 +54,8 @@ export class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cxt extend
}
}
constructor(path: string, nsDs: Namespace, nsServer: Namespace, socketPath: string) {
super(path, nsDs, nsServer);
constructor(path: string, nsDs: Namespace, nsSocket: Namespace, nsServer: Namespace, socketPath: string) {
super(path, nsDs, nsSocket, nsServer);
this.dbStore.setOnVolatileTrigger(
async (entity, trigger, ids, cxtStr, option) => {
const execLocal = async (ids2: string[]) => {
@ -157,7 +157,7 @@ export class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cxt extend
}, context);
}
protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt, singleton?: true): Promise<Partial<ED[T]['Schema']>[]> {
protected selectInWatcher<T extends keyof ED>(entity: T, selection: ED[T]['Selection'], context: Cxt, forUpdate?: true, singleton?: true): Promise<Partial<ED[T]['Schema']>[]> {
const { instanceCount, instanceId } = getClusterInfo()!;
assert(instanceCount && typeof instanceId === 'number');
const { filter } = selection;
@ -169,7 +169,7 @@ export class ClusterAppLoader<ED extends EntityDict & BaseEntityDict, Cxt extend
return super.selectInWatcher(entity, {
...selection,
filter: filter2,
}, context);
}, context, forUpdate);
}
protected async execWatcher(watcher: Watcher<ED, keyof ED, Cxt>) {