Cara menggunakan nodejs queue library

Bee-Queue is meant to power a distributed worker pool and was built with short, real-time jobs in mind. A web server can enqueue a job, wait for a worker process to complete it, and return its results within an HTTP request. Scaling is as simple as running more workers.

Thanks to the folks at Mixmax, Bee-Queue is once again being regularly maintained!

Celery, Resque, Kue, and Bull operate similarly, but are generally designed for longer background jobs, supporting things like job prioritization and repeatable jobs, which Bee-Queue . Bee-Queue can handle longer background jobs just fine, but they aren't .

  • Create, save, and process jobs
  • Concurrent processing
  • Job timeouts, retries, and retry strategies
  • Scheduled jobs
  • Pass events via Pub/Sub
    • Progress reporting
    • Send job results back to producers
  • Robust design
    • Strives for all atomic operations
    • Retries for at-least-once delivery
    • High code coverage
  • Performance-focused
    • Minimizes
    • Uses Lua scripting and pipelining to minimize network overhead
    • favorably against similar libraries
  • Fully callback- and Promise-compatible API

Installation

$ npm install bee-queue

You'll also need Redis 2.8+* running somewhere.

* We've been noticing that some jobs get delayed by virtue of an issue with Redis < 3.2, and therefore recommend the use of Redis 3.2+.

Table of Contents
  • License
Motivation

Celery is for Python, and Resque is for Ruby, but Kue and Bull already exist for Node, and they're good at what they do, so why does Bee-Queue also need to exist?

In short: we needed to mix and match things that Kue does well with things that Bull does well, and we needed to squeeze out more performance. There's also a long version with more details.

Bee-Queue starts by combining Bull's simplicity and robustness with Kue's ability to send events back to job creators, then focuses heavily on minimizing overhead, and finishes by being strict about code quality and testing. It compromises on breadth of features, so there are certainly cases where Kue or Bull might be preferable (see ).

Bull and Kue do things really well and deserve a lot of credit. Bee-Queue borrows ideas from both, and Bull was an especially invaluable reference during initial development.

Why Bees?

Bee-Queue is like a bee because it:

  • is small and simple
  • is fast (bees can fly 20mph!)
  • carries pollen (messages) between flowers (servers)
  • something something "worker bees"
Benchmarks

Cara menggunakan nodejs queue library

These basic benchmarks ran 10,000 jobs through each library, at varying levels of concurrency, with Node.js (v6.9.1, v6.11.2, v7.6.0, v7.10.1, v8.2.1, v8.3.0) and Redis (v3.2.10, v4.0.1) running directly on an Amazon AWS EC2 m4.large. The numbers shown are averages of 36 runs, 3 for each combination of the aforementioned Redis and Node versions. The raw data collected and code used are available in the benchmark folder.

Web Interface

Check out the Arena web interface to manage jobs and inspect queue health.

Overview

Creating Queues

objects are the starting point to everything this library does. To make one, we just need to give it a name, typically indicating the sort of job it will process:

const Queue = require('bee-queue');
const addQueue = new Queue('addition');

Queues are very lightweight — the only significant overhead is connecting to Redis — so if you need to handle different types of jobs, just instantiate a queue for each:

const subQueue = new Queue('subtraction', {
  redis: {
    host: 'somewhereElse',
  },
  isWorker: false,
});

Here, we pass a

const job = addQueue.createJob({x: 2, y: 3});
job
  .timeout(3000)
  .retries(2)
  .save()
  .then((job) => {
    // job enqueued, job.id populated
  });
5 object to specify an alternate Redis host and to indicate that this queue will only add jobs (not process them). See for more options.

Creating Jobs

Jobs are created using

const job = addQueue.createJob({x: 2, y: 3});
job
  .timeout(3000)
  .retries(2)
  .save()
  .then((job) => {
    // job enqueued, job.id populated
  });
6, which returns a object storing arbitrary
const job = addQueue.createJob({x: 2, y: 3});
job
  .timeout(3000)
  .retries(2)
  .save()
  .then((job) => {
    // job enqueued, job.id populated
  });
7.

Jobs have a chaining API for configuring the Job, and

const job = addQueue.createJob({x: 2, y: 3});
job
  .timeout(3000)
  .retries(2)
  .save()
  .then((job) => {
    // job enqueued, job.id populated
  });
8 method to save the job into Redis and enqueue it for processing:

const job = addQueue.createJob({x: 2, y: 3});
job
  .timeout(3000)
  .retries(2)
  .save()
  .then((job) => {
    // job enqueued, job.id populated
  });

The Job's

const job = addQueue.createJob({x: 2, y: 3});
job
  .timeout(3000)
  .retries(2)
  .save()
  .then((job) => {
    // job enqueued, job.id populated
  });
9 method returns a Promise in addition to calling the optional callback.

Each Job can be configured with the commands

addQueue
  .saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})])
  .then((errors) => {
    // The errors value is a Map associating Jobs with Errors. This will often be an empty Map.
  });
0,
addQueue
  .saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})])
  .then((errors) => {
    // The errors value is a Map associating Jobs with Errors. This will often be an empty Map.
  });
1,
addQueue
  .saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})])
  .then((errors) => {
    // The errors value is a Map associating Jobs with Errors. This will often be an empty Map.
  });
2,
addQueue
  .saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})])
  .then((errors) => {
    // The errors value is a Map associating Jobs with Errors. This will often be an empty Map.
  });
3, and
addQueue
  .saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})])
  .then((errors) => {
    // The errors value is a Map associating Jobs with Errors. This will often be an empty Map.
  });
4 for setting options.

Jobs can later be retrieved from Redis using , but most use cases won't need this, and can instead use .

Advanced: Bulk-Creating Jobs

Normally, creating and saving jobs blocks the underlying redis client for the full duration of an RTT to the Redis server. This can reduce throughput in cases where many operations should occur without delay - particularly when there are many jobs that need to be created quickly. Use

addQueue
  .saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})])
  .then((errors) => {
    // The errors value is a Map associating Jobs with Errors. This will often be an empty Map.
  });
