LangChain Go Map-Reduce Chain 学习案例
本项目提供了完整的LangChain Go Map-Reduce Chain实现示例,包括基础用法、高级特性和实际应用场景。
项目结构
langchain_021/
├── main.go # 基础Map-Reduce实现
├── advanced_example.go # 高级示例(并行处理、分块处理等)
├── go.mod # Go模块依赖
├── run_demo.sh # 演示运行脚本
├── CONFIG.md # 详细配置指南
└── README.md # 项目说明文档
什么是Map-Reduce Chain?
Map-Reduce Chain是一种处理大量文档或文本数据的模式,它将处理过程分为两个阶段:
- Map阶段:将大型任务分解为较小的子任务,并行处理每个文档
- Reduce阶段:将Map阶段的结果合并成最终结果
📚 Map-Reduce Chain 详细知识点
1. 核心概念
1.1 Map-Reduce 模式起源
Map-Reduce模式最初来源于函数式编程概念,后被Google用于大数据处理。在LangChain中,这个模式被用于处理大量文本数据,特别适合以下场景:
- 处理的文档数量超过单次LLM调用的token限制
- 需要对大量文档进行相同的处理操作
- 希望提高处理效率通过并行化
1.2 工作原理图解
输入文档 → Map阶段 → 中间结果 → Reduce阶段 → 最终结果
文档1 ──┐
文档2 ──┼─→ [Map处理] ──┐
文档3 ──┤ ├─→ [Reduce合并] ──→ 最终摘要
文档4 ──┤ │
文档5 ──┘ │
... ────────────────────┘
1.3 与传统方法的对比
| 特性 | 传统单次处理 | Map-Reduce Chain |
|---|---|---|
| 处理能力 | 受token限制 | 无限制 |
| 处理速度 | 线性增长 | 可并行化 |
| 内存使用 | 高(全量加载) | 低(分批处理) |
| 容错性 | 全或无 | 部分失败可恢复 |
| 可扩展性 | 有限 | 优秀 |
2. 技术实现原理
2.1 Map阶段详解
Map阶段的核心是数据分解和并行处理:
// Map函数的通用模式
func mapFunction(document string) (summary string, error) {
// 1. 构建针对单个文档的提示
prompt := buildMapPrompt(document)
// 2. 调用LLM处理单个文档
result := llm.Generate(prompt)
// 3. 返回处理结果
return result, nil
}
// 并行处理所有文档
for _, doc := range documents {
go func(d string) {
result := mapFunction(d)
resultChan <- result
}(doc)
}
Map阶段的关键设计要点:
- ✅ 无状态处理:每个Map操作独立,不依赖其他文档
- ✅ 标准化输出:所有Map结果应具有相似的格式和长度
- ✅ 错误隔离:单个文档失败不影响其他文档处理
- ✅ 提示优化:针对单文档处理优化提示模板
2.2 Reduce阶段详解
Reduce阶段的核心是结果聚合和信息整合:
// Reduce函数的通用模式
func reduceFunction(summaries []string) (finalResult string, error) {
// 1. 合并所有Map结果
combinedInput := strings.Join(summaries, "\n---\n")
// 2. 构建Reduce提示
prompt := buildReducePrompt(combinedInput)
// 3. 生成最终结果
result := llm.Generate(prompt)
return result, nil
}
Reduce阶段的关键设计要点:
- ✅ 信息整合:将分散的信息组织成连贯的整体
- ✅ 去重处理:识别和合并重复信息
- ✅ 层次结构:为大量输入可采用分层Reduce策略
- ✅ 质量控制:确保最终结果的完整性和准确性
3. 提示工程最佳实践
3.1 Map阶段提示设计
// 优秀的Map提示模板示例
mapTemplate := `作为专业的文档分析师,请为以下文档生成结构化摘要。
文档内容:
{text}
请按以下格式输出摘要(约100字):
主题:[文档主要讨论的话题]
要点:[3-5个关键要点,用分号分隔]
结论:[文档的核心结论或观点]
输出摘要:`
Map提示的设计原则:
- 🎯 明确任务:清楚说明要执行的具体任务
- 📏 限制长度:指定输出长度避免结果差异过大
- 📋 格式标准:使用统一的输出格式便于后续处理
- 🔍 聚焦重点:引导模型关注最重要的信息
3.2 Reduce阶段提示设计
// 优秀的Reduce提示模板示例
reduceTemplate := `作为资深信息整合专家,请将以下文档摘要整合成综合报告。
输入摘要:
{summaries}
整合要求:
1. 识别共同主题和模式
2. 提取最重要的见解
3. 组织成逻辑清晰的结构
4. 避免重复信息
5. 保持客观和准确
请输出结构化的综合报告(约300字):
【总体概览】
【主要主题】
【关键发现】
【总结】
综合报告:`
Reduce提示的设计原则:
- 🔗 关联分析:寻找信息间的联系和模式
- 🎯 突出重点:识别最重要和最相关的信息
- 📊 结构化输出:使用清晰的结构组织信息
- ⚖️ 平衡性:确保不同来源信息的平衡表示
4. 高级处理策略
4.1 分层Reduce策略
当处理大量文档时,可以采用分层Reduce:
文档1-10 → Map → 摘要1-10 → Reduce1 → 中间结果1
文档11-20 → Map → 摘要11-20 → Reduce2 → 中间结果2
文档21-30 → Map → 摘要21-30 → Reduce3 → 中间结果3
↓
中间结果1,2,3 → 最终Reduce → 最终结果
4.2 动态分块策略
// 根据内容长度动态确定分块大小
func calculateOptimalChunkSize(documents []string) int {
avgLength := calculateAverageLength(documents)
if avgLength < 1000 {
return 5 // 短文档可以更多合并
} else if avgLength < 3000 {
return 3 // 中等文档适中合并
} else {
return 1 // 长文档单独处理
}
}
4.3 智能重试机制
func processWithRetry(doc string, maxRetries int) (string, error) {
for attempt := 0; attempt < maxRetries; attempt++ {
result, err := processDocument(doc)
if err == nil {
return result, nil
}
// 指数退避策略
time.Sleep(time.Duration(math.Pow(2, float64(attempt))) * time.Second)
}
return "", errors.New("超过最大重试次数")
}
5. 性能优化策略
5.1 并发控制
// 使用带缓冲通道控制并发数
const maxConcurrent = 5
semaphore := make(chan struct{}, maxConcurrent)
for _, doc := range documents {
semaphore <- struct{}{} // 获取令牌
go func(d string) {
defer func() { <-semaphore }() // 释放令牌
processDocument(d)
}(doc)
}
5.2 结果缓存
type CacheEntry struct {
Result string
Timestamp time.Time
}
var cache = sync.Map{}
func getCachedResult(docHash string) (string, bool) {
if entry, ok := cache.Load(docHash); ok {
cacheEntry := entry.(CacheEntry)
// 检查缓存是否过期(例如24小时)
if time.Since(cacheEntry.Timestamp) < 24*time.Hour {
return cacheEntry.Result, true
}
}
return "", false
}
5.3 流式处理
func streamProcess(documents <-chan string, results chan<- string) {
for doc := range documents {
result := processDocument(doc)
results <- result
}
close(results)
}
功能特性
基础功能 (main.go)
- ✅ 手动实现Map-Reduce流程
- ✅ 文档摘要生成
- ✅ 错误处理和日志输出
- ✅ 基本链API使用
高级功能 (advanced_example.go)
- ✅ 并行处理Map阶段
- ✅ 分块处理大量文档
- ✅ 情感分析应用
- ✅ 性能监控和对比
- ✅ 多种处理策略
快速开始
1. 环境准备
# 设置OpenAI API密钥
export OPENAI_API_KEY="your-openai-api-key"
# 可选:设置其他配置
export OPENAI_MODEL="gpt-3.5-turbo" # 默认模型
2. 安装依赖
cd langchain_021
go mod tidy
3. 运行演示
# 使用演示脚本(推荐)
./run_demo.sh
# 或直接运行
go run main.go # 基础示例
go run advanced_example.go # 高级示例
🎓 详细使用教程
教程1:基础Map-Reduce实现
步骤1:创建LLM实例
package main
import (
"context"
"github.com/tmc/langchaingo/llms/openai"
"github.com/tmc/langchaingo/prompts"
)
func main() {
// 1. 创建OpenAI LLM实例
llm, err := openai.New(
openai.WithToken(os.Getenv("OPENAI_API_KEY")),
openai.WithModel("gpt-3.5-turbo"),
)
if err != nil {
log.Fatal("创建LLM失败:", err)
}
}
步骤2:准备文档数据
// 2. 准备要处理的文档
documents := []string{
"人工智能是计算机科学的一个分支...",
"机器学习是人工智能的一个子领域...",
"深度学习是机器学习的一个子集...",
// 更多文档...
}
步骤3:实现Map阶段
// 3. Map阶段:为每个文档生成摘要
func performMapStage(ctx context.Context, llm llms.Model, documents []string) ([]string, error) {
var summaries []string
// 创建Map提示模板
mapTemplate := `请为以下文档生成一个简洁的摘要(约50字):
文档内容:
{text}
摘要:`
mapPrompt := prompts.NewPromptTemplate(mapTemplate, []string{"text"})
// 处理每个文档
for i, doc := range documents {
fmt.Printf("处理文档 %d/%d...\n", i+1, len(documents))
// 格式化提示
prompt, err := mapPrompt.Format(map[string]any{"text": doc})
if err != nil {
return nil, fmt.Errorf("格式化提示失败: %w", err)
}
// 调用LLM
response, err := llms.GenerateFromSinglePrompt(ctx, llm, prompt)
if err != nil {
return nil, fmt.Errorf("生成摘要失败: %w", err)
}
summaries = append(summaries, strings.TrimSpace(response))
}
return summaries, nil
}
步骤4:实现Reduce阶段
// 4. Reduce阶段:合并所有摘要
func performReduceStage(ctx context.Context, llm llms.Model, summaries []string) (string, error) {
// 创建Reduce提示模板
reduceTemplate := `请将以下摘要合并成一个连贯的总体摘要:
摘要列表:
{summaries}
总体摘要:`
reducePrompt := prompts.NewPromptTemplate(reduceTemplate, []string{"summaries"})
// 合并所有摘要
allSummaries := strings.Join(summaries, "\n\n")
// 格式化提示
prompt, err := reducePrompt.Format(map[string]any{
"summaries": allSummaries,
})
if err != nil {
return "", fmt.Errorf("格式化Reduce提示失败: %w", err)
}
// 调用LLM生成最终摘要
finalSummary, err := llms.GenerateFromSinglePrompt(ctx, llm, prompt)
if err != nil {
return "", fmt.Errorf("生成最终摘要失败: %w", err)
}
return strings.TrimSpace(finalSummary), nil
}
步骤5:组合完整流程
func runBasicMapReduce() error {
ctx := context.Background()
// 1. 创建LLM
llm, err := createLLM()
if err != nil {
return err
}
// 2. 准备文档
documents := prepareDocuments()
// 3. 执行Map阶段
summaries, err := performMapStage(ctx, llm, documents)
if err != nil {
return err
}
// 4. 执行Reduce阶段
finalResult, err := performReduceStage(ctx, llm, summaries)
if err != nil {
return err
}
// 5. 输出结果
fmt.Printf("最终摘要:\n%s\n", finalResult)
return nil
}
教程2:并行处理Map-Reduce
步骤1:定义结果结构
// 定义Map处理结果结构
type MapResult struct {
Index int
Summary string
Error error
}
步骤2:实现并行Map处理
func performParallelMapStage(ctx context.Context, llm llms.Model, documents []string) ([]string, error) {
// 创建结果通道
resultChan := make(chan MapResult, len(documents))
var wg sync.WaitGroup
// 控制并发数
const maxConcurrent = 5
semaphore := make(chan struct{}, maxConcurrent)
// 并行处理每个文档
for i, doc := range documents {
wg.Add(1)
go func(index int, document string) {
defer wg.Done()
// 获取并发令牌
semaphore <- struct{}{}
defer func() { <-semaphore }()
fmt.Printf("开始处理文档 %d\n", index+1)
// 处理单个文档
summary, err := processSingleDocument(ctx, llm, document)
resultChan <- MapResult{
Index: index,
Summary: summary,
Error: err,
}
fmt.Printf("完成处理文档 %d\n", index+1)
}(i, doc)
}
// 等待所有goroutine完成
wg.Wait()
close(resultChan)
// 收集和排序结果
summaries := make([]string, len(documents))
for result := range resultChan {
if result.Error != nil {
return nil, fmt.Errorf("处理文档 %d 失败: %w", result.Index+1, result.Error)
}
summaries[result.Index] = result.Summary
}
return summaries, nil
}
步骤3:添加错误处理和重试
func processSingleDocument(ctx context.Context, llm llms.Model, document string) (string, error) {
const maxRetries = 3
for attempt := 0; attempt < maxRetries; attempt++ {
summary, err := generateSummary(ctx, llm, document)
if err == nil {
return summary, nil
}
// 记录重试
log.Printf("尝试 %d 失败: %v", attempt+1, err)
// 指数退避
if attempt < maxRetries-1 {
backoff := time.Duration(math.Pow(2, float64(attempt))) * time.Second
time.Sleep(backoff)
}
}
return "", fmt.Errorf("处理文档失败,已达到最大重试次数")
}
教程3:分块处理Map-Reduce
步骤1:实现智能分块
func chunkDocuments(documents []string, chunkSize int) [][]string {
var chunks [][]string
for i := 0; i < len(documents); i += chunkSize {
end := i + chunkSize
if end > len(documents) {
end = len(documents)
}
chunks = append(chunks, documents[i:end])
}
return chunks
}
步骤2:实现分块处理逻辑
func performChunkedMapReduce(ctx context.Context, llm llms.Model, documents []string) (string, error) {
const chunkSize = 3
// 1. 将文档分块
chunks := chunkDocuments(documents, chunkSize)
fmt.Printf("将 %d 个文档分为 %d 块\n", len(documents), len(chunks))
// 2. 处理每个块
var chunkSummaries []string
for i, chunk := range chunks {
fmt.Printf("处理块 %d/%d...\n", i+1, len(chunks))
// 合并块内文档
chunkText := strings.Join(chunk, "\n\n--- 文档分隔符 ---\n\n")
// 为块生成摘要
summary, err := generateChunkSummary(ctx, llm, chunkText)
if err != nil {
return "", fmt.Errorf("处理块 %d 失败: %w", i+1, err)
}
chunkSummaries = append(chunkSummaries, summary)
}
// 3. Reduce阶段:合并块摘要
finalSummary, err := combineChunkSummaries(ctx, llm, chunkSummaries)
if err != nil {
return "", fmt.Errorf("合并块摘要失败: %w", err)
}
return finalSummary, nil
}
教程4:自定义应用场景
情感分析Map-Reduce
func performSentimentMapReduce(ctx context.Context, llm llms.Model, texts []string) error {
// Map阶段:分析每个文本的情感
sentimentTemplate := `请分析以下文本的情感倾向:
文本:{text}
请返回格式:情感|评分|原因
- 情感:积极/消极/中性
- 评分:1-10(数字)
- 原因:简要说明
分析结果:`
var sentiments []string
sentimentPrompt := prompts.NewPromptTemplate(sentimentTemplate, []string{"text"})
for i, text := range texts {
prompt, err := sentimentPrompt.Format(map[string]any{"text": text})
if err != nil {
return err
}
response, err := llms.GenerateFromSinglePrompt(ctx, llm, prompt)
if err != nil {
return err
}
sentiments = append(sentiments, response)
fmt.Printf("文本 %d 情感: %s\n", i+1, strings.TrimSpace(response))
}
// Reduce阶段:分析情感趋势
trendTemplate := `基于以下情感分析结果,请总结整体情感趋势:
情感分析结果:
{sentiments}
请提供:
1. 整体情感倾向
2. 平均情感评分
3. 情感分布统计
4. 主要发现
趋势分析:`
trendPrompt := prompts.NewPromptTemplate(trendTemplate, []string{"sentiments"})
prompt, err := trendPrompt.Format(map[string]any{
"sentiments": strings.Join(sentiments, "\n"),
})
if err != nil {
return err
}
trendAnalysis, err := llms.GenerateFromSinglePrompt(ctx, llm, prompt)
if err != nil {
return err
}
fmt.Printf("\n情感趋势分析:\n%s\n", trendAnalysis)
return nil
}
教程5:性能监控和优化
步骤1:添加性能监控
type PerformanceMetrics struct {
StartTime time.Time
MapDuration time.Duration
ReduceDuration time.Duration
TotalDuration time.Duration
DocumentCount int
ConcurrentCount int
}
func (p *PerformanceMetrics) Start() {
p.StartTime = time.Now()
}
func (p *PerformanceMetrics) MarkMapComplete() {
p.MapDuration = time.Since(p.StartTime)
}
func (p *PerformanceMetrics) MarkComplete() {
p.TotalDuration = time.Since(p.StartTime)
p.ReduceDuration = p.TotalDuration - p.MapDuration
}
func (p *PerformanceMetrics) Report() {
fmt.Printf("\n=== 性能报告 ===\n")
fmt.Printf("文档数量: %d\n", p.DocumentCount)
fmt.Printf("并发数: %d\n", p.ConcurrentCount)
fmt.Printf("Map阶段耗时: %v\n", p.MapDuration)
fmt.Printf("Reduce阶段耗时: %v\n", p.ReduceDuration)
fmt.Printf("总耗时: %v\n", p.TotalDuration)
fmt.Printf("平均每文档耗时: %v\n", p.TotalDuration/time.Duration(p.DocumentCount))
}
步骤2:集成性能监控
func runWithMetrics(documents []string) error {
metrics := &PerformanceMetrics{
DocumentCount: len(documents),
ConcurrentCount: 5,
}
metrics.Start()
// 执行Map阶段
summaries, err := performParallelMapStage(ctx, llm, documents)
if err != nil {
return err
}
metrics.MarkMapComplete()
// 执行Reduce阶段
result, err := performReduceStage(ctx, llm, summaries)
if err != nil {
return err
}
metrics.MarkComplete()
// 输出结果和性能报告
fmt.Printf("最终结果:\n%s\n", result)
metrics.Report()
return nil
}
使用示例
基础Map-Reduce实现
// Map阶段:为每个文档生成摘要
for i, doc := range documents {
prompt, err := mapPrompt.Format(map[string]any{"text": doc})
response, err := llms.GenerateFromSinglePrompt(ctx, llm, prompt)
summaries = append(summaries, response)
}
// Reduce阶段:合并所有摘要
allSummaries := strings.Join(summaries, "\n\n")
finalSummary, err := llms.GenerateFromSinglePrompt(ctx, llm, reducePrompt)
并行处理Map-Reduce
// 使用goroutine并行处理Map阶段
var wg sync.WaitGroup
resultChan := make(chan MapReduceResult, len(documents))
for i, doc := range documents {
wg.Add(1)
go func(index int, document string) {
defer wg.Done()
// 处理单个文档
result := processDocument(document)
resultChan <- MapReduceResult{Index: index, Summary: result}
}(i, doc)
}
应用场景
1. 文档摘要
- 大量报告的综合摘要
- 新闻文章的主题提取
- 学术论文的关键点总结
2. 情感分析
- 用户评论情感趋势分析
- 社交媒体情感监控
- 产品反馈情感统计
3. 内容分类
- 文档自动分类
- 主题标签生成
- 内容质量评估
4. 信息提取
- 关键信息抽取
- 实体识别汇总
- 结构化数据生成
性能对比
运行高级示例可以看到不同处理方式的性能对比:
顺序处理Map-Reduce: ~30秒 (6个文档)
并行处理Map-Reduce: ~8秒 (6个文档)
分块处理Map-Reduce: ~12秒 (6个文档)
配置选项
详细配置选项包括:
- 环境变量配置
- 性能优化设置
- 成本控制策略
- 故障排除指南
Map-Reduce的优势
- 可扩展性:可以处理任意数量的文档
- 并行处理:Map阶段可以并行执行,提高效率
- 模块化:Map和Reduce阶段可以独立优化和测试
- 内存效率:避免将所有文档同时加载到内存中
- 容错性:单个文档处理失败不影响其他文档
注意事项
- API配额:注意OpenAI API的调用频率限制
- 成本控制:大量文档处理可能产生较高费用
- 网络稳定性:确保网络连接稳定,考虑添加重试机制
- 内存管理:处理大量数据时注意内存使用
扩展建议
- 添加缓存机制:缓存已处理的结果
- 实现断点续传:支持中断后继续处理
- 添加更多LLM支持:支持其他模型提供商
- 优化提示模板:针对特定任务优化提示词
- 添加结果验证:验证处理结果的质量
🔧 故障排除指南
常见问题和解决方案
1. API相关问题
问题:401 Unauthorized
Error: 401 Unauthorized - Invalid API key
解决方案:
- 检查
OPENAI_API_KEY环境变量是否正确设置 - 确认API密钥有效且未过期
- 验证API密钥格式(应以
sk-开头)
问题:429 Rate Limit
Error: 429 Too Many Requests - Rate limit exceeded
解决方案:
// 添加速率限制控制
rateLimiter := time.NewTicker(time.Second / 2) // 每秒最多2个请求
defer rateLimiter.Stop()
for _, doc := range documents {
<-rateLimiter.C // 等待速率限制
processDocument(doc)
}
2. 内存和性能问题
问题:内存不足
Error: runtime: out of memory
解决方案:
// 实现流式处理
func streamProcessDocuments(documents []string, batchSize int) error {
for i := 0; i < len(documents); i += batchSize {
end := i + batchSize
if end > len(documents) {
end = len(documents)
}
batch := documents[i:end]
if err := processBatch(batch); err != nil {
return err
}
// 强制垃圾回收
runtime.GC()
}
return nil
}
问题:并发过高导致性能下降
// 动态调整并发数
func adaptiveConcurrency(baseCount int, successRate float64) int {
if successRate > 0.95 {
return baseCount + 1 // 成功率高,增加并发
} else if successRate < 0.8 {
return baseCount - 1 // 成功率低,减少并发
}
return baseCount
}
3. 网络和连接问题
问题:网络超时
Error: context deadline exceeded
解决方案:
// 设置合理的超时时间
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
// 添加重试机制
func retryableCall(ctx context.Context, fn func() error) error {
const maxRetries = 3
for i := 0; i < maxRetries; i++ {
if err := fn(); err != nil {
if i == maxRetries-1 {
return err
}
// 指数退避
backoff := time.Duration(math.Pow(2, float64(i))) * time.Second
select {
case <-time.After(backoff):
continue
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
return nil
}
4. 数据处理问题
问题:Map结果格式不一致
// 添加结果验证和标准化
func validateAndNormalizeResult(result string) (string, error) {
// 检查结果长度
if len(result) < 10 {
return "", errors.New("结果太短")
}
// 标准化格式
result = strings.TrimSpace(result)
// 检查是否包含必要信息
if !strings.Contains(result, "主题") {
return "", errors.New("结果缺少主题信息")
}
return result, nil
}
问题:Reduce阶段输入过长
// 实现分层Reduce
func hierarchicalReduce(summaries []string, maxBatchSize int) (string, error) {
if len(summaries) <= maxBatchSize {
return performSingleReduce(summaries)
}
// 分批处理
var intermediateResults []string
for i := 0; i < len(summaries); i += maxBatchSize {
end := i + maxBatchSize
if end > len(summaries) {
end = len(summaries)
}
batch := summaries[i:end]
result, err := performSingleReduce(batch)
if err != nil {
return "", err
}
intermediateResults = append(intermediateResults, result)
}
// 递归处理中间结果
return hierarchicalReduce(intermediateResults, maxBatchSize)
}
调试技巧
1. 启用详细日志
type Logger struct {
level int
}
func (l *Logger) Debug(msg string, args ...interface{}) {
if l.level >= 3 {
log.Printf("[DEBUG] "+msg, args...)
}
}
func (l *Logger) Info(msg string, args ...interface{}) {
if l.level >= 2 {
log.Printf("[INFO] "+msg, args...)
}
}
func (l *Logger) Error(msg string, args ...interface{}) {
if l.level >= 1 {
log.Printf("[ERROR] "+msg, args...)
}
}
2. 性能分析
import _ "net/http/pprof"
func startProfiling() {
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
}
// 使用方法:
// go tool pprof http://localhost:6060/debug/pprof/heap
3. 中间结果保存
func saveIntermediateResults(results []string, filename string) error {
data, err := json.Marshal(results)
if err != nil {
return err
}
return os.WriteFile(filename, data, 0644)
}
func loadIntermediateResults(filename string) ([]string, error) {
data, err := os.ReadFile(filename)
if err != nil {
return nil, err
}
var results []string
err = json.Unmarshal(data, &results)
return results, err
}
🏆 最佳实践总结
1. 设计原则
Map阶段设计
- ✅ 保持无状态:每个Map操作应该独立
- ✅ 标准化输出:确保所有Map结果格式一致
- ✅ 错误隔离:单个失败不应影响整体
- ✅ 幂等性:相同输入应产生相同输出
Reduce阶段设计
- ✅ 信息整合:有效合并来自Map阶段的信息
- ✅ 去重处理:识别并处理重复信息
- ✅ 结构化输出:提供清晰、有组织的最终结果
- ✅ 质量控制:确保输出的完整性和准确性
2. 性能优化
并发控制
// 最佳实践:动态并发控制
type ConcurrencyController struct {
semaphore chan struct{}
maxConcurrent int
currentLoad int
mutex sync.Mutex
}
func (c *ConcurrencyController) Acquire() {
c.semaphore <- struct{}{}
c.mutex.Lock()
c.currentLoad++
c.mutex.Unlock()
}
func (c *ConcurrencyController) Release() {
<-c.semaphore
c.mutex.Lock()
c.currentLoad--
c.mutex.Unlock()
}
资源管理
// 连接池管理
type LLMPool struct {
clients chan llms.Model
factory func() (llms.Model, error)
}
func NewLLMPool(size int, factory func() (llms.Model, error)) *LLMPool {
pool := &LLMPool{
clients: make(chan llms.Model, size),
factory: factory,
}
// 预创建连接
for i := 0; i < size; i++ {
client, err := factory()
if err == nil {
pool.clients <- client
}
}
return pool
}
func (p *LLMPool) Get() llms.Model {
select {
case client := <-p.clients:
return client
default:
// 如果池为空,创建新连接
client, _ := p.factory()
return client
}
}
func (p *LLMPool) Put(client llms.Model) {
select {
case p.clients <- client:
default:
// 池已满,丢弃连接
}
}
3. 错误处理策略
// 分级错误处理
type ErrorSeverity int
const (
ErrorSeverityLow ErrorSeverity = iota
ErrorSeverityMedium
ErrorSeverityHigh
ErrorSeverityCritical
)
func classifyError(err error) ErrorSeverity {
switch {
case strings.Contains(err.Error(), "rate limit"):
return ErrorSeverityMedium
case strings.Contains(err.Error(), "unauthorized"):
return ErrorSeverityCritical
case strings.Contains(err.Error(), "timeout"):
return ErrorSeverityLow
default:
return ErrorSeverityMedium
}
}
func handleError(err error, context string) error {
severity := classifyError(err)
switch severity {
case ErrorSeverityLow:
log.Printf("可重试错误 [%s]: %v", context, err)
return err // 允许重试
case ErrorSeverityCritical:
log.Fatalf("致命错误 [%s]: %v", context, err)
return err
default:
log.Printf("一般错误 [%s]: %v", context, err)
return err
}
}
4. 监控和观测
// 指标收集
type Metrics struct {
ProcessedDocuments int64
FailedDocuments int64
TotalProcessTime time.Duration
AverageProcessTime time.Duration
mutex sync.RWMutex
}
func (m *Metrics) RecordProcess(duration time.Duration, success bool) {
m.mutex.Lock()
defer m.mutex.Unlock()
if success {
m.ProcessedDocuments++
} else {
m.FailedDocuments++
}
m.TotalProcessTime += duration
total := m.ProcessedDocuments + m.FailedDocuments
if total > 0 {
m.AverageProcessTime = m.TotalProcessTime / time.Duration(total)
}
}
func (m *Metrics) Report() {
m.mutex.RLock()
defer m.mutex.RUnlock()
fmt.Printf("=== 处理指标 ===\n")
fmt.Printf("成功处理: %d\n", m.ProcessedDocuments)
fmt.Printf("处理失败: %d\n", m.FailedDocuments)
fmt.Printf("成功率: %.2f%%\n", float64(m.ProcessedDocuments)/float64(m.ProcessedDocuments+m.FailedDocuments)*100)
fmt.Printf("平均处理时间: %v\n", m.AverageProcessTime)
}
5. 提示工程技巧
提示模板版本控制
type PromptTemplate struct {
Version string
Template string
Description string
Variables []string
}
var PromptTemplates = map[string]PromptTemplate{
"map_summary_v1": {
Version: "1.0",
Template: "为以下文档生成摘要:\n{text}\n摘要:",
Description: "基础摘要模板",
Variables: []string{"text"},
},
"map_summary_v2": {
Version: "2.0",
Template: "作为专业分析师,请为以下文档生成结构化摘要(约100字):\n\n{text}\n\n格式:主题|要点|结论\n摘要:",
Description: "结构化摘要模板",
Variables: []string{"text"},
},
}
A/B测试框架
func runABTest(documentsA, documentsB []string, templateA, templateB string) {
var wg sync.WaitGroup
resultsA := make(chan []string, 1)
resultsB := make(chan []string, 1)
wg.Add(2)
go func() {
defer wg.Done()
results, _ := processWithTemplate(documentsA, templateA)
resultsA <- results
}()
go func() {
defer wg.Done()
results, _ := processWithTemplate(documentsB, templateB)
resultsB <- results
}()
wg.Wait()
close(resultsA)
close(resultsB)
// 比较结果质量
compareResults(<-resultsA, <-resultsB)
}
学习价值
通过本项目,你可以学习到:
- LangChain Go的基本使用方法
- Map-Reduce模式的实现原理
- 并行处理和性能优化技巧
- 实际NLP任务的解决方案
- Go语言并发编程最佳实践
贡献
欢迎提交issue和pull request来改进这个学习案例!