Skip to main content

LangChain Go Map-Reduce Chain 学习案例

本项目提供了完整的LangChain Go Map-Reduce Chain实现示例,包括基础用法、高级特性和实际应用场景。

项目结构

langchain_021/
├── main.go # 基础Map-Reduce实现
├── advanced_example.go # 高级示例(并行处理、分块处理等)
├── go.mod # Go模块依赖
├── run_demo.sh # 演示运行脚本
├── CONFIG.md # 详细配置指南
└── README.md # 项目说明文档

什么是Map-Reduce Chain?

Map-Reduce Chain是一种处理大量文档或文本数据的模式,它将处理过程分为两个阶段:

  1. Map阶段:将大型任务分解为较小的子任务,并行处理每个文档
  2. Reduce阶段:将Map阶段的结果合并成最终结果

📚 Map-Reduce Chain 详细知识点

1. 核心概念

1.1 Map-Reduce 模式起源

Map-Reduce模式最初来源于函数式编程概念,后被Google用于大数据处理。在LangChain中,这个模式被用于处理大量文本数据,特别适合以下场景:

  • 处理的文档数量超过单次LLM调用的token限制
  • 需要对大量文档进行相同的处理操作
  • 希望提高处理效率通过并行化

1.2 工作原理图解

输入文档 → Map阶段 → 中间结果 → Reduce阶段 → 最终结果

文档1 ──┐
文档2 ──┼─→ [Map处理] ──┐
文档3 ──┤ ├─→ [Reduce合并] ──→ 最终摘要
文档4 ──┤ │
文档5 ──┘ │
... ────────────────────┘

1.3 与传统方法的对比

特性传统单次处理Map-Reduce Chain
处理能力受token限制无限制
处理速度线性增长可并行化
内存使用高(全量加载)低(分批处理)
容错性全或无部分失败可恢复
可扩展性有限优秀

2. 技术实现原理

2.1 Map阶段详解

Map阶段的核心是数据分解和并行处理

// Map函数的通用模式
func mapFunction(document string) (summary string, error) {
// 1. 构建针对单个文档的提示
prompt := buildMapPrompt(document)

// 2. 调用LLM处理单个文档
result := llm.Generate(prompt)

// 3. 返回处理结果
return result, nil
}

// 并行处理所有文档
for _, doc := range documents {
go func(d string) {
result := mapFunction(d)
resultChan <- result
}(doc)
}

Map阶段的关键设计要点:

  • 无状态处理:每个Map操作独立,不依赖其他文档
  • 标准化输出:所有Map结果应具有相似的格式和长度
  • 错误隔离:单个文档失败不影响其他文档处理
  • 提示优化:针对单文档处理优化提示模板

2.2 Reduce阶段详解

Reduce阶段的核心是结果聚合和信息整合

// Reduce函数的通用模式
func reduceFunction(summaries []string) (finalResult string, error) {
// 1. 合并所有Map结果
combinedInput := strings.Join(summaries, "\n---\n")

// 2. 构建Reduce提示
prompt := buildReducePrompt(combinedInput)

// 3. 生成最终结果
result := llm.Generate(prompt)

return result, nil
}

Reduce阶段的关键设计要点:

  • 信息整合:将分散的信息组织成连贯的整体
  • 去重处理:识别和合并重复信息
  • 层次结构:为大量输入可采用分层Reduce策略
  • 质量控制:确保最终结果的完整性和准确性

3. 提示工程最佳实践

3.1 Map阶段提示设计

// 优秀的Map提示模板示例
mapTemplate := `作为专业的文档分析师,请为以下文档生成结构化摘要。

文档内容:
{text}

请按以下格式输出摘要(约100字):
主题:[文档主要讨论的话题]
要点:[3-5个关键要点,用分号分隔]
结论:[文档的核心结论或观点]

输出摘要:`

