使用golang开发Tcp服务端和客户端的例子

作者:getyouyou
链接:https://www.jianshu.com/p/dbc62a879081
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

前言

新工作接手了公司的一个使用golang编写的agent程序,用于采集各个机器的性能指标和监控数据,之前使用http实现数据的上传,最近想把它改成tcp上传的方式,由于是新手上路,顺手写了一个小demo程序。

这个程序中包含:

  • 简单的TcpServer服务程序:侦听,数据收发与解析
  • 简单的客户端程序:数据收发与解析

服务端

  • 与正常的其他语言一样,go中也提供了丰富的网络相关的包,按照正常的套路,它是这样的:

  • 绑定端口,初始化套接字

  • 启动侦听,开启后台线程接收客户端请求

  • 接收请求,针对每个请求开启一个线程来处理通信

  • 资源回收

golang的套路也是如此,不同的地方在于它可以使用goroutine来替换上面的线程;

整体的代码很简单,可以参考文档和api手册,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
package main

import (
"fmt"
"net"
"os"
"encoding/json"
"bufio"
"hash/crc32"
"io"
)
//数据包的类型
const (
HEART_BEAT_PACKET = 0x00
REPORT_PACKET = 0x01
)

var (
server = "127.0.0.1:8080"
)
//这里是包的结构体,其实是可以不需要的
type Packet struct {
PacketType byte
PacketContent []byte
}
//心跳包,这里用了json来序列化,也可以用github上的gogo/protobuf包
//具体见(https://github.com/gogo/protobuf)
type HeartPacket struct {
Version string`json:"version"`
Timestamp int64`json:"timestamp"`
}
//正式上传的数据包
type ReportPacket struct {
Content string`json:"content"`
Rand int`json:"rand"`
Timestamp int64`json:"timestamp"`
}
//与服务器相关的资源都放在这里面
type TcpServer struct {
listener *net.TCPListener
hawkServer *net.TCPAddr
}

func main() {
//类似于初始化套接字,绑定端口
hawkServer, err := net.ResolveTCPAddr("tcp", server)
checkErr(err)
//侦听
listen, err := net.ListenTCP("tcp", hawkServer)
checkErr(err)
//记得关闭
defer listen.Close()
tcpServer := &TcpServer{
listener:listen,
hawkServer:hawkServer,
}
fmt.Println("start server successful......")
//开始接收请求
for {
conn, err := tcpServer.listener.Accept()
fmt.Println("accept tcp client %s",conn.RemoteAddr().String())
checkErr(err)
// 每次建立一个连接就放到单独的协程内做处理
go Handle(conn)
}
}
//处理函数,这是一个状态机
//根据数据包来做解析
//数据包的格式为|0xFF|0xFF|len(高)|len(低)|Data|CRC高16位|0xFF|0xFE
//其中len为data的长度,实际长度为len(高)*256+len(低)
//CRC为32位CRC,取了最高16位共2Bytes
//0xFF|0xFF和0xFF|0xFE类似于前导码
func Handle(conn net.Conn) {
// close connection before exit
defer conn.Close()
// 状态机状态
state := 0x00
// 数据包长度
length := uint16(0)
// crc校验和
crc16 := uint16(0)
var recvBuffer []byte
// 游标
cursor := uint16(0)
bufferReader := bufio.NewReader(conn)
//状态机处理数据
for {
recvByte,err := bufferReader.ReadByte()
if err != nil {
//这里因为做了心跳,所以就没有加deadline时间,如果客户端断开连接
//这里ReadByte方法返回一个io.EOF的错误,具体可考虑文档
if err == io.EOF {
fmt.Printf("client %s is close!\n",conn.RemoteAddr().String())
}
//在这里直接退出goroutine,关闭由defer操作完成
return
}
//进入状态机,根据不同的状态来处理
switch state {
case 0x00:
if recvByte == 0xFF {
state = 0x01
//初始化状态机
recvBuffer = nil
length = 0
crc16 = 0
}else{
state = 0x00
}
break
case 0x01:
if recvByte == 0xFF {
state = 0x02
}else{
state = 0x00
}
break
case 0x02:
length += uint16(recvByte) * 256
state = 0x03
break
case 0x03:
length += uint16(recvByte)
// 一次申请缓存,初始化游标,准备读数据
recvBuffer = make([]byte,length)
cursor = 0
state = 0x04
break
case 0x04:
//不断地在这个状态下读数据,直到满足长度为止
recvBuffer[cursor] = recvByte
cursor++
if(cursor == length){
state = 0x05
}
break
case 0x05:
crc16 += uint16(recvByte) * 256
state = 0x06
break
case 0x06:
crc16 += uint16(recvByte)
state = 0x07
break
case 0x07:
if recvByte == 0xFF {
state = 0x08
}else{
state = 0x00
}
case 0x08:
if recvByte == 0xFE {
//执行数据包校验
if (crc32.ChecksumIEEE(recvBuffer) >> 16) & 0xFFFF == uint32(crc16) {
var packet Packet
//把拿到的数据反序列化出来
json.Unmarshal(recvBuffer,&packet)
//新开协程处理数据
go processRecvData(&packet,conn)
}else{
fmt.Println("丢弃数据!")
}
}
//状态机归位,接收下一个包
state = 0x00
}
}
}