5 to save an iterable (e.g. an Array) containing jobs in a pipelined network request, thus pushing all the work out on the wire before hearing back from the Redis server.

addQueue
  .saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})])
  .then((errors) => {
    // The errors value is a Map associating Jobs with Errors. This will often be an empty Map.
  });

Each job in the array provided to saveAll will be mutated with the ID it gets assigned.

Processing Jobs

To start processing jobs, call

addQueue
  .saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})])
  .then((errors) => {
    // The errors value is a Map associating Jobs with Errors. This will often be an empty Map.
  });
6 and provide a handler function:

addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});

Instead of calling the provided callback, the handler function can return a

addQueue
  .saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})])
  .then((errors) => {
    // The errors value is a Map associating Jobs with Errors. This will often be an empty Map.
  });
7. This enables the intuitive use of
addQueue
  .saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})])
  .then((errors) => {
    // The errors value is a Map associating Jobs with Errors. This will often be an empty Map.
  });
8/
addQueue
  .saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})])
  .then((errors) => {
    // The errors value is a Map associating Jobs with Errors. This will often be an empty Map.
  });
9:

addQueue.process(async (job) => {
  console.log(`Processing job ${job.id}`);
  return job.data.x + job.data.y;
});

The handler function is given the job it needs to process, including

addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
0 from when the job was created. It should then pass results either by returning a
addQueue
  .saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})])
  .then((errors) => {
    // The errors value is a Map associating Jobs with Errors. This will often be an empty Map.
  });
7 or by calling the
addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
2 callback. For more on handlers, see .

addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
3 can only be called once per
addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
4 instance, but we can process on as many instances as we like, spanning multiple processes or servers, as long as they all connect to the same Redis instance. From this, we can easily make a worker pool of machines who all run the same code and spend their lives processing our jobs, no matter where those jobs are created.

addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
3 can also take a concurrency parameter. If your jobs spend most of their time just waiting on external resources, you might want each processor instance to handle at most 10 at a time:

const baseUrl = 'http://www.google.com/search?q=';
subQueue.process(10, function (job, done) {
  http.get(`${baseUrl}${job.data.x}-${job.data.y}`, function (res) {
    // parse the difference out of the response...
    return done(null, difference);
  });
});

Progress Reporting

Handlers can send progress reports, which will be received as events on the original job instance:

const job = addQueue.createJob({x: 2, y: 3}).save();
job.on('progress', (progress) => {
  console.log(
    `Job ${job.id} reported progress: page ${progress.page} / ${progress.totalPages}`
  );
});

addQueue.process(async (job) => {
  // do some work
  job.reportProgress({page: 3, totalPages: 11});
  // do more work
  job.reportProgress({page: 9, totalPages: 11});
  // do the rest
});

Just like

addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
3, these
addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
7 events work across multiple processes or servers; the job instance will receive the progress event no matter where processing happens. The data passed through can be any JSON-serializable value. Note that this mechanism depends on Pub/Sub, and thus will incur additional overhead for each additional worker node.

Job and Queue Events

There are three classes of events emitted by Bee-Queue objects: , , and . The linked API Reference sections provide a more complete overview of each.

Progress reporting, demonstrated above, happens via Job events. Jobs also emit

addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
8 events, which we've seen in the , and
addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
9 and
addQueue.process(async (job) => {
  console.log(`Processing job ${job.id}`);
  return job.data.x + job.data.y;
});
0 events.

Queue PubSub events correspond directly to Job events:

addQueue.process(async (job) => {
  console.log(`Processing job ${job.id}`);
  return job.data.x + job.data.y;
});
1,
addQueue.process(async (job) => {
  console.log(`Processing job ${job.id}`);
  return job.data.x + job.data.y;
});
2,
addQueue.process(async (job) => {
  console.log(`Processing job ${job.id}`);
  return job.data.x + job.data.y;
});
3, and
addQueue.process(async (job) => {
  console.log(`Processing job ${job.id}`);
  return job.data.x + job.data.y;
});
4. These events fire from all queue instances and for all jobs on the queue.

Queue local events include

addQueue.process(async (job) => {
  console.log(`Processing job ${job.id}`);
  return job.data.x + job.data.y;
});
5 and
addQueue.process(async (job) => {
  console.log(`Processing job ${job.id}`);
  return job.data.x + job.data.y;
});
6 on all queue instances, and
addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
8,
addQueue.process(async (job) => {
  console.log(`Processing job ${job.id}`);
  return job.data.x + job.data.y;
});
0, and
addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
9 on worker queues corresponding to the PubSub events being sent out.

Note that Job events become unreliable across process restarts, since the queue's reference to the associated job object will be lost. Queue-level events are thus potentially more reliable, but Job events are more convenient in places like HTTP requests where a process restart loses state anyway.

Stalling Jobs

Bee-Queue attempts to provide "at least once delivery". Any job enqueued should be processed at least once - and if a worker crashes, gets disconnected, or otherwise fails to confirm completion of the job, the job will be dispatched to another worker for processing.

To make this happen, workers periodically phone home to Redis about each job they're working on, just to say "I'm still working on this and I haven't stalled, so you don't need to retry it." The method finds any active jobs whose workers have gone silent (not phoned home for at least ms), assumes they have stalled, emits a

const baseUrl = 'http://www.google.com/search?q=';
subQueue.process(10, function (job, done) {
  http.get(`${baseUrl}${job.data.x}-${job.data.y}`, function (res) {
    // parse the difference out of the response...
    return done(null, difference);
  });
});
2 event with the job id, and re-enqueues them to be picked up by another worker.

Optimizing Redis Connections

By default, every time you create a queue instance with

const baseUrl = 'http://www.google.com/search?q=';
subQueue.process(10, function (job, done) {
  http.get(`${baseUrl}${job.data.x}-${job.data.y}`, function (res) {
    // parse the difference out of the response...
    return done(null, difference);
  });
});
3 a new redis connection will be created. If you have a small number of queues accross a large number of servers this will probably be fine. If you have a large number of queues with a small number of servers, this will probably be fine too. If your deployment gets a bit larger you will likely need to optimize the Redis connections.

