@anzerr/atium.broker
v1.0.42
Published
Light weight in memory message broker.
Downloads
25
Readme
Intro
Light weight in memory message broker. There are no stat logging for task. Tasks are run in the order they arrive/complete. There is still work that can be done at the moment it handles around 10k tasks a sec.
Features
- atomic task execution
- guarantee task being pooled
- sub/unsub event message system
- light weight in memory storage
- task chaining
- zero dependencies
How does it work?
- A worker connects to the broker.
- He tells the broker he can work.
- The server pushes a task to the worker it sends a acknowledge back.
- Runs the task and sends back the output if there's a next/chained task.
Install
npm install --save git+https://[email protected]/anzerr/atium.broker.gitExample of a task
const {Task, Server, Event} = require('atium.broker'),
Request = require('request.libary');
class TestTask extends Task {
constructor(c) {
super({
...c,
type: 'default'
});
this.store = {};
this.on('event:done', (msg) => { // sub to done event
this.store[msg.n] = false;
});
}
run(task) {
console.log('t', this.who, task);
this.event('done', {n: task.stuff}); // send event to everyone execpt me
return Promise.resolve();
}
}
(() => {
const config = {
socket: 'localhost:3001',
api: 'localhost:3002',
tasks: ['task_10001']
};
const send = (task) => {
return new Request(`http://${config.api}`).json(task).post('/add');
};
let t = null;
let s = new Server(config); // server
let e = new TestTask({ // client to receive events no tasks
socket: config.socket,
api: config.api,
tasks: []
});
let out = [];
for (let x = 0; x < 10; x++) {
e.store[x] = true;
e.store[x + 100] = true;
out.push({
tasks: [ // chain tasks
{
task: config.tasks[0], // run this task first
input: {
stuff: x
}
},
{
task: config.tasks[0], // run this task second
input: {
stuff: x + 100
}
}
]
});
}
const eventClient = new Event(config); // event client without tasks handling
eventClient.init().then(() => {
eventClient.subscribe('done');
eventClient.on('event:done', (msg) => {
console.log(msg);
});
});
e.on('connect', () => {
e.subscribe('done').then(() => {
console.log('subscribe to "done"');
t = new TestTask(config); // task client
Promise.all([
new Promise((resolve) => t.on('connect', () => resolve())), // wait for client to connect
send(out) // send out task creation
]).then(() => {
console.log('setup done');
setTimeout(() => { // dirty close
for (let i in e.store) {
if (e.store[i]) {
console.log('missing', i, 'did not recive event');
}
}
console.log('done task', JSON.stringify(e.store));
t.close();
e.close();
s.close();
}, 1000);
});
});
});
})();
