npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

wistroni40-dmc

v1.0.12

Published

DMC Template

Downloads

16

Readme

wistroni40-dmc

Install

npm i wistroni40-dmc --save

Table of Contents

Feature

  • Logger
    • 建立抽象日誌轉接器,提供客製日誌功能
    • 建立 Log4js 日誌轉接器
  • Storage
    • 建立備援儲存功能,提供資料保存
    • 建立 Local Storage 的備援儲存功能
  • CRON
    • 建立抽象排程執行器,提供客製設定排程工作
    • 建立 ElasticSearch 查詢排程執行器
  • Consumer
    • 建立 Consumer 抽象類別,提供客製 Consumer 轉接器
    • 建立 Kafka Consumer 轉接器
    • 建立 MQTT Consumer 轉接器
    • 建立 CRON Consumer 轉接器
    • 建立複合式 Consumer 轉接器,可插入多個轉接器,介接多個數據來源
    • 建立 Consumer 資料解析策略
      • 建立 Kafka Confluent Avro 資料解析策略
      • 建立 Kafka JSON 資料解析策略
      • 建立 MQTT JSON 資料解析策略
      • 建立 ElasticSearch Hits 資料解析策略
      • 建立 ElasticSearch 聚合資料解析策略
  • Producer
    • 建立 Producer 抽象類別,提供客製 Producer 轉接器
    • 建立 Kafka Producer 轉接器
    • 建立 HTTP Post 轉接器
    • 建立 MQTT Producer 轉接器
  • Initializer
    • 建立抽象初始化程序,實作該類可將要監控的數據加入至報警資料當中,同時也可以定 期更新,去添加或移除不再監控範圍的資料
  • Alarm
    • 建立抽象報警流程範本,只需繼承並實作(或覆寫)對應屬性及方法,即可發送報警
    • 建立抽象報警狀態物件,只需繼承並實作對應方法,即可讓報警自動升級或解除
    • 報警流程範本內建時間驅動(Timer-Driven)、資料驅動(Data-Driven)及混合驅動 (Mixin-Driven),三種驅動報警的類型
    • 報警流程範本內建失敗重新拋送機制,當上拋失敗,可設定重新拋送次數及間格時間, 進行重拋
    • 報警流程範本內建備援機制,當服務終止,會保存之前報警的升級紀錄,避免重新啟動 後,誤拋報警資料

Example

Alarm API

Alarm 物件提供報警狀態紀錄物件以及報警流程範本

Alarm State (Abstract Class)

Alarm State 提供報警升級條件的功能,各等級的報警僅需要實作其升級或解除的邏輯,繼 承 AlarmState,實作以下內容

  • Properties
    • level
  • Methods
    • change
import { Alarm, AlarmLevel, AlarmState } from 'wistroni40-dmc';
import { Enviroment } from '../models';
import { Level4State } from './level4.state';

/**
 * 未達報警狀態
 */
export class NoneState extends AlarmState {
  /**
   * 報警等級
   */
  public level: AlarmLevel = null;

  constructor() {
    super('NONE.STATE');
  }

  /**
   * 更新報警
   *
   * @method public
   * @param alarm 報警狀態
   */
  public change(alarm: Alarm<Enviroment>): void | Promise<void> {
    if (/** 達報警升級條件 **/)) {
      alarm.updateLevel(new Level4State());
    }
  }
}

Alarm Template (Abstract Class)

Alarm Template 提供報警流程的實作,僅需將特定 Properties 與 Methods 實作完成,即 可讓報警自動升級、自動保存與自動發送

Constructor

| 參數 | 型別 | 說明 | | ------- | :---------: | :------------------------------------------- | | id | string | 報警服務 ID,同時也作為備援資料保存的 Key 值 | | options | AlarmConfig | 報警流程配置 |

import { AlarmTemplate } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  /**
   * @param id      報警服務 ID
   * @param options 報警流程配置
   */
  constructor(
    protected id: string,
    protected options: AlarmConfig = new AlarmConfigEntity(),
  ) {
    super(id, options);
  }
}

Properties

alarmTrigger

報警觸發器,提供報警資料觸發機制

| 型別 | 實作 | | ------------------------------- | :--: | | AlarmTriggerStrategy<D> | ✔️ |

Timer Driven 時間驅動

作為 By Time 升級報警的驅動方式

import { AlarmTemplate, TimerTriggerStrategy } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  /**
   * 報警觸發器
   *
   * @description 建構值提供 CRON
   */
  protected alarmTrigger: AlarmTriggerStrategy = new TimerTriggerStrategy(
    '*/1 * * * * *',
  );
}
Data Driven 資料驅動