//在这里处理收到的包,就和一般的逻辑一样了,根据类型进行不同的处理,因人而异
//我这里处理了心跳和一个上报数据包
//服务器往客户端的数据包很简单地以\n换行结束了,偷了一个懒:),正常情况下也可根据自己的协议来封装好
//然后在客户端写一个状态来处理
func processRecvData(packet *Packet,conn net.Conn) {
switch packet.PacketType {
case HEART_BEAT_PACKET:
var beatPacket HeartPacket
json.Unmarshal(packet.PacketContent,&beatPacket)
fmt.Printf("recieve heat beat from [%s] ,data is [%v]\n",conn.RemoteAddr().String(),beatPacket)
conn.Write([]byte("heartBeat\n"))
return
case REPORT_PACKET:
var reportPacket ReportPacket
json.Unmarshal(packet.PacketContent,&reportPacket)
fmt.Printf("recieve report data from [%s] ,data is [%v]\n",conn.RemoteAddr().String(),reportPacket)
conn.Write([]byte("Report data has recive\n"))
return
}
}
//处理错误,根据实际情况选择这样处理,还是在函数调之后不同的地方不同处理
func checkErr(err error) {
if err != nil {
fmt.Println(err)
os.Exit(-1)
}
}

特别需要注意
Handle方法在一个死循环中使用了一个无阻塞的buff来读取套接字中的数据,因此当客户端主动关闭连接时,如果不对这个io.EOF进行处理,会导致这个goroutine空转,疯狂吃cpu,在这里io.EOF的处理非常重要。

客户端

客户端与一般的TCP通信程序一样,它需要完成的工作有:

  • 向服务器发送心跳包

  • 向服务器发送数据包

  • 接收服务器的数据包

需要注意的就是客户端与服务端的数据协议保持一致,请在开始发送数据之前启动数据接收

上面的3个工作我分别用了goroutine来做,整体的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package main

import (
"os"
"fmt"
"net"
"time"
"math/rand"
"encoding/json"
"bufio"
"hash/crc32"
"sync"
)
//数据包类型
const (
HEART_BEAT_PACKET = 0x00
REPORT_PACKET = 0x01
)
//默认的服务器地址
var (
server = "127.0.0.1:9876"
)
//数据包
type Packet struct {
PacketType byte
PacketContent []byte
}
//心跳包
type HeartPacket struct {
Version string`json:"version"`
Timestamp int64`json:"timestamp"`
}
//数据包
type ReportPacket struct {
Content string`json:"content"`
Rand int`json:"rand"`
Timestamp int64`json:"timestamp"`
}

//客户端对象
type TcpClient struct {
connection *net.TCPConn
hawkServer *net.TCPAddr
stopChan chan struct{}
}