Let's say for example you have a web application with 30 producer queues and you run 10 webservers & 10 worker servers, each one with 4 processes/server. With the default settings this is going to add up to a lot of Redis connections. Each Redis connection consumes a fairly large chunk of memory, and it adds up quickly!

The producer queues are the ones that run on the webserver and they push jobs into the queue. These queues do not need to receive events so they can all share one redis connection by passing in an instance of .

Example:

$ npm install bee-queue
0

Note that these "producer queues" above are only relevant for the processes that have to put jobs into the queue, not for the workers that need to actually process the jobs.

In your worker process where you define how to process the job with

const baseUrl = 'http://www.google.com/search?q=';
subQueue.process(10, function (job, done) {
  http.get(`${baseUrl}${job.data.x}-${job.data.y}`, function (res) {
    // parse the difference out of the response...
    return done(null, difference);
  });
});
5 you will have to run "worker queues" instead of "producer queues". In the example below, even though you are passing in the shared config with the same redis instance, because this is a worker queue Bee-Queue will
const baseUrl = 'http://www.google.com/search?q=';
subQueue.process(10, function (job, done) {
  http.get(`${baseUrl}${job.data.x}-${job.data.y}`, function (res) {
    // parse the difference out of the response...
    return done(null, difference);
  });
});
6 the client because it needs the blocking commands for PubSub subscriptions. This will result in a new connection for each queue.

$ npm install bee-queue
1

For a more detailed example and explanation see #96

API Reference

Queue

Settings

The default Queue settings are:

$ npm install bee-queue
2

The

const job = addQueue.createJob({x: 2, y: 3});
job
  .timeout(3000)
  .retries(2)
  .save()
  .then((job) => {
    // job enqueued, job.id populated
  });
5 fields are:

  • const baseUrl = 'http://www.google.com/search?q=';
    subQueue.process(10, function (job, done) {
      http.get(`${baseUrl}${job.data.x}-${job.data.y}`, function (res) {
        // parse the difference out of the response...
        return done(null, difference);
      });
    });
    8: string, default
    const baseUrl = 'http://www.google.com/search?q=';
    subQueue.process(10, function (job, done) {
      http.get(`${baseUrl}${job.data.x}-${job.data.y}`, function (res) {
        // parse the difference out of the response...
        return done(null, difference);
      });
    });
    9. Useful if the
    const job = addQueue.createJob({x: 2, y: 3}).save();
    job.on('progress', (progress) => {
      console.log(
        `Job ${job.id} reported progress: page ${progress.page} / ${progress.totalPages}`
      );
    });
    
    addQueue.process(async (job) => {
      // do some work
      job.reportProgress({page: 3, totalPages: 11});
      // do more work
      job.reportProgress({page: 9, totalPages: 11});
      // do the rest
    });
    0 namespace is, for whatever reason, unavailable or problematic on your redis instance.

  • const baseUrl = 'http://www.google.com/search?q=';
    subQueue.process(10, function (job, done) {
      http.get(`${baseUrl}${job.data.x}-${job.data.y}`, function (res) {
        // parse the difference out of the response...
        return done(null, difference);
      });
    });
    1: number, ms; the length of the window in which workers must report that they aren't stalling. Higher values will reduce Redis/network overhead, but if a worker stalls, it will take longer before its stalled job(s) will be retried. A higher value will also result in a lower probability of false-positives during stall detection.

  • const job = addQueue.createJob({x: 2, y: 3}).save();
    job.on('progress', (progress) => {
      console.log(
        `Job ${job.id} reported progress: page ${progress.page} / ${progress.totalPages}`
      );
    });
    
    addQueue.process(async (job) => {
      // do some work
      job.reportProgress({page: 3, totalPages: 11});
      // do more work
      job.reportProgress({page: 9, totalPages: 11});
      // do the rest
    });
    2: number, ms; the window during which delayed jobs will be specifically scheduled using
    const job = addQueue.createJob({x: 2, y: 3}).save();
    job.on('progress', (progress) => {
      console.log(
        `Job ${job.id} reported progress: page ${progress.page} / ${progress.totalPages}`
      );
    });
    
    addQueue.process(async (job) => {
      // do some work
      job.reportProgress({page: 3, totalPages: 11});
      // do more work
      job.reportProgress({page: 9, totalPages: 11});
      // do the rest
    });
    3 - if all delayed jobs are further out than this window, the Queue will double-check that it hasn't missed any jobs after the window elapses.

  • const job = addQueue.createJob({x: 2, y: 3}).save();
    job.on('progress', (progress) => {
      console.log(
        `Job ${job.id} reported progress: page ${progress.page} / ${progress.totalPages}`
      );
    });
    
    addQueue.process(async (job) => {
      // do some work
      job.reportProgress({page: 3, totalPages: 11});
      // do more work
      job.reportProgress({page: 9, totalPages: 11});
      // do the rest
    });
    4: number, ms; to avoid unnecessary churn for several jobs in short succession, the Queue may delay individual jobs by up to this amount.

  • const job = addQueue.createJob({x: 2, y: 3}).save();
    job.on('progress', (progress) => {
      console.log(
        `Job ${job.id} reported progress: page ${progress.page} / ${progress.totalPages}`
      );
    });
    
    addQueue.process(async (job) => {
      // do some work
      job.reportProgress({page: 3, totalPages: 11});
      // do more work
      job.reportProgress({page: 9, totalPages: 11});
      // do the rest
    });
    5: object or string, specifies how to connect to Redis. See for the full set of options.

    • const job = addQueue.createJob({x: 2, y: 3}).save();
      job.on('progress', (progress) => {
        console.log(
          `Job ${job.id} reported progress: page ${progress.page} / ${progress.totalPages}`
        );
      });
      
      addQueue.process(async (job) => {
        // do some work
        job.reportProgress({page: 3, totalPages: 11});
        // do more work
        job.reportProgress({page: 9, totalPages: 11});
        // do the rest
      });
      7: string, Redis host.
    • const job = addQueue.createJob({x: 2, y: 3}).save();
      job.on('progress', (progress) => {
        console.log(
          `Job ${job.id} reported progress: page ${progress.page} / ${progress.totalPages}`
        );
      });
      
      addQueue.process(async (job) => {
        // do some work
        job.reportProgress({page: 3, totalPages: 11});
        // do more work
        job.reportProgress({page: 9, totalPages: 11});
        // do the rest
      });
      8: number, Redis port.
    • const job = addQueue.createJob({x: 2, y: 3}).save();
      job.on('progress', (progress) => {
        console.log(
          `Job ${job.id} reported progress: page ${progress.page} / ${progress.totalPages}`
        );
      });
      
      addQueue.process(async (job) => {
        // do some work
        job.reportProgress({page: 3, totalPages: 11});
        // do more work
        job.reportProgress({page: 9, totalPages: 11});
        // do the rest
      });
      9: string, Redis socket to be used instead of a host and port.

    Note that this can also be a node_redis

    const baseUrl = 'http://www.google.com/search?q=';
    subQueue.process(10, function (job, done) {
      http.get(`${baseUrl}${job.data.x}-${job.data.y}`, function (res) {
        // parse the difference out of the response...
        return done(null, difference);
      });
    });
    4 instance, in which case Bee-Queue will issue normal commands over it. It will
    const baseUrl = 'http://www.google.com/search?q=';
    subQueue.process(10, function (job, done) {
      http.get(`${baseUrl}${job.data.x}-${job.data.y}`, function (res) {
        // parse the difference out of the response...
        return done(null, difference);
      });
    });
    6 the client for blocking commands and PubSub subscriptions, if enabled. This is advanced usage,

  • $ npm install bee-queue
    02: boolean. Disable if this queue will not process jobs.

  • $ npm install bee-queue
    03: boolean. Disable if this queue does not need to receive job events.

  • $ npm install bee-queue
    04: boolean. Disable if this worker does not need to send job events back to other queues.

  • $ npm install bee-queue
    05: boolean. Disable if this worker does not need to associate events with specific
    $ npm install bee-queue
    06 instances. This normally improves memory usage, as the storage of jobs is unnecessary for many use-cases.

  • $ npm install bee-queue
    07: boolean. Ensure that the Lua scripts exist in redis before running any commands against redis.

  • $ npm install bee-queue
    08: boolean. Activate delayed jobs once they've passed their
    $ npm install bee-queue
    09 timestamp. Note that this must be enabled on at least one
    addQueue.process(function (job, done) {
      console.log(`Processing job ${job.id}`);
      return done(null, job.data.x + job.data.y);
    });
    4 instance for the delayed retry strategies (
    $ npm install bee-queue
    11 and
    $ npm install bee-queue
    12) - this will reactivate them after their computed delay.

  • $ npm install bee-queue
    13: boolean. Enable to have this worker automatically remove its successfully completed jobs from Redis, so as to keep memory usage down.

  • $ npm install bee-queue
    14: boolean. Enable to have this worker automatically remove its failed jobs from Redis, so as to keep memory usage down. This will not remove jobs that are set to retry unless they fail all their retries.

  • $ npm install bee-queue
    15: boolean. Whether to
    $ npm install bee-queue
    16 the redis command client (the client it sends normal operations over) when
    $ npm install bee-queue
    17 is called. This defaults to
    $ npm install bee-queue
    18 for normal usage, and
    $ npm install bee-queue
    19 if an existing
    const baseUrl = 'http://www.google.com/search?q=';
    subQueue.process(10, function (job, done) {
      http.get(`${baseUrl}${job.data.x}-${job.data.y}`, function (res) {
        // parse the difference out of the response...
        return done(null, difference);
      });
    });
    4 object was provided to the
    const job = addQueue.createJob({x: 2, y: 3}).save();
    job.on('progress', (progress) => {
      console.log(
        `Job ${job.id} reported progress: page ${progress.page} / ${progress.totalPages}`
      );
    });
    
    addQueue.process(async (job) => {
      // do some work
      job.reportProgress({page: 3, totalPages: 11});
      // do more work
      job.reportProgress({page: 9, totalPages: 11});
      // do the rest
    });
    5 option.

  • $ npm install bee-queue
    22: number. For setting the value of the
    $ npm install bee-queue
    23 Redis command used in
    $ npm install bee-queue
    24 for succeeded and failed job types.

