npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

stepify

v0.1.5

Published

Executing nodejs asynchronous tasks by steps chain

Downloads

8

Readme

stepify

Build Status NPM version

stepify是一个简单易用的Node.js异步流程控制库,提供一种比较灵活的方式完成Node.js(多)任务。

目标是将复杂的任务进行拆分成多步完成,使得每一步的执行过程更加透明,化繁为简。

stepify侧重流程控制,甚至可以和其他异步类库配合使用。

stepify特点

  • 最基本的API的就3个:step()done()run(),简单容易理解。

  • 精细的粒度划分(同时支持单/多任务),执行顺序可定制化。

  • 每一个异步操作都经过特殊的封装,内部只需要关心这个异步的执行过程。

  • 链式(chain)调用,代码逻辑看起来比较清晰。

  • 灵活的回调函数定制和参数传递。

  • 统一处理单个异步操作的异常,也可根据需要单独处理某个任务的异常。

最简单的用法

简单实现基于oauth2授权获取用户基本资料的例子:

// Authorizing based on oauth2 workflow
Stepify()
    .step('getCode', function(appId, rUri) {
        var root = this;
        request.get('[authorize_uri]', function(err, res, body) {
            root.done(err, JSON.parse(body).code);
        });
    }, [appId], [redirectUri])
    .step('getToken', function(code) {
        var root = this;
        request.post('[token_uri]', function(err, res, body) {
            root.done(err, JSON.parse(body).access_token);
        });
    })
    .step('getInfo', function(token) {
        request.get('[info_uri]?token=' + token, function(err, res, body) {
            // got user info, pass it to client via http response
        });
    })
    .run();

多个step共用一个handle、静态参数、动态参数传递的例子:

Stepify()
    .step('read', __filename)
    .step(function(buf) {
        // buf is the buffer content of __filename
        var root = this;
        var writed = 'test.js';

        // do more stuff with buf
        // this demo just replace all spaces simply
        buf = buf.toString().replace(/\s+/g, '');
        fs.writeFile(writed, buf, function(err) {
            // writed is the name of target file,
            // it will be passed into next step as the first argument
            root.done(err, writed);
        });
    })
    .step('read')
    // `read` here is a common handle stored in workflow
    .read(function(p, encoding) {
        fs.readFile(p, encoding || null, this.done.bind(this));
    })
    .run();

这里多了一个read()方法,但read方法并不是stepify内置的方法。实际上,您可以任意“扩展”stepify链!它的奥妙在于step()方法的参数,详细请看step调用说明

可以看到,一个复杂的异步操作,通过stepify定制,每一步都是那么清晰可读!

安装

$ npm install stepify

运行测试

$ npm install
$ mocha

灵活使用

var Stepify = require('stepify');
var workflow1 = Stepify().step(fn).step(fn)...run();
// or
var workflow2 = new Stepify().step(fn).step(fn)...run();
// or
var workflow3 = Stepify().step(fn).step(fn);
// do some stuff ...
workflow3.run();
// or
var workflow4 = Stepify().task('foo').step(fn).step(fn).task('bar').step(fn).step(fn);
// do some stuff ...
workflow4.run(['foo', 'bar']);
var workflow5 = Stepify().step(fn).step(fn);
workflow5.debug = true;
workflow5.error = function(err) {};
workflow5.result = function(result) {};
...
workflow5.run();
// more ...

注:文档几乎所有的例子都是采用链式调用,但是拆开执行也是没有问题的。

原理

概念:

  • task:完成一件复杂的事情,可以把它拆分成一系列任务,这些个任务有可能它的执行需要依赖上一个任务的完成结果,它执行的同时也有可能可以和其他一些任务并行,串行并行相结合,这其实跟真实世界是很吻合的。

  • step:每一个task里边可再细分,可以理解成“一步一步完成一个任务(Finish a task step by step)”,正所谓“一步一个脚印”是也。

stepify内部实际上有两个主要的类,一个是Stepify,一个是Step。

Stepify()的调用会返回一个Stepify实例,在这里称之为workflow,用于调度所有task的执行。

step()的调用会创建一个Step实例,用于完成具体的异步操作(当然也可以是同步操作,不过意义不大),step之间使用简单的api(done方法和next方法)传递。

API 文档

Stepify类:

调用Stepify即可创建一个workflow。

Step类:

Step类只在Stepify实例调用step方法时创建,不必显式调用。


debug()

描述:开启debug模式,打印一些log,方便开发。

调用:debug(flag)

