How do streams work in Node.js?
What happens when I want to read a very big file in Node.js? Should I read the full file at once, loading it all to memory? And if I do it, will it cause performance issues on my program?Let me show you how we can use streams to improve performance on your project.
What happens when I want to read a very big file in Node.js? Should I read the full file at once, loading it all to memory? And if I do it, will it cause performance issues on my program?
Normally, if you have little or no experience with these operations, you wouldn't ask yourself these kind of questions. What's more likely is that these questions will arise when you clearly get a performance problem in your software.
Let's try it out. We'll do a very simple example of reading a file in the most well known way to do it. To achieve that, we will start an HTTP server. When we send a request to http://localhost:3000/
, the server will send a response with the data of the file
const http = require('http')
const fs = require('fs')
const server = http.createServer(function(req,res) {
fs.read('data3.txt',(err,data) => {
res.end(data);
})
})
server.listen(3000, () =>{
console.log('server is running');
})
The previous operation reads all the file, puts it in memory and, after that, sends the file in the response.
Ok. So, what could be the problem of this solution?
In small files, none, actually. But what if the file’s size is 1 GB, or even higher?
Do we still want to bring all the file into memory? Well, I don't think so. Sometimes, when the size of the file is too big or the operations we use are not performant enough, that’s not something doable.
As a university teacher warned me once, every time you do this kind of things, you kill a puppy. And no one wants to kill a puppy, right?
Node to the rescue!
In Node.js, we have a mechanism that allows us to read file without bringing it all into memory at once. It’s called streams.
They are a way to handle reading/writing files, network communications, or any kind of end-to-end information exchange in an efficient way.
There are four types of streams:
Readable
: a stream you can receive data from, but not send data to. You can send it the messagepush
with a string, and it will save the data in a buffer until a consumer requests it.
You can pause the flow of the stream by sending the messagepause()
, and resume it withresume()
. Or use thepaused()
message to check the status of the flow.Writable
: a stream you can send data to, but not receive data from.Duplex
: a stream you can both send and receive data from, basically a combination of aReadable
andWritable
stream.Transform
: aTransform
stream is similar to aDuplex
, but the output is a transformation of its input.
Let’s go back to the last example and try again with streams:
const http = require('http')
const fs = require('fs')
const server = http.createServer(function(req,res) {
const stream = fs.createReadStream('data3.txt');
stream.pipe(res);
})
server.listen(3000, () =>{
console.log('server is running');
})
The memory consumption is:
Now that we know it definitely works better performance wise, let's try to answer a few questions we've come across during the experience:
- What are the benefits of using streams? What changes by using them?
- What is the meaning of the
pipe
message?
Well, for the first question there are several benefits in using streams.
For starters, you don’t need to load large amounts of data in memory before you are able to process it, which gives us memory efficiency. Also, you can start processing data as soon as you have it, instead of waiting until the whole data payload is available, therefore gaining time efficiency.
As for the second question, you send this message to streams you can read data from. And you pass as a parameter another stream that must be writable. The pipe message reads the data from the source stream (Readable
), and send it to the destination stream (Writable
). So, you can only use it in Readable
and Writable
streams, right?
Not quite. If you remember, there are other types of stream, Duplex
and Transform
, which are both readable and writable. Because of that, you can use all types of streams with the message pipe. For example, you could do something like:
readableStream.pipe(transformStream)
.pipe(writableStream)
Excellent. After these examples, we finally know how to read a big file using Node without killing puppies.
Is that it?
Could we use streams to do something else? Or that's all they've been made for?
Of course not! As we've seen before, when define what a stream is, there are a lot of things we can be done. For example, we can manage HTTP/HTTPS requests
When we request an API for information, and it responds with a huge amount of it, we shouldn’t bring it to memory and process it altogether.Let's see this example where we request information of pokemons to an API, and we later want to do something with it. More precisely, we want to get all the names of the pokemons, and write them into a file!
Do we want to get all the pokemon’s information into memory, and after that, proceed to write it? Or do we want to get some pokemons, process them, and once we did the first bunch, proceed with the next (we could say that we're processing chunks of pokemon information)?
As you might have predicted, we'll go for the second option:
const JSONStream = require('JSONStream');
const http = require('http');
const fs = require('fs');
const request= require('request');
const transformStream = new require('stream').Transform({
transform: function transformer(chunk,encoding,callback){
callback(false,chunk+'/n');
}
})
const server = http.createServer(function(req,res) {
const writableStream = fs.createWriteStream('data2.txt');
request({url: 'https://pokeapi.co/api/v2/pokemon?offset=0&limit=10000'})
.pipe(JSONStream.parse('results.*.name'))
.pipe(transformStream)
.pipe(writableStream);
res.end('ok')
})
server.listen(3000, () =>{
console.log('server is running');
})
Let’s take a moment to dissect what we've done here.
JSONStream
?
Keep calm. JSONStream
is a dependency where you receive a chunk of data, and it parses it to JSON. Then it writes to the stream to continue with the stream flow.
By passing "results.*.name"
we are saying that we want the results from the response, and from each result, we want the name.
After that, we have a Transform
stream. We use it to concat
a '\n'
after each name of the pokemons, so we get one name by line in the file. Finally, the last stream writes the file.
Why can we do request.pipe(stream)
?
Because the request and the response are streams! A lot of I/O operations in Node are managed with streams for performance reasons.
A last observation about this example. You can create a pipeline, which receives a list of streams in order. For example, you can create a pipeline like this:
const JSONStream = require('JSONStream');
const http = require('http');
const fs = require('fs');
const stream = require('stream');
const request= require('request');
const transformStream = new stream.Transform({
transform: function transformer(chunk,encoding,callback){
callback(false,chunk+'/n');
}
})
const server = http.createServer(function(req,res) {
const writableStream = fs.createWriteStream('data2.txt');
stream.pipeline(
request({url: 'https://pokeapi.co/api/v2/pokemon?offset=0&limit=10000'}),
JSONStream.parse('results.*.name'),
transformStream,
writableStream,
(err) =>{
if(err){
console.log('Pipeline failed',err)
}
else {
console.log('Pipeline succeeded.')
}
} )
res.end('ok')
})
server.listen(3002, () =>{
console.log('server is running');
})
Let’s do a last example about streams and requests. We have a server in http://localhost:3000
. When you request information, the server responds with a list of numbers between 1 and 10000000. We need to read them and write them in a file, for example, because we want to log them.
With a “classic” version, we have a code like this:
const http = require('http');
const request= require('request');
const fs = require('fs');
const server = http.createServer((req,res) => {
request({url: 'http://localhost:3000'}, (error, response, body) => {
.fs.writeFile('data3.txt', body, () => res.end('ok'));
})
})
server.listen(3001, () =>{
console.log('server is running');
})
And we have a memory consumption of:
Disclaimer: The process with ID 24552 is the server process.
Now, let’s do this with streams.
const server = http.createServer((req,res) => {
const stream = fs.createWriteStream('data3.txt')
request({url: 'http://localhost:3000'})
.pipe(stream)
res.end('ok'))
})
You have this memory consumption:
You can see the differences between using the classic mechanism and using streams.
Alright, what happens when we don’t want to use the built-in streams? What if we want to implement our own writable stream? For example, we want to write all the data in the console. Can we do it with a writable filesystem stream? The answer is no. But we can implement our own writable streams.
const writableStream = new stream.WritableStream({
write: function(chunk,encoding,next){
console.log(chunk.toString())
next();
}
})
This is the simplest way to construct it. We pass to the constructor an object with a function in the attribute write
. This function receives three parameters. A chunk of data, the encoding that we are using and the callback function, that you must call after you “write” the chunk.
And our code will be something like:
const JSONStream = require('JSONStream');
const http = require('http');
const fs = require('fs');
const request= require('request');
const stream = require('stream')
const writableStream = new stream.WritableStream({
write: function(chunk,encoding,next){
console.log(chunk.toString())
next();
}
})
const server = http.createServer(function(req,res) {
request({url: 'https://pokeapi.co/api/v2/pokemon?offset=0&limit=10000'})
.pipe (JSONStream.parse('results.*.name'))
.pipe(writableStream);
res.end('ok')
})
server.listen(3000, () =>{
console.log('server is running');
})
We can also implement our custom Transform
streams, readable streams and duplex streams.
The native streams only receive a String
or a Buffer
. What if we have to pass a lot of JSON? Well, you have two approaches:
- You stringify them before passing the chunk and parse them when you receive.
- You use object mode.So, what is this “object mode”?
When we want to pass a Javascript object as a chunk, in all the examples before, the streams throw an exception, saying that the chunk must be an string or a Buffer
. But, we want to pass a JSON! What can we do? Well, the streams must be in “object mode”. How do I achieve that? For example, going back to the pokemon pipeline example, you can do something like:
const JSONStream = require('JSONStream');
const http = require('http');
const fs = require('fs');
const request= require('request');
const stream = require('stream')
const transformStream = new stream.Transform({
transform: function transformer(pokemon,encoding,callback){
callback(false,pokemon.name+'/n');
}
})
const server = http.createServer(function(req,res) {
const writableStream = fs.createWriteStream("data2.txt");
stream.pipeline( request({url: 'https://pokeapi.co/api/v2/pokemon?offset=0&limit=10000'}),
JSONStream.parse('results.*.name'),
transformStream,
writableStream,
(err) => {
if(err){
console.error('Pipeline failed', err);
res.end(err);
}
else{
console.log('Pipeline succeeded');
res.end('ok');
}
})
})
server.listen(3000, () =>{
console.log('server is running');
})
In this case, we receive all the pokemon JSON in the transformStream
. So, we can manage all the data in the JSON. Notice that the transformStream
is in object mode when we initialize it. Because of that, we don’t need to pass a Buffer
or a String
in the “chunk”, we can pass the JSON.
A last question. What happens if we read faster than we write in disk? The chunks will accumulate to process. And what happens if I accumulate a lot of chunks? Well, let’s introduce the concept of backpressure. If we don’t have a backpressure mechanism, the other processes will be slower, and we will have memory problems. Well, how does the backpressure resolve these problems? Well, we use .pipe()
between a source stream (readable) and a destination stream (writable). When the readable reads a chunk, it uses .write()
. If it returns false
, saying that it can’t write it right now, the backpressure mechanism will stop the readable stream from continuing reading chunks until the writable stream process processes the buffer of chunks. Then, it will use .drain()
to continue the flow of the stream, allowing the readable to continue reading chunks.
When you implement your own writable/readable streams, you will have to consider these problems!
In conclusion, we use streams because they are a lot more performant than using the classic mechanism. You will reduce a lot the memory usage of your application. Also, you don’t have to read all the data to process it, you can start processing data as soon as you get it.