作為 By Status 或 By Pcs 升級報警的驅動方式

import { AlarmTemplate, DataTriggerStrategy } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  /**
   * 報警觸發器
   */
  protected alarmTrigger: AlarmTriggerStrategy = new DataTriggerStrategy();
}
Mixin Driven 混合驅動

結合時間驅動(Timer-Driven )與資料驅動(Data-Driven)的報警驅動方式

import { AlarmTemplate, MixinTriggerStrategy } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  /**
   * 報警觸發器
   */
  protected alarmTrigger: AlarmTriggerStrategy = new MixinTriggerStrategy(
    '*/1 * * * * *',
  );
}

publishedLocation

報警發送位置

| 型別 | 實作 | | ------ | :--: | | string | ✔️ |

import { AlarmTemplate } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  /**
   * 報警發送位置
   */
  protected publishedLocation: string;

  /**
   * @param id 報警服務 ID
   */
  constructor(protected id = 'power-meter') {
    super(id);
    this.publishedLocation = 'http://localhost:4000';
  }
}

Methods

init()

覆寫: ✔️

初始化,提供報警開始時初始狀態的功能

import { AlarmTemplate } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  /**
   * 初始化
   *
   * @override
   * @method public
   */
  public async init(): Promise<void> {
    // TODO
  }
}

consumer()

實作: ✔️

取得資料消費者

| 參數 | 型別 | 說明 | | ------ | :----------------------------------: | :------------- | | Return | Promise<Consumer<D>> | 回傳資料消費者 |

import { AlarmTemplate } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  /**
   * 取得資料消費者
   *
   * @method public
   * @return 回傳資料消費者
   */
  public async consumer(): Promise<Consumer<S>> {
    const client = mqtt.connect('mqtt://localhost:1883');
    const topic = 'wks/cim/t1/femii/alarm/dev/enviroment/#';
    return new MqttConsumerAdapter(client, topic);
  }
}

resolve()

實作: ✔️

解析消費資料,目前內建以下解析策略

  • MqttPayloadStrategy: MQTT 訂閱資料解析
  • ConfluentConsumeStrategy: Kafka Confluent Avro 資料解析
  • JsonConsumeStrategy: Kafka JSON 資料解析

| 參數 | 型別 | 說明 | | ------- | :----------------: | :------------------- | | message | <S> | 消費資料 | | Return | Promise<T> | 回傳解析後的消費資料 |

import { AlarmTemplate } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  /**
   * 解析消費資料
   *
   * @method public
   * @param message 消費資料
   * @return 回傳解析後的消費資料
   */
  public async resolve(message: S): Promise<T> {
    return new MqttPayloadStrategy().resolve(message);
  }
}

exclude()

覆寫: ✔️

該筆資料是否要排除

| 參數 | 型別 | 說明 | | ------ | :---------: | :---------------------------------------------- | | entity | <T> | 資料實體 | | Return | boolean | 回傳該筆資料是否要排除,true: 排除、false: 保留 |

import { AlarmTemplate } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  /**
   * 該筆資料是否要排除
   *
   * @override
   * @method public
   * @param entity 資料實體
   * @return 回傳該筆資料是否要排除
   */
  public exclude(entity: T): boolean {
    return /** 可根據資料或其他因素決定是否要保留該筆資料 */;
  }
}

keyBy()

實作: ✔️

取得資料 Key 值,該值可以做報警所需的 Sync ID

| 參數 | 型別 | 說明 | | ------ | :---------: | :-------------- | | entity | <T> | 資料實體 | | Return | string | 回傳資料 Key 值 |

import { AlarmTemplate } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  /**
   * 取得資料Key值
   *
   * @method public
   * @param entity 資料實體
   * @return 回傳資料Key值
   */
  public keyBy(entity: T): string {
    return `${entity.Plant}.${entity.Building}.${entity.NAME}`;
  }
}

defaultLevel()

實作: ✔️

取得預設的報警等級狀態

| 參數 | 型別 | 說明 | | ------ | :---------: | :--------------------- | | entity | <T> | 資料實體 | | Return | AlarmState | 回傳預設的報警等級狀態 |

import { AlarmTemplate } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  /**
   * 取得預設的報警等級狀態
   *
   * @method public
   * @param entity 資料實體
   * @return 回傳預設的報警等級狀態
   */
  public defaultLevel(entity: T): AlarmState {
    return new NoneState();
  }
}

level4()

實作: ✔️

取得L4報警等級狀態

| 參數 | 型別 | 說明 | | ------ | :---------: | :--------------------- | | entity | <T> | 資料實體 | | Return | AlarmState | 回傳預設的報警等級狀態 |

