Eventually Consistent

Dispatches from the grey area between correct and working

Streams

Touted as Node.js's best feature, streams are also one of the hardest to comprehend. Some of the mystique that surrounds streams is the lack of good documentation; there are plenty of online examples that show how to use pre-built stream objects (like and http request) but less on how to create custom stream objects that work within an application.

Touted as Node.js’s best feature, streams are also one of the hardest to comprehend. Some of the mystique that surrounds streams is the lack of good documentation; there are plenty of online examples that show how to use pre-built stream objects (like and http request) but less on how to create custom stream objects that work within an application. The feature also suffers from confusing terminology including the name: is “Streams” the name of the feature in Node or are “streams” simply featured in Node? Why are some people “writing” to streams (or a stream?) and others “piping”? Here I will try to de-mystify streams by showing the basics of how to create streams objects. Specifically we will setup up a pipeline that reads from a file, transforms the data, then writes to a destination.

4 Flavors

Streams come in the following flavors: Readable, Writeable, Duplex, and Transform. They represent the “pipeline” at some point in its lifecycle. The stream is created with a readable, goes through duplex or transforms, then ends in a writable. Its also common to skip the transform or duplex and just use a readable and writable stream. 

Consumers vs Implementers

The flavors I mentioned above are known as “interfaces”. An interface is a kind of blueprint for a class, it dictates what methods need to be written for an object to be of a certain class. For example, if we want to make a readable stream object, we have to implement methods x, y, z. Since we dont have classes in Javascript, these interfaces are more like interface-lite. They function the same way though; leave out a method and you will get an “unimplemented” error;

Any object that implements the interface is an implementer. Objects that do not have these methods but read or write to objects that have these methods are called consumers. An object can be both an implementer and a consumer if it reads or writes to steams. Our transform streams (which we’ll soon be writing) would qualify.

Reading

Usually you would read a file with fs like this:

fs.readFile("data.json",function(err,contents){
    /* check for errors and do something with the contents */
});

This setup works, but its not efficient. The entire contents of file.txt need to be read into memory before the the callback runs. If data.json is large, or there are thousands of these operations happening simultaneously then the process can grind to a halt. Using a readable stream can solve those problems. We’ll rewrite the code to use fs.createReadStream():

var readstream = fs.createReadStream("data.json", {encoding:"utf8"});
readstream.on("data",function(chunk){
    /* do something with chunk */
}); 

Notice that readstream emits “data” events. This is because streams are actually event emitters! If we want to read from the stream directly then we would hook into these data events. But we’re creating a pipeline, for that we use pipe:

fs.createReadStream("data.json", {encoding:"utf8"}).pipe(destination); 

Transforming

Our pipe already reads from the filesystem, for our internediatry step we will perform a transformation on the file data.json. For reference, data.json looks like this:

[
    { "name": "Seattle", "state": "WA"},
    { "name": "Bellevue", "state": "WA"},
    { "name": "San Francisco", "state": "CA"},
    { "name": "Sacramento", "state": "CA"},
    { "name": "Boston", "state": "MA"},
    { "name": "Atlanta", "state": "GA"},
    { "name": "Phoenix", "state": "AZ"}
]

Our transformation will be filtering this data down to a list of city names that are in Washington. To start with, we need to include the appropriate modules and create a new transform object:

var stream = require('stream');
var waFilter = new stream.Transform({decodeStrings: false});

About the decodeStrings:false bit: Streams like to communicate using buffers, for our purposes we need strings (there is also a way to stream objects, but its not covered here). Using this settings tells node not to decode our strings back into buffers.

To make waFilter a full-fledged transform it needs to be able to take in input and return output. To do this we write a method called _write. Its three parameters are chunk with is data coming through the pipeline, enc which is the encoding type, and next which is a callback that tells any consumers they can write more. We need to check to see if our input chunk is a string, and then we need to convert it to an object from JSON. We would run into a problem if the length of our array was larger than the chunk size as our JSON would not parse properly. However, libraries like JSONStream to fix this.

waFilter._write = function(chunk,enc,next){
    /* are we working with a string? */
    if (enc != "string"){
        chunk = chunk.toString();
    }
    /* do we need to make an object? */
    if (typeof chunk != "object"){
        chunk = JSON.parse(chunk)
    }
    this.push(JSON.stringify(chunk.filter(function(v){
        return v.state == "WA"
    }).map(function(v){
        return v.name
    })));
    this.push("\n")
    this.push(null);
    next();
}

After checking the encoding and parsing JSON, we use the transform method of push to output the text of our filter. We also output a newline and then null to indicate to the consumer that we are done with output.

Writing

Making a writable object works the same way as making a transform object with the exception that there is no push method to call. Since it’s the same we wont write our own writable but instead use process.stdout which is a writable stream.

Piping

The fun part. We can glue our application’s parts together in one pipeline by using the pipe method like so:

fs.createReadStream(__dirname + "/data.json", {encoding:"utf8"})
    .pipe(waFilter)
    .pipe(process.stdout)

The output of which is

["Seattle","Bellevue"]

Extending objects with streams

Another way to implement the stream interface is to write a function that inherits from the basic stream class and has the correct prototypes. For example we could rewrite waFilter into a more customizable object that filters to any state like this:

var stream = require('stream');
var util = require('util');
var fs = require('fs');

util.inherits(stateFilter, stream.Transform);

function stateFilter(state){
    stream.Transform.call(this, {decodeStrings: false});
    this.state = state;
}

stateFilter.prototype._write = function(chunk, enc, next){
    var _this = this;
    if (enc != "string"){
        chunk = chunk.toString();
    }
    if (typeof chunk != "object"){
        chunk = JSON.parse(chunk)
    }
    this.push(JSON.stringify(chunk.filter(function(v){
        return v.state == _this.state
    }).map(function(v){
        return v.name
    })));
    this.push("\n")
    this.push(null);
    next();
}

fs.createReadStream(__dirname + "/data.json", {encoding:"utf8"})
    .pipe(new stateFilter("CA"))
    .pipe(process.stdout);

/* ["San Francisco","Sacremento"] */

This may feel a bit weird but it makes sense: utils.inherits copies the prototypes of the Transform object into stateFilter, then we call Transform using a call which sets the context to statefilter therefore making the _write method available.

Why use streams

As difficult as they are to understand, a thorough understanding of streams in necessary to really understand the value of Node. They handle memory and processing more efficiently and make the “bagillion concurrent connections” claims possible by not tying up system resources and not unnecessarily loading buffers into memory.

Thanks for reading! If you found this helpful, consider sharing it.