等待一组线程正常的退出

在线运行open in new window启动 AI 助手open in new window

func main() {
	var wg sync.WaitGroup

	for i := 0; i < 100; i++ {
		// 增加等待计数
		wg.Add(1)
		go func() {
			// 退出时减少等待计数
			defer wg.Done()
			time.Sleep(1 * time.Millisecond)
		}()
	}

	wg.Wait()
	fmt.Println("goroutines finished")
}

该程序使用了 sync.WaitGroup 来等待一组协程完成。程序通过循环创建了 100 个协程,在每个协程内部使用 time.Sleep() 来模拟一些处理时间。每个协程在开始处理前调用 wg.Add(1) 来增加等待计数器,然后在处理完成后通过 defer wg.Done() 来减少等待计数器。

最后,程序调用了 wg.Wait() 方法来等待所有协程完成。一旦所有协程完成,wg.Wait() 方法将返回,程序将继续执行下面的语句。最后,程序输出 "goroutines finished" 表示所有协程都已经完成。

使用 Mutex 锁定并发访问

在线运行open in new window启动 AI 助手open in new window

var (
	count int
	mu    sync.Mutex
	wg sync.WaitGroup
)

func main() {
	for i := 0; i < 100; i++ {
		wg.Add(1)

		go func() {
			defer wg.Done()

			mu.Lock()
			defer mu.Unlock()

			for j := 0; j < 1000; j++ {
				count += 1
			}
		}()
	}

	wg.Wait()

	// 不加锁 count 可能是 89016、99924 等不到 100000
	fmt.Println(count)
}

在程序中,首先定义了一个整数变量 count 和一个互斥锁变量 mu,以及一个 WaitGroup 变量 wg。在 for 循环中,使用 wg.Add(1) 增加等待计数器,并创建了 100 个协程,每个协程会对 count 变量进行一千次加一的操作。在协程内部,首先使用 mu.Lock() 方法锁定互斥锁,以确保在一个协程修改 count 变量时,其他协程不能同时修改它。然后在 for 循环中,对 count 变量执行加一操作。最后,使用 mu.Unlock() 方法释放互斥锁。

在所有协程执行完毕后,程序使用 wg.Wait() 方法等待所有协程完成,然后输出 count 变量的值。

由于 count 变量是在多个协程之间共享的,如果不加锁,可能会导致竞态条件,从而得到不正确的结果。使用互斥锁来保护共享变量是一种常见的解决竞态条件的方法。

需要注意的是,在这个程序中,对 count 变量的操作是原子性的,因此也可以使用 Go 标准库中的 atomic 包来保护 count 变量,而不需要使用互斥锁。

使用 Once 确保只运行一次

在线运行open in new window启动 AI 助手open in new window

func main() {
	num := 0
	once := sync.Once{}

	for i := 0; i < 1000; i++ {
		go once.Do(func() {
			num += 1
		})
	}

	time.Sleep(1 * time.Second)
	fmt.Println(num) // 1
}

在程序中,首先定义了一个整数变量 num 和一个 sync.Once 类型变量 once。在 for 循环中,使用 go 关键字创建了 1000 个协程,每个协程都会调用 once.Do() 方法。once.Do() 方法接受一个函数作为参数,并确保这个函数只被执行一次。在这个例子中,函数的作用是将 num 变量加一。

由于 once.Do() 方法只会执行一次,所以实际上只有一个协程能够执行 num += 1 这个语句,其他的协程会在调用 once.Do() 方法时被阻塞。因此,最终 num 变量的值将会是 1。

程序中的 time.Sleep() 方法用于等待所有协程都调用过 once.Do() 方法,以确保 num 变量的值被正确计算。如果没有等待的话,程序可能会在 num 变量被计算之前就退出,从而得到错误的结果。

需要注意的是,sync.Once 类型的值只能被执行一次,如果尝试多次调用 once.Do() 方法,则会被忽略。

sync.Map 是线程安全的字典

在线运行open in new window启动 AI 助手open in new window