import { AlarmTemplate } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  /**
   * 取得等級 4 報警等級狀態
   *
   * @method public
   * @param entity 資料實體
   * @return 回傳等級 4 報警等級狀態
   */
  public level4(entity: Enviroment): AlarmState {
    return new Level4State();
  }
}

level3()

實作: ✔️

取得L3報警等級狀態

| 參數 | 型別 | 說明 | | ------ | :---------: | :--------------------- | | entity | <T> | 資料實體 | | Return | AlarmState | 回傳預設的報警等級狀態 |

import { AlarmTemplate } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  /**
   * 取得等級 3 報警等級狀態
   *
   * @method public
   * @param entity 資料實體
   * @return 回傳等級 3 報警等級狀態
   */
  public level3(entity: Enviroment): AlarmState {
    return new Level3State();
  }
}

level2()

實作: ✔️

取得L2報警等級狀態

| 參數 | 型別 | 說明 | | ------ | :---------: | :--------------------- | | entity | <T> | 資料實體 | | Return | AlarmState | 回傳預設的報警等級狀態 |

import { AlarmTemplate } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  /**
   * 取得等級 2 報警等級狀態
   *
   * @method public
   * @param entity 資料實體
   * @return 回傳等級 2 報警等級狀態
   */
  public level2(entity: Enviroment): AlarmState {
    return new Level2State();
  }
}

level1()

實作: ✔️

取得L1報警等級狀態

| 參數 | 型別 | 說明 | | ------ | :---------: | :--------------------- | | entity | <T> | 資料實體 | | Return | AlarmState | 回傳預設的報警等級狀態 |

import { AlarmTemplate } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  /**
   * 取得等級 1 報警等級狀態
   *
   * @method public
   * @param entity 資料實體
   * @return 回傳等級 1 報警等級狀態
   */
  public level1(entity: Enviroment): AlarmState {
    return new Level1State();
  }
}

payload()

實作: ✔️

打包報警發送數據

| 參數 | 型別 | 說明 | | ------ | :----------------------: | :--------------- | | alarm | Alarm<T> | 報警等級狀態 | | Return | <P = AlarmModel> | 回傳報警發送數據 |

import { AlarmTemplate } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  /**
   * 打包報警發送數據
   *
   * @method public
   * @param alarm 報警等級狀態
   * @return 回傳報警發送數據
   */
  public payload(alarm: Alarm<T>): AlarmModel {
    return new AlarmEntity({
      site: alarm.data.site,
      plant: alarm.data.site,
      eventId: 'FEM001',
      eventType: 0,
      syncId: alarm.key,
      alertType: alarm.level === null ? 1 : 0,
      alertItem: 1,
      IssueType: 0,
      level: alarm.level,
      shortMessage: `${alarm.key} missing`,
      eventTime: new Date().getTime().toString(),
      evtvalue1: alarm.data.site,
      evtvalue2: alarm.data.building,
      evtvalue3: alarm.data.meterId,
      toDMC: 1,
      toNotify: 0,
    });
  }
}

producer()

覆寫: ✔️

取得資料生產者

| 參數 | 型別 | 說明 | | ------ | :-----------------------------------------------: | :------------- | | Return | Promise<Producer<P = AlarmModel>> | 回傳資料生產者 |

import { AlarmTemplate } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  /**
   * 取得資料生產者
   *
   * @override
   * @method public
   * @return 回傳資料生產者
   */
  public async producer(): Promise<Producer<P>> {
    return new HttpProducerAdapter(/** URL */);
  }
}

execute()

執行報警判定

| 參數 | 型別 | 說明 | | ------ | :------------------------------------------------------: | :--------------- | | Return | Observable<AlarmPayload<P = AlarmModel>> | 回傳報警發送結果 |

import { AlarmTemplate } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  ...
}

const alarmService = new AlarmService('alarm');
alarmService.execute().subscribe(res => console.log(res));

getAlarmEntity()

取得特定鍵值的報警資料

| 參數 | 型別 | 說明 | | ------ | :------------------------------: | :--------------------- | | entity | <T> | 資料實體 | | Return | Alarm<T>| undefined | 回傳特定鍵值的報警資料 |

import { AlarmTemplate } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  ...
}

const alarmService = new AlarmService('alarm');
const alarm = alarmService.getAlarmEntity(/** 要取得的資料 */)

getAllAlarmEntities()

取得所有報警資料

| 參數 | 型別 | 說明 | | ------ | :-----------------------------------: | :--------------- | | Return | Map<string, Alarm<T>> | 回傳所有報警資料 |

import { AlarmTemplate } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  ...
}