Properties

  • $ npm install bee-queue
    25: string, the name passed to the constructor.
  • $ npm install bee-queue
    26: string, the prefix used for all Redis keys associated with this queue.
  • $ npm install bee-queue
    27: a
    $ npm install bee-queue
    28 associating the currently tracked jobs (when
    $ npm install bee-queue
    05 and
    $ npm install bee-queue
    03 are enabled).
  • $ npm install bee-queue
    31: boolean, whether the queue instance is paused. Only true if the queue is in the process of closing.
  • const job = addQueue.createJob({x: 2, y: 3});
    job
      .timeout(3000)
      .retries(2)
      .save()
      .then((job) => {
        // job enqueued, job.id populated
      });
    5: object, the settings determined between those passed and the defaults

Queue Local Events

ready

Instead of listening to this event, consider calling

$ npm install bee-queue
33, which returns a Promise that resolves once the Queue is ready. If the Queue is already ready, then the Promise will be already resolved.

$ npm install bee-queue
3

The queue has connected to Redis and ensured that the Lua scripts are cached. You can often get away without checking for this event, but it's a good idea to wait for it in case the Redis host didn't have the scripts cached beforehand; if you try to enqueue jobs when the scripts are not yet cached, you may run into a Redis error.

error

$ npm install bee-queue
4

Any Redis errors are re-emitted from the Queue. Note that this event will not be emitted for failed jobs.

succeeded

$ npm install bee-queue
5

This queue has successfully processed

$ npm install bee-queue
34. If
$ npm install bee-queue
35 is defined, the handler called
$ npm install bee-queue
36.

retrying

$ npm install bee-queue
6

This queue has processed

$ npm install bee-queue
34, but it reported a failure and has been re-enqueued for another attempt.
$ npm install bee-queue
38 has been decremented, and the stack trace (or error message) has been added to its
$ npm install bee-queue
39 array.

failed

$ npm install bee-queue
7

This queue has processed

