What is a readable stream
A readable stream is used for production data A stream for program consumption. Our common data production methods include reading disk files, reading network request content, etc. Let’s take a look at the example of stream usage introduced earlier:
const rs = fs.createReadStream(filePath);
rs is a readable stream, the way to produce data is to read disk files, our common console process.stdin is also a readable stream:
process.stdin.pipe(process.stdout);
You can print out the console input through a simple sentence, process.stdin The way to generate data is to read user input at the console.
Look back at our definition of readable streams: readable streams are streams that produce data for program consumption.
Custom readable stream
In addition to the fs.CreateReadStream
We also often use the src method provided by gulp or vinyl-fs
gulp.src(['*.js', ' 39;dist/**/*.scss'])
If we want to produce data in a specific way and hand it over to the program for consumption, how do we start?
Just two simple steps
- Inherit the Readable class of the sream module
- Rewrite _read method, call this.push to put the produced data into the queue to be read
The Readable class has completed most of the work to be done by the readable stream , we only need to inherit it, and then write the way of producing data in the _read method to implement a custom readable stream.
If we want to implement a stream that produces a random number every 100 milliseconds (useless)
const Readable = require(&## 39;stream').Readable; class RandomNumberStream extends Readable { constructor(max) { super() } _read() { const ctx = this; setTimeout(() => { const randomNumber = parseInt(Math. random() * 10000); // Can only push string or Buffer, hit a carriage return for the convenience of display ctx.push(`${randomNumber}\n`); }, 100); } } module.exports = RandomNumberStream;
The class inheritance part code is very simple, mainly look at the implementation of the _read method, there are several points worth noting
- In the Readable class, the default There is an implementation of the _read method, but nothing is done. What we do is overwrite and rewrite
- The _read method has a parameter size, which is used to specify how much data should be read and returned to the read method, but it is just a For reference data, many implementations ignore this parameter, and we also ignore it here. It will be mentioned in detail later
- Push data to the buffer through this.push. The buffer concept will be mentioned later, and it is temporarily understood as being squeezed Consumable in the water pipe
- The content of push can only be a string or Buffer, not a number
- The push method has a second parameter encoding, which is used for the first parameter to be a character Specify encoding when stringing
Execute to see the effect
const RandomNumberStream = require('./RandomNumberStream& #39;); const rns = new RandomNumberStream(); rns.pipe(process.stdout);
In this way, you can see that the numbers are continuously displayed on the console. We have implemented a readable stream that generates random numbers, and there are still a few small problems to be solved
How to stop
We push a number to the buffer every 100 milliseconds, so it is like reading a local When the file is always read, how to stop and mark that the data has been read?
Push a null to the buffer. Let’s modify the code to allow consumers to define how many random numbers are needed:
const Readable = require('stream').Readable; class RandomNumberStream extends Readable { constructor(max) { super() this.max = max; } _read() { const ctx = this; setTimeout(() => { if (ctx. max) { const randomNumber = parseInt(Math. random() * 10000); // Can only push string or Buffer, hit a carriage return for the convenience of display ctx.push(`${randomNumber}\n`); ctx.max -= 1; } else { ctx. push(null); } }, 100); } } module.exports = RandomNumberStream;
We use a max flag to allow consumers to specify the number of characters they need, which can be specified at the time of instantiation
const RandomNumberStream = require('./RandomNumberStream'); const rns = new RandomNumberStream(5); rns.pipe(process.stdout);
You can see that the console only prints 5 characters
Why is it setTimeout instead of setInterval?
Careful students may have noticed that we produce one every 100 milliseconds.��The random number is not called setInterval, but setTimeout is used. Why is it just delayed and not repeated, but the result is correct?
This requires an understanding of the two ways streams work
- Flow mode: data is read by the underlying system and provided to the application as quickly as possible
- Pause mode: The read() method must be explicitly called to read several data blocks
The stream is in the pause mode by default, that is, the program needs to be called explicitly read() method, but we can get the data without calling it in our example, because our stream is switched to flow mode through the pipe() method, so our _read() method will be automatically called repeatedly until the data is read After fetching, we only need to read the data once in each _read() method.
Flow Mode and Pause Mode Switch
Flow switching from the default pause mode to flow mode can use the following Several ways:
- Start data monitoring by adding a data event listener
- Call the resume() method to start the data stream
- Call pipe() method to transfer data to another writable stream
Two ways to switch from flow mode to pause mode:
- There is no pipe() on the stream
- When pipe(), you need to remove all data event listeners, and then call the unpipe() method
data event
After using the pipe() method, the data enters the writable stream from the readable stream, but it seems like a black box to us, the data How exactly did it flow? We see that there are two important nouns when switching between flow mode and pause mode
- data event corresponding to flow mode
- read() method corresponding to pause mode
These two mechanisms are the reasons why we can drive data flow. Let’s take a look at the flow mode data event first. Once we listen to the data and events of the readable stream, the stream enters the flow mode , we can rewrite the code for calling the stream above
const RandomNumberStream = require('./RandomNumberStream'); const rns = new RandomNumberStream(5); rns.on('data', chunk => { console.log(chunk); });
In this way, we can see that the console prints a result similar to the following
When the readable stream produces data that can be consumed, the data event will be triggered. After the data event listener is bound, the data will be delivered as much as possible. The listener of the data event can receive the Buffer data passed by the readable stream in the first parameter, which is the chunk we print. If you want to display it as a number, you can call the toString() method of Buffer.
When the data processing is completed, an end event will be triggered, because the processing of the stream is not a synchronous call, so if we want to do something after the completion, we need to listen to this event. We add a sentence at the end of the code:
rns.on('end', () => { console.log('done'); });
In this way, ‘done’ will be displayed after the data is received.
Of course, if an error occurs during data processing, it will trigger error We can also listen to events and do exception handling:
rns.on('error', (err) => { console. log(err); });
read(size)
In the pause mode, the program needs to explicitly call the read() method to get the data . The read() method will fetch and return some data from the internal buffer, or null when no more data is available.
When using the read() method to read data, if the size parameter is passed in, it will return the data of the specified byte; when the specified size byte is not available, it will return null. If the size parameter is not specified, all data in the internal buffer will be returned.
Now there is a contradiction. It is very convenient to stream the data in the flow mode, and then trigger the data event to notify the program. In the pause mode, the program needs to read, so there is a possibility that it has not been produced when reading. If we only use the polling method, the efficiency will be somewhat low.
NodeJS provides us with a readable event, which is triggered when the readable stream is ready for data, that is, listen to this event first, receive the notification and send us the data Just read it again:
const rns = new RandomNumberStream(5); rns.on('readable', () => { let chunk; while((chunk = rns. read()) !== null){ console.log(chunk); } });
In this way, we can also read the data. It is worth noting that not every time the read() method is called, the data can be returned. As mentioned earlier, if the available data does not reach the size, then return null, so we added a judgment in the program.
Will the data be missing
When I start using the flow mode, I often worry about a problem. In the above code, the readable stream produces data when it is created, so will some data be produced before we bind the readable event, trigger the readable event, and we have not bound it, so it is not an extreme case Will it cause the loss of data at the beginning?
But the fact is not, according to the NodeJS event loop, we create streams and call event listeners in an event queue, and the production data is already at a lower level because it involves asynchronous operations. An event queue, no matter how slow we listen to events, it will be slower than data production blocks, and data will not be lost.
Seeing this, everyone actually has some doubts about the triggering timing of data events and readable events, how much data the read() method reads each time, and when to return null, because so far we have come into contact with It is still a black box. After we introduce the writable stream later, we will explain these internal details in detail in the back pressure mechanism part combined with the source code, and listen to the next chapter for decomposition.
For more programming-related knowledge, please visit: Introduction to Programming! !
The above is to understand the details of the readable stream in NodeJS. For more information, please pay attention to other related articles on 1024programmer.com!
When I first use the flow mode, I often worry about a problem. The readable stream in the above code produces data when it is created. So will some data be produced before we bind the readable event, triggering We haven’t bound the readable event yet, so it will cause the loss of the beginning data in extreme cases.
But the fact is not, according to the NodeJS event loop, we create streams and call event listeners in an event queue , because the production data involves asynchronous operations, it is already in the next event queue. No matter how slow we listen to events, it will be slower than the data production block, and the data will not be lost.
Seeing this, everyone actually has some doubts about the triggering timing of data events and readable events, how much data the read() method reads each time, and when to return null, because so far we have come into contact with It is still a black box. After we introduce the writable stream later, we will explain these internal details in detail in the back pressure mechanism part combined with the source code, and listen to the next chapter for decomposition.
For more programming-related knowledge, please visit: Introduction to Programming! !
The above is to understand the details of the readable stream in NodeJS. For more information, please pay attention to other related articles on 1024programmer.com!