const alarmService = new AlarmService('alarm');
const alarm = alarmService.getAllAlarmEntities()

storeAlarmEntity()

保存報警資料

| 參數 | 型別 | 說明 | | ------ | :---------: | :------- | | entity | <T> | 資料實體 |

import { AlarmTemplate } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  ...
}

const alarmService = new AlarmService('alarm');
alarmService.storeAlarmEntity(/** 要保存的資料 */)

isAlarmEntityExist()

該報警資料是否存在

| 參數 | 型別 | 說明 | | ------ | :---------: | :--------------------- | | entity | <T> | 資料實體 | | Return | boolean | 回傳該報警資料是否存在 |

import { AlarmTemplate } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  ...
}

const alarmService = new AlarmService('alarm');
alarmService.isAlarmEntityExist(/** 要檢視的資料 */)

deleteAlarmEntity()

刪除特定報警資料

| 參數 | 型別 | 說明 | | ------ | :---------: | :------- | | entity | <T> | 資料實體 |

import { AlarmTemplate } from 'wistroni40-dmc';

class AlarmService extends AlarmTemplate {
  ...
}

const alarmService = new AlarmService('alarm');
alarmService.deleteAlarmEntity(/** 要刪除的資料 */)

Consumer API

Consumer 物件作為介接報警獲取所需的資料來源,目前提供以下轉接器,若所需要的 Consumer 不包含在以下,可實作 ConsumerAdapter

  • Kafka: 支持 kafka-node 套件的轉接器,該套件使用方式可參閱 https://www.npmjs.com/package/kafka-node
  • MQTT: 支持 mqtt 套件的轉接器,該套件使用方式可參閱 https://www.npmjs.com/package/mqtt
  • CRON: 支持使用排程定期呼叫資料,可作為 Batch 或轉換為 Streaming 的方式使用,目 前提供以下幾種 datasource 進行串接
    • ElasticSearch: 支持 elasticsearch 套件的執行器,該套件使用方式可參閱 https://www.npmjs.com/package/elasticsearch

Kafka Adapter

支持 Kafka 資料介接,並提供 Confluent Avro 與 JSON 的資料解析方式

KafkaConsumerAdapter

| 參數 | 型別 | 說明 | | -------- | :-----------: | :--------------- | | consumer | ConsumerGroup | Kafka 資料消費者 |

kafka 轉接器範例,使用 JSON 解析策略

import { ConsumerGroup } from 'kafka-node';
import { mergeMap } from 'rxjs/operators';
import { KafkaConsumerAdapter, JsonConsumeStrategy } from 'wistroni40-dmc';

// 初始化 Kafka Consumer
const options = { kafkaHost: 'localhost:1883' };
const topic = 'your.kafka.topic';
const consumer = new ConsumerGroup(options, topic);

// Kafka 轉接器
const consumerAdapter = new KafkaConsumerAdapter(consumer);

// Kafka JSON 解析策略
const strategy = new JsonConsumeStrategy();

// 訂閱資料
consumerAdapter
  .consume()
  .pipe(mergeMap(res => strategy.resolve(res)))
  .subscribe(res => console.log(res));

kafka 轉接器範例,使用 Confluent Avro 解析策略

import { ConsumerGroup } from 'kafka-node';
import { mergeMap } from 'rxjs/operators';
import { KafkaConsumerAdapter, ConfluentConsumeStrategy } from 'wistroni40-dmc';

// 初始化 Kafka Consumer
const options = { kafkaHost: 'localhost:1883' };
const topic = 'your.kafka.topic';
const consumer = new ConsumerGroup(options, topic);

// Kafka 轉接器
const consumerAdapter = new KafkaConsumerAdapter(consumer);

// Kafka JSON 解析策略,須提供 Schema Registry 位置
const strategy = new ConfluentConsumeStrategy('http://localhost:8080');

// 訂閱資料
consumerAdapter
  .consume()
  .pipe(mergeMap(res => strategy.resolve(res)))
  .subscribe(res => console.log(res));

MQTT Adapter

支持 MQTT 資料介接,並提供 JSON 的資料解析方式

MqttConsumerAdapter

| 參數 | 型別 | 說明 | | -------- | :--------: | :---------------- | | consumer | MqttClient | MQTT 資料消費者 | | topic | string | MQTT 要訂閱的主題 |

import * as mqtt from 'mqtt';
import { mergeMap } from 'rxjs/operators';
import { MqttConsumerAdapter, MqttPayloadStrategy } from 'wistroni40-dmc';

// 初始化 MQTT Client
const topic = 'your/mqtt/topic/#';
const client = mqtt.connect('mqtt://localhost:1883');