$ npm install bee-queue
34, but its handler reported a failure either by rejecting its returned Promise, or by calling
$ npm install bee-queue
41. Note that if you pass an async function to process, you must reject it by returning
$ npm install bee-queue
42 or throwing an exception (done does not apply).

stalled

$ npm install bee-queue
8

This queue detected that a job . Note that this might not be the same queue instance that processed the job and ultimately stalled; instead, it's the queue instance that happened to detect the stalled job.

Queue PubSub Events

These events are all reported by some worker queue (with

$ npm install bee-queue
04 enabled) and sent as Redis Pub/Sub messages back to any queues listening for them (with
$ npm install bee-queue
03 enabled). This means that listening for these events is effectively a monitor for all activity by all workers on the queue.

If the

$ npm install bee-queue
45 of an event is for a job that was created by that queue instance, a corresponding will be emitted from that job object.

Note that Queue PubSub events pass the

$ npm install bee-queue
45, but do not have a reference to the job object, since that job might have originally been created by some other queue in some other process. are emitted only in the process that created the job, and are emitted from the job object itself.

job succeeded

$ npm install bee-queue
9

Some worker has successfully processed job

$ npm install bee-queue
45. If
$ npm install bee-queue
35 is defined, the handler called
$ npm install bee-queue
36.

job retrying

const Queue = require('bee-queue');
const addQueue = new Queue('addition');
0

Some worker has processed job

$ npm install bee-queue
45, but it reported a failure and has been re-enqueued for another attempt.

job failed

const Queue = require('bee-queue');
const addQueue = new Queue('addition');
1

Some worker has processed

$ npm install bee-queue
34, but its handler reported a failure with
$ npm install bee-queue
41.

job progress

const Queue = require('bee-queue');
const addQueue = new Queue('addition');
2

Some worker is processing job

$ npm install bee-queue
45, and it sent a of
addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
7 percent.

Queue Delayed Job activation

The

addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
4 will activate no delayed jobs unless
$ npm install bee-queue
08 is set to
$ npm install bee-queue
18.

The promptness of the job activation is controlled with the

const job = addQueue.createJob({x: 2, y: 3}).save();
job.on('progress', (progress) => {
  console.log(
    `Job ${job.id} reported progress: page ${progress.page} / ${progress.totalPages}`
  );
});

addQueue.process(async (job) => {
  // do some work
  job.reportProgress({page: 3, totalPages: 11});
  // do more work
  job.reportProgress({page: 9, totalPages: 11});
  // do the rest
});
4 setting on the
addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
4. This setting defines a window across which to group delayed jobs. If three jobs are enqueued for 10s, 10.5s, and 12s in the future, then a
const job = addQueue.createJob({x: 2, y: 3}).save();
job.on('progress', (progress) => {
  console.log(
    `Job ${job.id} reported progress: page ${progress.page} / ${progress.totalPages}`
  );
});

addQueue.process(async (job) => {
  // do some work
  job.reportProgress({page: 3, totalPages: 11});
  // do more work
  job.reportProgress({page: 9, totalPages: 11});
  // do the rest
});
4 of
$ npm install bee-queue
61 will cause the first two jobs to activate when the timestamp of the second job passes.

The

const job = addQueue.createJob({x: 2, y: 3}).save();
job.on('progress', (progress) => {
  console.log(
    `Job ${job.id} reported progress: page ${progress.page} / ${progress.totalPages}`
  );
});

addQueue.process(async (job) => {
  // do some work
  job.reportProgress({page: 3, totalPages: 11});
  // do more work
  job.reportProgress({page: 9, totalPages: 11});
  // do the rest
});
2 setting on the
addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
4 determines the maximum duration the
addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
4 should wait before attempting to activate any of the elapsed delayed jobs in Redis. This setting is to control for network failures in the delivery of the
$ npm install bee-queue
65 event in conjunction with the death of the publishing
addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
4.

Methods

Queue(name, [settings])

Used to instantiate a new queue; opens connections to Redis.

Queue#createJob(data)

const Queue = require('bee-queue');
const addQueue = new Queue('addition');
3

Returns a new with the associated user data.

Queue#getJob(jobId, [cb])

const Queue = require('bee-queue');
const addQueue = new Queue('addition');
4

Looks up a job by its

$ npm install bee-queue
45. The returned job will emit events if
$ npm install bee-queue
03 and
$ npm install bee-queue
05 is true.

Be careful with this method; most potential uses would be better served by job events on already-existing job instances. Using this method indiscriminately can lead to increasing memory usage when the

$ npm install bee-queue
05 setting is
$ npm install bee-queue
18, as each queue maintains a table of all associated jobs in order to dispatch events.

Queue#getJobs(type, page, [cb])

const Queue = require('bee-queue');
const addQueue = new Queue('addition');
5

Looks up jobs by their queue type. When looking up jobs of type

$ npm install bee-queue
72,
$ npm install bee-queue
73, or
$ npm install bee-queue
74,
$ npm install bee-queue
75 should be configured with
$ npm install bee-queue
76 and
$ npm install bee-queue
77 attributes to specify a range of job indices to return. Jobs of type
addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
9 and
addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
8 will return an arbitrary subset of the queue of size
$ npm install bee-queue
80. Note: This is because failed and succeeded job types are represented by a Redis SET, which does not maintain a job ordering.

Note that large values of the attributes of

$ npm install bee-queue
75 may cause excess load on the Redis server.

Queue#process([concurrency], handler(job, done))

Begins processing jobs with the provided handler function.

The

$ npm install bee-queue
82 method should only be called once, and should never be called on a queue where
$ npm install bee-queue
02 is false.

The optional

$ npm install bee-queue
84 parameter sets the maximum number of simultaneously active jobs for this processor. It defaults to 1.

