ByteBuf使用实例
之前我們有個netty5的拆包解決方案(參加netty5拆包問題解決實例),現(xiàn)在我們采用另一種思路,不需要新增LengthFieldBasedFrameDecoder,直接修改NettyMessageDecoder:
package com.wlf.netty.nettyapi.msgpack;
import com.wlf.netty.nettyapi.constant.Delimiter;
import com.wlf.netty.nettyapi.javabean.Header;
import com.wlf.netty.nettyapi.javabean.NettyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class NettyMessageDecoder extends ByteToMessageDecoder {
    /**
     * 消息體字節(jié)大小:分割符字段4字節(jié)+長度字段4字節(jié)+請求類型字典1字節(jié)+預(yù)留字段1字節(jié)=10字節(jié)
     */
    private static final int HEAD_LENGTH = 10;
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        while (true) {
            // 標記字節(jié)流開始位置
            byteBuf.markReaderIndex();
            // 若讀取到分割標識,說明讀取當前字節(jié)流開始位置了
            if (byteBuf.readInt() == Delimiter.DELIMITER) {
                break;
            }
            // 重置讀索引為0
            byteBuf.resetReaderIndex();
            // 長度校驗,字節(jié)流長度至少10字節(jié),小于10字節(jié)則等待下一次字節(jié)流過來
            if (byteBuf.readableBytes() < HEAD_LENGTH) {
                byteBuf.resetReaderIndex();
                return;
            }
        }
        // 2、獲取data的字節(jié)流長度
        int dataLength = byteBuf.readInt();
        // 校驗數(shù)據(jù)包是否全部發(fā)送過來,總字節(jié)流長度(此處讀取的是除去delimiter和length之后的總長度)-
        // type和reserved兩個字節(jié)=data的字節(jié)流長度
        int totalLength = byteBuf.readableBytes();
        if ((totalLength - 2) < dataLength) {
            // 長度校驗,字節(jié)流長度少于數(shù)據(jù)包長度,說明數(shù)據(jù)包拆包了,等待下一次字節(jié)流過來
            byteBuf.resetReaderIndex();
            return;
        }
        // 3、請求類型
        byte type = byteBuf.readByte();
        // 4、預(yù)留字段
        byte reserved = byteBuf.readByte();
        // 5、數(shù)據(jù)包內(nèi)容
        byte[] data = null;
        if (dataLength > 0) {
            data = new byte[dataLength];
            byteBuf.readBytes(data);
        }
        NettyMessage nettyMessage = new NettyMessage();
        Header header = new Header();
        header.setDelimiter(Delimiter.DELIMITER);
        header.setLength(dataLength);
        header.setType(type);
        header.setReserved(reserved);
        nettyMessage.setHeader(header);
        nettyMessage.setData(data);
        list.add(nettyMessage);
        // 回收已讀字節(jié)
        byteBuf.discardReadBytes();
    }
}
我們的改動很小,只不過將原來的讀索引改為標記索引,然后在拆包時退出方法前重置讀索引,這樣下次數(shù)據(jù)包過來,我們的讀索引依然從0開始,delimiter的標記就可以讀出來,而不會陷入死循環(huán)了。
ByteBuf是ByteBuffer的進化版,ByteBuffer(參見ByteBuffer使用實例)才一個索引,讀寫模式需要通過flip來轉(zhuǎn)換,而ByteBuf有兩個索引,readerIndex讀索引和writerIndex寫索引,讀寫轉(zhuǎn)換無縫連接,青出于藍而勝于藍:
 +-------------------+------------------+------------------+
 | discardable bytes | readable bytes | writable bytes |
 |              | (CONTENT) |             |
 +-------------------+------------------+------------------+
 |              |              |             |
 0 <= readerIndex <= writerIndex <= capacity
