Go的并行计算
基础概念
使用go(关键字)创建一个「类似于线程」的东西(Goroutines)
Goroutines结束的两种方式
主线程结束、程序退出 → 所有Goroutines也结束(这种方式应该一般不会用吧)
传递消息 → Goroutines自己主动结束
使用Channel进行通信
基础示例
// 注释中使用了「线程」一词 只是为了方便理解 实际上可能有些不准确
package main
import (
"fmt"
"sync"
"time"
)
const target int = 10
var wg sync.WaitGroup
var count_locker sync.Mutex
var count int = 0
// 数据生产者
func recv(c chan int) {
for i := 0; i < target; i++ {
// 存数据 满则阻塞
c <- i
time.Sleep(time.Microsecond * 300)
}
// 标记线程结束
wg.Done()
}
// 数据消费者
func proc(c chan int) {
for {
var next_id int
count_locker.Lock() // 上锁
next_id = count // 领取下一个任务ID
count += 1
count_locker.Unlock() // 解锁
// 如果已经处理完毕
if next_id >= target {
break
}
// 取数据 空则阻塞
t := <-c
time.Sleep(time.Second)
fmt.Println("proc", t)
}
wg.Done()
}
func main() {
// chan 需要使用 make 创建 这里也指定了 chan 的容量大小
// chan 用于传递数据
c := make(chan int, 3)
// 线程计数器
wg.Add(1)
// 在普通的函数调用前面加上 go 就变成异步了
go recv(c)
for i := 0; i < 3; i++ {
wg.Add(1)
go proc(c)
}
// 等待所有线程完成
wg.Wait()
fmt.Println("done")
}
进阶用法
// AI Generated
package main
import (
"context" // 用于控制 goroutine 的生命周期,如取消、超时
"fmt" // 用于打印输出
"math/rand" // 生成随机数
"sync" // 提供并发控制工具,如 WaitGroup
"time" // 时间相关的操作,如延时、超时
)
// Task 是一个自定义结构体,代表一个要被处理的任务
type Task struct {
ID int // 任务的编号(标识)
}
// processTask 是一个函数,模拟处理任务的过程
// 参数:ctx 是上下文(用于取消任务),task 是当前要处理的任务
// 返回值:error 表示是否处理出错或被取消
func processTask(ctx context.Context, task Task) error {
// select 是 Go 的关键字,用来监听多个 channel 的情况
select {
// 模拟一个处理耗时,随机在 0-1000 毫秒之间
case <-time.After(time.Duration(rand.Intn(1000)) * time.Millisecond):
fmt.Printf("Task %d done\n", task.ID)
return nil
// 如果 context 被取消或超时,提前终止任务
case <-ctx.Done():
// Done() 返回一个 channel,当 context 被取消或超时时会触发
fmt.Printf("Task %d cancelled or timed out\n", task.ID)
return ctx.Err() // 返回 context 的错误原因
}
}
// worker 是一个“工人”函数,会作为 goroutine 被运行
// 它从任务通道中不断取出任务并处理,直到通道关闭或 context 取消
// 参数说明:
// - ctx:上下文控制(用于取消)
// - id:工人的编号(方便调试)
// - tasks:接收任务的 channel(只读)
// - wg:WaitGroup,用于等待所有 worker 完成
func worker(ctx context.Context, id int, tasks <-chan Task, wg *sync.WaitGroup) {
defer wg.Done() // 当函数结束时,通知 WaitGroup 该 goroutine 已结束
for {
select {
case <-ctx.Done():
// 说明超时或手动取消发生了,worker 主动退出
// 补充说明:ctx 是全局的,只要它超时了,所有 worker 都会收到通知,及时退出,避免无谓的资源浪费
fmt.Printf("Worker %d exiting due to context\n", id)
return
case task, ok := <-tasks:
// 从 channel 读取任务
// ok 为 false 表示通道已关闭
if !ok {
fmt.Printf("Worker %d done (no more tasks)\n", id)
return
}
// 正常处理任务
processTask(ctx, task)
}
}
}
// main 是程序入口函数,Go 的执行从这里开始
func main() {
numWorkers := 5 // 启动 5 个 worker 并发处理任务
numTasks := 20 // 要处理的总任务数量是 20 个
// 创建一个缓冲通道(channel),类型为 Task,容量为 numTasks
// 通道是 goroutine 之间通信的机制
tasks := make(chan Task, numTasks)
// 创建一个 context,它会在 3 秒后自动取消
// 重要理解点:
// - 从创建这一刻起,3 秒倒计时就开始了(不是从每个任务启动时开始);
// - 所有 goroutine 共享这个 context,也就共享一个“超时时钟”;
// - 超过时间后,context 会自动调用 cancel,所有 `<-ctx.Done()` 都会触发。
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
// 有人会疑惑:cancel 在最后调用,会不会太晚?
// 实际上,3 秒超时是自动生效的;
// defer cancel() 只是为了确保无论是否超时,都能释放 context 所占的资源。
// WaitGroup 用于等待所有 worker 执行完成
var wg sync.WaitGroup
// 启动多个 worker(每个是一个独立 goroutine)
for i := 0; i < numWorkers; i++ {
wg.Add(1) // 注册一个 worker
go worker(ctx, i, tasks, &wg) // 使用 go 关键字启动 goroutine
}
// 向任务通道中发送任务(由 main 线程发出)
for i := 0; i < numTasks; i++ {
tasks <- Task{ID: i} // 创建一个任务并发送到通道中
}
close(tasks) // 所有任务发完后,关闭通道,通知 worker 不再有新任务
// 等待所有 worker 执行完毕(或因 context 被取消提前退出)
wg.Wait()
// 所有任务处理完或超时取消后,打印退出提示
fmt.Println("All tasks processed or cancelled.")
}