Map提示的设计原则:

  • 🎯 明确任务:清楚说明要执行的具体任务
  • 📏 限制长度:指定输出长度避免结果差异过大
  • 📋 格式标准:使用统一的输出格式便于后续处理
  • 🔍 聚焦重点:引导模型关注最重要的信息

3.2 Reduce阶段提示设计

// 优秀的Reduce提示模板示例
reduceTemplate := `作为资深信息整合专家,请将以下文档摘要整合成综合报告。

输入摘要:
{summaries}

整合要求:
1. 识别共同主题和模式
2. 提取最重要的见解
3. 组织成逻辑清晰的结构
4. 避免重复信息
5. 保持客观和准确

请输出结构化的综合报告(约300字):
【总体概览】
【主要主题】
【关键发现】
【总结】

综合报告:`

Reduce提示的设计原则:

  • 🔗 关联分析:寻找信息间的联系和模式
  • 🎯 突出重点:识别最重要和最相关的信息
  • 📊 结构化输出:使用清晰的结构组织信息
  • ⚖️ 平衡性:确保不同来源信息的平衡表示

4. 高级处理策略

4.1 分层Reduce策略

当处理大量文档时,可以采用分层Reduce:

文档1-10 → Map → 摘要1-10 → Reduce1 → 中间结果1
文档11-20 → Map → 摘要11-20 → Reduce2 → 中间结果2
文档21-30 → Map → 摘要21-30 → Reduce3 → 中间结果3

中间结果1,2,3 → 最终Reduce → 最终结果

4.2 动态分块策略

// 根据内容长度动态确定分块大小
func calculateOptimalChunkSize(documents []string) int {
avgLength := calculateAverageLength(documents)
if avgLength < 1000 {
return 5 // 短文档可以更多合并
} else if avgLength < 3000 {
return 3 // 中等文档适中合并
} else {
return 1 // 长文档单独处理
}
}

4.3 智能重试机制

func processWithRetry(doc string, maxRetries int) (string, error) {
for attempt := 0; attempt < maxRetries; attempt++ {
result, err := processDocument(doc)
if err == nil {
return result, nil
}

// 指数退避策略
time.Sleep(time.Duration(math.Pow(2, float64(attempt))) * time.Second)
}
return "", errors.New("超过最大重试次数")
}

5. 性能优化策略

5.1 并发控制

// 使用带缓冲通道控制并发数
const maxConcurrent = 5
semaphore := make(chan struct{}, maxConcurrent)

for _, doc := range documents {
semaphore <- struct{}{} // 获取令牌
go func(d string) {
defer func() { <-semaphore }() // 释放令牌
processDocument(d)
}(doc)
}

5.2 结果缓存

type CacheEntry struct {
Result string
Timestamp time.Time
}

var cache = sync.Map{}

func getCachedResult(docHash string) (string, bool) {
if entry, ok := cache.Load(docHash); ok {
cacheEntry := entry.(CacheEntry)
// 检查缓存是否过期(例如24小时)
if time.Since(cacheEntry.Timestamp) < 24*time.Hour {
return cacheEntry.Result, true
}
}
return "", false
}

5.3 流式处理

func streamProcess(documents <-chan string, results chan<- string) {
for doc := range documents {
result := processDocument(doc)
results <- result
}
close(results)
}

功能特性

基础功能 (main.go)

  • ✅ 手动实现Map-Reduce流程
  • ✅ 文档摘要生成
  • ✅ 错误处理和日志输出
  • ✅ 基本链API使用

高级功能 (advanced_example.go)

  • ✅ 并行处理Map阶段
  • ✅ 分块处理大量文档
  • ✅ 情感分析应用
  • ✅ 性能监控和对比
  • ✅ 多种处理策略

快速开始

1. 环境准备

# 设置OpenAI API密钥
export OPENAI_API_KEY="your-openai-api-key"

# 可选:设置其他配置
export OPENAI_MODEL="gpt-3.5-turbo" # 默认模型