参数:

  • {Boolean} flag 默认是false

例子:

var work = Stepify().debug(true);
// or
var work = Stepify();
work.debug = true;

task()

描述:显式创建一个task,task()的调用是可选的。在新定制一个task时,如果没有显式调用task(),则这个task的第一个step()内部会先生成一个task,后续的step都是挂在这个task上面,每一个task内部会维持自己的step队列。多个task使用pend方法分割。

调用:task([taskName])

参数:

  • {String} taskName 可选参数,默认是_UNAMED_TASK_[index]。为这个task分配一个名字,如果有多个task实例并且执行顺序需要(使用run()方法)自定义,则设置下taskName方便一点。

例子:

var myWork1 = Stepify().task('foo').step('step1').step('step2').run();
// equal to
var myWork1 = Stepify().step('step1').step('step2').run();
// multiply tasks
var myWork2 = Stepify()
	.task('foo')
    	.step(fn)
        .step(fn)
    .task('bar')
    	.step(fn)
        .step(fn)
    .task('baz')
    	.step(fn)
        .step(fn)
    .run();

step()

描述:定义当前task的一个异步操作,每一次step()调用都会实例化一个Step推入task的step队列。这个方法是整个lib的核心所在。

调用:step(stepName, stepHandle, *args)

参数:

  • {String} stepName 可选参数,但在不传stepHandle时是必传参数。为这个step分配一个名称。当stepHandle没有传入时,会在Stepify原型上扩展一个以stepName命名的方法,而它具体的实现则在调用stepName方法时决定,这个方法详情请看stepName说明

  • {Function} stepHandle 可选参数,但在stepName不传时是必传参数。在里边具体定义一个异步操作的过程。stepHandle的执行分两步,先查找这个step所属的task上有没有stepHandle,找不到则查找Stepify实例上有没有stepHandle,再没有就抛异常。

  • {Mix} *args 可选参数,表示这个step的已知参数(即静态参数),在stepHandle执行的时候会把静态参与动态参数(通过done或者next传入)合并作为stepHandle的最终参数。

例子:

  • 参数传递
Stepify()
	.step(function() {
    	var root = this;
        setTimeout(function() {
        	// 这里done的第二个参数(100)即成为下一个stepHandle的动态参数
        	root.done(null, 100);
        }, 100);
    })
    .step(function(start, n) {
    	// start === 50
        // n === 100
        var root = this;
        setTimeout(function() {
        	root.done();
        }, start + n);
    }, 50)
    .run();
  • 扩展原型链
Stepify()
    .step('sleep')
    // more step ...
    .step('sleep', 50)
    .sleep(function(start, n) {
        var args = [].slice.call(arguments, 0);
        var root = this;

        n = args.length ? args.reduce(function(mem, arg) {return mem + arg;}) : 100;
        setTimeout(function() {
            root.done(null, n);
        }, n);
    })
    .run();

pend()

描述:结束一个task的定义,会影响扩展到Stepify原型链上的stepName方法的执行。

调用:pend()

参数: 无参数。

例子:见*stepName*部分

stepName()

描述:这是一个虚拟方法,它是通过动态扩展Stepify类原型链实现的,具体调用的名称由step方法的stepName参数决定。扩展原型链的stepName的必要条件是step方法传了stepName(stepName需要是一个可以通过.访问属性的js变量)但是stepHandle没有传,且stepName在原型链上没定义过,当workflow执行结束之后会删除已经扩展到原型链上的所有方法。当调用实例上的stepName方法时,会检测此时有没有在定义的task(使用pend方法结束一个task的定义),如果有则把传入的handle挂到到这个task的handles池里,没有则挂到Stepify的handles池。

调用:stepName(stepHandle)

参数:

  • {Function} stepHandle 必传参数,定义stepName对应的stepHandle,可以在多个task之间共享。

例子:

  • pend()的影响
Stepify()
    .step('mkdir', './foo')
    // 这样定义,在执行到sleep时会抛异常,
    // 因为这个task上面没定义过sleep的具体操作
    .step('sleep', 100)
    .pend()
    .step('sleep', 200)
    .step('mkdir', './bar')
    .sleep(function(n) {
        var root = this;
        setTimeout(function() {
            root.done();
        }, n);
    })
    // 这个pend的调用,使得mkdir方法传入的handle挂在了Stepify handles池中,
    // 所以第一个task调用mkdir方法不会抛异常
    .pend()
    .mkdir(function(p) {
        fs.mkdir(p, this.done.bind(this));
    })
    .run();
  • stepHandle的查找