The handler function should either:

  • Return a
    addQueue
      .saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})])
      .then((errors) => {
        // The errors value is a Map associating Jobs with Errors. This will often be an empty Map.
      });
    7 that eventually resolves or rejects, or
  • Call
    addQueue.process(function (job, done) {
      console.log(`Processing job ${job.id}`);
      return done(null, job.data.x + job.data.y);
    });
    2 exactly once
    • Use
      $ npm install bee-queue
      41 to indicate job failure
    • Use
      $ npm install bee-queue
      88 or
      $ npm install bee-queue
      36 to indicate job success
      • $ npm install bee-queue
        35 must be JSON-serializable (for
        $ npm install bee-queue
        91)
  • Never ever block the event loop (for very long). If you do, the stall detection might think the job stalled, when it was really just blocking the event loop.

N.B. If the handler returns a

addQueue
  .saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})])
  .then((errors) => {
    // The errors value is a Map associating Jobs with Errors. This will often be an empty Map.
  });
7, calls to the
addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
2 callback will be ignored.

Queue#checkStalledJobs([interval], [cb])

Checks for jobs that appear to be stalling and thus need to be retried, then re-enqueues them.

const Queue = require('bee-queue');
const addQueue = new Queue('addition');
6

What happens after the check is determined by the parameters provided:

  • $ npm install bee-queue
    94 only:
    $ npm install bee-queue
    94 is called
  • $ npm install bee-queue
    96 only: a timeout is set to call the method again in
    $ npm install bee-queue
    96 ms
  • $ npm install bee-queue
    94 and
    $ npm install bee-queue
    96: a timeout is set, then
    $ npm install bee-queue
    94 is called

Bee-Queue automatically calls this method once when a worker begins processing, so it will check once if a worker process restarts. You should also make your own call with an interval parameter to make the check happen repeatedly over time; see for an explanation why.

The maximum delay from when a job stalls until it will be retried is roughly

const Queue = require('bee-queue');
const addQueue = new Queue('addition');
01, so to minimize that delay without calling
const baseUrl = 'http://www.google.com/search?q=';
subQueue.process(10, function (job, done) {
  http.get(`${baseUrl}${job.data.x}-${job.data.y}`, function (res) {
    // parse the difference out of the response...
    return done(null, difference);
  });
});
0 unnecessarily often, set
$ npm install bee-queue
96 to be the same or a bit shorter than
const baseUrl = 'http://www.google.com/search?q=';
subQueue.process(10, function (job, done) {
  http.get(`${baseUrl}${job.data.x}-${job.data.y}`, function (res) {
    // parse the difference out of the response...
    return done(null, difference);
  });
});
1. A good system-wide average frequency for the check is every 0.5-10 seconds, depending on how time-sensitive your jobs are in case of failure. Larger deployments, or deployments where processing has higher CPU variance, may need even higher intervals.

Note that for calls that specify an interval, you must provide a callback if you want results from each subsequent check - the returned

addQueue
  .saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})])
  .then((errors) => {
    // The errors value is a Map associating Jobs with Errors. This will often be an empty Map.
  });
7 can and will only resolve for the first check. If and only if you specify an
$ npm install bee-queue
96 and no
$ npm install bee-queue
94, then errors encountered after the first check will be emitted as
addQueue.process(async (job) => {
  console.log(`Processing job ${job.id}`);
  return job.data.x + job.data.y;
});
6 events.

Queue#checkHealth([cb])

Check the "health" of the queue. Returns a promise that resolves to the number of jobs in each state (

$ npm install bee-queue
72,
$ npm install bee-queue
73,
addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
8,
addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
9,
$ npm install bee-queue
74), and the newest job ID (if using the default ID behavior) in
const Queue = require('bee-queue');
const addQueue = new Queue('addition');
14. You can periodically query the
const Queue = require('bee-queue');
const addQueue = new Queue('addition');
14 ID to estimate the job creation throughput, and can infer the job processing throughput by incorporating the
$ npm install bee-queue
72 and
$ npm install bee-queue
73 counts.

const Queue = require('bee-queue');
const addQueue = new Queue('addition');
7

Queue#close([timeout], [cb])

Closes the queue's connections to Redis. Idempotent.

The recommended pattern for gracefully shutting down your worker is:

const Queue = require('bee-queue');
const addQueue = new Queue('addition');
8

Queue#isRunning()

Returns

$ npm install bee-queue
18 unless the Queue is shutting down due to a call to
const Queue = require('bee-queue');
const addQueue = new Queue('addition');
19.

Queue#ready([cb])

Promise resolves to the queue (or callback is called wth

const Queue = require('bee-queue');
const addQueue = new Queue('addition');
20 argument) when the queue (and Redis) are ready for jobs. Learn more about
const Queue = require('bee-queue');
const addQueue = new Queue('addition');
21 in .

const Queue = require('bee-queue');
const addQueue = new Queue('addition');
9

Queue#removeJob(jobId, [cb])

const subQueue = new Queue('subtraction', {
  redis: {
    host: 'somewhereElse',
  },
  isWorker: false,
});
0

Removes a job by its

$ npm install bee-queue
45. Idempotent.

This may have unintended side-effect, e.g. if the job is currently being processed by another worker, so only use this method when you know it's safe.

Returns the

addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
4 instance it was called on.

Queue#destroy([cb])

const subQueue = new Queue('subtraction', {
  redis: {
    host: 'somewhereElse',
  },
  isWorker: false,
});
1

Removes all Redis keys belonging to this queue (see ). Idempotent.

It goes without saying that this should be used with great care.

Returns the number of keys removed.

Job

Properties

  • const Queue = require('bee-queue');
    const addQueue = new Queue('addition');
    24: string, Job ID unique to each job. Not populated until
    const Queue = require('bee-queue');
    const addQueue = new Queue('addition');
    25 calls back. Can be overridden with
    const Queue = require('bee-queue');
    const addQueue = new Queue('addition');
    26.
  • const job = addQueue.createJob({x: 2, y: 3});
    job
      .timeout(3000)
      .retries(2)
      .save()
      .then((job) => {
        // job enqueued, job.id populated
      });
    7: object; user data associated with the job. It should:
    • Be JSON-serializable (for
      $ npm install bee-queue
      91)
    • Never be used to pass large pieces of data (100kB+)
    • Ideally be as small as possible (1kB or less)
  • const Queue = require('bee-queue');
    const addQueue = new Queue('addition');
    29: object used by Bee-Queue to store timeout, retries, stack traces, etc.
    • Do not modify directly; use job methods instead.
  • const Queue = require('bee-queue');
    const addQueue = new Queue('addition');
    30: the Queue responsible for this instance of the job. This is either:
    • the queue that called
      const Queue = require('bee-queue');
      const addQueue = new Queue('addition');
      31 to make the job,
    • the queue that ran
      const Queue = require('bee-queue');
      const addQueue = new Queue('addition');
      32 to fetch the job from redis, or
    • the queue that called
      $ npm install bee-queue
      82 to process it
  • addQueue.process(function (job, done) {
      console.log(`Processing job ${job.id}`);
      return done(null, job.data.x + job.data.y);
    });
    7: number; progress between 0 and 100, as reported by
    const Queue = require('bee-queue');
    const addQueue = new Queue('addition');
    35.