2. 安装依赖

cd langchain_021
go mod tidy

3. 运行演示

# 使用演示脚本(推荐)
./run_demo.sh

# 或直接运行
go run main.go # 基础示例
go run advanced_example.go # 高级示例

🎓 详细使用教程

教程1:基础Map-Reduce实现

步骤1:创建LLM实例

package main

import (
"context"
"github.com/tmc/langchaingo/llms/openai"
"github.com/tmc/langchaingo/prompts"
)

func main() {
// 1. 创建OpenAI LLM实例
llm, err := openai.New(
openai.WithToken(os.Getenv("OPENAI_API_KEY")),
openai.WithModel("gpt-3.5-turbo"),
)
if err != nil {
log.Fatal("创建LLM失败:", err)
}
}

步骤2:准备文档数据

// 2. 准备要处理的文档
documents := []string{
"人工智能是计算机科学的一个分支...",
"机器学习是人工智能的一个子领域...",
"深度学习是机器学习的一个子集...",
// 更多文档...
}

步骤3:实现Map阶段

// 3. Map阶段:为每个文档生成摘要
func performMapStage(ctx context.Context, llm llms.Model, documents []string) ([]string, error) {
var summaries []string

// 创建Map提示模板
mapTemplate := `请为以下文档生成一个简洁的摘要(约50字):

文档内容:
{text}

摘要:`

mapPrompt := prompts.NewPromptTemplate(mapTemplate, []string{"text"})

// 处理每个文档
for i, doc := range documents {
fmt.Printf("处理文档 %d/%d...\n", i+1, len(documents))

// 格式化提示
prompt, err := mapPrompt.Format(map[string]any{"text": doc})
if err != nil {
return nil, fmt.Errorf("格式化提示失败: %w", err)
}

// 调用LLM
response, err := llms.GenerateFromSinglePrompt(ctx, llm, prompt)
if err != nil {
return nil, fmt.Errorf("生成摘要失败: %w", err)
}

summaries = append(summaries, strings.TrimSpace(response))
}

return summaries, nil
}

步骤4:实现Reduce阶段

// 4. Reduce阶段:合并所有摘要
func performReduceStage(ctx context.Context, llm llms.Model, summaries []string) (string, error) {
// 创建Reduce提示模板
reduceTemplate := `请将以下摘要合并成一个连贯的总体摘要:

摘要列表:
{summaries}

总体摘要:`

reducePrompt := prompts.NewPromptTemplate(reduceTemplate, []string{"summaries"})

// 合并所有摘要
allSummaries := strings.Join(summaries, "\n\n")

// 格式化提示
prompt, err := reducePrompt.Format(map[string]any{
"summaries": allSummaries,
})
if err != nil {
return "", fmt.Errorf("格式化Reduce提示失败: %w", err)
}

// 调用LLM生成最终摘要
finalSummary, err := llms.GenerateFromSinglePrompt(ctx, llm, prompt)
if err != nil {
return "", fmt.Errorf("生成最终摘要失败: %w", err)
}

return strings.TrimSpace(finalSummary), nil
}

步骤5:组合完整流程

func runBasicMapReduce() error {
ctx := context.Background()

// 1. 创建LLM
llm, err := createLLM()
if err != nil {
return err
}

// 2. 准备文档
documents := prepareDocuments()

// 3. 执行Map阶段
summaries, err := performMapStage(ctx, llm, documents)
if err != nil {
return err
}

// 4. 执行Reduce阶段
finalResult, err := performReduceStage(ctx, llm, summaries)
if err != nil {
return err
}

// 5. 输出结果
fmt.Printf("最终摘要:\n%s\n", finalResult)
return nil
}

教程2:并行处理Map-Reduce

步骤1:定义结果结构

// 定义Map处理结果结构
type MapResult struct {
Index int
Summary string
Error error
}

步骤2:实现并行Map处理