func main() {
	wg := sync.WaitGroup{}
	sm := sync.Map{}

	for i := 0; i < 10; i++ {
		wg.Add(1)

		go func(n int) {
			defer wg.Done()
			sm.Store(n, n)
		}(i)
	}

	wg.Wait()

	sm.Range(func(key, value interface{}) bool {
		fmt.Printf("%d: %d\n", key.(int), value.(int))
		return true
	})
}

在程序中,首先定义了一个 sync.WaitGroup 变量 wg 和一个 sync.Map 变量 sm。在 for 循环中,使用 wg.Add(1) 增加等待计数器,并创建了 10 个协程。每个协程会调用 sync.Map 类型的 Store() 方法,将一个整数键值对存储到映射中。由于 sync.Map 是并发安全的,因此可以在多个协程中同时对其进行读写操作。

在所有协程执行完毕后,程序使用 wg.Wait() 方法等待所有协程完成。然后,程序使用 sync.Map 类型的 Range() 方法遍历映射中所有的键值对,并输出它们的值。Range() 方法接受一个函数作为参数,这个函数会在遍历每个键值对时被调用。在这个例子中,函数的作用是将键和值分别强制转换为整数,并输出它们的值。

需要注意的是,与普通的映射类型不同,sync.Map 类型的值不能直接使用索引操作符访问,而是需要使用它提供的方法来实现对映射的读写操作。同时,sync.Map 类型的值是并发安全的,可以在多个协程中同时对其进行读写操作,而不需要显式地使用互斥锁等机制来保护其并发访问。

sync.Atomic 提供的整数原子操作

在线运行open in new window启动 AI 助手open in new window

func main() {
	var num int32
	wg := &sync.WaitGroup{}

	for i := 0; i < 100; i++ {
		wg.Add(1)

		go func() {
			defer wg.Done()
			atomic.AddInt32(&num, 1)
		}()
	}

	wg.Wait()

	fmt.Println(atomic.LoadInt32(&num)) // 100
}

在程序中,首先定义了一个 int32 类型的变量 num,以及一个 sync.WaitGroup 变量 wg。在 for 循环中,使用 wg.Add(1) 增加等待计数器,并创建了 100 个协程。每个协程会调用 atomic 包中的 AddInt32() 方法,将 num 变量的值加一。由于 AddInt32() 方法是原子的,因此可以在多个协程中同时调用,而不会发生竞态条件。

在所有协程执行完毕后,程序使用 wg.Wait() 方法等待所有协程完成。然后,程序使用 atomic 包中的 LoadInt32() 方法获取 num 变量的最终值,并输出它的值。由于 AddInt32() 方法是原子的,所以最终的 num 变量的值将会是 100。

需要注意的是,atomic 包中的操作是原子的,因此可以在多个协程中同时使用它来对共享变量进行读写操作,而不需要使用互斥锁等机制来保护其并发访问。

Go 1.19 后新的 atomic.Xxx 类型

在线运行open in new window启动 AI 助手open in new window

func main() {
	var num atomic.Int32
	wg := &sync.WaitGroup{}

	for i := 0; i < 100; i++ {
		wg.Add(1)

		go func() {
			defer wg.Done()
			num.Add(1)
		}()
	}

	wg.Wait()

	fmt.Println(num.Load()) // 100
}

与前面的程序不同的是,这里使用了 atomic.Int32 类型的变量 num,而不是 int32 类型的变量。atomic.Int32 类型提供了一些方法,可以方便地对其进行原子操作,而不需要显式地调用 atomic 包中的方法。

在程序中,首先定义了一个 atomic.Int32 类型的变量 num,以及一个 sync.WaitGroup 变量 wg。在 for 循环中,使用 wg.Add(1) 增加等待计数器,并创建了 100 个协程。每个协程会调用 num 变量的 Add() 方法,将 num 变量的值加一。由于 Add() 方法是原子的,因此可以在多个协程中同时调用,而不会发生竞态条件。

在所有协程执行完毕后,程序使用 wg.Wait() 方法等待所有协程完成。然后,程序使用 atomic 包中的 Load() 方法获取 num 变量的最终值,并输出它的值。由于 Add() 方法是原子的,所以最终的 num 变量的值将会是 100。