Stepify()
    .step('mkdir', './foo')
    // 定义当前task上的mkdirHandle,这里其实直接.step('mkdir', fn)更清晰
    .mkdir(function(p) {
        fs.mkdir(p, 0755, this.done.bind(this));
    })
    .step('sleep', 100)
    .pend()
    // 这个task上没定义mkdirHandle,会往Stepify类的handles池去找
    .step('mkdir', './bar')
    .step('sleep', 200)
    .pend()
    .sleep(function(n) {
        var root = this;
        setTimeout(function() {
            root.done();
        }, n);
    })
    .mkdir(function(p) {
        fs.mkdir(p, this.done.bind(this));
    })
    .run();

error()

描述:定制task的异常处理函数。

调用:error(errorHandle)

参数:

  • {Function} errorHandle 必传参数 默认会直接抛出异常并中断当前task的执行。每一个task都可以定制自己的errorHandle,亦可为所有task定制errorHandle。每个step执行如果出错会直接进入这个errorHandle,后面是否继续执行取决于errorHandle内部定义。errorHandle第一个参数便是具体异常信息。

注意:errorHandle的执行环境是发生异常所在的那个step,也就是说Step类定义的所有方法在errorHandle内部均可用,您可以在异常时决定是否继续执行下一步,或者使用this.taskNamethis.name分别访问所属task的名称和step的名称,进而得到更详细的异常信息。

例子:

Stepify()
    .step(fn)
    .step(fn)
    // 这个task的异常会走到这里
    .error(function(err) {
        console.error('Error occurs when running task %s\'s %s step!', this.taskName, this.name);
        if(err.message.match(/can_ignore/)) {
            // 继续执行下一步
            this.next();
        } else {
            throw err;
        }
    })
    .pend()
    .step(fn)
    .step(fn)
    .pend()
    // 所有没显式定义errorHandle的所有task异常都会走到这里
    .error(function(err) {
        console.error(err.stack);
        res.send(500, 'Server error!');
    })
    .run();

result()

描述:所有task执行完之后,输出结果。在Stepify内部,会保存一份结果数组,通过step的fulfill方法可以将结果push到这个数组里,result执行的时候将这个数组传入finishHandle。

调用:result(finishHandle)

参数:

  • {Function} finishHandle,result本身是可选调用的,如果调用了result,则finishHandle是必传参数。

例子:

Stepify()
    .step(function() {
        var root = this;
        setTimeout(function() {
            root.fulfill(100);
            root.done(null);
        }, 100);
    })
    .step(function() {
        var root = this;
        fs.readFile(__filename, function(err, buf) {
            if(err) return root.done(err);
            root.fulfill(buf.toString());
            root.done();
        });
    })
    .result(function(r) {
        console.log(r); // [100, fs.readFileSync(__filename).toString()]
    })
    .run();

run()

描述:开始执行所定制的整个workflow。这里比较灵活,执行顺序可自行定制,甚至可以定义一个workflow,分多种模式执行。

调用:run(*args)

参数:

  • {Mix} 可选参数,类型可以是字符串(taskName)、数字(task定义的顺序,从0开始)、数组(指定哪些tasks可以并行),也可以混合起来使用(数组不支持嵌套)。默认是按照定义的顺序串行执行所有tasks。

例子:

function createTask() {
    return Stepify()
        .task('task1')
            .step(function() {
                c1++;
                fs.readdir(__dirname, this.wrap());
            })
            .step('sleep')
            .step('exec', 'cat', __filename)
        .task('task2')
            .step('sleep')
            .step(function() {
                c2++;
                var root = this;
                setTimeout(function() {
                    root.done(null);
                }, 1500);
            })
            .step('exec', 'ls', '-l')
        .task('task3')
            .step('readFile', __filename)
            .step('timer', function() {
                c3++;
                var root = this;
                setTimeout(function() {
                    root.done();
                }, 1000);
            })
            .step('sleep')
            .readFile(function(p) {
                fs.readFile(p, this.done.bind(this));
            })
        .task('task4')
            .step('sleep')
            .step(function(p) {
                c4++;
                fs.readFile(p, this.wrap());
            }, __filename)
        .pend()
        .sleep(function() {
            console.log('Task %s sleep.', this.taskName);
            var root = this;
            setTimeout(function() {
                root.done(null);
            }, 2000);
        })
        .exec(function(cmd, args) {
            cmd = [].slice.call(arguments, 0);
            var root = this;
            exec(cmd.join(' '), this.wrap());
        });
};

