当前位置:首页 » 《资源分享》 » 正文

手摸手Go 并发编程基石atomic_sydhappy的专栏

14 人参与  2021年03月15日 16:03  分类 : 《资源分享》  评论

点击全文阅读


only has compared to the others early, diligently diligently, can feel the successful taste。

“高并发 高性能 高可用”一直以来作为搬砖界用力搬砖的口号。由于CPU一次读取存储数据的长度有限,比如32bit的平台修改int64需要被拆分成两次写操作,更何况对于结构体的赋值,那么对于高并发场景下我们怎么才能保证数据的完整性和一致性呢?

所以今天我们来聊聊Go的atomic包,它提供了低级别原子内存原语,对于实现同步算法起到很大作用。可以说是Go并发编程的基石,比如MutexRWMutexWaitGroupOnce等实现都依赖于atomic。当然其提供的功能需要格外小心才能正确使用,atomic大致提供了5类原子操作,因为不会被CPU中断所以在多个goroutine之间访问是安全的。

  1. SwapT函数实现的交换操作,在原子上等价于

old = *addr
*addr = new
return old
  1. CompareAndSwapT函数实现的比较并交换操作,在原子上等价于

if *addr == old {
 *addr = new
 return true
}
return false
  1. AddT函数实现的加法操作,原子上等价于

*addr += delta
return *addr
  1. LoadTStoreT函数实现的加载和存储操作,原子上等价于return *addr*addr=val

  2. atomic.Value为除了int32int64uint32uint64uintptrunsafe.Pointer之外的类型提供原子读写能力。

atomic源码分析

然而我们在atomic包下只能看到类似如下的函数定义

// SwapInt32 atomically stores new into *addr and returns the previous *addr value.
func SwapInt32(addr *int32, new int32) (old int32)
// SwapInt64 atomically stores new into *addr and returns the previous *addr value.
func SwapInt64(addr *int64, new int64) (old int64)
// SwapUint32 atomically stores new into *addr and returns the previous *addr value.
func SwapUint32(addr *uint32, new uint32) (old uint32)
... ...

并没有发现函数实现部分,但是能够找到相应汇编代码

TEXT ·SwapInt32(SB),NOSPLIT,$0
	JMP	runtime∕internal∕atomic·Xchg(SB)
TEXT ·SwapUint32(SB),NOSPLIT,$0
	JMP	runtime∕internal∕atomic·Xchg(SB)
TEXT ·SwapInt64(SB),NOSPLIT,$0
	JMP	runtime∕internal∕atomic·Xchg64(SB)
	... ...

可见它们最终都是基于运行时中runtime/internal/atomic的实现。

提前剧透一下,原子操作其实最终依赖硬件指令的支持,但是因为原子操作可能会导致goroutine阻塞,所以它同时还需要运行时调度器的配合。

案例分析

atomic.CompareAndSwapPointer为例,它只有函数定义没有函数体

// CompareAndSwapPointer executes the compare-and-swap operation for a unsafe.Pointer value.
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

通过查找,发现其本身由运行时实现

//go:linkname sync_atomic_CompareAndSwapPointer sync/atomic.CompareAndSwapPointer
//go:nosplit
func sync_atomic_CompareAndSwapPointer(ptr *unsafe.Pointer, old, new unsafe.Pointer) bool {
 if writeBarrier.enabled {
  atomicwb(ptr, new)
 }
 return sync_atomic_CompareAndSwapUintptr((*uintptr)(noescape(unsafe.Pointer(ptr))), uintptr(old), uintptr(new))
}

可以看到其最终调用了sync_atomic_CompareAndSwapUintptr,且sync_atomic_CompareAndSwapUintptr也只有函数定义没有函数体,而且也找不到运行时的实现,说明它是由编译器完成。那么来通过一个栗子一窥究竟吧

package main

import (
 "fmt"
 "sync/atomic"
 "unsafe"
)

func main() {
 var p unsafe.Pointer
 newP := 23
 atomic.CompareAndSwapPointer(&p, nil, unsafe.Pointer(&newP))
 v := (*int)(p)
 fmt.Println(*v)
}