需要注意的是,atomic 包中的操作是原子的,因此可以在多个协程中同时使用它来对共享变量进行读写操作,而不需要使用互斥锁等机制来保护其并发访问。此外,使用 atomic.Int32 类型的变量可以更加方便地进行原子操作,而不需要显式地调用 atomic 包中的方法。

指针类型的原子存取和交换

在线运行open in new window启动 AI 助手open in new window

func main() {
	var (
		pt     atomic.Pointer[int]
		ta, tb = 1, 2
	)

	// store and load
	pt.Store(&ta)
	fmt.Println(pt.Load() == &ta) // true

	// swap
	pa := pt.Swap(&tb)
	fmt.Println(pa == &ta)        // true
	fmt.Println(pt.Load() == &tb) // true

	// CAS
	b := pt.CompareAndSwap(&ta, &tb)
	fmt.Println(b)                // false
	fmt.Println(pt.Load() == &tb) // true

	b = pt.CompareAndSwap(&tb, &ta)
	fmt.Println(b)                // true
	fmt.Println(pt.Load() == &tb) // false
}

在程序中,首先定义了一个 atomic.Pointer[int] 类型的变量 pt,以及两个整数变量 ta 和 tb,分别初始化为 1 和 2。

第一个操作是 store and load,使用 pt.Store() 方法将 ta 的地址存储到 pt 变量中,并使用 pt.Load() 方法来读取 pt 变量中的值。由于 pt.Load() 方法返回的是指针类型,因此需要使用 == 运算符来判断它是否等于 &ta。

第二个操作是 swap,使用 pt.Swap() 方法将 tb 的地址存储到 pt 变量中,并使用 pt.Swap() 方法将 ta 的地址替换为 tb 的地址。Swap() 方法会返回被替换的值的地址,因此需要使用一个变量 pa 来接收 ta 的地址,并使用 == 运算符来判断它是否等于 &ta。接着使用 pt.Load() 方法来读取 pt 变量中的值,并使用 == 运算符来判断它是否等于 &tb。

第三个操作是 CAS,使用 pt.CompareAndSwap() 方法尝试将 ta 的地址替换为 tb 的地址,如果替换成功,则返回 true,否则返回 false。由于 ta 的地址已经被替换为 tb 的地址,因此第一次调用 CompareAndSwap() 方法会返回 false。接着使用 pt.Load() 方法来读取 pt 变量中的值,并使用 == 运算符来判断它是否等于 &tb。然后再次调用 CompareAndSwap() 方法,将 tb 的地址替换为 ta 的地址。由于 tb 的地址已经是 pt 变量中的值,因此第二次调用 CompareAndSwap() 方法会返回 true。最后使用 pt.Load() 方法来读取 pt 变量中的值,并使用 == 运算符来判断它是否等于 &tb。

任何/自定义类型的原子存取

在线运行open in new window启动 AI 助手open in new window

type person struct {
	name string
	age  int
}

func main() {
	var (
		p1 = person{"zhangsan", 20}
		v  atomic.Value
	)

	v.Store(p1)
	p2 := v.Load().(person)

	fmt.Println(p2)       // {zhangsan 20}
	fmt.Println(p1 == p2) // true
}

在程序中,首先定义了一个 person 结构体类型的变量 p1,它的 name 和 age 字段分别初始化为 "zhangsan" 和 20。接着定义了一个 atomic.Value 类型的变量 v,它可以用来存储任意类型的值。

使用 v.Store() 方法将 p1 的值存储到 v 变量中。由于 v 变量是原子类型的,因此可以在多个协程中同时对它进行读写操作,而不需要使用互斥锁等机制来保护其并发访问。

使用 v.Load() 方法从 v 变量中读取存储的值,并将它强制转换为 person 结构体类型。由于 Load() 方法返回的是一个 interface{} 类型的值,因此需要使用类型断言将它转换为 person 类型。最后将读取的 person 结构体类型的值存储到 p2 变量中。

最后,程序分别输出了 p2 变量的值和 p1 == p2 的结果。由于 p1 和 p2 变量的值相同,因此 p1 == p2 的结果为 true。

Last Updated:
Contributors: Bob Wang