mm_base_service
v1.0.2
Published
轻量级模块化服务基类,支持插件扩展
Maintainers
Readme
mm_base_service
轻量级模块化服务基类,支持插件扩展系统
特性
- 轻量级设计:专注于核心功能,避免不必要的复杂性
- 插件系统:通过
use方法按需引入功能 - 钩子机制:支持在关键生命周期点执行自定义代码
- 事件系统:基于 EventEmitter,支持事件监听和触发
- 配置管理:支持默认配置、配置合并和验证
- 生命周期管理:完整的初始化和销毁流程
安装
npm install mm_base_service基本用法
创建服务类
const { BaseService } = require('mm_base_service');
class Mysql extends BaseService {
// 默认配置
static default_config = {
name: 'my_service',
timeout: 30000
};
constructor(config = {}) {
super(config);
// 初始化状态
this.is_ready = false;
}
}
// 实现初始化方法
Mysql.prototype._initService = async function() {
// 执行服务特定的初始化逻辑
this.is_ready = true;
$.log.debug(`服务 ${this.config.name} 初始化完成`);
};
// 导出服务类
module.exports = Mysql;使用插件
const { BaseService } = require('mm_base_service');
const { CircuitBreakerPlugin } = require('./plugins');
class Mysql extends BaseService {
constructor(config = {}) {
super(config);
// 使用断路器插件
this.use(CircuitBreakerPlugin, 'circuitBreaker');
}
async performOperation() {
// 获取插件实例
const circuitBreaker = this.getPlugin('circuitBreaker');
// 使用插件功能
return await circuitBreaker.execute(async () => {
// 执行受保护的操作
return '操作结果';
}, 'myOperation');
}
}使用钩子
const { BaseService } = require('mm_base_service');
class Mysql extends BaseService {
constructor(config = {}) {
super(config);
// 注册钩子
this.hook('init:before', this._beforeInit.bind(this));
this.hook('init:after', this._afterInit.bind(this));
}
async _beforeInit(ctx) {
$.log.debug('初始化前执行');
return ctx;
}
async _afterInit(ctx) {
$.log.debug('初始化后执行');
return ctx;
}
}MySQL 服务类示例
下面是一个使用 BaseService 创建 MySQL 数据库服务类的完整示例:
const { BaseService } = require('mm_base_service');
const mysql = require('mysql2/promise');
const { CircuitBreakerPlugin, MonitoringPlugin, HealthCheckPlugin } = require('./plugins');
class Mysql extends BaseService {
// 默认配置
static default_config = {
host: 'localhost',
port: 3306,
user: 'root',
password: '',
database: 'test',
connectionLimit: 10,
connectTimeout: 10000,
acquireTimeout: 10000,
timeout: 30000,
// 插件配置
circuit_breaker: {
enabled: true,
failure_threshold: 3,
reset_timeout: 30000
},
monitoring: {
enabled: true,
track_performance: true
},
health_check: {
enabled: true,
interval: 30000
}
};
/**
* 构造函数
* @param {Object} config - MySQL连接配置
*/
constructor(config = {}) {
super(config);
// 初始化状态
this.is_connected = false;
this.pool = null;
// 使用插件
this.use(CircuitBreakerPlugin, 'circuitBreaker');
this.use(MonitoringPlugin, 'monitoring');
this.use(HealthCheckPlugin, 'healthCheck');
// 注册钩子
this.hook('init:after', this._onInitComplete.bind(this));
this.hook('destroy:before', this._onBeforeDestroy.bind(this));
}
}
/**
* 服务初始化方法
* @private
* @returns {Promise<void>}
*/
Mysql.prototype._initService = async function() {
try {
// 创建连接池
this.pool = mysql.createPool({
host: this.config.host,
port: this.config.port,
user: this.config.user,
password: this.config.password,
database: this.config.database,
connectionLimit: this.config.connectionLimit,
connectTimeout: this.config.connectTimeout,
acquireTimeout: this.config.acquireTimeout,
waitForConnections: true,
queueLimit: 0
});
// 测试连接
await this.pool.getConnection(async (err, connection) => {
if (err) {
throw new Error(`数据库连接失败: ${err.message}`);
}
connection.release();
});
this.is_connected = true;
$.log.debug(`[Mysql] 成功连接到数据库 ${this.config.database}@${this.config.host}:${this.config.port}`);
} catch (error) {
$.log.error(`[Mysql] 初始化失败: ${error.message}`);
throw error;
}
};
/**
* 初始化完成后的钩子处理
* @private
* @param {Object} ctx - 上下文对象
* @returns {Promise<Object>}
*/
Mysql.prototype._onInitComplete = async function(ctx) {
// 启动健康检查
const healthCheck = this.getPlugin('healthCheck');
if (healthCheck && this.config.health_check.enabled) {
healthCheck.startPeriodicCheck(this.config.health_check.interval);
}
return ctx;
};
/**
* 销毁前的钩子处理
* @private
* @param {Object} ctx - 上下文对象
* @returns {Promise<Object>}
*/
Mysql.prototype._onBeforeDestroy = async function(ctx) {
$.log.debug('[Mysql] 准备关闭数据库连接...');
return ctx;
};
/**
* 执行SQL查询
* @param {string} sql - SQL查询语句
* @param {Array} params - 查询参数
* @returns {Promise<Object>} 查询结果
*/
Mysql.prototype.query = async function(sql, params = []) {
const startTime = Date.now();
const monitoring = this.getPlugin('monitoring');
const circuitBreaker = this.getPlugin('circuitBreaker');
try {
// 定义查询操作
const performQuery = async () => {
if (!this.pool || !this.is_connected) {
throw new Error('数据库未连接');
}
const [rows, fields] = await this.pool.execute(sql, params);
const duration = Date.now() - startTime;
// 记录成功操作
if (monitoring) {
monitoring.recordOperation('query', true, duration);
}
return { rows, fields };
};
// 使用断路器保护查询
if (circuitBreaker && this.config.circuit_breaker.enabled) {
return await circuitBreaker.execute(performQuery, 'mysql_query');
} else {
return await performQuery();
}
} catch (error) {
const duration = Date.now() - startTime;
// 记录失败操作
if (monitoring) {
monitoring.recordOperation('query', false, duration);
}
$.log.error(`[Mysql] 查询执行失败: ${error.message}`, { sql, params });
throw error;
}
};
/**
* 执行事务
* @param {Function} transactionCallback - 事务回调函数
* @returns {Promise<any>} 事务结果
*/
Mysql.prototype.transaction = async function(transactionCallback) {
const startTime = Date.now();
const monitoring = this.getPlugin('monitoring');
let connection;
let result;
try {
// 获取连接
connection = await this.pool.getConnection();
// 开始事务
await connection.beginTransaction();
// 执行事务回调
result = await transactionCallback(connection);
// 提交事务
await connection.commit();
const duration = Date.now() - startTime;
// 记录成功事务
if (monitoring) {
monitoring.recordOperation('transaction', true, duration);
}
return result;
} catch (error) {
// 回滚事务
if (connection) {
await connection.rollback().catch(rollbackErr => {
$.log.error(`[Mysql] 事务回滚失败: ${rollbackErr.message}`);
});
}
const duration = Date.now() - startTime;
// 记录失败事务
if (monitoring) {
monitoring.recordOperation('transaction', false, duration);
}
$.log.error(`[Mysql] 事务执行失败: ${error.message}`);
throw error;
} finally {
// 释放连接
if (connection) {
connection.release();
}
}
};
/**
* 健康检查实现
* @private
* @returns {Promise<boolean>} 健康状态
*/
Mysql.prototype._performHealthCheck = async function() {
try {
await this.query('SELECT 1');
return true;
} catch (error) {
$.log.error(`[Mysql] 健康检查失败: ${error.message}`);
return false;
}
};
/**
* 销毁服务
* @returns {Promise<Object>} 销毁结果
*/
Mysql.prototype.destroy = async function() {
try {
// 触发销毁前钩子
await this.triggerHook('destroy:before');
// 关闭连接池
if (this.pool) {
await this.pool.end();
this.pool = null;
$.log.debug('[Mysql] 数据库连接池已关闭');
}
// 销毁插件实例
if (this.plugin_instances && Object.keys(this.plugin_instances).length > 0) {
for (const plugin of Object.values(this.plugin_instances)) {
if (plugin && typeof plugin.destroy === 'function') {
try {
await plugin.destroy();
} catch (err) {
$.log.error(`[Mysql] 插件销毁失败`, { error: err.message });
}
}
}
}
this.is_connected = false;
// 触发销毁后钩子
await this.triggerHook('destroy:after');
return { success: true };
} catch (error) {
$.log.error(`[Mysql] 销毁失败: ${error.message}`);
throw error;
}
};
// 使用示例
async function example() {
const Mysql = new Mysql({
host: 'localhost',
user: 'root',
password: 'password',
database: 'test_db'
});
try {
// 初始化服务
await Mysql.init();
// 执行查询
const result = await Mysql.query('SELECT * FROM users WHERE id = ?', [1]);
$.log.debug('查询结果:', result.rows);
// 执行事务
const transactionResult = await Mysql.transaction(async (connection) => {
// 执行多个操作
await connection.execute('UPDATE users SET status = ? WHERE id = ?', ['active', 1]);
await connection.execute('INSERT INTO logs (action, user_id) VALUES (?, ?)', ['update_status', 1]);
return { success: true, message: '事务执行成功' };
});
$.log.debug('事务结果:', transactionResult);
// 查看监控指标
const monitoring = Mysql.getPlugin('monitoring');
if (monitoring) {
const metrics = monitoring.getMetrics();
$.log.debug('监控指标:', metrics);
}
// 执行健康检查
const healthCheck = Mysql.getPlugin('healthCheck');
if (healthCheck) {
const healthStatus = await healthCheck.checkHealth();
$.log.debug('健康状态:', healthStatus ? '健康' : '不健康');
}
} catch (error) {
$.log.error('示例运行失败:', error);
} finally {
// 销毁服务
await Mysql.destroy();
}
}
module.exports = Mysql;内置插件
断路器插件 (CircuitBreakerPlugin)
提供断路器模式功能,防止系统因连续失败而过载:
const { CircuitBreakerPlugin } = require('./plugins');
// 配置选项
const circuitBreakerConfig = {
enabled: true,
failure_threshold: 5, // 失败阈值
reset_timeout: 30000 // 重置超时时间(毫秒)
};
// 使用插件
this.use(CircuitBreakerPlugin, 'circuitBreaker');监控插件 (MonitoringPlugin)
提供性能监控功能,跟踪操作执行情况:
const { MonitoringPlugin } = require('./plugins');
// 配置选项
const monitoringConfig = {
enabled: true,
track_performance: true
};
// 使用插件
this.use(MonitoringPlugin, 'monitoring');健康检查插件 (HealthCheckPlugin)
提供服务健康状态检查功能:
const { HealthCheckPlugin } = require('./plugins');
// 配置选项
const healthCheckConfig = {
enabled: true,
interval: 30000 // 检查间隔(毫秒)
};
// 使用插件
this.use(HealthCheckPlugin, 'healthCheck');创建自定义插件
插件需要实现 install 和 create 方法:
const MyCustomPlugin = {
/**
* 插件安装方法
* @param {BaseService} service - 服务实例
*/
install(service) {
// 初始化插件配置
service.config.myPlugin = service.config.myPlugin || {};
// 注册钩子
service.hook('init:after', () => {
$.log.debug('自定义插件已安装');
});
},
/**
* 创建插件实例
* @param {BaseService} service - 服务实例
* @returns {Object} - 插件实例
*/
create(service) {
return {
// 插件方法
doSomething() {
return '插件功能';
},
// 销毁方法(可选)
destroy() {
$.log.debug('插件已销毁');
}
};
}
};
// 使用自定义插件
this.use(MyCustomPlugin, 'myPlugin');示例
运行内置示例:
npm run exampleAPI 文档
BaseService 类
构造函数
const service = new BaseService(config);config: 配置对象,将与默认配置合并
核心方法
- init(): 初始化服务
- destroy(): 销毁服务,清理资源
- use(plugin, name): 注册插件
- getPlugin(name): 获取插件实例
- hook(hookName, handler): 注册钩子处理函数
- triggerHook(hookName, context): 触发钩子
- setConfig(config): 更新配置
事件
- init:before: 初始化前触发
- init:after: 初始化后触发
- destroy:before: 销毁前触发
- destroy:after: 销毁后触发
运行示例
- 克隆仓库
- 安装依赖:
npm install - 运行示例:
npm run example
许可证
MIT