func performParallelMapStage(ctx context.Context, llm llms.Model, documents []string) ([]string, error) {
// 创建结果通道
resultChan := make(chan MapResult, len(documents))
var wg sync.WaitGroup

// 控制并发数
const maxConcurrent = 5
semaphore := make(chan struct{}, maxConcurrent)

// 并行处理每个文档
for i, doc := range documents {
wg.Add(1)
go func(index int, document string) {
defer wg.Done()

// 获取并发令牌
semaphore <- struct{}{}
defer func() { <-semaphore }()

fmt.Printf("开始处理文档 %d\n", index+1)

// 处理单个文档
summary, err := processSingleDocument(ctx, llm, document)
resultChan <- MapResult{
Index: index,
Summary: summary,
Error: err,
}

fmt.Printf("完成处理文档 %d\n", index+1)
}(i, doc)
}

// 等待所有goroutine完成
wg.Wait()
close(resultChan)

// 收集和排序结果
summaries := make([]string, len(documents))
for result := range resultChan {
if result.Error != nil {
return nil, fmt.Errorf("处理文档 %d 失败: %w", result.Index+1, result.Error)
}
summaries[result.Index] = result.Summary
}

return summaries, nil
}

步骤3:添加错误处理和重试

func processSingleDocument(ctx context.Context, llm llms.Model, document string) (string, error) {
const maxRetries = 3

for attempt := 0; attempt < maxRetries; attempt++ {
summary, err := generateSummary(ctx, llm, document)
if err == nil {
return summary, nil
}

// 记录重试
log.Printf("尝试 %d 失败: %v", attempt+1, err)

// 指数退避
if attempt < maxRetries-1 {
backoff := time.Duration(math.Pow(2, float64(attempt))) * time.Second
time.Sleep(backoff)
}
}

return "", fmt.Errorf("处理文档失败,已达到最大重试次数")
}

教程3:分块处理Map-Reduce

步骤1:实现智能分块

func chunkDocuments(documents []string, chunkSize int) [][]string {
var chunks [][]string

for i := 0; i < len(documents); i += chunkSize {
end := i + chunkSize
if end > len(documents) {
end = len(documents)
}
chunks = append(chunks, documents[i:end])
}

return chunks
}

步骤2:实现分块处理逻辑

func performChunkedMapReduce(ctx context.Context, llm llms.Model, documents []string) (string, error) {
const chunkSize = 3

// 1. 将文档分块
chunks := chunkDocuments(documents, chunkSize)
fmt.Printf("将 %d 个文档分为 %d 块\n", len(documents), len(chunks))

// 2. 处理每个块
var chunkSummaries []string
for i, chunk := range chunks {
fmt.Printf("处理块 %d/%d...\n", i+1, len(chunks))

// 合并块内文档
chunkText := strings.Join(chunk, "\n\n--- 文档分隔符 ---\n\n")

// 为块生成摘要
summary, err := generateChunkSummary(ctx, llm, chunkText)
if err != nil {
return "", fmt.Errorf("处理块 %d 失败: %w", i+1, err)
}

chunkSummaries = append(chunkSummaries, summary)
}

// 3. Reduce阶段:合并块摘要
finalSummary, err := combineChunkSummaries(ctx, llm, chunkSummaries)
if err != nil {
return "", fmt.Errorf("合并块摘要失败: %w", err)
}

return finalSummary, nil
}

教程4:自定义应用场景

情感分析Map-Reduce