var modes = {
    'Default(serial)': [], // 10621 ms.
    'Customized-serial': ['task1', 'task3', 'task4', 'task2'], // 10624 ms.
    'Serial-mix-parallel-1': ['task1', ['task3', 'task4'], 'task2'], // 8622 ms.
    'Serial-mix-parallel-2': [['task1', 'task3', 'task4'], 'task2'], // 6570 ms.
    'Serial-mix-parallel-3': [['task1', 'task3'], ['task4', 'task2']], // 6576 ms.
    'All-parallel': [['task1', 'task3', 'task4', 'task2']], // 3552 ms.
    'Part-of': ['task2', 'task4'] // 5526 ms.
};

var test = function() {
    var t = Date.now();
    var task;

    Object.keys(modes).forEach(function(mode) {
        task = createTask();

        task.result = function() {
            console.log(mode + ' mode finished and took %d ms.', Date.now() - t);
        };

        task.run.apply(task, modes[mode]);
    });

    setTimeout(function() {
        log(c1, c2, c3 ,c4); // [6, 7, 6, 7]
    }, 15000);
};

test();

done()

描述:标识完成了一个异步操作(step)。

调用:done([err, callback, *args])

参数:

  • {String|Error|null} err 错误描述或Error对象实例。参数遵循Node.js的回调约定,可以不传参数,如果需要传递参数,则第一个参数必须是error对象。

  • {Function} callback 可选参数 自定义回调函数,默认是next,即执行下一步。

  • {Mix} *args 这个参数是传递给callback的参数,也就是作为下一步的动态参数。一般来说是将这一步的执行结果传递给下一步。

例子:

Stepify()
    .step(function() {
        var root = this;
        setTimeout(function() {
            root.done();
        }, 200);
    })
    .step(function() {
        var root = this;
        exec('curl "https://github.com/"', function(err, res) {
            // end this task in error occured
            if(err) root.end();
            else root.done(null, res);
        });
    })
    .step(function(res) {
        var root = this;
        setTimeout(function() {
            // do some stuff with res ...
            console.log(res);
            root.done();
        }, 100);
    })
    .run();

wrap()

描述:其实就是this.done.bind(this)的简写,包装done函数保证它的执行环境是当前step。比如原生的fs.readFile()的callback的执行环境被设置为nullfs.js#L91

调用:wrap()

参数:无

例子:

Stepify()
    .step(function() {
        fs.readdir(__dirname, this.done.bind(this));
    })
    .step(function() {
        fs.readFile(__filename, this.wrap());
    })
    .run();

fulfill()

描述:把step执行的结果推入结果队列,最终传入finishHandle。最终结果数组的元素顺序在传入给finishHandle时不做任何修改。

调用:fulfill(*args)

参数:

  • {Mix} 可选参数 可以是一个或者多个参数,会一一push到结果队列。

例子:

// Assuming retrieving user info
Stepify()
    .step(function() {
        var root = this;
        db.getBasic(function(err, basic) {
            root.fulfill(basic || null);
            root.done(err, basic.id);
        });
    })
    .step(function(id) {
        var root = this;
        db.getDetail(id, function(err, detail) {
            root.fulfill(detail || null);
            root.done(err);
        });
    })
    .error(function(err) {
        console.error(err);
        res.send(500, 'Get user info error.');
    })
    .result(function(r) {
        res.render('user', {basic: r[0], detail: r[1]});
    })
    .run();

vars()

描述:暂存临时变量,在整个workflow的运行期可用。如果不想在workflow之外使用var申明别的变量,可以考虑用vars()。

调用:vars(key[, value])

参数:

  • {String} key 变量名。访问临时变量。

  • {Mix} value 变量值。如果只传入key则是访问变量,如果传入两个值则是写入变量并返回这个value。

例子:

Stepify()
    .step(function() {
        this.vars('foo', 'bar');
        // todo
    })
    .pend()
    .step(function() {
        // todo
        console.log(this.vars('foo')); // bar
    })
    .run();

parallel()

描述:简单的并发支持。这里还可以考虑引用其他模块(如:async)完成并行任务。

调用:parallel(arr[, iterator, *args, callback])

