Zhonghui

每个不曾起舞的日子,都是对生命的辜负

User Tools

Site Tools


程序:go:并行

Go的并行计算


基础概念

  1. 使用go(关键字)创建一个「类似于线程」的东西(Goroutines)
    1. Goroutines结束的两种方式
      1. 主线程结束、程序退出 → 所有Goroutines也结束(这种方式应该一般不会用吧)
      2. 传递消息 → Goroutines自己主动结束
  2. 使用Channel进行通信

基础示例

// 注释中使用了「线程」一词 只是为了方便理解 实际上可能有些不准确
 
package main
 
import (
	"fmt"
	"sync"
	"time"
)
 
const target int = 10
 
var wg sync.WaitGroup
var count_locker sync.Mutex
var count int = 0
 
// 数据生产者
func recv(c chan int) {
	for i := 0; i < target; i++ {
		// 存数据 满则阻塞
		c <- i
		time.Sleep(time.Microsecond * 300)
	}
	// 标记线程结束
	wg.Done()
}
 
// 数据消费者
func proc(c chan int) {
	for {
		var next_id int
		count_locker.Lock() // 上锁
		next_id = count     // 领取下一个任务ID
		count += 1
		count_locker.Unlock() // 解锁
 
		// 如果已经处理完毕
		if next_id >= target {
			break
		}
 
		// 取数据 空则阻塞
		t := <-c
		time.Sleep(time.Second)
		fmt.Println("proc", t)
	}
	wg.Done()
}
 
func main() {
	// chan 需要使用 make 创建 这里也指定了 chan 的容量大小
	// chan 用于传递数据
	c := make(chan int, 3)
 
	// 线程计数器
	wg.Add(1)
	// 在普通的函数调用前面加上 go 就变成异步了
	go recv(c)
 
	for i := 0; i < 3; i++ {
		wg.Add(1)
		go proc(c)
	}
 
	// 等待所有线程完成
	wg.Wait()
	fmt.Println("done")
}

进阶用法

// AI Generated
 
package main
 
import (
	"context"   // 用于控制 goroutine 的生命周期,如取消、超时
	"fmt"       // 用于打印输出
	"math/rand" // 生成随机数
	"sync"      // 提供并发控制工具,如 WaitGroup
	"time"      // 时间相关的操作,如延时、超时
)
 
// Task 是一个自定义结构体,代表一个要被处理的任务
type Task struct {
	ID int // 任务的编号(标识)
}
 
// processTask 是一个函数,模拟处理任务的过程
// 参数:ctx 是上下文(用于取消任务),task 是当前要处理的任务
// 返回值:error 表示是否处理出错或被取消
func processTask(ctx context.Context, task Task) error {
	// select 是 Go 的关键字,用来监听多个 channel 的情况
	select {
	// 模拟一个处理耗时,随机在 0-1000 毫秒之间
	case <-time.After(time.Duration(rand.Intn(1000)) * time.Millisecond):
		fmt.Printf("Task %d done\n", task.ID)
		return nil
 
	// 如果 context 被取消或超时,提前终止任务
	case <-ctx.Done():
		// Done() 返回一个 channel,当 context 被取消或超时时会触发
		fmt.Printf("Task %d cancelled or timed out\n", task.ID)
		return ctx.Err() // 返回 context 的错误原因
	}
}
 
// worker 是一个“工人”函数,会作为 goroutine 被运行
// 它从任务通道中不断取出任务并处理,直到通道关闭或 context 取消
// 参数说明:
// - ctx:上下文控制(用于取消)
// - id:工人的编号(方便调试)
// - tasks:接收任务的 channel(只读)
// - wg:WaitGroup,用于等待所有 worker 完成
func worker(ctx context.Context, id int, tasks <-chan Task, wg *sync.WaitGroup) {
	defer wg.Done() // 当函数结束时,通知 WaitGroup 该 goroutine 已结束
 
	for {
		select {
		case <-ctx.Done():
			// 说明超时或手动取消发生了,worker 主动退出
			// 补充说明:ctx 是全局的,只要它超时了,所有 worker 都会收到通知,及时退出,避免无谓的资源浪费
			fmt.Printf("Worker %d exiting due to context\n", id)
			return
 
		case task, ok := <-tasks:
			// 从 channel 读取任务
			// ok 为 false 表示通道已关闭
			if !ok {
				fmt.Printf("Worker %d done (no more tasks)\n", id)
				return
			}
			// 正常处理任务
			processTask(ctx, task)
		}
	}
}
 
// main 是程序入口函数,Go 的执行从这里开始
func main() {
	numWorkers := 5 // 启动 5 个 worker 并发处理任务
	numTasks := 20  // 要处理的总任务数量是 20 个
 
	// 创建一个缓冲通道(channel),类型为 Task,容量为 numTasks
	// 通道是 goroutine 之间通信的机制
	tasks := make(chan Task, numTasks)
 
	// 创建一个 context,它会在 3 秒后自动取消
	// 重要理解点:
	// - 从创建这一刻起,3 秒倒计时就开始了(不是从每个任务启动时开始);
	// - 所有 goroutine 共享这个 context,也就共享一个“超时时钟”;
	// - 超过时间后,context 会自动调用 cancel,所有 `<-ctx.Done()` 都会触发。
	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()
	// 有人会疑惑:cancel 在最后调用,会不会太晚?
	// 实际上,3 秒超时是自动生效的;
	//    defer cancel() 只是为了确保无论是否超时,都能释放 context 所占的资源。
 
	// WaitGroup 用于等待所有 worker 执行完成
	var wg sync.WaitGroup
 
	// 启动多个 worker(每个是一个独立 goroutine)
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)                     // 注册一个 worker
		go worker(ctx, i, tasks, &wg) // 使用 go 关键字启动 goroutine
	}
 
	// 向任务通道中发送任务(由 main 线程发出)
	for i := 0; i < numTasks; i++ {
		tasks <- Task{ID: i} // 创建一个任务并发送到通道中
	}
	close(tasks) // 所有任务发完后,关闭通道,通知 worker 不再有新任务
 
	// 等待所有 worker 执行完毕(或因 context 被取消提前退出)
	wg.Wait()
 
	// 所有任务处理完或超时取消后,打印退出提示
	fmt.Println("All tasks processed or cancelled.")
}
/var/www/DokuWikiStick/dokuwiki/data/pages/程序/go/并行.txt · Last modified: 2025/07/08 05:36 by zhonghui