和trigger相关的数据结构

This commit is contained in:
Xu Chang 2022-04-11 18:51:37 +08:00
parent 99b6e45a8e
commit f39eb755a8
12 changed files with 408 additions and 22 deletions

View File

@ -10,7 +10,7 @@ import { EntityDef as System } from "./System/Schema";
import { EntityDef as Token } from "./Token/Schema";
import { EntityDef as User } from "./User/Schema";
import { EntityDef as WechatUser } from "./WechatUser/Schema";
export declare type EntityDict = {
export declare type BaseEntityDict = {
address: Address;
application: Application;
area: Area;

1
lib/triggers/address.d.ts vendored Normal file
View File

@ -0,0 +1 @@
export {};

14
lib/triggers/address.js Normal file
View File

@ -0,0 +1,14 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const triggers = [
{
entity: 'address',
action: 'create',
when: 'before',
fn: async ({ operation }, context) => {
const { action, data } = operation;
return 0;
},
name: '建立新area前检查数据',
}
];

View File

@ -7,3 +7,8 @@ export {
data,
aspectDict,
};
export * from './types/Trigger';
export * from './types/Aspect';
export * from './utils/TriggerExecutor';
export * from './types/RuntimeContext';

15
src/triggers/address.ts Normal file
View File

@ -0,0 +1,15 @@
import { CreateTriggerInTxn, Trigger } from '../types/Trigger';
import { EntityDict } from '../base-ed/EntityDict';
const triggers: CreateTriggerInTxn<EntityDict, 'address'>[] = [
{
entity: 'address',
action: 'create',
when: 'before',
fn: async ({ operation }, context) => {
const { action, data } = operation;
return 0;
},
name: '建立新area前检查数据',
}
];

View File

@ -1,5 +0,0 @@
import { EntityDict } from "oak-domain/lib/types/Entity";
import { RuntimeContext } from './RuntimeContext';
export interface Aspect<ED extends EntityDict> {
(params: any, context: RuntimeContext<ED>): Promise<any>;
}

View File

@ -1,3 +0,0 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
;

View File

@ -1,8 +0,0 @@
import { Context } from 'oak-domain/lib/types/Context';
import { EntityDict } from 'oak-domain/lib/types/Entity';
import { Schema as Application } from '../base-ed/Application/Schema';
import { Schema as Token } from '../base-ed/Token/Schema';
export interface RuntimeContext<ED extends EntityDict> extends Context<ED> {
getApplication: () => Application;
getToken: () => Token | undefined;
}

View File

@ -1,3 +0,0 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
;

View File

@ -6,6 +6,7 @@ import { Schema as Token } from '../base-ed/Token/Schema';
export interface RuntimeContext<ED extends EntityDict> extends Context<ED> {
getApplication: () => Application;
getToken: () => Token | undefined;
getApplication: () => Application | undefined;
getToken: () => Token | undefined;
on(event: 'commit' | 'rollback', callback: (context: RuntimeContext<ED>) => Promise<void>): void;
};

121
src/types/Trigger.ts Normal file
View File

@ -0,0 +1,121 @@
import { GenericAction } from "oak-domain/lib/actions/action";
import { DeduceCreateOperation, DeduceRemoveOperation, DeduceSelection, DeduceUpdateOperation, EntityDict } from "oak-domain/lib/types/Entity";
import { EntityDef, EntityShape, OperationResult, SelectionResult, TriggerDataAttribute, TriggerTimestampAttribute } from "oak-domain/src/types/Entity";
import { RuntimeContext } from "./RuntimeContext";
export interface CreateTriggerBase<ED extends EntityDict, T extends keyof ED> {
entity: T;
name: string;
action: 'create',
check?: (operation: DeduceCreateOperation<ED[T]['Schema']>) => boolean;
fn: (event: { operation: DeduceCreateOperation<ED[T]['Schema']>; }, context: RuntimeContext<ED>, params?: Object) => Promise<number>;
};
export interface CreateTriggerInTxn<ED extends EntityDict, T extends keyof ED> extends CreateTriggerBase<ED, T> {
when: 'before' | 'after',
};
export interface CreateTriggerCrossTxn<ED extends EntityDict, T extends keyof ED> extends CreateTriggerBase<ED, T> {
when: 'commit',
strict?: 'takeEasy' | 'makeSure';
};
export type CreateTrigger<ED extends EntityDict, T extends keyof ED> = CreateTriggerInTxn<ED, T> | CreateTriggerCrossTxn<ED, T>;
export interface UpdateTriggerBase<ED extends EntityDict, T extends keyof ED> {
entity: T;
name: string;
action: Exclude<ED[T]['Action'], GenericAction> | 'update',
attributes?: keyof ED[T]['OpSchema'] | Array<keyof ED[T]['OpSchema']>;
check?: (operation: DeduceUpdateOperation<ED[T]['Schema']>) => boolean;
fn: (event: { operation: DeduceUpdateOperation<ED[T]['Schema']> }, context: RuntimeContext<ED>, params?: Object) => Promise<number>;
};
export interface UpdateTriggerInTxn<ED extends EntityDict, T extends keyof ED> extends UpdateTriggerBase<ED, T> {
when: 'before' | 'after',
};
export interface UpdateTriggerCrossTxn<ED extends EntityDict, T extends keyof ED> extends UpdateTriggerBase<ED, T> {
when: 'commit',
strict?: 'takeEasy' | 'makeSure';
};
export type UpdateTrigger<ED extends EntityDict, T extends keyof ED> = UpdateTriggerInTxn<ED, T> | UpdateTriggerCrossTxn<ED, T>;
export interface RemoveTriggerBase<ED extends EntityDict, T extends keyof ED> {
entity: T;
name: string;
action: 'remove',
check?: (operation: DeduceRemoveOperation<ED[T]['Schema']>) => boolean;
fn: (event: { operation: DeduceRemoveOperation<ED[T]['Schema']> }, context: RuntimeContext<ED>, params?: Object) => Promise<number>;
};
export interface RemoveTriggerInTxn<ED extends EntityDict, T extends keyof ED> extends RemoveTriggerBase<ED, T> {
when: 'before' | 'after',
};
export interface RemoveTriggerCrossTxn<ED extends EntityDict, T extends keyof ED> extends RemoveTriggerBase<ED, T> {
when: 'commit',
strict?: 'takeEasy' | 'makeSure';
};
export type RemoveTrigger<ED extends EntityDict, T extends keyof ED> = RemoveTriggerInTxn<ED, T> | RemoveTriggerCrossTxn<ED, T>;
export interface SelectTriggerBase<ED extends EntityDict, T extends keyof ED> {
entity: T;
name: string;
action: 'select';
};
/**
* selection似乎不需要支持跨事务
* todo by Xc
*/
export interface SelectTriggerBefore<ED extends EntityDict, T extends keyof ED> extends SelectTriggerBase<ED, T> {
when: 'before';
fn: (event: { operation: DeduceSelection<ED[T]['Schema']> }, context: RuntimeContext<ED>, params?: Object) => Promise<number>;
};
export interface SelectTriggerAfter<ED extends EntityDict, T extends keyof ED> extends SelectTriggerBase<ED, T> {
when: 'after',
fn: (event: {
operation: DeduceSelection<ED[T]['Schema']>;
result: SelectionResult<ED, T>;
}, context: RuntimeContext<ED>, params?: Object) => Promise<number>;
};
export type SelectTrigger<ED extends EntityDict, T extends keyof ED> = SelectTriggerBefore<ED, T> | SelectTriggerAfter<ED, T>;
export type Trigger<ED extends EntityDict, T extends keyof ED> = CreateTrigger<ED, T> | UpdateTrigger<ED, T> | RemoveTrigger<ED, T> | SelectTrigger<ED, T>;
export interface TriggerEntityShape extends EntityShape {
$$triggerData$$?: {
name: string;
operation: object;
};
$$triggerTimestamp$$?: number;
};
export abstract class Executor<ED extends EntityDict> {
static dataAttr: TriggerDataAttribute = '$$triggerData$$';
static timestampAttr: TriggerTimestampAttribute = '$$triggerTimestamp$$';
abstract registerTrigger<T extends keyof ED>(trigger: Trigger<ED, T>): void;
abstract preOperation<T extends keyof ED>(
entity: T,
operation: ED[T]['Operation'],
context: RuntimeContext<ED>
): Promise<void>;
abstract postOperation<T extends keyof ED>(
entity: T,
operation: ED[T]['Operation'],
context: RuntimeContext<ED>
): Promise<void>;
abstract checkpoint(context: RuntimeContext<ED>, timestamp: number): Promise<number>; // 将所有在timestamp之前存在不一致的数据进行恢复
}

View File

@ -0,0 +1,248 @@
import assert from 'assert';
import { assign } from "lodash";
import { addFilterSegment } from "oak-domain/lib/store/filter";
import { DeduceCreateOperation, DeduceCreateOperationData, EntityDict } from "oak-domain/lib/types/Entity";
import { Logger } from "oak-domain/lib/types/Logger";
import { RuntimeContext } from '../types/RuntimeContext';
import { Trigger, Executor, CreateTriggerCrossTxn, CreateTrigger } from "../types/Trigger";
export class TriggerExecutor<ED extends EntityDict> extends Executor<ED> {
private triggerMap: {
[T in keyof ED]?: {
[A: string]: Array<Trigger<ED, T>>;
};
};
private triggerNameMap: {
[N: string]: Trigger<ED, keyof ED>;
};
private volatileEntities: Array<keyof ED>;
private logger: Logger;
constructor(logger: Logger = console) {
super();
this.logger = logger;
this.triggerMap = {};
this.triggerNameMap = {};
this.volatileEntities = [];
}
registerTrigger<T extends keyof ED>(trigger: Trigger<ED, T>): void {
// trigger的两种访问方式: by name, by entity/action
if (this.triggerNameMap.hasOwnProperty(trigger.name)) {
throw new Error(`不可有同名的触发器「${trigger.name}`);
}
assign(this.triggerNameMap, {
[trigger.name]: trigger,
});
const triggers = this.triggerMap[trigger.entity] && this.triggerMap[trigger.entity]![trigger.action];
if (triggers) {
triggers.push(trigger);
}
else if (this.triggerMap[trigger.entity]) {
assign(this.triggerMap[trigger.entity], {
[trigger.action]: [trigger],
});
}
else {
assign(this.triggerMap, {
[trigger.entity]: {
[trigger.action]: [trigger],
}
});
}
if (trigger.when === 'commit' && trigger.strict === 'makeSure') {
if (this.volatileEntities.indexOf(trigger.entity) === -1) {
this.volatileEntities.push(trigger.entity);
}
}
}
private async preCommitTrigger<T extends keyof ED>(
entity: T,
operation: ED[T]['Operation'],
trigger: Trigger<ED, T>,
context: RuntimeContext<ED>,
) {
assert(trigger.action !== 'select');
if ((trigger as CreateTriggerCrossTxn<ED, T>).strict === 'makeSure') {
switch (operation.action) {
case 'create': {
if (operation.data.hasOwnProperty(Executor.dataAttr) || operation.data.hasOwnProperty(Executor.timestampAttr)) {
throw new Error('同一行数据上不能存在两个跨事务约束');
}
break;
}
default: {
const { filter } = operation;
// 此时要保证更新或者删除的行上没有跨事务约束
const filter2 = addFilterSegment({
$or: [
{
$$triggerData$$: {
$exists: true,
},
},
{
$$triggerTimestamp$$: {
$exists: true,
},
}
],
}, filter);
const { rowStore } = context;
const count = await rowStore.count(entity, {
filter: filter2
} as Omit<ED[T]['Selection'], 'action' | 'sorter' | 'data'>, context);
if (count > 0) {
throw new Error(`对象${entity}的行「${JSON.stringify(operation)}」上已经存在未完成的跨事务约束`);
}
break;
}
}
assign(operation.data, {
[Executor.dataAttr]: {
name: trigger.name,
operation,
},
[Executor.timestampAttr]: Date.now(),
});
}
}
async preOperation<T extends keyof ED>(
entity: T,
operation: ED[T]['Operation'],
context: RuntimeContext<ED>
): Promise<void> {
const triggers = this.triggerMap[entity] && this.triggerMap[entity]![operation.action];
if (triggers) {
const preTriggers = triggers.filter(
ele => ele.when === 'before' && (!(ele as CreateTrigger<ED, T>).check || (ele as CreateTrigger<ED, T>).check!(operation as DeduceCreateOperation<ED[T]['Schema']>))
);
for (const trigger of preTriggers) {
const number = await (trigger as CreateTrigger<ED, T>).fn({ operation: operation as DeduceCreateOperation<ED[T]['Schema']> }, context);
if (number > 0) {
this.logger.info(`触发器「${trigger.name}」成功触发了「${number}」行数据更改`);
}
}
const commitTriggers = triggers.filter(
ele => ele.when === 'commit' && (!(ele as CreateTrigger<ED, T>).check || (ele as CreateTrigger<ED, T>).check!(operation as DeduceCreateOperation<ED[T]['Schema']>))
);
for (const trigger of commitTriggers) {
await this.preCommitTrigger(entity, operation, trigger, context);
}
}
}
private onCommit<T extends keyof ED>(
trigger: Trigger<ED, T>, operation: ED[T]['Operation']) {
return async (context: RuntimeContext<ED>) => {
await context.begin();
const number = await (trigger as CreateTrigger<ED, T>).fn({
operation: operation as DeduceCreateOperation<ED[T]['Schema']>,
}, context);
const { rowStore } = context;
if ((trigger as CreateTriggerCrossTxn<ED, T>).strict === 'makeSure') {
// 如果是必须完成的trigger在完成成功后要把trigger相关的属性置null;
let filter = {};
if (operation.action === 'create') {
filter = operation.data instanceof Array ? {
filter: {
id: {
$in: operation.data.map(ele => (ele.id as string)),
},
},
} : {
filter: {
id: (operation.data.id as string),
}
};
}
else if (operation.filter) {
assign(filter, { filter: operation.filter });
}
await rowStore.operate(trigger.entity, {
action: 'update',
data: {
$$triggerTimestamp$$: null,
$$triggerData$$: null,
} as any,
...filter /** as Filter<'update', DeduceFilter<ED[T]['Schema']>> */,
}, context);
}
await context.commit();
return;
};
}
private async postCommitTrigger<T extends keyof ED>(
operation: ED[T]['Operation'],
trigger: Trigger<ED, T>,
context: RuntimeContext<ED>
) {
context.on('commit', this.onCommit(trigger, operation));
}
async postOperation<T extends keyof ED>(
entity: T,
operation: ED[T]['Operation'],
context: RuntimeContext<ED>
): Promise<void> {
const triggers = this.triggerMap[entity] && this.triggerMap[entity]![operation.action];
if (triggers) {
const postTriggers = triggers.filter(
ele => ele.when === 'after' && (!(ele as CreateTrigger<ED, T>).check || (ele as CreateTrigger<ED, T>).check!(operation as DeduceCreateOperation<ED[T]['Schema']>))
);
for (const trigger of postTriggers) {
const number = await (trigger as CreateTrigger<ED, T>).fn({ operation: operation as DeduceCreateOperation<ED[T]['Schema']> }, context);
if (number > 0) {
this.logger.info(`触发器「${trigger.name}」成功触发了「${number}」行数据更改`);
}
}
const commitTriggers = (<Array<CreateTrigger<ED, T>>>triggers).filter(
ele => ele.when === 'commit' && (!ele.check || ele.check(operation as DeduceCreateOperation<ED[T]['Schema']>))
);
for (const trigger of commitTriggers) {
await this.postCommitTrigger(operation, trigger, context);
}
}
}
async checkpoint(context: RuntimeContext<ED>, timestamp: number): Promise<number> {
let result = 0;
const { rowStore } = context;
for (const entity of this.volatileEntities) {
const { result: rows } = await rowStore.select(entity, {
data: {
id: 1,
$$triggerData$$: 1,
},
filter: {
$$triggerTimestamp$$: {
$gt: timestamp,
}
},
} as any, context);
for (const row of rows) {
const { $$triggerData$$ } = row;
const { name, operation } = $$triggerData$$!;
const trigger = this.triggerNameMap[name];
await this.onCommit(trigger, operation as ED[typeof entity]['Operation'])(context);
}
}
return result;
}
}