Job Events

These are all Pub/Sub events like and are disabled when

$ npm install bee-queue
03 is false.

succeeded

const subQueue = new Queue('subtraction', {
  redis: {
    host: 'somewhereElse',
  },
  isWorker: false,
});
2

The job has succeeded. If

$ npm install bee-queue
35 is defined, the handler called
$ npm install bee-queue
36.

retrying

const subQueue = new Queue('subtraction', {
  redis: {
    host: 'somewhereElse',
  },
  isWorker: false,
});
3

The job has failed, but it is being automatically re-enqueued for another attempt.

$ npm install bee-queue
38 has been decremented accordingly.

failed

const subQueue = new Queue('subtraction', {
  redis: {
    host: 'somewhereElse',
  },
  isWorker: false,
});
4

The job has failed, and is not being retried.

progress

const subQueue = new Queue('subtraction', {
  redis: {
    host: 'somewhereElse',
  },
  isWorker: false,
});
5

The job has sent a of

addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
7 percent.

Methods

Each Job can be configured with the chainable commands

addQueue
  .saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})])
  .then((errors) => {
    // The errors value is a Map associating Jobs with Errors. This will often be an empty Map.
  });
0,
addQueue
  .saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})])
  .then((errors) => {
    // The errors value is a Map associating Jobs with Errors. This will often be an empty Map.
  });
1,
addQueue
  .saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})])
  .then((errors) => {
    // The errors value is a Map associating Jobs with Errors. This will often be an empty Map.
  });
2,
addQueue
  .saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})])
  .then((errors) => {
    // The errors value is a Map associating Jobs with Errors. This will often be an empty Map.
  });
3, and
addQueue
  .saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})])
  .then((errors) => {
    // The errors value is a Map associating Jobs with Errors. This will often be an empty Map.
  });
4.

Job#setId(id)

const subQueue = new Queue('subtraction', {
  redis: {
    host: 'somewhereElse',
  },
  isWorker: false,
});
6

Explicitly sets the ID of the job. If a job with the given ID already exists, the Job will not be created, and

const Queue = require('bee-queue');
const addQueue = new Queue('addition');
46 will be set to
const Queue = require('bee-queue');
const addQueue = new Queue('addition');
20. This method can be used to run a job once for each of an external resource by passing that resource's ID. For instance, you might run the setup job for a user only once by setting the job ID to the ID of the user. Furthermore, when this feature is used with queue settings
const Queue = require('bee-queue');
const addQueue = new Queue('addition');
48 and
const Queue = require('bee-queue');
const addQueue = new Queue('addition');
49, it will allow that job to be re-run again, effectively ensuring that jobId will have a global concurrency of 1.

Avoid passing a numeric job ID, as it may conflict with an auto-generated ID.

Job#retries(n)

const subQueue = new Queue('subtraction', {
  redis: {
    host: 'somewhereElse',
  },
  isWorker: false,
});
7

Sets how many times the job should be automatically retried in case of failure.

Stored in

$ npm install bee-queue
38 and decremented each time the job is retried.

Defaults to 0.

Job#backoff(strategy, delayFactor)

const subQueue = new Queue('subtraction', {
  redis: {
    host: 'somewhereElse',
  },
  isWorker: false,
});
8

Sets the backoff policy when handling retries.

This setting is stored in

const Queue = require('bee-queue');
const addQueue = new Queue('addition');
51 as
const Queue = require('bee-queue');
const addQueue = new Queue('addition');
52.

Defaults to

const Queue = require('bee-queue');
const addQueue = new Queue('addition');
53.

Job#delayUntil(date|timestamp)

const subQueue = new Queue('subtraction', {
  redis: {
    host: 'somewhereElse',
  },
  isWorker: false,
});
9

Delay the job until the given Date/timestamp passes. See the

addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
4 settings section for information on controlling the activation of delayed jobs.

Defaults to enqueueing the job for immediate processing.

Job#timeout(ms)

const job = addQueue.createJob({x: 2, y: 3});
job
  .timeout(3000)
  .retries(2)
  .save()
  .then((job) => {
    // job enqueued, job.id populated
  });
0

Sets a job runtime timeout in milliseconds; if the job's handler function takes longer than the timeout to call

addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
2, the worker assumes the job has failed and reports it as such (causing the job to retry if applicable).

Defaults to no timeout.

Job#save([cb])

const job = addQueue.createJob({x: 2, y: 3});
job
  .timeout(3000)
  .retries(2)
  .save()
  .then((job) => {
    // job enqueued, job.id populated
  });
1

Saves a job, queueing it up for processing. After the callback fires (and associated Promise resolves),

const Queue = require('bee-queue');
const addQueue = new Queue('addition');
46 will be populated.

Job#reportProgress(n)

const job = addQueue.createJob({x: 2, y: 3});
job
  .timeout(3000)
  .retries(2)
  .save()
  .then((job) => {
    // job enqueued, job.id populated
  });
2

Reports job progress when called within a handler function. Causes a

addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
7 event to be emitted. Does not persist the progress to Redis, but will store it on
const Queue = require('bee-queue');
const addQueue = new Queue('addition');
58, and if other
addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
4s have
$ npm install bee-queue
05 and
$ npm install bee-queue
03 enabled, then the
addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});
7 will end up on all corresponding job instances.

Job#remove([cb])

