并发安全Sync包的使用
有時候在Go代碼中可能會存在多個goroutine同時操作一個資源(臨界區(qū)),這種情況會發(fā)生競態(tài)問題(數(shù)據(jù)競態(tài))。Sync包主要實現(xiàn)了并發(fā)任務同步WaitGroup的幾種方法和并發(fā)安全的互斥鎖和讀寫鎖方法,還實現(xiàn)了比較特殊的兩個方法,一個是保持只執(zhí)行一次的Once方法和線程安全的Map。
sync.WaitGroup(同步等待)
sync.WaitGroup內(nèi)部維護著一個計數(shù)器Add(),計數(shù)器的值可以增加和減少。例如當我們啟動了N 個并發(fā)任務時,就將計數(shù)器值增加N。每個任務完成時通過調(diào)用Done()方法將計數(shù)器減1,底層為Add(-1)。通過調(diào)用Wait()來等待并發(fā)任務執(zhí)行完,當計數(shù)器值為0時,表示所有并發(fā)任務已經(jīng)完成。
var x int64 var wg sync.WaitGroupfunc add() {for i := 0; i < 5000; i++ {x = x + 1 //數(shù)據(jù)競爭}wg.Done() } func main() {wg.Add(2)go add()go add()wg.Wait()fmt.Println(x) }上面的代碼中我們開啟了兩個goroutine去累加變量x的值,這兩個goroutine在訪問和修改x變量的時候就會存在數(shù)據(jù)競爭,導致最后的結(jié)果與期待的不符。
sync.Mutex(互斥鎖)
互斥鎖是一種常用的控制共享資源訪問的方法,它能夠保證同時只有一個goroutine可以訪問共享資源。Go語言中使用sync包的Mutex類型來實現(xiàn)互斥鎖。 使用互斥鎖來修復上面代碼的問題:
var x int64 var wg sync.WaitGroup var lock sync.Mutexfunc add() {for i := 0; i < 5000; i++ {lock.Lock() // 加鎖x = x + 1lock.Unlock() // 解鎖}wg.Done() } func main() {wg.Add(2)go add()go add()wg.Wait()fmt.Println(x) }使用互斥鎖能夠保證同一時間有且只有一個goroutine進入臨界區(qū),其他的goroutine則在等待鎖;當互斥鎖釋放后,等待的goroutine才可以獲取鎖進入臨界區(qū),多個goroutine同時等待一個鎖時,喚醒的策略是隨機的。
sync.RWMutex(讀寫互斥鎖)
互斥鎖是完全互斥的,但是有很多實際的場景下是讀多寫少的,當我們并發(fā)的去讀取一個資源不涉及資源修改的時候是沒有必要加鎖的,這種場景下使用讀寫鎖是更好的一種選擇。讀寫鎖在Go語言中使用sync包中的RWMutex類型。
讀寫鎖分為兩種:讀鎖和寫鎖。當一個goroutine獲取讀鎖之后,其他的goroutine如果是獲取讀鎖會繼續(xù)獲得鎖,如果是獲取寫鎖就會等待;當一個goroutine獲取寫鎖之后,其他的goroutine無論是獲取讀鎖還是寫鎖都會等待。
讀寫鎖示例:
var (x int64wg sync.WaitGrouplock sync.Mutexrwlock sync.RWMutex )func write() {// lock.Lock() // 加互斥鎖rwlock.Lock() // 加寫鎖x = x + 1time.Sleep(10 * time.Millisecond) // 假設(shè)讀操作耗時10毫秒rwlock.Unlock() // 解寫鎖// lock.Unlock() // 解互斥鎖wg.Done() }func read() {// lock.Lock() // 加互斥鎖rwlock.RLock() // 加讀鎖time.Sleep(time.Millisecond) // 假設(shè)讀操作耗時1毫秒rwlock.RUnlock() // 解讀鎖// lock.Unlock() // 解互斥鎖wg.Done() }func main() {start := time.Now()for i := 0; i < 10; i++ {wg.Add(1)go write()}for i := 0; i < 1000; i++ {wg.Add(1)go read()}wg.Wait()end := time.Now()fmt.Println(end.Sub(start)) }需要注意的是讀寫鎖非常適合讀多寫少的場景,如果讀和寫的操作差別不大,讀寫鎖的優(yōu)勢就發(fā)揮不出來。
sync.Once(單例)
說在前面的話:這是一個進階知識點。
在編程的很多場景下我們需要確保某些操作在高并發(fā)的場景下只執(zhí)行一次,例如只加載一次配置文件、只關(guān)閉一次通道等。
Go語言中的sync包中提供了一個針對只執(zhí)行一次場景的解決方案–sync.Once。
sync.Once只有一個Do方法,其簽名如下:
func (o *Once) Do(f func()) {}注意:如果要執(zhí)行的函數(shù)f需要傳遞參數(shù)就需要搭配閉包來使用。
sync.Map(線程安全map)
Go語言中內(nèi)置的map不是并發(fā)安全的。請看下面的示例:
var m = make(map[string]int)func get(key string) int {return m[key] }func set(key string, value int) {m[key] = value }func main() {wg := sync.WaitGroup{}for i := 0; i < 20; i++ {wg.Add(1)go func(n int) {key := strconv.Itoa(n)set(key, n)fmt.Printf("k=:%v,v:=%v\n", key, get(key))wg.Done()}(i)}wg.Wait() }上面的代碼開啟少量幾個goroutine的時候可能沒什么問題,當并發(fā)多了之后執(zhí)行上面的代碼就會報fatal error: concurrent map writes錯誤。
像這種場景下就需要為map加鎖來保證并發(fā)的安全性了,Go語言的sync包中提供了一個開箱即用的并發(fā)安全版map–sync.Map。開箱即用表示不用像內(nèi)置的map一樣使用make函數(shù)初始化就能直接使用。同時sync.Map內(nèi)置了諸如Store、Load、LoadOrStore、Delete、Range等操作方法。
var m = sync.Map{}func main() {wg := sync.WaitGroup{}for i := 0; i < 20; i++ {wg.Add(1)go func(n int) {key := strconv.Itoa(n)m.Store(key, n)value, _ := m.Load(key)fmt.Printf("k=:%v,v:=%v\n", key, value)wg.Done()}(i)}wg.Wait() }sync/atomic(原子操作)
代碼中的加鎖操作因為涉及內(nèi)核態(tài)的上下文切換會比較耗時、代價比較高。針對基本數(shù)據(jù)類型我們還可以使用原子操作來保證并發(fā)安全,因為原子操作是Go語言提供的方法它在用戶態(tài)就可以完成,因此性能比加鎖操作更好。Go語言中原子操作由內(nèi)置的標準庫sync/atomic提供。
| func LoadInt32(addr *int32) (val int32) | 讀取操作 |
| func LoadInt64(addr *int64) (val int64) | 讀取操作 |
| func LoadUint32(addr *uint32) (val uint32) | 讀取操作 |
| func LoadUint64(addr *uint64) (val uint64) | 讀取操作 |
| func LoadUintptr(addr *uintptr) (val uintptr) | 讀取操作 |
| func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer) | 讀取操作 |
| func StoreInt32(addr *int32, val int32) | 寫入操作 |
| func StoreInt64(addr *int64, val int64) | 寫入操作 |
| func StoreUint32(addr *uint32, val uint32) | 寫入操作 |
| func StoreUint64(addr *uint64, val uint64) | 寫入操作 |
| func StoreUintptr(addr *uintptr, val uintptr) | 寫入操作 |
| func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer) | 寫入操作 |
| func AddInt32(addr *int32, delta int32) (new int32) | 修改操作 |
| func AddInt64(addr *int64, delta int64) (new int64) | 修改操作 |
| func AddUint32(addr *uint32, delta uint32) (new uint32) | 修改操作 |
| func AddUint64(addr *uint64, delta uint64) (new uint64) | 修改操作 |
| func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) | 修改操作 |
| func SwapInt32(addr *int32, new int32) (old int32) | 交換操作 |
| func SwapInt64(addr *int64, new int64) (old int64) | 交換操作 |
| func SwapUint32(addr *uint32, new uint32) (old uint32) | 交換操作 |
| func SwapUint64(addr *uint64, new uint64) (old uint64) | 交換操作 |
| func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) | 交換操作 |
| func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) | 交換操作 |
| func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) | 比較并交換操作 |
| func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) | 比較并交換操作 |
| func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) | 比較并交換操作 |
| func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) | 比較并交換操作 |
| func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) | 比較并交換操作 |
| func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) | 比較并交換操作 |
示例:
package mainimport ("fmt""sync""sync/atomic""time" )var x int64 var l sync.Mutex var wg sync.WaitGroup// 普通版加函數(shù) func add() {// x = x + 1x++ // 等價于上面的操作wg.Done() }// 互斥鎖版加函數(shù) func mutexAdd() {l.Lock()x++l.Unlock()wg.Done() }// 原子操作版加函數(shù) func atomicAdd() {atomic.AddInt64(&x, 1)wg.Done() }func main() {start := time.Now()for i := 0; i < 10000; i++ {wg.Add(1)//go add() // 普通版add函數(shù) 不是并發(fā)安全的//go mutexAdd() // 加鎖版add函數(shù) 是并發(fā)安全的,但是加鎖性能開銷大go atomicAdd() // 原子操作版add函數(shù) 是并發(fā)安全,性能優(yōu)于加鎖版}wg.Wait()end := time.Now()fmt.Println(x)fmt.Println(end.Sub(start)) }總結(jié)
以上是生活随笔為你收集整理的并发安全Sync包的使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 定时器Timer和Ticker
- 下一篇: 反射(reflect)机制