func performSentimentMapReduce(ctx context.Context, llm llms.Model, texts []string) error {
// Map阶段:分析每个文本的情感
sentimentTemplate := `请分析以下文本的情感倾向:

文本:{text}

请返回格式:情感|评分|原因
- 情感:积极/消极/中性
- 评分:1-10(数字)
- 原因:简要说明

分析结果:`

var sentiments []string
sentimentPrompt := prompts.NewPromptTemplate(sentimentTemplate, []string{"text"})

for i, text := range texts {
prompt, err := sentimentPrompt.Format(map[string]any{"text": text})
if err != nil {
return err
}

response, err := llms.GenerateFromSinglePrompt(ctx, llm, prompt)
if err != nil {
return err
}

sentiments = append(sentiments, response)
fmt.Printf("文本 %d 情感: %s\n", i+1, strings.TrimSpace(response))
}

// Reduce阶段:分析情感趋势
trendTemplate := `基于以下情感分析结果,请总结整体情感趋势:

情感分析结果:
{sentiments}

请提供:
1. 整体情感倾向
2. 平均情感评分
3. 情感分布统计
4. 主要发现

趋势分析:`

trendPrompt := prompts.NewPromptTemplate(trendTemplate, []string{"sentiments"})
prompt, err := trendPrompt.Format(map[string]any{
"sentiments": strings.Join(sentiments, "\n"),
})
if err != nil {
return err
}

trendAnalysis, err := llms.GenerateFromSinglePrompt(ctx, llm, prompt)
if err != nil {
return err
}

fmt.Printf("\n情感趋势分析:\n%s\n", trendAnalysis)
return nil
}

教程5:性能监控和优化

步骤1:添加性能监控

type PerformanceMetrics struct {
StartTime time.Time
MapDuration time.Duration
ReduceDuration time.Duration
TotalDuration time.Duration
DocumentCount int
ConcurrentCount int
}

func (p *PerformanceMetrics) Start() {
p.StartTime = time.Now()
}

func (p *PerformanceMetrics) MarkMapComplete() {
p.MapDuration = time.Since(p.StartTime)
}

func (p *PerformanceMetrics) MarkComplete() {
p.TotalDuration = time.Since(p.StartTime)
p.ReduceDuration = p.TotalDuration - p.MapDuration
}

func (p *PerformanceMetrics) Report() {
fmt.Printf("\n=== 性能报告 ===\n")
fmt.Printf("文档数量: %d\n", p.DocumentCount)
fmt.Printf("并发数: %d\n", p.ConcurrentCount)
fmt.Printf("Map阶段耗时: %v\n", p.MapDuration)
fmt.Printf("Reduce阶段耗时: %v\n", p.ReduceDuration)
fmt.Printf("总耗时: %v\n", p.TotalDuration)
fmt.Printf("平均每文档耗时: %v\n", p.TotalDuration/time.Duration(p.DocumentCount))
}

步骤2:集成性能监控

func runWithMetrics(documents []string) error {
metrics := &PerformanceMetrics{
DocumentCount: len(documents),
ConcurrentCount: 5,
}

metrics.Start()

// 执行Map阶段
summaries, err := performParallelMapStage(ctx, llm, documents)
if err != nil {
return err
}
metrics.MarkMapComplete()

// 执行Reduce阶段
result, err := performReduceStage(ctx, llm, summaries)
if err != nil {
return err
}
metrics.MarkComplete()

// 输出结果和性能报告
fmt.Printf("最终结果:\n%s\n", result)
metrics.Report()

return nil
}

使用示例

基础Map-Reduce实现

// Map阶段:为每个文档生成摘要
for i, doc := range documents {
prompt, err := mapPrompt.Format(map[string]any{"text": doc})
response, err := llms.GenerateFromSinglePrompt(ctx, llm, prompt)
summaries = append(summaries, response)
}

// Reduce阶段:合并所有摘要
allSummaries := strings.Join(summaries, "\n\n")
finalSummary, err := llms.GenerateFromSinglePrompt(ctx, llm, reducePrompt)

并行处理Map-Reduce

// 使用goroutine并行处理Map阶段
var wg sync.WaitGroup
resultChan := make(chan MapReduceResult, len(documents))

for i, doc := range documents {
wg.Add(1)
go func(index int, document string) {
defer wg.Done()
// 处理单个文档
result := processDocument(document)
resultChan <- MapReduceResult{Index: index, Summary: result}
}(i, doc)
}

