Streams
Touted as Node.js's best feature, streams are also one of the hardest to comprehend. Some of the mystique that surrounds streams is the lack of good documentation; there are plenty of online examples that show how to use pre-built stream objects (like and http request) but less on how to create custom stream objects that work within an application.
Touted as Node.js’s best feature, streams are also one of the hardest to comprehend. Some of the mystique that surrounds streams is the lack of good documentation; there are plenty of online examples that show how to use pre-built stream objects (like and http request) but less on how to create custom stream objects that work within an application. The feature also suffers from confusing terminology including the name: is “Streams” the name of the feature in Node or are “streams” simply featured in Node? Why are some people “writing” to streams (or a stream?) and others “piping”? Here I will try to de-mystify streams by showing the basics of how to create streams objects. Specifically we will setup up a pipeline that reads from a file, transforms the data, then writes to a destination.
4 Flavors
Streams come in the following flavors: Readable, Writeable, Duplex, and Transform. They represent the “pipeline” at some point in its lifecycle. The stream is created with a readable, goes through duplex or transforms, then ends in a writable. Its also common to skip the transform or duplex and just use a readable and writable stream.
Consumers vs Implementers
The flavors I mentioned above are known as “interfaces”. An interface is a kind of blueprint for a class, it dictates what methods need to be written for an object to be of a certain class. For example, if we want to make a readable stream object, we have to implement methods x, y, z. Since we dont have classes in Javascript, these interfaces are more like interface-lite. They function the same way though; leave out a method and you will get an “unimplemented” error;
Any object that implements the interface is an implementer. Objects that do not have these methods but read or write to objects that have these methods are called consumers. An object can be both an implementer and a consumer if it reads or writes to steams. Our transform streams (which we’ll soon be writing) would qualify.
Reading
Usually you would read a file with fs like this:
fs.readFile("data.json",function(err,contents){
/* check for errors and do something with the contents */
});
This setup works, but its not efficient. The entire contents of file.txt need to be read into memory before the the callback runs. If data.json is large, or there are thousands of these operations happening simultaneously then the process can grind to a halt. Using a readable stream can solve those problems. We’ll rewrite the code to use fs.createReadStream():
var readstream = fs.createReadStream("data.json", {encoding:"utf8"});
readstream.on("data",function(chunk){
/* do something with chunk */
});
Notice that readstream emits “data” events. This is because streams are actually event emitters! If we want to read from the stream directly then we would hook into these data events. But we’re creating a pipeline, for that we use pipe:
fs.createReadStream("data.json", {encoding:"utf8"}).pipe(destination);
Transforming
Our pipe already reads from the filesystem, for our internediatry step we will perform a transformation on the file data.json. For reference, data.json looks like this:
[
{ "name": "Seattle", "state": "WA"},
{ "name": "Bellevue", "state": "WA"},
{ "name": "San Francisco", "state": "CA"},
{ "name": "Sacramento", "state": "CA"},
{ "name": "Boston", "state": "MA"},
{ "name": "Atlanta", "state": "GA"},
{ "name": "Phoenix", "state": "AZ"}
]
Our transformation will be filtering this data down to a list of city names that are in Washington. To start with, we need to include the appropriate modules and create a new transform object:
var stream = require('stream');
var waFilter = new stream.Transform({decodeStrings: false});
About the decodeStrings:false
bit: Streams like to communicate using buffers, for our purposes we need strings (there is also a way to stream objects, but its not covered here). Using this settings tells node not to decode our strings back into buffers.
To make waFilter a full-fledged transform it needs to be able to take in input and return output. To do this we write a method called _write
. Its three parameters are chunk
with is data coming through the pipeline, enc
which is the encoding type, and next
which is a callback that tells any consumers they can write more. We need to check to see if our input chunk is a string, and then we need to convert it to an object from JSON. We would run into a problem if the length of our array was larger than the chunk size as our JSON would not parse properly. However, libraries like JSONStream to fix this.
waFilter._write = function(chunk,enc,next){
/* are we working with a string? */
if (enc != "string"){
chunk = chunk.toString();
}
/* do we need to make an object? */
if (typeof chunk != "object"){
chunk = JSON.parse(chunk)
}
this.push(JSON.stringify(chunk.filter(function(v){
return v.state == "WA"
}).map(function(v){
return v.name
})));
this.push("\n")
this.push(null);
next();
}
After checking the encoding and parsing JSON, we use the transform method of push
to output the text of our filter. We also output a newline and then null
to indicate to the consumer that we are done with output.
Writing
Making a writable object works the same way as making a transform object with the exception that there is no push
method to call. Since it’s the same we wont write our own writable but instead use process.stdout
which is a writable stream.
Piping
The fun part. We can glue our application’s parts together in one pipeline by using the pipe
method like so:
fs.createReadStream(__dirname + "/data.json", {encoding:"utf8"})
.pipe(waFilter)
.pipe(process.stdout)
The output of which is
["Seattle","Bellevue"]
Extending objects with streams
Another way to implement the stream interface is to write a function that inherits from the basic stream class and has the correct prototypes. For example we could rewrite waFilter
into a more customizable object that filters to any state like this:
var stream = require('stream');
var util = require('util');
var fs = require('fs');
util.inherits(stateFilter, stream.Transform);
function stateFilter(state){
stream.Transform.call(this, {decodeStrings: false});
this.state = state;
}
stateFilter.prototype._write = function(chunk, enc, next){
var _this = this;
if (enc != "string"){
chunk = chunk.toString();
}
if (typeof chunk != "object"){
chunk = JSON.parse(chunk)
}
this.push(JSON.stringify(chunk.filter(function(v){
return v.state == _this.state
}).map(function(v){
return v.name
})));
this.push("\n")
this.push(null);
next();
}
fs.createReadStream(__dirname + "/data.json", {encoding:"utf8"})
.pipe(new stateFilter("CA"))
.pipe(process.stdout);
/* ["San Francisco","Sacremento"] */
This may feel a bit weird but it makes sense: utils.inherits
copies the prototypes of the Transform object into stateFilter
, then we call Transform
using a call
which sets the context to statefilter
therefore making the _write
method available.
Why use streams
As difficult as they are to understand, a thorough understanding of streams in necessary to really understand the value of Node. They handle memory and processing more efficiently and make the “bagillion concurrent connections” claims possible by not tying up system resources and not unnecessarily loading buffers into memory.