// 注释中使用了「线程」一词 只是为了方便理解 实际上可能有些不准确 package main import ( "fmt" "sync" "time" ) const target int = 10 var wg sync.WaitGroup var count_locker sync.Mutex var count int = 0 // 数据生产者 func recv(c chan int) { for i := 0; i < target; i++ { // 存数据 满则阻塞 c <- i time.Sleep(time.Microsecond * 300) } // 标记线程结束 wg.Done() } // 数据消费者 func proc(c chan int) { for { var next_id int count_locker.Lock() // 上锁 next_id = count // 领取下一个任务ID count += 1 count_locker.Unlock() // 解锁 // 如果已经处理完毕 if next_id >= target { break } // 取数据 空则阻塞 t := <-c time.Sleep(time.Second) fmt.Println("proc", t) } wg.Done() } func main() { // chan 需要使用 make 创建 这里也指定了 chan 的容量大小 // chan 用于传递数据 c := make(chan int, 3) // 线程计数器 wg.Add(1) // 在普通的函数调用前面加上 go 就变成异步了 go recv(c) for i := 0; i < 3; i++ { wg.Add(1) go proc(c) } // 等待所有线程完成 wg.Wait() fmt.Println("done") }
// AI Generated package main import ( "context" // 用于控制 goroutine 的生命周期,如取消、超时 "fmt" // 用于打印输出 "math/rand" // 生成随机数 "sync" // 提供并发控制工具,如 WaitGroup "time" // 时间相关的操作,如延时、超时 ) // Task 是一个自定义结构体,代表一个要被处理的任务 type Task struct { ID int // 任务的编号(标识) } // processTask 是一个函数,模拟处理任务的过程 // 参数:ctx 是上下文(用于取消任务),task 是当前要处理的任务 // 返回值:error 表示是否处理出错或被取消 func processTask(ctx context.Context, task Task) error { // select 是 Go 的关键字,用来监听多个 channel 的情况 select { // 模拟一个处理耗时,随机在 0-1000 毫秒之间 case <-time.After(time.Duration(rand.Intn(1000)) * time.Millisecond): fmt.Printf("Task %d done\n", task.ID) return nil // 如果 context 被取消或超时,提前终止任务 case <-ctx.Done(): // Done() 返回一个 channel,当 context 被取消或超时时会触发 fmt.Printf("Task %d cancelled or timed out\n", task.ID) return ctx.Err() // 返回 context 的错误原因 } } // worker 是一个“工人”函数,会作为 goroutine 被运行 // 它从任务通道中不断取出任务并处理,直到通道关闭或 context 取消 // 参数说明: // - ctx:上下文控制(用于取消) // - id:工人的编号(方便调试) // - tasks:接收任务的 channel(只读) // - wg:WaitGroup,用于等待所有 worker 完成 func worker(ctx context.Context, id int, tasks <-chan Task, wg *sync.WaitGroup) { defer wg.Done() // 当函数结束时,通知 WaitGroup 该 goroutine 已结束 for { select { case <-ctx.Done(): // 说明超时或手动取消发生了,worker 主动退出 // 补充说明:ctx 是全局的,只要它超时了,所有 worker 都会收到通知,及时退出,避免无谓的资源浪费 fmt.Printf("Worker %d exiting due to context\n", id) return case task, ok := <-tasks: // 从 channel 读取任务 // ok 为 false 表示通道已关闭 if !ok { fmt.Printf("Worker %d done (no more tasks)\n", id) return } // 正常处理任务 processTask(ctx, task) } } } // main 是程序入口函数,Go 的执行从这里开始 func main() { numWorkers := 5 // 启动 5 个 worker 并发处理任务 numTasks := 20 // 要处理的总任务数量是 20 个 // 创建一个缓冲通道(channel),类型为 Task,容量为 numTasks // 通道是 goroutine 之间通信的机制 tasks := make(chan Task, numTasks) // 创建一个 context,它会在 3 秒后自动取消 // 重要理解点: // - 从创建这一刻起,3 秒倒计时就开始了(不是从每个任务启动时开始); // - 所有 goroutine 共享这个 context,也就共享一个“超时时钟”; // - 超过时间后,context 会自动调用 cancel,所有 `<-ctx.Done()` 都会触发。 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() // 有人会疑惑:cancel 在最后调用,会不会太晚? // 实际上,3 秒超时是自动生效的; // defer cancel() 只是为了确保无论是否超时,都能释放 context 所占的资源。 // WaitGroup 用于等待所有 worker 执行完成 var wg sync.WaitGroup // 启动多个 worker(每个是一个独立 goroutine) for i := 0; i < numWorkers; i++ { wg.Add(1) // 注册一个 worker go worker(ctx, i, tasks, &wg) // 使用 go 关键字启动 goroutine } // 向任务通道中发送任务(由 main 线程发出) for i := 0; i < numTasks; i++ { tasks <- Task{ID: i} // 创建一个任务并发送到通道中 } close(tasks) // 所有任务发完后,关闭通道,通知 worker 不再有新任务 // 等待所有 worker 执行完毕(或因 context 被取消提前退出) wg.Wait() // 所有任务处理完或超时取消后,打印退出提示 fmt.Println("All tasks processed or cancelled.") }