既然有兩個索引,那么標記mask、重置reset必然也是兩兩對應(yīng),上面的代碼中我們只需要用到讀標記和讀重置。
我們把客戶端handler也修改下,先把LengthFieldBasedFrameDecoder去掉:
// channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 4, 4, 2, 0));
再讓數(shù)據(jù)包更大一些:
    /**
     * 構(gòu)造PCM請求消息體
     *
     * @return
     */
    private byte[] buildPcmData() throws Exception {
        byte[] resultByte = longToBytes(System.currentTimeMillis());
        // 讀取一個本地文件
        String AUDIO_PATH = "D:\input\test_1.pcm";
        try (RandomAccessFile raf = new RandomAccessFile(AUDIO_PATH, "r")) {
            int len = -1;
            byte[] content = new byte[1024];
            while((len = raf.read(content)) != -1)
            {
               resultByte = addAll(resultByte, content);
            }
        }
        return resultByte;
    }
            
再debug下看看,第一次解析客戶端發(fā)送的數(shù)據(jù),讀取1024字節(jié),我們可以看到讀索引是8(delimiter+length=8),寫索引就是1024,我們的大包里有3939116個字節(jié),去掉10個字節(jié)的header,剩下小包是3939106::
第二次再讀1024,代碼已經(jīng)執(zhí)行reset重置讀索引了,所以讀索引由8改為0,寫索引累增到2048:
第三次再讀1024,寫索引繼續(xù)累增到3072:
最后一次發(fā)1024,寫索引已經(jīng)到達3939116,大包傳輸結(jié)束了:
從上面看出,我們對ByteBuf的capacity一直在翻倍,讀指針一直標記在大包的起始位置0,這樣做的目的是每次都能讀取小包的長度length(3939106),拿來跟整個ByteBuf的長度作比較,只要它取到的小包沒到達到length,我們就繼續(xù)接受新包,寫索引不停的累加,直到整個大包長度>=3939116(也就是小包>=3939106),這時我們開始移動讀索引,將字節(jié)流寫入對象,最后回收已讀取的字節(jié)(調(diào)用discardReaderBytes方法):
BEFORE discardReadBytes()
 +-------------------+------------------+------------------+
 | discardable bytes | readable bytes | writable bytes |
 +-------------------+------------------+------------------+
 |             |           |              |
 0 <= readerIndex <= writerIndex <= capacity
AFTER discardReadBytes()
 +------------------+--------------------------------------+
 | readable bytes | writable bytes (got more space) |
 +------------------+--------------------------------------+
 |            |                        |
 readerIndex (0) <= writerIndex (decreased) <= capacity
其他方法參見測試類:
package com.wlf.netty.nettyserver;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.Assert;
import org.junit.Test;
public class ByteBufTest {
    @Test
    public void byteBufTest() {
        ByteBuf byteBuf = Unpooled.buffer(10);
        byteBuf.writeInt(0xabef0101);
        byteBuf.writeInt(1024);
        byteBuf.writeByte((byte) 1);
        byteBuf.writeByte((byte) 0);
        // 開始讀取
        printDelimiter(byteBuf);
        printLength(byteBuf);
        // 派生一個ByteBuf,取剩下2個字節(jié),但讀索引不動
        ByteBuf duplicatBuf = byteBuf.duplicate();
        printByteBuf(byteBuf);
        // 派生一個ByteBuf,取剩下2個字節(jié),讀索引動了
        ByteBuf sliceBuf = byteBuf.readSlice(2);
        printByteBuf(byteBuf);
        // 兩個派生的對象其實是一樣的
        Assert.assertEquals(duplicatBuf, sliceBuf);
    }
    private void printDelimiter(ByteBuf buf) {
        int newDelimiter = buf.readInt();
        System.out.printf("delimeter: %s
", Integer.toHexString(newDelimiter));
        printByteBuf(buf);
    }
    private void printLength(ByteBuf buf) {
        int length = buf.readInt();
        System.out.printf("length: %d
", length);
        printByteBuf(buf);
    }
    private void printByteBuf(ByteBuf buf) {
        System.out.printf("reader Index: %d, writer Index: %d, capacity: %d
", buf.readerIndex(), buf.writerIndex(), buf.capacity());
    }
}
輸出:
delimeter: abef0101 reader Index: 4, writer Index: 10, capacity: 10 length: 1024 reader Index: 8, writer Index: 10, capacity: 10 reader Index: 8, writer Index: 10, capacity: 10 reader Index: 10, writer Index: 10, capacity: 10
總結(jié)
以上是生活随笔為你收集整理的ByteBuf使用实例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 洛谷模板,树状数组二 差分
- 下一篇: excel 批量替换换行符