先执行编译命令go build -gcflags="-N -l -m" -o atomic atomic.go得到二进制文件atomic`

然后执行go tool objdump -s "main.main" atomic查看下main.main`编译结果

TEXT main.main(SB) /Users/shangyindong/mywork/workspace/workspace_github/go-snippets/atomic/atomic.go
  .....
  atomic.go:12		0x10a6ff3		e82869fbff			CALL sync/atomic.CompareAndSwapPointer(SB)
  ......

go tool objdump -s "sync/atomic.CompareAndSwapPointer" atomic接着看CompareAndSwapPointer

TEXT sync/atomic.CompareAndSwapPointer(SB) /usr/local/go/src/runtime/atomic_pointer.go
......
  atomic_pointer.go:76	0x105d960		e85b8b0000		CALL sync/atomic.CompareAndSwapUintptr(SB)
......

可以看到CompareAndSwapPointer实际调用了CompareAndSwapUintptr接着看go tool objdump -s "sync/atomic.CompareAndSwapUintptr" atomic

TEXT sync/atomic.CompareAndSwapUintptr(SB) /usr/local/go/src/sync/atomic/asm.s
  asm.s:31		0x10664c0		e93bbaf9ff		JMP runtime/internal/atomic.Casuintptr(SB)
  ......

最终JMP跳转到了``,这个方法为内置汇编,看下asm_amd64.s吧

TEXT runtime∕internal∕atomic·Casuintptr(SB), NOSPLIT, $0-25
	JMP	runtime∕internal∕atomic·Cas64(SB)

进而调整到runtime/internal/atomic.Cas64

// bool	runtime∕internal∕atomic·Cas64(uint64 *val, uint64 old, uint64 new)
// Atomically:
//	if(*val == *old){
//		*val = new;
//		return 1;
//	} else {
//		return 0;
//	}
TEXT runtime∕internal∕atomic·Cas64(SB), NOSPLIT, $0-25
	MOVQ	ptr+0(FP), BX
	MOVQ	old+8(FP), AX
	MOVQ	new+16(FP), CX
	LOCK
	CMPXCHGQ	CX, 0(BX)
	SETEQ	ret+24(FP)
	RET

到这里我们能够很清晰看到,本质上原子操作最终还是依赖于CPU的Lock+CMPXCHGQ指令,Cas64(SB)总共包含7条指令

第一条指令:将ptr的值放入BX

第二条指令:将假设的旧值放入AX

第三条指令:将要比较的新值放入CX

第四条指令:LOCK并不是指令,而是作为指令前缀用来修饰CMPXCHGQ CX, 0(BX)

大致有五类指令可强制使用LOCK语义,但当LOCK前缀被置于其他指令之前或者指令没有对内存进行写操作(目标操作数可能在寄存器中)时,会抛出一个invalid-opcode异常

  • 位测试和修改指令(BTS,BTR,BTC)

  • 交换指令(XADD,CMPXCHG,CMPXCHG8B)

  • XCHG指令自动使用LOCK前缀

  • 单操作数算术和逻辑指令:INC,DEC,NOT,NEG

  • 双操作数算术和逻辑指令:ADD,ADC,SUB,SBB,AND,OR,XOR

对于Intel486和Pentium处理器,在进行加锁操作时,LOCK#信号总是在总线上发出,甚至锁定的内存区域已经缓存在处理器中。这种通过封锁总线来禁止其他CPU对内存修改进而达到原子性的效果,显然锁的力度过于粗糙。

所以在Pentium4,Intel Xeon,P6系列已经最近的处理器,如果加锁的内存区域已经缓存在处理器中,处理器可能并不对总线发出LOCK#信号,而是仅仅修改缓存中的数据,然后依赖缓存一致性协议(MESI 详见《手摸手Go 深入剖析sync.Pool》有讲解)来保证加锁操作的自动执行。缓存一致性协议会自动阻止两个或多个缓存了同一区域内存的处理器同时修改数据。

