npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

promise-queue-observable

v1.0.6

Published

Build Observables composed of producers pushing args and consumers handling Promises

Downloads

15

Readme

promise-queue-observable

A Queue implemented on top of the simple SetQueue package. It implements a two-sided queue using promises where any number of publishers can dispatch events to any number of consumers.

import PromiseQueue from 'promise-queue-observable'

const observer = new PromiseQueue({
  /* Default Configuration */
  // Feel free to provide a different Promise implementation.
  promise: Promise,
  /* optional callback made when the Queue is cancelled through observer.cancel()
     the callback is made with the PromiseQueue instances this binding.
  */
  onCancel: undefined,
  /* queueStyle allows you to adjust how the queue is handled.  When 'next' (default)
     any calls to observer.next() will return the same promise until a new value has 
     been resolved.  
     
     If queueStyle is 'shift' then each call to observer.next() will return a new 
     promise that will resolve after all other promises ahead of it have been 
     resolved.
  */
  queueStyle: 'next',
  /* any time a new promise is created - if promiseFactory is given the promise is 
     sent to the factory with 'pull' or 'push' and the promise instance.
     expects the promise as a response.  this is used to modify the promise to make 
     it operate with 3rd party libraries like redux-saga's cancellation.
     (type, Promise)
  */
  promiseFactory: undefined,
  /* Log Errors ?
  */
  log: false,
})

Simple Example

import PromiseQueue from 'promise-queue-observable'

// Create an Observable
const observable = new PromiseQueue()

// consumers request promises - this promise will resolve when a 
// publisher provides a value.
observable.next()
  .then(result => {
    console.log(result) ; // [1, 2, 3]
  })
  .catch(e => {
    console.log(e) ; 
  })
  
observable.next()
  .then(result => {
    console.log(result) ; // [1, 2, 3]
  })
  .catch(e => {
    console.log(e) ; 
  })

// ... sometime later
observable.publish(1, 2, 3)
observable.publish(2, 3, 4)
observable.throw(new Error('some error'))

observable.next()
  .then(result => {
    console.log(result) ; // [2, 3, 4]
  })
  .catch(e => {
    console.log(e) ;
  })
  
observable.next()
  .then(result => {
    console.log(result)
  })
  .catch(e => {
    console.log(e.message) ; // some error
  })

Shift Example

import PromiseQueue from 'promise-queue-observable'

// Create an Observable
const observable = new PromiseQueue({
  queueStyle: 'shift'
})


// consumers request promises - this promise will resolve when a 
// publisher provides a value.
observable.next()
  .then(result => {
    console.log(result) ; // [1, 2, 3]
  })
  .catch(e => {
    console.log(e) ; 
  })
  
observable.next()
  .then(result => {
    console.log(result) ; // [2, 3, 4]
  })
  .catch(e => {
    console.log(e) ; 
  })

// ... sometime later
observable.publish(1, 2, 3)
observable.publish(2, 3, 4)
observable.throw(new Error('some error'))

observable.next()
  .then(result => {
    console.log(result) ;
  })
  .catch(e => {
    console.log(e) ; // some error
  })
  
observable.next()
  .then(result => {
    console.log(result) ; // [ 5, 6, 7 ]
  })
  .catch(e => {
    console.log(e.message) ;
  })
  
setTimeout(() => observable.publish(5, 6, 7), 5000)

Promisifying Callbacks

Below we are iterating the observables value and composing them over time until the observable is cancelled (or we don't return getNextEvent). The initial caller receives the final results.

import PromiseQueue from 'promise-queue-observable'

// Create an Observable
const observable = new PromiseQueue({ 
  log: true,
  onCancel: function handlePromiseQueueCancellation() {
    // handle cancellation however needed
  }
})

const getNextEvent = prev => (
  observable.cancelled() 
    ? prev
    : observable.next().then(value => {
      console.log('value: ', value)
      console.log('prev:  ', prev)
      return getNextEvent([ ...prev || [], ...e ])
    }).catch(e => {
      console.log('Error! ', e)
    })
)

getNextEvent().then(r => {
  console.log('result ', r)
  if ( ! observable.cancelled() ) {
    observable.cancel() ; // cleanup the queues and prevent publishers from 
                          // continually publishing to a queue that no one
                          // will ever consume by accident.
  }
})

// works geat for things like window.addEventListener, etc.
setTimeout(() => observable.publish(1), 1000)
setTimeout(() => observable.publish(2), 3000)
setTimeout(() => observable.publish(3), 2000)
setTimeout(() => observable.publish(4), 6000)
setTimeout(() => observable.cancel(), 3000)

/*
value:  [ 1 ]
prev:   undefined
value:  [ 3 ]
prev:   [ 1 ]
value:  [ 2 ]
prev:   [ 1, 3 ]
result  [ 1, 3, 2 ]
[SagaObservable]: Publish Received after Cancellation  undefined [ 4 ]
*/