应用场景

1. 文档摘要

  • 大量报告的综合摘要
  • 新闻文章的主题提取
  • 学术论文的关键点总结

2. 情感分析

  • 用户评论情感趋势分析
  • 社交媒体情感监控
  • 产品反馈情感统计

3. 内容分类

  • 文档自动分类
  • 主题标签生成
  • 内容质量评估

4. 信息提取

  • 关键信息抽取
  • 实体识别汇总
  • 结构化数据生成

性能对比

运行高级示例可以看到不同处理方式的性能对比:

顺序处理Map-Reduce: ~30秒 (6个文档)
并行处理Map-Reduce: ~8秒 (6个文档)
分块处理Map-Reduce: ~12秒 (6个文档)

配置选项

详细配置选项包括:

  • 环境变量配置
  • 性能优化设置
  • 成本控制策略
  • 故障排除指南

Map-Reduce的优势

  1. 可扩展性:可以处理任意数量的文档
  2. 并行处理:Map阶段可以并行执行,提高效率
  3. 模块化:Map和Reduce阶段可以独立优化和测试
  4. 内存效率:避免将所有文档同时加载到内存中
  5. 容错性:单个文档处理失败不影响其他文档

注意事项

  1. API配额:注意OpenAI API的调用频率限制
  2. 成本控制:大量文档处理可能产生较高费用
  3. 网络稳定性:确保网络连接稳定,考虑添加重试机制
  4. 内存管理:处理大量数据时注意内存使用

扩展建议

  1. 添加缓存机制:缓存已处理的结果
  2. 实现断点续传:支持中断后继续处理
  3. 添加更多LLM支持:支持其他模型提供商
  4. 优化提示模板:针对特定任务优化提示词
  5. 添加结果验证:验证处理结果的质量

🔧 故障排除指南

常见问题和解决方案

1. API相关问题

问题:401 Unauthorized

Error: 401 Unauthorized - Invalid API key

解决方案:

  • 检查 OPENAI_API_KEY 环境变量是否正确设置
  • 确认API密钥有效且未过期
  • 验证API密钥格式(应以 sk- 开头)

问题:429 Rate Limit

Error: 429 Too Many Requests - Rate limit exceeded

解决方案:

// 添加速率限制控制
rateLimiter := time.NewTicker(time.Second / 2) // 每秒最多2个请求
defer rateLimiter.Stop()

for _, doc := range documents {
<-rateLimiter.C // 等待速率限制
processDocument(doc)
}

2. 内存和性能问题

问题:内存不足

Error: runtime: out of memory

解决方案:

// 实现流式处理
func streamProcessDocuments(documents []string, batchSize int) error {
for i := 0; i < len(documents); i += batchSize {
end := i + batchSize
if end > len(documents) {
end = len(documents)
}

batch := documents[i:end]
if err := processBatch(batch); err != nil {
return err
}

// 强制垃圾回收
runtime.GC()
}
return nil
}

问题:并发过高导致性能下降

// 动态调整并发数
func adaptiveConcurrency(baseCount int, successRate float64) int {
if successRate > 0.95 {
return baseCount + 1 // 成功率高,增加并发
} else if successRate < 0.8 {
return baseCount - 1 // 成功率低,减少并发
}
return baseCount
}

3. 网络和连接问题

问题:网络超时

Error: context deadline exceeded

解决方案:

// 设置合理的超时时间
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()