const job = addQueue.createJob({x: 2, y: 3});
job
  .timeout(3000)
  .retries(2)
  .save()
  .then((job) => {
    // job enqueued, job.id populated
  });
3

Removes a job from the queue. Idempotent.

This may have unintended side-effect, e.g. if the job is currently being processed by another worker, so only use this method when you know it's safe.

Note that this method will call with the job id, so if you don't have the job in memory, but knows its id, it's much more efficient to use

const Queue = require('bee-queue');
const addQueue = new Queue('addition');
63 instead of getting the job first.

Returns the

$ npm install bee-queue
06 instance it was called on.

Defaults

Defaults for Queue

const job = addQueue.createJob({x: 2, y: 3});
job
  .timeout(3000)
  .retries(2)
  .save()
  .then((job) => {
    // job enqueued, job.id populated
  });
5 live in
const Queue = require('bee-queue');
const addQueue = new Queue('addition');
67. Changing that file will change Bee-Queue's default behavior.

Under the hood

Each Queue uses the following Redis keys:

  • const Queue = require('bee-queue');
    const addQueue = new Queue('addition');
    68: Integer, incremented to determine the next Job ID.
  • const Queue = require('bee-queue');
    const addQueue = new Queue('addition');
    69: Hash from Job ID to a JSON string containing its data and options.
  • const Queue = require('bee-queue');
    const addQueue = new Queue('addition');
    70: List of IDs of jobs waiting to be processed.
  • const Queue = require('bee-queue');
    const addQueue = new Queue('addition');
    71: List of IDs jobs currently being processed.
  • const Queue = require('bee-queue');
    const addQueue = new Queue('addition');
    72: Set of IDs of jobs which succeeded.
  • const Queue = require('bee-queue');
    const addQueue = new Queue('addition');
    73: Set of IDs of jobs which failed.
  • const Queue = require('bee-queue');
    const addQueue = new Queue('addition');
    74: Ordered Set of IDs corresponding to delayed jobs - this set maps delayed timestamp to IDs.
  • const Queue = require('bee-queue');
    const addQueue = new Queue('addition');
    75: Set of IDs of jobs which haven't 'checked in' during this interval.
  • const Queue = require('bee-queue');
    const addQueue = new Queue('addition');
    76: Set of IDs of jobs which haven't 'checked in' during this interval.
  • const Queue = require('bee-queue');
    const addQueue = new Queue('addition');
    77: Pub/Sub channel for workers to send out job results.
  • const Queue = require('bee-queue');
    const addQueue = new Queue('addition');
    78: When a new delayed job is added prior to all other jobs, the script creating the job will publish the job's timestamp over this Pub/Sub channel.

Bee-Queue is non-polling, so idle workers are listening to receive jobs as soon as they're enqueued to Redis. This is powered by brpoplpush, which is used to move jobs from the waiting list to the active list. Bee-Queue generally follows the "Reliable Queue" pattern described here.

The

$ npm install bee-queue
02 creates an extra Redis connection dedicated to
const Queue = require('bee-queue');
const addQueue = new Queue('addition');
80. If either
$ npm install bee-queue
03 or
$ npm install bee-queue
08 are enabled, another connection is dedicated to receiving Pub/Sub events. As such, these settings should be disabled if you don't need them.

The stalling set is a snapshot of the active list from the beginning of the latest stall interval. During each stalling interval, workers remove their job IDs from the stalling set, so at the end of an interval, any jobs whose IDs are left in the stalling set have missed their window (stalled) and need to be rerun. When

const baseUrl = 'http://www.google.com/search?q=';
subQueue.process(10, function (job, done) {
  http.get(`${baseUrl}${job.data.x}-${job.data.y}`, function (res) {
    // parse the difference out of the response...
    return done(null, difference);
  });
});
0 runs, it re-enqueues any jobs left in the stalling set (to the waiting list), then takes a snapshot of the active list and stores it in the stalling set.

Bee-Queue requires the user to start the repeated checks on their own because if we did it automatically, every queue instance in the system would be doing the check. Checking from all instances is less efficient and provides weaker guarantees than just checking from one or two. For example, a

const baseUrl = 'http://www.google.com/search?q=';
subQueue.process(10, function (job, done) {
  http.get(`${baseUrl}${job.data.x}-${job.data.y}`, function (res) {
    // parse the difference out of the response...
    return done(null, difference);
  });
});
0 interval of 5000ms running on 10 processes would average one check every 500ms, but would only guarantee a check every 5000ms. Two instances checking every 1000ms would also average one check every 500ms, but would be more well-distributed across time and would guarantee a check every 1000ms. Though the check is not expensive, and it doesn't hurt to do it extremely often, avoiding needless inefficiency is a main point of this library, so we leave it to the user to control exactly which processes are doing the check and how often.

Contributing

Pull requests are welcome; just make sure

const Queue = require('bee-queue');
const addQueue = new Queue('addition');
85 passes. For significant changes, open an issue for discussion first.

Some significant non-features include:

  • Worker tracking: Kue does this.
  • All-workers pause-resume: Bull does this.
  • Job priority: multiple queues get the job done in simple cases, but Kue has first-class support. Bull provides a wrapper around multiple queues.

Some of these could be worthwhile additions; please comment if you're interested in using or helping implement them!

Testing

You'll need a local Redis server to run the tests. Note that running the tests may delete some keys in the form of

const Queue = require('bee-queue');
const addQueue = new Queue('addition');
86.

Alternatively, if you have Docker available, you can run tests or do forensic work in an ephemeral container with its own Redis server, e.g.:

Node.js digunakan untuk apa saja?

Node.js biasanya digunakan dalam pemrograman seperti aplikasi yang menggunakan program interface (API) JavaScript Object Notation (JSON), aplikasi single-page, dan aplikasi yang melakukan streaming data.

Node.js menggunakan bahasa apa?

Dengan adanya Node.js ini, kita dapat menggunakan satu bahasa saja yaitu Javascript untuk membuat program, baik dari sisi client maupun server. Node.js memungkinkan kita untuk membuat sebuah program yang powerful dengan mengedepankan keringanan dan kecepatan sebuah program.