go语言socket通信初试
2019獨角獸企業重金招聘Python工程師標準>>>
聽說go的在系統性能方面有很大的優勢,最近對go語言產生了極大的興趣,相對現有的項目用go改造,原有的項目用的ace框架編寫的通信的框架,在目前的移動的通信網中忙時有的時候處理不過來,于是先研究試圖測試一下socket。由于對go剛剛入門,有些不正確的地方還請高人指點。
由于我們系統通常是不同語言之間通信(之前系統是客戶端和服務端都用c++),這里客戶端采用java+mina編寫,服務端采用go編寫,最初設計,像借用go語言中的gob進行編解碼,但是經過測試后發現行不通,經過和網友以及一些高人的指點,gob其實針對go語言之間的編解碼的,跨語言還真不靈光。有同事建議我用protocolbuffer這個,我一看這個又是定義類似idl文件(之前做了幾年的corba技術,對這樣的東西有點抵觸了,因為有的時候項目合作方已經定下的方案或者已經完成的項目很難配合你用一種新的技術或者新協議重新修改),沒有辦法我只能采用硬編碼的方式實現了,下一步我會采用protocolbuffer技術實現跨語言之間的通信,如果您有剛好的方式,希望能交流。下面我把我的部分代碼貼一下,僅供參考,如果需要全部可以測試程序可以留下方式。
客戶端主要代碼:
主入口類:
public class Client {
?? ?/**
?? ? * @param args
?? ? */
?? ?public static void main(String[] args) {
?? ??? ?// TODO Auto-generated method stub
?? ??? ?MinaClient client = new MinaClient(); ?
?? ??? ?if (client.connect()) { ?
?? ??? ??? ?System.out.println("連接服務器成功!");
?? ??? ??? ?//client.send("連接服務器成功!"); ?
?? ??? ??? ?//Scanner scanner = new Scanner(System.in);
?? ??? ??? ?boolean flag =false;
?? ??? ??? ?int i = 0;
?? ??? ??? ?while (!flag) { ?
?? ??? ??? ??? ?i++;
?? ??? ???????? //client.send("hello world "+i);
?? ??? ??? ??? ?Ss7LspMsg msg = new Ss7LspMsg();
?? ??? ??? ??? ?msg.setSeq(231115);
?? ??? ??? ??? ?msg.setProtocoltype(1);
?? ??? ??? ??? ?msg.setTime(System.currentTimeMillis());
?? ??? ??? ??? ?msg.setLsp(123);
?? ??? ??? ??? ?msg.setLen(20);
?? ??? ??? ??? ?byte[] bytes = new byte[20];
?? ??? ??? ??? ?for(int j=0;j<20;j++){
?? ??? ??? ??? ??? ?bytes[i] = (byte) j;
?? ??? ??? ??? ?}
?? ??? ??? ??? ?msg.setBytes(bytes);
?? ??? ??? ??? ?//client.send("helloworld"+i);
?? ??? ??? ??? ?client.send(msg);
?? ??? ???????? if(i==1)
?? ??? ??????? ??? ?flag = true;
?? ??? ???????? /*try {
?? ??? ??? ??? ??? ?Thread.sleep(1000);
?? ??? ??? ??? ?} catch (InterruptedException e) {
?? ??? ??? ??? ??? ?// TODO Auto-generated catch block
?? ??? ??? ??? ??? ?e.printStackTrace();
?? ??? ??? ??? ?}*/
?? ??? ??? ?} ?
?? ??? ?} ?
?? ??? ?System.out.println("OVER!");
?? ??? ?client.close();
?? ?}
}
MinaClient類:
public class MinaClient {
?? ?
?? ?private SocketConnector connector; ?
?? ?private ConnectFuture future; ?
?? ?private IoSession session; ?
?? ?public boolean connect() { ?
?? ??? ?// 創建一個socket連接 ?
?? ??? ?connector = new NioSocketConnector(); ?
?? ??? ?// 設置鏈接超時時間 ?
?? ??? ?connector.setConnectTimeoutMillis(3000); ?
?? ??? ?// 獲取過濾器鏈 ?
?? ??? ?DefaultIoFilterChainBuilder filterChain = connector.getFilterChain(); ?
?? ??? ?// 添加編碼過濾器 處理亂碼、編碼問題 ?
?? ??? ?filterChain.addLast("codec", new ProtocolCodecFilter(new CharsetCodecFactory()));?
?? ?
?? ??? ?// 消息核心處理器 ?
?? ??? ?connector.setHandler(new ClientMessageHandlerAdapter()); ?
?? ?? ?
?? ??? ?// 連接服務器,知道端口、地址 ?
?? ??? ?future = connector.connect(new InetSocketAddress("127.0.0.1",22345)); ?
?? ???????? // 等待連接創建完成 ?
?? ??? ?future.awaitUninterruptibly(); ?
?? ??? ?// 獲取當前session ?
?? ??? ?session = future.getSession(); ?
?? ??? ?return true; ?
?? ?} ?
?? ?public void setAttribute(Object key, Object value) { ?
?? ??? ?session.setAttribute(key, value); ?
?? ?} ?
?? ?
?? ?public void send(String message) { ?
?? ??? ?session.write(message); ?
?? ?} ?
?? ?
?? ?public void send(Ss7LspMsg message) { ?
?? ??? ?session.write(message); ?
?? ?}
?? ?
?? ?public boolean close() { ?
?? ??? ?CloseFuture future = session.getCloseFuture(); ?
?? ??? ?future.awaitUninterruptibly(1000); ?
?? ??? ?connector.dispose(); ?
?? ??? ?return true; ?
?? ?} ?
?? ?
?? ?public SocketConnector getConnector() { ?
?? ??? ?return connector; ?
?? ?} ?
?? ?
?? ?public IoSession getSession() { ?
?? ??? ?return session; ?
?? ?}?
消息基礎類(消息頭)
public abstract class MsgHeader implements Serializable{
?? ?/**
?? ? *
?? ? */
?? ?private static final long serialVersionUID = 1L;
?? ?//消息長度,包括頭長度
?? ?//頭長:8
?? ?int length = 20;
?? ?//協議類型
?? ?int protocoltype;
?? ?//序列號
?? ?int? seq;
?? ?//時間戳
?? ?long time;
?? ?
?? ?public MsgHeader() {
?? ??? ?super();
?? ??? ?// TODO Auto-generated constructor stub
?? ?}
?? ?public MsgHeader(int length, int protocoltype, int seq) {
?? ??? ?super();
?? ??? ?this.length = length;
?? ??? ?this.protocoltype = protocoltype;
?? ??? ?this.seq = seq;
?? ??? ?this.time = System.currentTimeMillis();
?? ?}
?? ?public void encodeHeader(IoBuffer buf) {
??????? // The total Length will be set later.
?? ??? ?buf.putInt(seq);
?? ??? ?buf.putInt(protocoltype);
??????? buf.putInt(length);
??????? System.out.println("len is "+length);
??????? buf.putLong(time);
??? }
??? public void decodeHeader(IoBuffer buf) {
?? ??? ?seq = buf.getInt();
??????? protocoltype = buf.getInt();
?? ??? ?length = buf.getInt();
?? ??? ?time = buf.getLong();
??? }
??? public abstract boolean encodeBody(IoBuffer bt);
??? public abstract boolean decodeBody(byte[] body);
?? ?
?? ?
?? ?public int getLength() {
?? ??? ?return length;
?? ?}
?? ?public void setLength(int length) {
?? ??? ?this.length += length;
?? ?}
?? ?public int getProtocoltype() {
?? ??? ?return protocoltype;
?? ?}
?? ?public void setProtocoltype(int protocoltype) {
?? ??? ?this.protocoltype = protocoltype;
?? ?}
?? ?public int getSeq() {
?? ??? ?return seq;
?? ?}
?? ?public void setSeq(int seq) {
?? ??? ?this.seq = seq;
?? ?}
?? ?
?? ?public byte[] strToBytes(int len,String str){
?? ??? ?byte[] bytes = new byte[len];
?? ??? ?for(int i=0;i<len;i++){
?? ??? ??? ?bytes[i] = (byte) 0xff;
?? ??? ?}
?? ??? ?String tmpstr = null;
?? ??? ?int tmplen = 0;
?? ??? ?if(str.trim().length()>len){
?? ??? ??? ?tmpstr = str.substring(0, 15);
?? ??? ??? ?tmplen = 16;
?? ??? ?}else{
?? ??? ??? ?tmpstr = str;
?? ??? ??? ?tmplen = str.length();
?? ??? ?}
?? ??? ?byte[] tmpbytes = tmpstr.getBytes();
?? ??? ?for(int i=0;i<tmplen;i++){
?? ??? ??? ?bytes[i] = tmpbytes[i];
?? ??? ?}
?? ??? ?return bytes;
?? ?}
?? ?
?? ?public? String decOctetString(byte[] bt) {
??????? int b = 0;
??????? int e = 0;
??????? // find the begin non 0 position;
??????? for (int i = 0; i < bt.length; i++) {
??????????? if (bt[i] != 0) {
??????????????? b = i;
??????????????? break;
??????????? }
??????? }
??????? // find the end non 0 position;
??????? for (int i = bt.length - 1; i > 0; i--) {
??????????? if (bt[i] != 0) {
??????????????? e = i;
??????????????? break;
??????????? }
??????? }
??????? return new String(bt, b, e - b + 1);
?? ?}
?? ?public long getTime() {
?? ??? ?return time;
?? ?}
?? ?public void setTime(long time) {
?? ??? ?this.time = time;
?? ?}
?? ?
?
?? ?/**
??? * 字符串ip轉換為long
??? * @param 字符串ip
??? * @return
??? */
?? public static long getStringIpToLong(String ip) {
?????? String[] ips = ip.trim().split("[.]");
?????? long num =? 16777216L*Long.parseLong(ips[0]) + 65536L*Long.parseLong(ips[1]) + 256*Long.parseLong(ips[2]) + Long.parseLong(ips[3]);
?????? return num;
?? }
?? ?
?? /**
??? * 長整型ip轉換為string
??? * @param long型ip
??? * @return
??? */
?? public static String getLongIpToString(long ipLong) { ?
???? ?
?????? long mask[] = {0x000000FF,0x0000FF00,0x00FF0000,0xFF000000};
?????? long num = 0;
?????? StringBuffer ipInfo = new StringBuffer();
?????? for(int i=0;i<4;i++){
?????????? num = (ipLong & mask[i])>>(i*8);
?????????? if(i>0) ipInfo.insert(0,".");
?????????? ipInfo.insert(0,Long.toString(num,10));
?????? }
?????? return ipInfo.toString();
?? }
}
抽象消息類(請求類、反饋類)
public abstract class BaseReq extends MsgHeader {
?? ?/**
?? ? *
?? ? */
?? ?private static final long serialVersionUID = 1L;
?? ?
}
public abstract class BaseRsp extends MsgHeader {
?? ?/**
?? ? *
?? ? */
?? ?private static final long serialVersionUID = 1L;
?? ?protected int result;
?? ?protected int reason;
?? ?public int getResult() {
?? ??? ?return result;
?? ?}
?? ?public void setResult(int result) {
?? ??? ?this.result = result;
?? ?}
?? ?public int getReason() {
?? ??? ?return reason;
?? ?}
?? ?public void setReason(int reason) {
?? ??? ?this.reason = reason;
?? ?}
?? ?
}
測試消息類
public class Ss7LspMsg extends BaseReq {
?? ?private int lsp;
?? ?private int len;
?? ?private byte[] bytes;
?? ?
?? ?public int getLsp() {
?? ??? ?return lsp;
?? ?}
?? ?public void setLsp(int lsp) {
?? ??? ?this.lsp = lsp;
?? ?}
?? ?public int getLen() {
?? ??? ?return len;
?? ?}
?? ?public void setLen(int len) {
?? ??? ?this.len = len;
?? ?}
?? ?public byte[] getBytes() {
?? ??? ?return bytes;
?? ?}
?? ?public void setBytes(byte[] bytes) {
?? ??? ?this.bytes = bytes;
?? ?}
?? ?@Override
?? ?public boolean encodeBody(IoBuffer bt) {
?? ??? ?// TODO Auto-generated method stub
?? ??? ?this.setLength(len+8);
?? ??? ?encodeHeader(bt);
?? ??? ?bt.putInt(len);
?? ??? ?bt.putInt(lsp);
?? ??? ?bt.put(bytes);
?? ??? ?return true;
?? ?}
?? ?@Override
?? ?public boolean decodeBody(byte[] body) {
?? ??? ?// TODO Auto-generated method stub
?? ??? ?return false;
?? ?}
}
服務端代碼:
負責通信的go文件
package main
import ( ????"fmt" ????//"github.com/bbangert/toml" ????"bytes" ????"encoding/binary" ????"encoding/gob" ????"io" ????"net" ) //常量定義 const ( ????VERSION = "0.1.0" ????TCP = "tcp" ????UDP = "udp" ????RECV_BUF_LEN = 1024 ) type IpTransType struct { ????Type string //網絡類型tcp/udp ????Addr string //ip地址 默認 127.0.0.1 ????Port int32 } func InitServer(transType IpTransType) (err error) { ????if transType.Addr == "" { ????????err = fmt.Errorf("transType.Addr is nil,please check the configuration file") ????????return ????} ????if transType.Port < 1 || transType.Port > 65535 { ????????err = fmt.Errorf("transType.Port must be in (1 ~ 65535") ????????return ????} ????if !(transType.Type == TCP || transType.Type == UDP) { ????????err = fmt.Errorf("transType.Type only be 'tcp' or 'udp' ") ????????return ????} ????listener, err := net.Listen(transType.Type, "127.0.0.1:22345") ????defer listener.Close() ????for { ????????conn, err := listener.Accept() ????????if err != nil { ????????????continue ????????} ????????fmt.Println("conn is coming") ????????go Receiver(conn) ????} ????return } type LspMsg struct { ????seq int32 ????protocol int32 ????length int32 ????times int64 ????lens int32 ????lsp int32 ????bytes [20]byte ????//bytes := make([]byte,20) ????//bytes *[]byte } type LspMsgBig struct { ????Seq int32 ????Protocol int32 ????Length int32 ????Times int64 ????Lens int32 ????Lsp int32 ????Bytes [20]byte ????//bytes := make([]byte,20) ????//bytes *[]byte } func Decode(data []byte, to interface{}) error { ????buf := bytes.NewBuffer(data) ????dec := gob.NewDecoder(buf) ????return dec.Decode(to) } func BytesToInt32(bytes []byte) int32 { ????return int32(binary.BigEndian.Uint32(bytes)) } func BytesToInt8(bytes []byte) int8 { ????return int8(bytes[0]) } func BytesToInt16(bytes []byte) int16 { ????return int16(binary.BigEndian.Uint16(bytes)) } func BytesToInt64(bytes []byte) int64 { ????return int64(binary.BigEndian.Uint64(bytes)) } func Receiver(conn net.Conn) (err error) { ????buf := make([]byte, RECV_BUF_LEN) ????//buf bytes.Buffer ????defer conn.Close() ????for { ????????n, err1 := conn.Read(buf) ????????switch err1 { ????????case nil: ????????????//n, _ := conn.Write(buf[0:n]) ????????????var out LspMsg ????????????//Decode(b, &out) ????????????var outout LspMsgBig ????????????if err := Decode(buf, &outout); err != nil { ????????????????fmt.Println("decode fail: " + err.Error()) ????????????} ????????????fmt.Println("outout is ", outout) ????????????fmt.Println("Byte2Int32 is ", BytesToInt32(buf[0:4])) ????????????fmt.Println("length is ", buf[0:n]) ????????????fmt.Println("length is ", buf[0:4]) ????????????fmt.Println("length is ", BytesToInt8(buf[1:4])) ????????????out.seq = BytesToInt32(buf[0:4]) ????????????out.protocol = BytesToInt32(buf[4:8]) ????????????out.length = BytesToInt32(buf[8:12]) ????????????out.times = BytesToInt64(buf[12:20]) ????????????out.lens = BytesToInt32(buf[20:24]) ????????????out.lsp = BytesToInt32(buf[24:28]) ????????????bytes := out.bytes[0:20] ????????????copy(bytes, buf[28:n]) ????????????//out.bytes = &(buf[28:n]) ????????????fmt.Println(out.bytes) ????????????/* ????????????????for j := 0; j < 20; j++ { ????????????????????out.bytes[j] = buf[j+28] ????????????????} ????????????*/ ????????????fmt.Println("length is ", out) ????????case io.EOF: //當對方斷開連接時觸發該方法 ????????????fmt.Printf("Warning: End of data: %s \n", err1) ????????????err = err1 ????????????return ????????default: //當對方斷開連接時觸發該方法 ????????????fmt.Printf("Error: Reading data: %s \n", err1) ????????????err = err1 ????????????return ????????} ????} ????return } 程序主入口:package main
import ( ????"fmt" ????//"net" ????"bytes" ????"encoding/gob" ????//"C" ) type P struct { ????X, Y, Z int ????Name string } type Q struct { ????X, Y *int32 ????Name string } func main() { ????//C.puts(C.CString("Hello, world\n")) ????var network bytes.Buffer // Stand-in for a network connection ????enc := gob.NewEncoder(&network) // Will write to network. ????dec := gob.NewDecoder(&network) // Will read from network. ????err := enc.Encode(P{3, 4, 5, "Pythagoras"}) ????if err != nil { ????????fmt.Println("encode error:", err) ????} ????var q Q ????err = dec.Decode(&q) ????fmt.Println("ENC IS ", enc) ????fmt.Println("dec IS ", dec) ????//fmt.Println("network IS ", network.String()) ????if err != nil { ????????fmt.Println("decode error:", err) ????} ????fmt.Printf("%q: {%d,%d}\n", q.Name, *q.X, *q.Y) ????fmt.Println("Hello World!") ????var transType IpTransType ????transType.Addr = "127.0.0.1" ????transType.Port = 12345 ????transType.Type = TCP ????InitServer(transType) }
測試結果:
conn is coming 48?decode fail: EOF?outout is? {0 0 0 0 0 0 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]}?Byte2Int32 is? 231115?length is? [0 3 134 203 0 0 0 1 0 0 0 48 0 0 1 63 38 140 96 48 0 0 0 20 0 0 0 123 0 19 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]?length is? [0 3 134 203]?length is? 3?[0 19 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]?length is? {231115 1 48 1370741301296 20 123 [0 19 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]}?FUCK22 ?FUCK22? 0?Warning: End of data: EOF ?
到目前為止我還木有找到一種go夸語言通信編解碼的問題,所以能硬編解碼了。
轉載于:https://my.oschina.net/goldwave/blog/136709
總結
以上是生活随笔為你收集整理的go语言socket通信初试的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Android开发必看知识,不看后悔
- 下一篇: eclipse+ADT 进行androi