参数:

  • {Array} arr 必传参数。需要并行执行的一个数组,对于数组元素只有一个要求,就是如果有函数则所有元素都必须是一个函数。

  • {Function} iterator 如果arr参数是一个函数数组,这个参数是不用传的,否则是必传参数,它迭代运行arr的每一个元素。iterator的第一个参数是arr中的某一个元素,第二个参数是回调函数(callback),当异步执行完之后需要调用callback(err, data)

  • {Mix} *args 传递给iterator的参数,在迭代器执行的时候,arr数组的每一个元素作为iterator的第一个参数,*args则作为剩下的传入。

  • {Function} callback 可选参数(约定当最后一个参数是函数时认为它是回调函数) 默认是next。这个并行任务的执行结果会作为一个数组按arr中定义的顺序传入callback,如果执行遇到错误,则直接交给errHandle处理。

例子:

传入一个非函数数组(parallel(arr, iterator[, *arg, callback]))

Stepify()
    .step(function() {
        fs.readdir(path.resolve('./test'), this.wrap());
    })
    .step(function(list) {
        list = list.filter(function(p) {return path.extname(p).match('js');});
        list.forEach(function(file, i) {list[i] = path.resolve('./test', file);});
        // 注释部分就相当于默认的this.next
        this.parallel(list, fs.readFile, {encoding: 'utf8'}/*, function(bufArr) {this.next(bufArr);}*/);
    })
    .step(function(bufArr) {
        // fs.writeFile('./combiled.js', Buffer.concat(bufArr), this.done.bind(this));
        // or
        this.parallel(bufArr, fs.writeFile.bind(this, './combiled.js'));
    })
    .run();

传入函数数组(parallel(fnArr[]))

Stepify()
    .step(function() {
        this.parallel([
            function(callback) {
                fs.readFile(__filename, callback);
            },
            function(callback) {
                setTimeout(function() {
                    callback(null, 'some string...');
                }, 500);
            }
        ]);
    })
    .step(function(list) {
        console.log(list); // [fs.readFileSync(__filename), 'some string...']
        // todo...
    })
    .run();

下面是一个应用到某项目里的例子:

...
.step('fetch-images', function() {
    var root = this;
    var localQ = [];
    var remoteQ = [];

    // do dome stuff to get localQ and remoteQ

    this.parallel([
        function(callback) {
            root.parallel(localQ, function(frameData, cb) {
                // ...
                db.getContentType(frameData.fileName, function(type) {
                    var imgPath = frameData.fileName + '.' + type;
                    // ...
                    db.fetchByFileName(imgPath).pipe(fs.createWriteStream(targetUrl));
                    cb(null);
                });
            }, function(r) {callback(null, r);});
        },
        function(callback) {
            root.parallel(remoteQ, function(frameData, cb) {
                var prop = frames[frameData['frame']].children[frameData['elem']]['property'];
                // ...
                request({url: prop.src}, function(err, res, body) {
                    // ...
                    cb(null);
                }).pipe(fs.createWriteStream(targetUrl));
            }, function(r) {callback(null, r);});
        },
    ]);
})
...

jump()

描述:在step之间跳转。这样会打乱step的执行顺序,谨慎使用jump,以免导致死循环

调用:jump(index|stepName)

参数:

  • {Number} index 要跳转的step索引。在step创建的时候会自建一个索引属性,使用this._index可以访问它。

  • {String} stepName step创建时传入的名称。

例子:

Stepify()
    .step('a', fn)
    .step('b', fn)
    .step(function() {
        if(!this.vars('flag')) {
            this.jump('a');
            this.vars('flag', 1)
        } else {
            this.next();
        }

        // 其他的异步操作
    })
    .step('c', fn)
    .run();

next()

描述:显式调用下一个step,并将数据传给下一step(即下一个step的动态参数)。其实等同于done(null, *args)。

调用:next([*args])

参数:

  • {Mix} *args 可选参数 类型不限,数量不限。

例子:

Stepify()
    .step(function() {
        // do some stuff ...
        this.next('foo', 'bar');
    })
    .step(function(a, b, c) {
        a.should.equal('test');
        b.should.equal('foo');
        c.should.equal('bar');
    }, 'test')
    .run();

end()

描述:终止当前task的执行。如果遇到异常并传递给end,则直接交给errorHandle,和done一样。不传或者传null则跳出所在task执行下一个task,没有则走到result,没有定义result则退出进程。

调用:end(err)

参数:

  • {Error|null} err 可选参数, 默认null。

例子:

Stepify()
    .step(fn)
    .step(function() {
        if(Math.random() > 0.5) {
            this.end();
        } else {
            // todo ...
        }
    })
    .step(fn)
    .run();

最后,欢迎fork或者提交bug

License

MIT http://rem.mit-license.org