13161216443

您所在位置: 首頁> 學習課程> web前端培訓 | 一文學會 Node.js 中的流

web前端培訓 | 一文學會 Node.js 中的流

發布百知教育 來源:學習課程 2019-12-03

Node.js 中的流(Stream)是出了名的難用甚至是難以理解。

用 Dominic Tarr 的話來說:“流是 Node 中最好的,也是最容易被誤解的想法?!奔词故?Redux 的創建者和 React.js 的核心團隊成員 Dan Abramov 也害怕 Node 流。


web前端培訓


本文將幫助你了解流以及如何使用。不要害怕,你完全可以把它搞清楚!



什么是流?

流是為 Node.js 應用提供動力的基本概念之一。它們是數據處理方法,用于將輸入的數據順序讀取或把數據寫入輸出。

流是一種以有效方式處理讀寫文件、網絡通信或任何類型的端到端信息交換的方式。

流的處理方式非常獨特,流不是像傳統方式那樣將文件一次全部讀取到存儲器中,而是逐段讀取數據塊并處理數據的內容,不將其全部保留在內存中。

這種方式使流在處理大量數據時非常強大,例如,文件的大小可能大于可用的內存空間,從而無法將整個文件讀入內存進行處理。那是流的用武之地!

既能用流來處理較小的數據塊,也可以讀取較大的文件。

以 YouTube 或 Netflix 之類的“流媒體”服務為例:這些服務不會讓你你立即下載視頻和音頻文件。取而代之的是,你的瀏覽器以連續的塊流形式接收視頻,從而使接收者幾乎可以立即開始觀看和收聽。

但是,流不僅涉及處理媒體和大數據。它們還在代碼中賦予了我們“可組合性”的力量??紤]可組合性的設計意味著能夠以某種方式組合多個組件以產生相同類型的結果。在 Node.js 中,可以通過流在其他較小的代碼段中傳遞數據,從而組成功能強大的代碼段。

為什么會用到流

與其他數據處理方法相比,流基本上具有兩個主要優點:

  1. 內存效率:你無需事先把大量數據加載到內存中即可進行處理

  2. 時間效率:得到數據后立即開始處所需的時間大大減少,不必等到整個有效數據全部發送完畢才開始處理

Node.js 中有 4 種類型的流:

  1. 可寫:可以向其中寫入數據的流。例如,fs.createWriteStream() 使我們可以使用流將數據寫入文件。

  2. 可讀:可從中讀取數據的流。例如:fs.createReadStream() 讓我們讀取文件的內容。

  3. 雙工:可讀和可寫的流。例如,net.Socket

  4. Transform:可在寫入和讀取時修改或轉換數據。例如在文件壓縮的情況下,你可以在文件中寫入壓縮數據,也可以從文件中讀取解壓縮的數據。

如果你已經使用過 Node.js,則可能遇到過流。例如在基于 Node.js 的 HTTP 服務器中,request 是可讀流,而 response 是可寫流。你可能用過 fs 模塊,該模塊可讓你用可讀和可寫文件流。每當使用 Express 時,你都在使用流與客戶端進行交互,而且由于 TCP 套接字、TLS棧和其他連接都基于 Node.js,所以在每個可以使用的數據庫連接驅動的程序中使用流。

一個實際的例子

如何創建可讀流

首先需要可讀性流,然后將其初始化。

1const Stream = require('stream')
2const readableStream = new Stream.Readable()

現在,流已初始化,可以向其發送數據了:

1readableStream.push('ping!')
2readableStream.push('pong!')

異步迭代器

強烈建議在使用流時配合異步迭代器(async iterator)。根據 Axel Rauschmayer【https://twitter.com/rauschma】 博士的說法,異步迭代是一種用于異步檢索數據容器內容的協議(這意味著當前“任務”可以在檢索項目之前被暫停)。另外必須提及的是,流異步迭代器實現使用內部的 readable 事件。

從可讀流中讀取時,可以使用異步迭代器:

 1import * as fs from 'fs';
