时间:2021-04-29 10:55:48 | 栏目:NodeJS | 点击:次
我们知道,TCP是面向连接流传输的,其采用Nagle算法,在缓冲区对上层数据进行了处理。避免触发自动分片机制和网络上大量小数据包的同时也造成了粘包(小包合并)和半包(大包拆分)问题,导致数据没有消息保护边界,接收端接收到一次数据无法判断是否是一个完整数据包。那有什么方案可以解决这问题呢?
1、粘包问题解决方案及对比
很简单,既然消息没有边界,那我们在消息往下传之前给它加一个边界识别就好了。
第一种方案不够灵活;第二种有风险,如果数据内刚好有该特殊字符会出问题;第三种方案虽然要增加对消息头的解析,不过相对而言还是要安全一些。
2、分包与拆包
既然使用第三种方案,就必然涉及到封包和拆包的问题。
首先肯定需要定义数据包的结构,这类似Http包一样,有包头和包体。包头其实上是个大小固定的结构体,其中有个结构体成员变量表示包体的长度,其他的结构体成员可根据需要自己定义。根据包头长度固定以及包头中含有包体长度的变量就能正确的拆分出一个完整的数据包。包体则存放数据内容。
在发送端,需要进行封包。封包就是给一段数据加上包头,这样一来数据包就分为包头和包体两部分内容了。
在接受端,则需要进行拆包。主要流程如下:
1. 为每一个连接动态分配一个缓冲区,同时把此缓冲区和SOCKET关联.
2. 当接收到数据时首先把此段数据存放在缓冲区中.
3. 判断缓存区中的数据长度是否够一个包头的长度,如不够,则不进行拆包操作.
4. 根据包头数据解析出里面代表包体长度的变量.
5. 判断缓存区中除包头外的数据长度是否够一个包体的长度,如不够,则不进行拆包操作.
6. 取出整个数据包.这里的"取"的意思是不光从缓冲区中拷贝出数据包,而且要把此数据包从缓存区中删除掉.删除的办法就是把此包后面的数据移动到缓冲区的起始地址.
其中对于缓冲区的设计,主要由俩种:
1. 采用动态变化的缓冲区暂存,根据数据大小调整缓冲区大小。这个方案有个缺点,为了避免缓冲区不断增长,每次解析出一个完整包后需要将缓冲区残留的数据拷贝到缓冲区首部,这增加了系统负载。
3、网络字节序和本机字节序
定义了消息结构之后,发送端和接收端还需要统一字节序。我们知道,不同机器的本机字节序不同,绝大多数X86机器都是小端字节序,然后还是由少数机器是大端存储的。因此在数据流进行传输时,必须先统一字节序。一般约定在传输时采用网络字节序(大端),统一用unicode编码。
4、代码实现
了解以上知识之后,我们现在之后要做什么了。发送端按定义的协议规则封包,接受端把接收到的buffer放入缓冲区,当缓冲区内有完整包时开始拆包。封包拆包过程需要注意,读写超过一个字节的数据时需要按大端字节序读取。下面看node的代码实现(只提供核心实现片段):
1)发送端封包:
let head = new Buffer(4); let jsonStr = JSON.stringify(json); let body = new Buffer(jsonStr); //超过一字节的大端写入 head.writeInt32BE(body.byteLength, 0); let buffer = Buffer.concat([head, body]);
2)接收端收到buffer入缓冲区:
let dataReadStart = 0; //新数据的起始位置 let dataLength = buffer.length; // 要拷贝数据的长度 let availableLen = _bufferLength - _dataLen; // 缓冲区剩余可用空间 // buffer剩余空间不足够存储本次数据 if (availableLen < dataLength) { let newLength = Math.ceil((_dataLen + dataLength) / _bufferLength) * _bufferLength; let _tempBuffer = Buffer.alloc(newLength); // 将旧数据复制到新buffer并且修正相关参数 if (_writePointer < _readPointer) { // 数据存储在旧buffer的尾部+头部的顺序 let dataTailLen = _bufferLength - _readPointer; _buffer.copy(_tempBuffer, 0, _readPointer, _readPointer + dataTailLen); _buffer.copy(_tempBuffer, dataTailLen, 0, _writePointer); } else { // 数据是按照顺序进行的完整存储 _buffer.copy(_tempBuffer, 0, _readPointer, _writePointer); } _bufferLength = newLength; _buffer = _tempBuffer; _tempBuffer = null; _readPointer = 0; _writePointer = _dataLen; //存储新到来的buffer buffer.copy(_buffer, _writePointer, dataReadStart, dataReadStart + dataLength); _dataLen += dataLength; _writePointer += dataLength; } else if (_writePointer + dataLength > _bufferLength) { // 空间够用情况下,但是数据会冲破缓冲区尾部,部分存到缓冲区旧数据后,一部分存到缓冲区开始位置 // 缓冲区尾部剩余空间的长度 let bufferTailLength = _bufferLength - _writePointer; // 数据尾部位置 let dataEndPosition = dataReadStart + bufferTailLength; buffer.copy(_buffer, _writePointer, dataReadStart, dataEndPosition); // data剩余未拷贝进缓存的长度 let restDataLen = dataLength - bufferTailLength; buffer.copy(_buffer, 0, dataEndPosition, dataLength); _dataLen = _dataLen + dataLength; _writePointer = restDataLen } else { // 剩余空间足够存储数据,直接拷贝数据到缓冲区 buffer.copy(_buffer, _writePointer, dataReadStart, dataReadStart + dataLength); _dataLen = _dataLen + dataLength; _writePointer = _writePointer + dataLength }
3)取出缓冲区所有完整数据包(收到的buffer入缓冲区后)
let _dataHeadLen = 4; timer && clearInterval(timer); timer = setInterval(()=>{ // 缓冲区数据不够解析出包头 if (_dataLen < _dataHeadLen) { console.log('数据长度小于包头规定长度,等待数据......') clearInterval(timer); } // 解析包头长度 // 尾部最后剩余可读字节长度 let restDataLen = _bufferLength - _readPointer; let dataLen = 0; let headBuffer = Buffer.alloc(_dataHeadLen); // 数据包为分段存储,不能直接解析出包头,先拼接 if (restDataLen < _dataHeadLen) { // 取出第一部分头部字节 _buffer.copy(headBuffer, 0, _readPointer, _bufferLength) // 取出第二部分头部字节 let unReadHeadLen = _dataHeadLen - restDataLen; _buffer.copy(headBuffer, restDataLen, 0, unReadHeadLen) dataLen = headBuffer.readUInt32BE(0); } else { _buffer.copy(headBuffer, 0, _readPointer, _readPointer + _dataHeadLen); dataLen = headBuffer.readUInt32BE(0);; } // 数据长度不够读取,直接返回 if (_dataLen - _dataHeadLen < dataLen) { log.info("缓冲区已有body数据长度小于包头定义body的长度,等待数据......") clearInterval(timer); } else { // 数据够读,读取数据包 let package = Buffer.alloc(dataLen); // 数据是分段存储,需要分两次读取 if (_bufferLength - _readPointer < dataLen) { let firstPartLen = _bufferLength - _readPointer; // 读取第一部分,直接到字符尾部的数据 _buffer.copy(package, 0, _readPointer, firstPartLen + _readPointer); // 读取第二部分,存储在开头的数据 let secondPartLen = dataLen - firstPartLen; _buffer.copy(package, firstPartLen, 0, secondPartLen); _readPointer = secondPartLen; //更新可读起点 } else { // 直接读取数据 _buffer.copy(package, 0, _readPointer, _readPointer + dataLen); _readPointer += dataLen; //更新可读起点 } _dataLen -= readData.length; //更新数据长度 // 已经读取完所有数据 if (_readPointer === _writePointer) { clearInterval(timer) } //开始解包 callback(package); } }, 50);
4)拆包得到数据
let headBytes = 4; let head = new Buffer(headBytes); buffer.copy(head, 0, 0, headBytes); let dataLen = head.readUInt32BE(); const body = new Buffer(dataLen); buffer.copy(body, 0, headBytes, headBytes + dataLen) let content = null; try { const str = body.toString('utf-8'); if(str === ''){ content = null; }else{ content = JSON.parse(body); } } catch (e) { log.error('head指定body长度有问题') } //传递给业务层 callback(content);
5、总结
从上面我们已经了解到了封包解包的一个过程。TCP是可靠传输的,同一时间在网络上只会有一个数据包,并且丢包会重传,因此不用担心丢包或者数据包乱序问题。UDP有消息保护边界,不需要进行拆包解包,然后其是非可靠传输,也需要解决其他一些问题,譬如丢包和数据包排序问题。
上面进行数据包结构设计时只是简单地加了一个包体长度,事实上在业务场景可以自由增加需要的字段,譬如协议版本,协议类型等等。