pro-node 之 stream & buffer

#stream

##stream是什么,stream模型
其他语言的stream貌似没分什么readstream,writestream,先理解下模型吧.

假设水龙头通过水管洒水车供水,那么这个水龙头就是提供者,洒水车就是消费者,这个小水管就是连接两者的stream,类似,一堆数据,就是数据提供者,而我们的程序需要从某些地方获取数据,就是数据消费者consumer,这个读取数据的通道就是stream…

所谓读写,是站在程序的角度来说:

  • 我程序需要数据,要读取,那个stream就是ReadStream
    process.stdin就是要从其他地方读取数据
  • 我程序要输出数据,stream就是WriteStream
    process.stdout/process.stderr输出数据,默认输出到console窗口

##readable stream

1
2
3
var Stream = require('stream');
var rs = new Stream();
rs.readable = true;

  • data事件,emit("data",data),使用on("data",handler)来接受数据
  • end事件,表示数据发送完了,emit('end');
  • close事件,stream关闭,emit('close');
  • error事件,表示发生错误,emit('error',new Error('xxxx')

使用pause()/resume()来暂停/恢复

##Writable stream

1
2
var ws = new Stream();
ws.writable = true;

###write() 写入数据

1
2
boolean successed = ws.write(new Buffer("ABCD"),callback)
boolean successed = ws.write("ABCD","utf8",callback)

返回succeed表示是否成功写入

###end(可选数据) 完成发送

1
2
ws.end();
//表示已完成发送数据,同时可选的带上本次数据,参数同write

end() -> finish事件 -> close事件

###drain事件
如果write返回false的话,表示这个数据消费者处理不了那么多的数据,所以没write成功,但当可以处理的时候,触发drain事件,程序可以监听这个事件,以继续write数据

##pipe
pipe管道的意思,链接两个stream,
src.pipe(dest) 返回这个dest
其中src必须是readable stream,dest必须是writable stream

出现a.pipe(b).pipe(c)这种没有打破src必须为readable stream,原因是还有可读可写stream,学名Duplex Stream

a.js :

1
2
3
setInterval(function() {
console.log(Math.floor(Math.random() * 100));
}, 1000);

b.js :

1
2
3
4
5
6
var sum = 0;
process.stdin.on('data', function(rec) {
rec = parseInt(rec);
console.log("rec : %d,sum : %d,sum+=rec : %d",rec,sum,sum+=rec);
})
process.stdin.resume();

执行

1
> node a | node b

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
rec : 68,sum : 0,sum+=rec : 68
rec : 95,sum : 68,sum+=rec : 163
rec : 95,sum : 163,sum+=rec : 258
rec : 88,sum : 258,sum+=rec : 346
rec : 76,sum : 346,sum+=rec : 422
rec : 8,sum : 422,sum+=rec : 430
rec : 24,sum : 430,sum+=rec : 454
rec : 83,sum : 454,sum+=rec : 537
rec : 18,sum : 537,sum+=rec : 555
rec : 97,sum : 555,sum+=rec : 652
rec : 45,sum : 652,sum+=rec : 697
rec : 90,sum : 697,sum+=rec : 787
rec : 64,sum : 787,sum+=rec : 851
rec : 75,sum : 851,sum+=rec : 926
rec : 11,sum : 926,sum+=rec : 937
rec : 73,sum : 937,sum+=rec : 1010
rec : 80,sum : 1010,sum+=rec : 1090
rec : 55,sum : 1090,sum+=rec : 1145

程序a产生的随机数,使用console.log输出到程序a的process.stdout 但pipe到了程序b的process.stdin,就是类似效果

##FileStream
假设要读取一个文件,并打印在控制台上,传统做法

1
2
3
fs.readFile(__dirname + "/foo.txt", function(error, data) {
console.log(data);
});

这样做可以实现,但是当文件很大时,就会吃掉内存,而且要等待它一点一点读取完,像Server如果服务静态文件的话,这么做就太浪费内存,而且耗时…更好的做法是使用流的pipe函数:

1
2
var rs = fs.createReadStream("foo.txt");
rs.pipe(process.stdout);

server端就是 rs.pipe(response); 直接发送文件,Nginx的SendFile貌似就是这样的原理
这样程序本身不用缓存

###fs.createReadStream

1
var rs = fs.createReadStream(path,option);

返回的rs就是ReadStream类型的,可以往任意WriteStream里pipe…

####事件&方法
data/end/close事件 根普通readable stream一致
open事件是文件被打开的时候出发,回调参数为fd :file descriptor

####option参数
|字段|说明|
|-|-|
|fd |已经存在的filedescriptor,那么path应该传null
|encoding |编码
|autoClose |默认true,读取完是否自动关闭,
|flags |默认”r”,根fs.open那里一致
|mode |默认0666,跟fs.open那里一致
|start |从文件哪个位置开始读取
|end |读到文件哪个位置算结束

###fs.createWriteStream

1
var ws = fs.createWriteStream(path,option);

有了ReadStream和WriteStream,我们复制文件就方便多了…
一般小文件也就fs.readFile 和 fs.writeFile 组合
fs-extra库的copySync方法使用fs.read方法,规定了64*1024大小的Buffer,异步的话使用pipe简单到家了…

1
2
3
4
var fs = require("fs");
var readStream = fs.createReadStream(__dirname + "/foo.txt");
var writeStream = fs.createWriteStream(__dirname + "/bar.txt");
readStream.pipe(writeStream);

####字段&事件
open事件,文件打开时
bytesWritten表示已经写入的字节数

####option参数
|字段 |说明|
|- |-|
|fd |file descriptor,设置这个就设置path = null吧|
|flags |默认w,即不存在创建;存在清空|
|mode |默认0666|
|encoding |编码|
|start |从哪里开始写|

##zlib中的stream
可以创建stream的方法

1
2
3
4
> zlib.create
zlib.createDeflate zlib.createDeflateRaw zlib.createGunzip
zlib.createGzip zlib.createInflate zlib.createInflateRaw
zlib.createUnzip

将read_strem先pipe至 压缩或解压缩 的stream中,再pipe至文件WriteStream中,压缩完成

1
2
3
4
5
6
var fs = require("fs");
var zlib = require("zlib");
var cs = zlib.createGzip();//compress stream
var rs = fs.createReadStream("input.txt");//read stream
var ws = fs.createWriteStream("input.txt.gz");//write stream
rs.pipe(cs).pipe(ws);

#Buffer相关
一字节包含8位,2位16进制数,看到的Buffer都是两位数分开,一个字节

字节顺序
os.endianness()获取

1
2
> os.endianness()
'LE'

有大字节顺序(big endianness = BE)讲的就是低位存储在高地址上
小字节(littlle endianness = LE),是低位存储在低地址上

例如32为的int值1,占用4字节
00 00 00 01,01所占位置不同,区分,下图0x00000000 0xFFFFFFFF即内存地址

BE的情况是,01出现的地址是内存地址最大的


LE的情况是,01出现的地址是最小的

之所以说这个,是因为Buffer里面的write方法有BE LE后缀的就是这个区别

##ArrayBuffer
提供字节数组,使用new ArrayBuffer(size)初始化,初始化之后不能改变,可以像数组一样赋值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
> ab = new ArrayBuffer(5)
{ '0': 0,
'1': 0,
'2': 0,
'3': 0,
'4': 0,
slice: [Function: slice],
byteLength: 5 }
> ab[0] = 1000
1000
> ab
{ '0': 232,
'1': 0,
'2': 0,
'3': 0,
'4': 0,
slice: [Function: slice],
byteLength: 5 }

可以看到:

  • 超过一个字节的容量FF = 255时,会被截断,1000 = 3E8 截成E8 = 232
  • byteLength表示字节个数
  • slice方法生成新的ArrayBuffer

##Buffer

###构造函数

  • new Buffer(1024) 数值表示大小,得到的Buffer是1024字节,也就是1KB,值未初始化,可以用fill(0)实例方法填充0
  • new Buffer([1,2,3]) ,用数组初始化
  • new Buffer(str,encoding = “utf8”)

###方法
小写buffer表示Buffer实例,大写Buffer表示类

  • buffertoString(encoding,start,end)
    encoding编码
    start/end表示截取该buffer,来toString
    三个参数全都可选

  • Buffer.isEncoding(“utf8”)判断参数是否是node认识的encoding

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    > Buffer.isEncoding("utf8")
    true
    > Buffer.isEncoding("utf-8")
    true
    > Buffer.isEncoding("UTF8")
    true
    > Buffer.isEncoding("UTF-8")
    true
    > Buffer.isEncoding("Unicode")
    false
    > Buffer.isEncoding("gb2312")
    false
    > Buffer.isEncoding("gbk")
    false
    > Buffer.isEncoding("hex")
    true
    > Buffer.isEncoding("binary")
    true
  • Buffer.isBuffer(buf)
    正如其名

  • Buffer.byteLength() = buffer.length

    1
    2
    3
    4
    > new Buffer("哈哈")
    <Buffer e5 93 88 e5 93 88>
    > _.length
    6

中文utf8宽字节,每个字3个字节

  • buffer.fill(0)填充0
  • buffer.slice

    1
    2
    var buf1 = new Buffer(4);
    var buf2 = buf1.slice(1, -1);//去头去尾,-1表示最末一个
  • buffer.concat([buf1,buf2])
    注意参数,Buffer.concat(list,[length]),
    length是拼接后的长度,如果拼接后大于这个长度则报错
    用法

    1
    2
    3
    var buf1 = new Buffer([1, 2]);
    var buf2 = new Buffer([3, 4]);
    var buf = Buffer.concat([buf1, buf2]);

length示例,给的
length=1,因为拼接后有6字节,报错
length=10,后面再分配4个字节,留用,也是未初始化状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
> b
<Buffer d6 d0 ce c4>
> Buffer.concat([b,new Buffer("\r\n")],1)
RangeError: targetStart out of bounds
at Buffer.copy (buffer.js:526:11)
at Function.Buffer.concat (buffer.js:499:9)
at repl:1:9
at REPLServer.self.eval (repl.js:110:21)
at Interface.<anonymous> (repl.js:239:12)
at Interface.emit (events.js:95:17)
at Interface._onLine (readline.js:202:10)
at Interface._line (readline.js:531:8)
at Interface._ttyWrite (readline.js:760:14)
at ReadStream.onkeypress (readline.js:99:10)
> Buffer.concat([b,new Buffer("\r\n")],10)
<Buffer d6 d0 ce c4 0d 0a 00 00 48 85>

  • buffer.copy(dest, … offset …)
    呢吗,参数真难记,直接用slice吧
  • buffer.write()
    参数1 : 要写入的string
    参数2 : buffer的offset,表示从哪里开始写,默认0
    参数3 : 要写的字节数,默认全部
    参数4 : encoding

写入数值/读取数值

LE BE就是那个字节顺序,我的win7是LE,低位在小内存地址处