第六章:并发编程基础

作者:Administrator 发布时间: 2026-03-13 阅读量:1 评论数:0

第六章:并发编程基础

6.1 Goroutine

创建Goroutine

package main
​
import (
    "fmt"
    "time"
)
​
func sayHello() {
    for i := 0; i < 5; i++ {
        fmt.Println("Hello")
        time.Sleep(100 * time.Millisecond)
    }
}
​
func sayWorld() {
    for i := 0; i < 5; i++ {
        fmt.Println("World")
        time.Sleep(100 * time.Millisecond)
    }
}
​
func main() {
    // 串行执行
    // sayHello()
    // sayWorld()
    
    // 并发执行
    go sayHello()
    go sayWorld()
    
    // 等待goroutine完成
    time.Sleep(1 * time.Second)
    fmt.Println("主函数结束")
}

匿名函数Goroutine

package main
​
import (
    "fmt"
    "time"
)
​
func main() {
    // 带参数的匿名函数
    for i := 0; i < 3; i++ {
        go func(n int) {
            fmt.Printf("Goroutine %d\n", n)
        }(i)  // 立即传参,避免闭包问题
    }
    
    // 闭包问题示例
    for i := 0; i < 3; i++ {
        go func() {
            // i是共享变量,可能输出相同值
            fmt.Printf("闭包: %d\n", i)
        }()
    }
    
    time.Sleep(1 * time.Second)
}

6.2 Channel

基本使用

package main
​
import "fmt"
​
func main() {
    // 创建channel
    ch := make(chan int)
    
    // 发送数据
    go func() {
        ch <- 100
    }()
    
    // 接收数据
    value := <-ch
    fmt.Println(value)
    
    // 缓冲channel
    bufferedCh := make(chan int, 3)
    bufferedCh <- 1
    bufferedCh <- 2
    bufferedCh <- 3
    
    fmt.Println(<-bufferedCh)
    fmt.Println(<-bufferedCh)
    fmt.Println(<-bufferedCh)
}

关闭Channel

package main
​
import "fmt"
​
func producer(ch chan int) {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)  // 关闭channel
}
​
func main() {
    ch := make(chan int)
    
    go producer(ch)
    
    // 使用range遍历
    for value := range ch {
        fmt.Println(value)
    }
    
    // 检查channel是否关闭
    ch2 := make(chan int, 2)
    ch2 <- 1
    close(ch2)
    
    v, ok := <-ch2
    fmt.Println(v, ok)  // 1 true
    
    v, ok = <-ch2
    fmt.Println(v, ok)  // 0 false(已关闭)
}

Select多路复用

package main
​
import (
    "fmt"
    "time"
)
​
func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    
    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "来自ch1"
    }()
    
    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "来自ch2"
    }()
    
    // 使用select等待多个channel
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println(msg1)
        case msg2 := <-ch2:
            fmt.Println(msg2)
        }
    }
    
    // 带超时的select
    ch3 := make(chan string)
    select {
    case msg := <-ch3:
        fmt.Println(msg)
    case <-time.After(1 * time.Second):
        fmt.Println("超时")
    }
    
    // 非阻塞select
    select {
    case msg := <-ch3:
        fmt.Println(msg)
    default:
        fmt.Println("没有数据")
    }
}

6.3 同步原语

WaitGroup

package main
​
import (
    "fmt"
    "sync"
    "time"
)
​
func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()  // 完成时调用
    
    fmt.Printf("Worker %d 开始\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d 完成\n", id)
}
​
func main() {
    var wg sync.WaitGroup
    
    for i := 1; i <= 3; i++ {
        wg.Add(1)  // 增加计数
        go worker(i, &wg)
    }
    
    wg.Wait()  // 等待所有完成
    fmt.Println("所有worker完成")
}

Mutex

package main
​
import (
    "fmt"
    "sync"
)
​
type Counter struct {
    mu    sync.Mutex
    count int
}
​
func (c *Counter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count++
}
​
func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.count
}
​
// RWMutex - 读写锁
type Cache struct {
    mu    sync.RWMutex
    data  map[string]string
}
​
func (c *Cache) Get(key string) string {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.data[key]
}
​
func (c *Cache) Set(key, value string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.data[key] = value
}
​
func main() {
    var wg sync.WaitGroup
    counter := Counter{}
    
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }
    
    wg.Wait()
    fmt.Println("计数:", counter.Value())
}

Once

package main
​
import (
    "fmt"
    "sync"
)
​
var once sync.Once
var instance *Singleton
​
type Singleton struct {
    data string
}
​
func GetInstance() *Singleton {
    once.Do(func() {
        instance = &Singleton{data: "单例对象"}
        fmt.Println("创建实例")
    })
    return instance
}
​
func main() {
    // 多次调用只执行一次
    for i := 0; i < 10; i++ {
        go func() {
            _ = GetInstance()
        }()
    }
    
    fmt.Scanln()
}

6.4 Context

package main
​
import (
    "context"
    "fmt"
    "time"
)
​
func worker(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("收到取消信号,退出")
            return
        default:
            fmt.Println("工作中...")
            time.Sleep(500 * time.Millisecond)
        }
    }
}
​
func main() {
    // 带超时的context
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    
    go worker(ctx)
    
    time.Sleep(3 * time.Second)
    
    // 手动取消
    ctx2, cancel2 := context.WithCancel(context.Background())
    go func() {
        time.Sleep(1 * time.Second)
        cancel2()
    }()
    
    select {
    case <-ctx2.Done():
        fmt.Println("已取消")
    }
}

6.5 项目实战:并发下载器

package main
​
import (
    "fmt"
    "io"
    "net/http"
    "os"
    "sync"
    "time"
)
​
type Downloader struct {
    urls      []string
    workers   int
    results   chan Result
}
​
type Result struct {
    URL      string
    Size     int64
    Duration time.Duration
    Error    error
}
​
func NewDownloader(urls []string, workers int) *Downloader {
    return &Downloader{
        urls:    urls,
        workers: workers,
        results: make(chan Result, len(urls)),
    }
}
​
func (d *Downloader) download(url string) Result {
    start := time.Now()
    
    resp, err := http.Get(url)
    if err != nil {
        return Result{URL: url, Error: err}
    }
    defer resp.Body.Close()
    
    size, _ := io.Copy(io.Discard, resp.Body)
    
    return Result{
        URL:      url,
        Size:     size,
        Duration: time.Since(start),
    }
}
​
func (d *Downloader) Run() []Result {
    var wg sync.WaitGroup
    urlCh := make(chan string, len(d.urls))
    
    // 启动worker
    for i := 0; i < d.workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for url := range urlCh {
                d.results <- d.download(url)
            }
        }()
    }
    
    // 发送任务
    for _, url := range d.urls {
        urlCh <- url
    }
    close(urlCh)
    
    // 等待完成
    wg.Wait()
    close(d.results)
    
    // 收集结果
    var results []Result
    for r := range d.results {
        results = append(results, r)
    }
    
    return results
}
​
func main() {
    urls := []string{
        "https://golang.org",
        "https://google.com",
        "https://github.com",
    }
    
    downloader := NewDownloader(urls, 3)
    results := downloader.Run()
    
    for _, r := range results {
        if r.Error != nil {
            fmt.Printf("失败: %s, 错误: %v\n", r.URL, r.Error)
        } else {
            fmt.Printf("成功: %s, 大小: %d, 耗时: %v\n",
                r.URL, r.Size, r.Duration)
        }
    }
}

评论