// MQTT 轉接器
const consumerAdapter = new MqttConsumerAdapter(client, topic);

// Kafka JSON 解析策略
const strategy = new MqttPayloadStrategy();

// 訂閱資料
consumerAdapter
  .consume()
  .pipe(mergeMap(res => strategy.resolve(res)))
  .subscribe(res => console.log(res));

CRON Adapter

支持排程定期呼叫資料,使用 ElasticSearch 作為資料調用執行器

CronConsumerAdapter

| 參數 | 型別 | 說明 | | -------- | :------------: | :---------------------------- | | _cron | string | 排程, ex: */5 * * * * * | | executor | CronExecutor | 排程執行器 | | batch | boolean = true | 是否已批量(Array)方式送出數據 |

import { Client } from 'elasticsearch';
import { ElasticsearchSearchExecutor } from 'wistroni40-dmc';

// 初始化 ElasticSearch Client
const client = new Client({ host: ['http://localhost:9200/'] });
const index = 'your_es_index';
const type = 'your_es_type';
// 使用 ElasticBuilder 建立 ElasticSearch 查詢語句
const builder = new QueryBuilder();
const executor = new ElasticsearchSearchExecutor(
  client,
  index,
  type,
  builder,
  'hits',
);

// CRON 轉接器
const cron = '*/5 * * * * *';
const consumerAdapter = new CronConsumerAdapter(cron, executor, false);

// 訂閱資料
consumerAdapter.consume().subscribe(res => console.log(res));

Composition Adapter

支持插入多個資料消費者轉接器,讓資料可以重多個來源獲取

CompositionConsumerAdapter

方法說明

添加資料消費者轉接器

addConsumer

| 參數 | 型別 | 說明 | | -------- | :---------------------: | :------------------------------------------------- | | name | string | 資料消費者名稱,讓獲取資料時,可以知道其來源自何處 | | consumer | ConsumerAdapter | 資料消費者轉接器 | | resolver | ConsumerResolveStrategy | 消費資料解析策略 | | 回傳值 | this | 回傳物件本身 |

keyBy

設定鍵值合成方法

| 參數 | 型別 | 說明 | | ------ | :-----------------: | :----------- | | keyFn | (data: T) => string | 鍵值合成方法 | | 回傳值 | this | 回傳物件本身 |

process

實作資料合成方法

| 參數 | 型別 | 說明 | | ------- | :-------------------------------------------------------: | :----------- | | process | (name: string, cache: D | undefined, data: any) => D | 資料合成方法 | | 回傳值 | this | 回傳物件本身 |

// 資料來源1
const client1 = mqtt.connect('mqtt://localhost:1883');
const topic1 = 'your/mqtt/topic1/#';
const consumer1 = new MqttConsumerAdapter(client1, topic1);
const resolver1 = new MqttPayloadStrategy();

// 資料來源2
const client2 = mqtt.connect('mqtt://localhost:1883');
const topic2 = 'your/mqtt/topic2/#';
const consumer2 = new MqttConsumerAdapter(client2, topic2);
const resolver2 = new MqttPayloadStrategy();

// 複合式轉接器
const consumerAdapter = new CompositionConsumerAdapter()
  .addConsumer('source1', consumer1, resolver1)
  .addConsumer('source2', consumer2, resolver2)
  .keyBy(data => `${data.field1}.${data.field2}`)
  .process((n, c, d) => /** TODO */);

// 訂閱資料
consumerAdapter.consume().subscribe(res => console.log(res));

Custom Adapter

若無匹配的轉接器,可透過以下方式自行客製,以 MQTT 為例

import { MqttClient } from 'mqtt';
import { IPublishPacket } from 'mqtt-packet';
import { Observable } from 'rxjs';
import { ConsumerAdapter } from 'wistroni40-dmc';

export class MqttConsumerAdapter extends ConsumerAdapter<
  MqttClient,
  IPublishPacket
> {
  /**
   * @param consumer 資料消費者
   * @param topic    要訂閱的主題
   */
  constructor(protected consumer: MqttClient, protected topic: string) {
    super(consumer);
  }

  /**
   * 消費資料
   *
   * @method public
   * @return 取得要消費的資料
   */
  public consume(): Observable<IPublishPacket> {
    return new Observable(sub => {
      this.consumer.on('connect', () => this.consumer.subscribe(this.topic));
      this.consumer.on('message', (topic, payload, packet) => sub.next(packet));
    });
  }
}

Initializer API

初始化程序主要提供初始報警紀錄的載入,常見的使用情境是,當特定的資料要進行報警, 非全部的資料要監控,