npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

refetcher

v0.2.0

Published

Fetch URL content into Redis

Downloads

5

Readme

refetcher

A microservice for Redis-based streaming of HTTP requests and responses for simplification and scaling of applicable services that consume HTTP responses for async processing.

Since the state of HTTP requests and responses is stored in Redis, multiple response handlers can be deployed e.g. for improved reliability and rolling updates.

Some external service can request a fetch via Redis as follows:

  • generate a new unique request id e.g. 123 via INCR fetch:id:seq
  • set hashes for that request especially url e.g. HSET fetch:123:h url ${url}
  • push the id to the request queue e.g. LPUSH fetch:req:q 123

This service performs the following operations:

  • pops the next request id from its Redis input queue e.g. 123 from fetch:req:q
  • retrieve the url for that request from Redis hashes e.g. fetch:123:h
  • HTTP fetch that URL using the node-fetch package
  • set the response text in Redis e.g. fetch:123:text as per res.text()
  • set the response headers in Redis e.g. fetch:123:headers:h hashes
  • notify subscribers via Redis pubsub e.g. publish 123 to channel fetch:res
  • handle failures, errors and retries e.g. via fetch:retry:q
  • pause processing when request rate and concurrency limits are exceeded

Typically sync services would subscribe to the channel fetch:res whereas async services might pull responses from the :res:q output queue.

Configuration

where all Redis keys will be prefixed with fetch

Incidently we don't necessarily actually rate limit the URL fetching, just slow down the process as follows:

if (Date.now() > counters.perMinute.timestamp + 60000) {
    counters.perMinute = new TimestampedCounter();
} else {
    counters.perMinute.count++;
}
if (counters.perMinute.count > config.perMinuteLimit) {
    await delay(config.rateDelayLimit);
}

where we pause this service for a configured delayLimit e.g. 2 seconds, before the next fetch operation.

Queues

const queueKeys = reduceKeys(
    ['req', 'res', 'busy', 'failed', 'errored', 'retry'],
    key => `${config.namespace}:${key}:q`
);

where queue keys are suffixed with :q

This service will brpoplush the next id as follows.

let id = await client.brpoplpushAsync(queueKeys.req, queueKeys.busy, config.popTimeout);

where in-flight requests are pushed to the busy queue.

Note that the onus is on drivers of this service to ensure a unique ID for the request. Naturally Redis INCR is recommended on this Redis instance, e.g. on key fetch:id:seq to provide a unique sequence number.

If no new incoming requests, we might retry an previous failed request from the retry queue.

if (!id) {
    if (counters.concurrent.count < config.concurrentLimit) {
        id = await client.rpoplpushAsync(queueKeys.retry, queueKeys.busy);
    }
}

where we first check that we are not at the limit of our concurrent requests, especially for retries.

Clear we give retries a lesser priority than new requests, and ensure they are somewhat delayed i.e. to retry "later."

After popping a request id, the service will retrieve the url from the hashes for this id

const hashesKey = [config.namespace, id, 'h'].join(':');
const hashes = await client.hgetallAsync(hashesKey);
if (!hashes) {
    logger.info('hashes expired', hashesKey);
} else {
    logger.debug('url', hashes.url, hashesKey, config.messageExpire);
    client.expire(hashesKey, config.messageExpire);
    handle(id, hashesKey, hashes);
}

Note that it is possible that the hashes of a request from retry:q will have expired, or because of delays when the load exceeds configured limits. Therefore persistent retries may require intervention by your application.

Handler

The url as retrieved from the hashes for this id is fetched i.e. an HTTP request is performed via the network.

const res = await fetch(hashes.url, {timeout: config.fetchTimeout});

where we use the node-fetch package for the HTTP request. Note that redirects should followed by default.

Reply

If an OK 200 HTTP response is received, then the response text is set in Redis, and the id pushed to :res:q i.e. to notify a reactive consumer that the response is ready for that id

if (res.status === 200) {
    const text = await res.text();
    logger.debug('text', text.length, hashesKey);
    await multiExecAsync(client, multi => {
        multi.hset(hashesKey, 'status', res.status);
        multi.setex(`${config.namespace}:${id}:text`, config.messageExpire, text);
        Object.keys(res.headers._headers).forEach(key => {
            multi.hset(`${config.namespace}:${id}:headers:h`, key, res.headers.get(key).toString());
        });
        multi.expire(`${config.namespace}:${id}:headers:h`, config.messageExpire);
        multi.lpush(queueKeys.res, id);
        multi.ltrim(queueKeys.res, config.queueLimit);
        multi.lrem(queueKeys.busy, 1, id);
        multi.publish(`${config.namespace}:res`, id);
    });
    ...
}

Error handling

Otherwise for a error status i.e. not 200 e.g. 500 or 404 or what you you, we increment a retry count and push the id to the failed queue.

multi.hincrby(hashesKey, 'retry', 1);
multi.hset(hashesKey, 'limit', config.retryLimit);
multi.hset(hashesKey, 'status', res.status);
multi.lpush(queueKeys.failed, id);
multi.ltrim(queueKeys.failed, 0, config.queueLimit);
multi.lrem(queueKeys.busy, 1, id);

If the retry count is within the limit then it will be retried later via the :retry:q queue.

if (retry < config.retryLimit) {
    await multiExecAsync(client, multi => {
        multi.lpush(queueKeys.retry, id);
        multi.ltrim(queueKeys.retry, 0, config.queueLimit);
    });
}

Note that if a network error occurs e.g. DNS lookup failure or request timeout, the id will be pushed to :error:q rather than :fail:q

As in the case of the status code, the id will be pushed to :retry:q