2
3async function logChunks(readable{
4  for await (const chunk of readable) {
5    console.log(chunk);
6  }
7}
8
9const readable = fs.createReadStream(
10  'tmp/test.txt', {encoding'utf8'});
11logChunks(readable);
12
13// Output:
14// 'This is a test!\n'

也可以用字符串收集可讀流的內容:

 1import {Readable} from 'stream';
2
3async function readableToString2(readable{
4  let result = '';
5  for await (const chunk of readable) {
6    result += chunk;
7  }
8  return result;
9}
10
11const readable = Readable.from('Good morning!', {encoding'utf8'});
12assert.equal(await readableToString2(readable), 'Good morning!');

注意,在這種情況下必須使用異步函數,因為我們想返回 Promise。

請切記不要將異步功能與 EventEmitter 混合使用,因為當前在事件處理程序中發出拒絕時,無法捕獲拒絕,從而導致難以跟蹤錯誤和內存泄漏。目前的最佳實踐是始終將異步函數的內容包裝在 try/catch 塊中并處理錯誤,但這很容易出錯。這個 pull request 【https://github.com/nodejs/node/pull/27867】旨在解決一旦其落在 Node 核心上產生的問題。

要了解有關異步迭代的 Node.js 流的更多信息,請查看這篇很棒的文章【https://2ality.com/2019/11/nodejs-streams-async-iteration.html】。

Readable.from():從可迭代對象創建可讀流

stream.Readable.from(iterable, [options])  這是一種實用方法,用于從迭代器中創建可讀流,該迭代器保存可迭代對象中包含的數據??傻鷮ο罂梢允峭娇傻鷮ο蠡虍惒娇傻鷮ο?。參數選項是可選的,除其他作用外,還可以用于指定文本編碼。

 1const { Readable } = require('stream');
2
3async function * generate({
4  yield 'hello';
5  yield 'streams';
6}
7
8const readable = Readable.from(generate());
9
10readable.on('data', (chunk) => {
11  console.log(chunk);
12});

兩種讀取模式

根據 Streams API,可讀流有效地以兩種模式之一運行:flowingpaused??勺x流可以處于對象模式,無論處于 flowing 模式還是 paused 模式。

  • 流模式下,將自動從底層系統讀取數據,并通過 EventEmitter 接口使用事件將其盡快提供給程序。

  •  paused 模式下,必須顯式調用 stream.read() 方法以從流中讀取數據塊。

在 flowing 模式中,要從流中讀取數據,可以監聽數據事件并附加回調。當有大量數據可用時,可讀流將發出一個數據事件,并執行你的回調??聪旅娴拇a片段:

 1var fs = require("fs");
2var data = '';
3
4var readerStream = fs.createReadStream('file.txt'); //Create a readable stream
5
6readerStream.setEncoding('UTF8'); // Set the encoding to be utf8. 
7
8// Handle stream events --> data, end, and error
9readerStream.on('data'function(chunk{
10   data += chunk;
11});
12
13readerStream.on('end',function({
14   console.log(data);
15});
16
17readerStream.on('error'function(err{
18   console.log(err.stack);
19});
20
21console.log("Program Ended");

函數調用 fs.createReadStream() 給你一個可讀流。最初流處于靜態狀態。一旦你偵聽數據事件并附加了回調,它就會開始流動。之后將讀取大塊數據并將其傳遞給你的回調。流實現者決定發送數據事件的頻率。例如,每當有幾 KB 的數據被讀取時,HTTP 請求就可能發出一個數據事件。當從文件中讀取數據時,你可能會決定讀取一行后就發出數據事件。

當沒有更多數據要讀?。ńY束)時,流將發出結束事件。在以上代碼段中,我們監聽此事件以在結束時得到通知。

另外,如果有錯誤,流將發出并通知錯誤。

在 paused 模式下,你只需在流實例上重復調用 read(),直到讀完所有數據塊為止,如以下示例所示:

 1var fs = require('fs');
2var readableStream = fs.createReadStream('file.txt');
3var data = '';
4var chunk;
5
6readableStream.on('readable'function({
7    while ((chunk=readableStream.read()) != null) {
8        data += chunk;
9    }
10});
11
12readableStream.on('end'function({
13    console.log(data)
14});

read() 函數從內部緩沖區讀取一些數據并將其返回。當沒有內容可讀取時返回 null。所以在while 循環中,我們檢查是否為 null 并終止循環。請注意,當可以從流中讀取大量數據時,將會發出可讀事件。

所有 Readable 流均以 paused 模式開始,但可以通過以下方式之一切換為 flowing 模式

  • 添加一個 'data' 事件處理。

  • 調用 stream.resume() 方法。

  • 調用 stream.pipe() 方法將數據發送到可寫對象。

Readable 可以使以下方法之一切換回 paused 模式:

  • 如果沒有管道目標,則通過調用 stream.pause() 方法。

  • 如果有管道目標,請刪除所有管道目標??梢酝ㄟ^調用 stream.unpipe() 方法來刪除多個管道目標。

一個需要記住的重要概念是,除非提供了一種用于消耗或忽略該數據的機制,否則 Readable 將不會生成數據。如果使用機制被禁用或取消,則 Readable 將會試圖停止生成數據。添加readable 事件處理會自動使流停止 flowing,并通過 read.read() 得到數據。如果刪除了readable 事件處理,那么如果存在 'data' 事件處理,則流將再次開始 flowing。

如何創建可寫流

要將數據寫入可寫流,你需要在流實例上調用 write()。如以下示例所示:

1var fs = require('fs');
2var readableStream = fs.createReadStream('file1.txt');
3var writableStream = fs.createWriteStream('file2.txt');
4
5readableStream.setEncoding('utf8');
6
7readableStream.on('data'function(chunk{
8    writableStream.write(chunk);
9});

上面的代碼很簡單。它只是簡單地從輸入流中讀取數據塊,并使用 write() 寫入目的地。該函數返回一個布爾值,指示操作是否成功。如果為 true,則寫入成功,你可以繼續寫入更多數據。如果返回 false,則表示出了點問題,你目前無法寫任何內容??蓪懥鲗⑼ㄟ^發出 drain事件來通知你什么時候可以開始寫入更多數據。

調用 writable.end() 方法表示沒有更多數據將被寫入 Writable。如果提供,則可選的回調函數將作為 finish 事件的偵聽器附加。

1// Write 'hello, ' and then end with 'world!'.
2const fs = require('fs');
3const file = fs.createWriteStream('example.txt');
4file.write('hello, ');
5file.end('world!');
6// Writing more now is not allowed!

你可以用可寫流從可讀流中讀取數據:

 1const Stream = require('stream')
2
3const readableStream = new Stream.Readable()
4const writableStream = new Stream.Writable()
5
6writableStream._write = (chunk, encoding, next) => {
7    console.log(chunk.toString())
8    next()
9}
10
11readableStream.pipe(writableStream)
12
13readableStream.push('ping!')
14readableStream.push('pong!')
15
16writableStream.end()

還可以用異步迭代器來寫入可寫流,建議使用

 1import * as util from 'util';
2import * as stream from 'stream';
3import * as fs from 'fs';
4import {once} from 'events';
5
6const finished = util.promisify(stream.finished); // (A)
7
8async function writeIterableToFile(iterable, filePath{
9  const writable = fs.createWriteStream(filePath, {encoding'utf8'});
10  for await (const chunk of iterable) {
11    if (!writable.write(chunk)) { // (B)
12      // Handle backpressure
13      await once(writable, 'drain');
14    }
15  }
16  writable.end(); // (C)
17  // Wait until done. Throws if there are errors.
18  await finished(writable);
19}
20
21await writeIterableToFile(
22  ['One'' line of text.\n'], 'tmp/log.txt');
23assert.equal(
24  fs.readFileSync('tmp/log.txt', {encoding'utf8'}),
25  'One line of text.\n');

stream.finished() 的默認版本是基于回調的,但是可以通過 util.promisify() 轉換為基于 Promise 的版本(A行)。

在此例中,使用以下兩種模式:

Writing to a writable stream while handling backpressure (line B):
在處理 backpressure 時寫入可寫流(B行):

1if (!writable.write(chunk)) {
2  await once(writable, 'drain');
3}

關閉可寫流,并等待寫入完成(C行):

1writable.end();
2await finished(writable);

pipeline()

管道是一種機制,可以將一個流的輸出作為另一流的輸入。它通常用于從一個流中獲取數據并將該流的輸出傳遞到另一個流。管道操作沒有限制。換句話說,管道可用于分多個步驟處理流數據。

在 Node 10.x 中引入了 stream.pipeline()。這是一種模塊方法,用于在流轉發錯誤和正確清理之間進行管道傳輸,并在管道完成后提供回調。

這是使用管道的例子:

 1const { pipeline } = require('stream');
2const fs = require('fs');
3const zlib = require('zlib');
4
5// 使用 pipeline API 可以輕松將一系列流
6// 通過管道傳輸在一起,并在管道完全完成后得到通知。
7// 一個有效地用 gzip壓縮巨大視頻文件的管道:
8
9pipeline(
10  fs.createReadStream('The.Matrix.1080p.mkv'),
11  zlib.createGzip(),
12  fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
13  (err) => {
14    if (err) {
15      console.error('Pipeline failed', err);
16    } else {
17      console.log('Pipeline succeeded');
18    }
19  }
20);

由于pipe 不安全,應使用 pipeline 代替 pipe。

流模塊

Node.js 流模塊【https://nodejs.org/api/stream.html】 提供了構建所有流 API 的基礎。

Stream 模塊是 Node.js 中默認提供的原生模塊。Stream 是 EventEmitter 類的實例,該類在 Node 中異步處理事件。因此流本質上是基于事件的。

要訪問流模塊:

1const stream = require('stream');

stream 模塊對于創建新型流實例非常有用。通常不需要使用 stream 模塊來消耗流。

流驅動的 Node API

由于它們的優點,許多 Node.js 核心模塊提供了原生流處理功能,最值得注意的是:

  • net.Socket 是流所基于的主 API 節點,它是以下大多數 API 的基礎

  • process.stdin 返回連接到 stdin 的流

  • process.stdout 返回連接到 stdout 的流

  • process.stderr 返回連接到 stderr 的流

  • fs.createReadStream() 創建一個可讀的文件流

  • fs.createWriteStream() 創建可寫的文件流

  • net.connect() 啟動基于流的連接

  • http.request() 返回 http.ClientRequest 類的實例,它是可寫流

  • zlib.createGzip() 使用gzip(一種壓縮算法)將數據壓縮到流中

  • zlib.createGunzip() 解壓縮 gzip 流。

  • zlib.createDeflate() deflate(壓縮算法)將數據壓縮到流中

  • zlib.createInflate() 解壓縮一個deflate流

流備忘單:

web前端培訓



web前端培訓



web前端培訓


查看更多:Node.js 流速查表【https://devhints.io/nodejs-stream

以下是與可寫流相關的一些重要事件:

  • error –表示在寫或配置管道時發生了錯誤。

  • pipeline – 當把可讀流傳遞到可寫流中時,該事件由可寫流發出。

  • unpipe – 當你在可讀流上調用 unpipe 并停止將其輸送到目標流中時發出。

結論

這就是所有關于流的基礎知識。流、管道和鏈是 Node.js 的核心和最強大的功能。流確實可以幫你編寫簡潔而高效的代碼來執行 I/O。

另外,還有一個值得期待的 Node.js 戰略計劃【https://github.com/nodejs/TSC/blob/master/Strategic-Initiatives.md#current-initiatives】,名為 BOB【https://github.com/Fishrock123/bob】,旨在改善 Node.js 的內部數據流以及希望作為未來 Node.js 流數據接口的公共 API 的。


web前端培訓:http://www.akpsimsu.com/web2019


注釋:本文內容來自公眾號 前端先鋒



上一篇:寫代碼注意了,千萬不要在 MySQL 中使用 UTF-8

下一篇:應屆生去公司找個Java程序員的職位需要什么技能?

相關推薦

www.akpsimsu.com

有位老師想和您聊一聊

關閉

立即申請