// 添加重试机制
func retryableCall(ctx context.Context, fn func() error) error {
const maxRetries = 3
for i := 0; i < maxRetries; i++ {
if err := fn(); err != nil {
if i == maxRetries-1 {
return err
}
// 指数退避
backoff := time.Duration(math.Pow(2, float64(i))) * time.Second
select {
case <-time.After(backoff):
continue
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
return nil
}

4. 数据处理问题

问题:Map结果格式不一致

// 添加结果验证和标准化
func validateAndNormalizeResult(result string) (string, error) {
// 检查结果长度
if len(result) < 10 {
return "", errors.New("结果太短")
}

// 标准化格式
result = strings.TrimSpace(result)

// 检查是否包含必要信息
if !strings.Contains(result, "主题") {
return "", errors.New("结果缺少主题信息")
}

return result, nil
}

问题:Reduce阶段输入过长

// 实现分层Reduce
func hierarchicalReduce(summaries []string, maxBatchSize int) (string, error) {
if len(summaries) <= maxBatchSize {
return performSingleReduce(summaries)
}

// 分批处理
var intermediateResults []string
for i := 0; i < len(summaries); i += maxBatchSize {
end := i + maxBatchSize
if end > len(summaries) {
end = len(summaries)
}

batch := summaries[i:end]
result, err := performSingleReduce(batch)
if err != nil {
return "", err
}
intermediateResults = append(intermediateResults, result)
}

// 递归处理中间结果
return hierarchicalReduce(intermediateResults, maxBatchSize)
}

调试技巧

1. 启用详细日志

type Logger struct {
level int
}

func (l *Logger) Debug(msg string, args ...interface{}) {
if l.level >= 3 {
log.Printf("[DEBUG] "+msg, args...)
}
}

func (l *Logger) Info(msg string, args ...interface{}) {
if l.level >= 2 {
log.Printf("[INFO] "+msg, args...)
}
}

func (l *Logger) Error(msg string, args ...interface{}) {
if l.level >= 1 {
log.Printf("[ERROR] "+msg, args...)
}
}

2. 性能分析

import _ "net/http/pprof"

func startProfiling() {
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
}

// 使用方法:
// go tool pprof http://localhost:6060/debug/pprof/heap

3. 中间结果保存

func saveIntermediateResults(results []string, filename string) error {
data, err := json.Marshal(results)
if err != nil {
return err
}
return os.WriteFile(filename, data, 0644)
}

func loadIntermediateResults(filename string) ([]string, error) {
data, err := os.ReadFile(filename)
if err != nil {
return nil, err
}

var results []string
err = json.Unmarshal(data, &results)
return results, err
}

🏆 最佳实践总结

1. 设计原则

Map阶段设计

  • 保持无状态:每个Map操作应该独立
  • 标准化输出:确保所有Map结果格式一致
  • 错误隔离:单个失败不应影响整体
  • 幂等性:相同输入应产生相同输出

Reduce阶段设计

  • 信息整合:有效合并来自Map阶段的信息
  • 去重处理:识别并处理重复信息
  • 结构化输出:提供清晰、有组织的最终结果
  • 质量控制:确保输出的完整性和准确性

2. 性能优化

并发控制

// 最佳实践:动态并发控制
type ConcurrencyController struct {
semaphore chan struct{}
maxConcurrent int
currentLoad int
mutex sync.Mutex
}

func (c *ConcurrencyController) Acquire() {
c.semaphore <- struct{}{}
c.mutex.Lock()
c.currentLoad++
c.mutex.Unlock()
}

func (c *ConcurrencyController) Release() {
<-c.semaphore
c.mutex.Lock()
c.currentLoad--
c.mutex.Unlock()
}

资源管理

// 连接池管理
type LLMPool struct {
clients chan llms.Model
factory func() (llms.Model, error)
}

func NewLLMPool(size int, factory func() (llms.Model, error)) *LLMPool {
pool := &LLMPool{
clients: make(chan llms.Model, size),
factory: factory,
}

// 预创建连接
for i := 0; i < size; i++ {
client, err := factory()
if err == nil {
pool.clients <- client
}
}

return pool
}

func (p *LLMPool) Get() llms.Model {
select {
case client := <-p.clients:
return client
default:
// 如果池为空,创建新连接
client, _ := p.factory()
return client
}
}

func (p *LLMPool) Put(client llms.Model) {
select {
case p.clients <- client:
default:
// 池已满,丢弃连接
}
}

3. 错误处理策略

// 分级错误处理
type ErrorSeverity int

const (
ErrorSeverityLow ErrorSeverity = iota
ErrorSeverityMedium
ErrorSeverityHigh
ErrorSeverityCritical
)

func classifyError(err error) ErrorSeverity {
switch {
case strings.Contains(err.Error(), "rate limit"):
return ErrorSeverityMedium
case strings.Contains(err.Error(), "unauthorized"):
return ErrorSeverityCritical
case strings.Contains(err.Error(), "timeout"):
return ErrorSeverityLow
default:
return ErrorSeverityMedium
}
}

func handleError(err error, context string) error {
severity := classifyError(err)

switch severity {
case ErrorSeverityLow:
log.Printf("可重试错误 [%s]: %v", context, err)
return err // 允许重试
case ErrorSeverityCritical:
log.Fatalf("致命错误 [%s]: %v", context, err)
return err
default:
log.Printf("一般错误 [%s]: %v", context, err)
return err
}
}

4. 监控和观测

// 指标收集
type Metrics struct {
ProcessedDocuments int64
FailedDocuments int64
TotalProcessTime time.Duration
AverageProcessTime time.Duration
mutex sync.RWMutex
}

func (m *Metrics) RecordProcess(duration time.Duration, success bool) {
m.mutex.Lock()
defer m.mutex.Unlock()

if success {
m.ProcessedDocuments++
} else {
m.FailedDocuments++
}

m.TotalProcessTime += duration
total := m.ProcessedDocuments + m.FailedDocuments
if total > 0 {
m.AverageProcessTime = m.TotalProcessTime / time.Duration(total)
}
}

func (m *Metrics) Report() {
m.mutex.RLock()
defer m.mutex.RUnlock()

fmt.Printf("=== 处理指标 ===\n")
fmt.Printf("成功处理: %d\n", m.ProcessedDocuments)
fmt.Printf("处理失败: %d\n", m.FailedDocuments)
fmt.Printf("成功率: %.2f%%\n", float64(m.ProcessedDocuments)/float64(m.ProcessedDocuments+m.FailedDocuments)*100)
fmt.Printf("平均处理时间: %v\n", m.AverageProcessTime)
}

5. 提示工程技巧

提示模板版本控制

type PromptTemplate struct {
Version string
Template string
Description string
Variables []string
}

var PromptTemplates = map[string]PromptTemplate{
"map_summary_v1": {
Version: "1.0",
Template: "为以下文档生成摘要:\n{text}\n摘要:",
Description: "基础摘要模板",
Variables: []string{"text"},
},
"map_summary_v2": {
Version: "2.0",
Template: "作为专业分析师,请为以下文档生成结构化摘要(约100字):\n\n{text}\n\n格式:主题|要点|结论\n摘要:",
Description: "结构化摘要模板",
Variables: []string{"text"},
},
}

A/B测试框架

func runABTest(documentsA, documentsB []string, templateA, templateB string) {
var wg sync.WaitGroup
resultsA := make(chan []string, 1)
resultsB := make(chan []string, 1)

wg.Add(2)
go func() {
defer wg.Done()
results, _ := processWithTemplate(documentsA, templateA)
resultsA <- results
}()

go func() {
defer wg.Done()
results, _ := processWithTemplate(documentsB, templateB)
resultsB <- results
}()

wg.Wait()
close(resultsA)
close(resultsB)

// 比较结果质量
compareResults(<-resultsA, <-resultsB)
}

学习价值

通过本项目,你可以学习到:

  • LangChain Go的基本使用方法
  • Map-Reduce模式的实现原理
  • 并行处理和性能优化技巧
  • 实际NLP任务的解决方案
  • Go语言并发编程最佳实践

贡献

欢迎提交issue和pull request来改进这个学习案例!