感兴趣的可以详细研究下intel开发手册卷3

第五条指令:调用CMPXCHGQ,将指令第二个操作数与累加器AX比较 ,如果相等,CX更新到BX,否则BX更新到AX`

第六条指令:AXCX相等时将1写进ret+16(FP)否则写入0

第七条指令:函数返回结束。

特殊的atomic.Value

atomic.Value是Go语言1.4版本的时候加入的,它相当于一个容器,可以原子的StoreLoad任意类型的值。是对int32int64uint32uint64uintptrunsafe.Pointer类型原子操作的补充。但它的实现不是通过汇编来完成,而是基于已有的atomic包。

看下它的基本结构

// Value的零值为nil
// 使用后禁止拷贝
type Value struct {
 v interface{}
}
// ifaceWords is interface{} internal representation.
type ifaceWords struct {
 typ  unsafe.Pointer
 data unsafe.Pointer
}

因为atomic.Value被设计为存储任意类型的值,所以它内部只有一个interface{}类型的字段。并且在atomic/value.go文件中还定义了一个ifaceWords,之前我们讲过Go的接口结构,是不是跟eface很像

type eface struct {
 _type *_type
 data  unsafe.Pointer
}

其实atomic.Value的实现原理就是将interface{}类型分解,得到类型和数据这两个unsafe.Pointer类型字段,在针对它们进行原子操作来达到interface{}类型原子操作的目的。

unsafe.Pointer

我们知道Go语言的编译器会使用静态类型检查来保证程序运行的类型安全。但它的标准库中又提供了unsafe.Pointer,可以让程序灵活的操作内存并且可以绕过Go语言的类型检查,从而可以跟任意的指针类型相互转换。

例如 字符串和byte切片之前的零拷贝转换

type StringHeader struct {
    Data uintptr
    Len  int
}

type SliceHeader struct {
    Data uintptr
    Len  int
    Cap  int
}

我们看到slice和string的底层数据结构基本一样,虽然Go语言的类型检查禁止了它们之间相互转换。但是拥有了unsafe.Pointer这个黑魔法,你就可以零拷贝实现[]byte 和string之间的转换,只需共享底层的Data和Len即可

func string2bytes(s string) []byte {
    return *(*[]byte)(unsafe.Pointer(&s))
}
func bytes2string(b []byte) string{
    return *(*string)(unsafe.Pointer(&b))
}

如果你搞清楚了unsafe.Pointer,那么接下来atomic.Value的神秘面纱也就很好揭开了。

atomic.Value提供了LoadStore两个操作,完成数据的读取和存储。

写操作Store

Store大致逻辑:

  1. 将待存储的数据和当前的值分别转换为*ifaceWords

  2. 进入一个无限for循环

    2.1 先检查现有值的typ,如果为nil表示这是第一次存储,则先调用runtime_procPin()通过修改当前g关联m的locks属性来禁止P被抢占

    2.2 尝试使用CompareAndSwapPointer将现有值的typ设置为unsafe.Pointer(^uintptr(0))方便Load 操作时判断当前状态,如果失败则解除抢占回到for循环开始位置继续执行

    2.3 如果设置成功,则可以完成第一次的数据存储

  3. 自旋等待中的gorountine如果发现uintptr(typ) == ^uintptr(0)表明第一次存储尚未完成则继续自旋等待

  4. 到这里说明第一次存储已经完成,则检查Value从始至终是否都是保存同一类型数据,不是则panic

  5. 非第一次存储,则更新数据

// Store 将Value的值设置为x
// 给定值的所有Store调用都必须使用相同的具体类型否则会像存储nil值一样会发生panic 
func (v *Value) Store(x interface{}) {
 if x == nil {
  panic("sync/atomic: store of nil value into Value")
 }
 vp := (*ifaceWords)(unsafe.Pointer(v))
 xp := (*ifaceWords)(unsafe.Pointer(&x))
 for {
  typ := LoadPointer(&vp.typ)
  if typ == nil {
      // 尝试开始第一次存储
      //禁止P被抢占 以便其他goroutine可以使用主动自旋等待来等待完成
      //GC也互补偶然看到伪造的类型
   runtime_procPin()
      // 加了个乐观锁
   if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
        //没抢到机会 则退出
    runtime_procUnpin()
    continue
   }
   // 完成第一次存储
   StorePointer(&vp.data, xp.data)
   StorePointer(&vp.typ, xp.typ)
   runtime_procUnpin()
   return
  }
  if uintptr(typ) == ^uintptr(0) {
   // 第一次存储正在进行 继续等待
   // 因为我们已经禁止了抢占所以我们可以继续自旋等待
   continue
  }
  // 第一次存储完成 检查类型并且覆盖数据
  if typ != xp.typ {
   panic("sync/atomic: store of inconsistently typed value into Value")
  }
  StorePointer(&vp.data, xp.data)
  return
 }
}

读操作Load

读取操作相对简单就不赘述。

// Load 返回最近一次Store存储的数据
// 如果没有调用Store存储数据则返回nil
func (v *Value) Load() (x interface{}) {
 vp := (*ifaceWords)(unsafe.Pointer(v))
 typ := LoadPointer(&vp.typ)
 if typ == nil || uintptr(typ) == ^uintptr(0) { 未调用Store或第一次存储尚未完成 直接返回nil
  // 第一次存储尚未完成
  return nil
 }
 data := LoadPointer(&vp.data)
 xp := (*ifaceWords)(unsafe.Pointer(&x))
 xp.typ = typ
 xp.data = data
 return
}

关于bug

atomic包中有一段这样的注释

BUG(rsc): On 386, the 64-bit functions use instructions unavailable before the Pentium MMX. On non-Linux ARM, the 64-bit functions use instructions unavailable before the ARMv6k core. On ARM, 386, and 32-bit MIPS, it is the caller's responsibility to arrange for 64-bit alignment of 64-bit words accessed atomically. The first word in a variable or in an allocated struct, array, or slice can be relied upon to be 64-bit aligned.

在《手摸手Go 你的内存对齐了吗?》中有聊到过内存对齐的内容。(不同硬件平台并不是都可以在任意地址上访问任意数据;而且如果数据没有内存对齐可能会导致CPU访问两次内存才能拿到数据,如果内存对齐一次就能完成数据读取。)

这里大概是说在ARM,386,和32位MIPS,调用者有责任安排原子访问的64位字按照8字节对齐,否则程序会panic。因为不同平台上的编译器有自己的对齐系数,32bit平台上一般是4字节对齐,而在64bit平台上一般是8字节对齐。所以32bit平台上8字节数字可能会因为内存对齐拆分成2个4字节分布。

举个栗子

package main

import (
 "fmt"
 "sync/atomic"
)

type M struct {
 x int64
 u uint32
 v int64
}

func main() {
 m := M{}
 result := atomic.AddInt64(&m.v, 1)
 fmt.Println(result)
}

GOARCH=amd64 go build pointer.go && ./pointer 执行正常

但是在386上GOARCH=386 go build pointer.go && ./pointer 程序发生panic

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x2f7c]

goroutine 1 [running]:
runtime/internal/atomic.Xadd64(0x1141612c, 0x1, 0x0, 0x53f8, 0x1141a230)
        /usr/local/go/src/runtime/internal/atomic/asm_386.s:105 +0xc
main.main()
        /Users/mywork/workspace/workspace_go/godemo/pointer/pointer.go:16 +0x40

总结

通过阅读源码,很显然atomic包中的原子操作均为底层硬件指令的协助完成,不需要加锁和解锁过程,所以对于单一变量更新保护,原子操作用起来更高效。文章篇幅关系我们这里只分析CompareAndSwapPointer,至于其他原子操作具体底层的硬件指令感兴趣的童鞋可以继续探索。

如果阅读过程中发现本文存疑或错误的地方,可以关注公众号留言。如果觉得还可以 帮忙点个在看????


点击全文阅读


本文链接:http://zhangshiyu.com/post/16299.html

指令  操作  原子  
<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1