func main() {
//拿到服务器地址信息
hawkServer,err := net.ResolveTCPAddr("tcp", server)
if err != nil {
fmt.Printf("hawk server [%s] resolve error: [%s]",server,err.Error())
os.Exit(1)
}
//连接服务器
connection,err := net.DialTCP("tcp",nil,hawkServer)
if err != nil {
fmt.Printf("connect to hawk server error: [%s]",err.Error())
os.Exit(1)
}
client := &TcpClient{
connection:connection,
hawkServer:hawkServer,
stopChan:make(chan struct{}),
}
//启动接收
go client.receivePackets()

//发送心跳的goroutine
go func() {
heartBeatTick := time.Tick(2 * time.Second)
for{
select {
case <-heartBeatTick:
client.sendHeartPacket()
case <-client.stopChan:
return
}
}
}()

//测试用的,开300个goroutine每秒发送一个包
for i:=0;i<300;i++ {
go func() {
sendTimer := time.After(1 * time.Second)
for{
select {
case <-sendTimer:
client.sendReportPacket()
sendTimer = time.After(1 * time.Second)
case <-client.stopChan:
return
}
}
}()
}
//等待退出
<-client.stopChan
}

// 接收数据包
func (client *TcpClient)receivePackets() {
reader := bufio.NewReader(client.connection)
for {
//承接上面说的服务器端的偷懒,我这里读也只是以\n为界限来读区分包
msg, err := reader.ReadString('\n')
if err != nil {
//在这里也请处理如果服务器关闭时的异常
close(client.stopChan)
break
}
fmt.Print(msg)
}
}
//发送数据包
//仔细看代码其实这里做了两次json的序列化,有一次其实是不需要的
func (client *TcpClient)sendReportPacket() {
reportPacket := ReportPacket{
Content:getRandString(),
Timestamp:time.Now().Unix(),
Rand:rand.Int(),
}
packetBytes,err := json.Marshal(reportPacket)
if err!=nil{
fmt.Println(err.Error())
}
//这一次其实可以不需要,在封包的地方把类型和数据传进去即可
packet := Packet{
PacketType:REPORT_PACKET,
PacketContent:packetBytes,
}
sendBytes,err := json.Marshal(packet)
if err!=nil{
fmt.Println(err.Error())
}
//发送
client.connection.Write(EnPackSendData(sendBytes))
fmt.Println("Send metric data success!")
}

//使用的协议与服务器端保持一致
func EnPackSendData(sendBytes []byte) []byte {
packetLength := len(sendBytes) + 8
result := make([]byte,packetLength)
result[0] = 0xFF
result[1] = 0xFF
result[2] = byte(uint16(len(sendBytes)) >> 8)
result[3] = byte(uint16(len(sendBytes)) & 0xFF)
copy(result[4:],sendBytes)
sendCrc := crc32.ChecksumIEEE(sendBytes)
result[packetLength-4] = byte(sendCrc >> 24)
result[packetLength-3] = byte(sendCrc >> 16 & 0xFF)
result[packetLength-2] = 0xFF
result[packetLength-1] = 0xFE
fmt.Println(result)
return result
}
//发送心跳包,与发送数据包一样
func (client *TcpClient)sendHeartPacket() {
heartPacket := HeartPacket{
Version:"1.0",
Timestamp:time.Now().Unix(),
}
packetBytes,err := json.Marshal(heartPacket)
if err!=nil{
fmt.Println(err.Error())
}
packet := Packet{
PacketType:HEART_BEAT_PACKET,
PacketContent:packetBytes,
}
sendBytes,err := json.Marshal(packet)
if err!=nil{
fmt.Println(err.Error())
}
client.connection.Write(EnPackSendData(sendBytes))
fmt.Println("Send heartbeat data success!")
}
//拿一串随机字符
func getRandString()string {
length := rand.Intn(50)
strBytes := make([]byte,length)
for i:=0;i<length;i++ {
strBytes[i] = byte(rand.Intn(26) + 97)
}
return string(strBytes)
}

后记

测试过程中,一共开了7个client,共计2100个goroutine,本机启动服务器端,机器配置为i-5/8G的情况下,整体的资源使用情况如下:

需要改进的地方,也是后两篇的主题:

  • 引入内存池
  • 服务无缝重启

本文作者:智慧锦囊

本文链接:https://dongchao935.github.io/%E4%BD%BF%E7%94%A8golang%E5%BC%80%E5%8F%91Tcp%E6%9C%8D%E5%8A%A1%E7%AB%AF%E5%92%8C%E5%AE%A2%E6%88%B7%E7%AB%AF%E7%9A%84%E4%BE%8B%E5%AD%90/

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

ESC 关闭 | 导航 | Enter 打开
输入关键词开始搜索