Files
b2txt25/TTA-E/genetic_search.py
2025-10-06 15:17:44 +08:00

462 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
遗传算法参数优化 - 使用DEAP库实现
针对TTA-E神经解码的参数搜索优化
"""
import os
import sys
import numpy as np
import pickle
import argparse
import time
import json
from typing import Dict, List, Tuple
import random
# DEAP库
from deap import algorithms, base, creator, tools
# GPU加速支持
try:
import cupy as cp
GPU_AVAILABLE = True
print("GPU acceleration available with CuPy")
except ImportError:
import numpy as cp
GPU_AVAILABLE = False
print("Using CPU computation with NumPy")
# 设置随机种子
random.seed(42)
np.random.seed(42)
def to_cpu(x):
"""将CuPy数组转换为NumPy数组"""
if GPU_AVAILABLE and hasattr(x, 'get'):
return x.get()
return x
def load_base_predictions(cache_file='base_predictions_cache.pkl'):
"""加载预计算的基础预测结果"""
if os.path.exists(cache_file):
print(f"Loading base predictions from {cache_file}")
with open(cache_file, 'rb') as f:
return pickle.load(f)
else:
print(f"Cache file {cache_file} not found. Generating mock data...")
return generate_mock_predictions()
def generate_mock_predictions():
"""生成模拟预测数据用于测试"""
print("Generating mock base predictions for testing...")
n_trials = 20 # 模拟20个试验
seq_len = 40 # 序列长度
vocab_size = 31 # 词汇表大小
n_tta = 5 # TTA样本数
base_predictions = []
np.random.seed(42) # 固定种子获得一致的结果
for trial in range(n_trials):
# 生成真实字符序列
true_chars = np.random.randint(0, vocab_size, seq_len)
# 模拟GRU和LSTM的概率预测 - 加入一些真实性
gru_probs = np.random.rand(n_tta, seq_len, vocab_size)
lstm_probs = np.random.rand(n_tta, seq_len, vocab_size)
# 让概率偏向真实答案,提高一些准确性
for i in range(seq_len):
for tta in range(n_tta):
# 给真实答案更高的概率
gru_probs[tta, i, true_chars[i]] += 2.0
lstm_probs[tta, i, true_chars[i]] += 2.0
# 归一化为概率分布
gru_probs = gru_probs / np.sum(gru_probs, axis=2, keepdims=True)
lstm_probs = lstm_probs / np.sum(lstm_probs, axis=2, keepdims=True)
base_predictions.append({
'gru_probs': gru_probs,
'lstm_probs': lstm_probs,
'true_chars': true_chars
})
return base_predictions
def calculate_per(predicted, true):
"""计算音素错误率 (PER)"""
if len(predicted) == 0 and len(true) == 0:
return 0.0
if len(predicted) == 0 or len(true) == 0:
return 1.0
# 简单的字符级编辑距离
n, m = len(predicted), len(true)
dp = np.zeros((n + 1, m + 1))
for i in range(n + 1):
dp[i][0] = i
for j in range(m + 1):
dp[0][j] = j
for i in range(1, n + 1):
for j in range(1, m + 1):
if predicted[i-1] == true[j-1]:
dp[i][j] = dp[i-1][j-1]
else:
dp[i][j] = 1 + min(dp[i-1][j], dp[i][j-1], dp[i-1][j-1])
return dp[n][m] / max(n, m)
def evaluate_individual(individual, base_predictions_data):
"""评估个体适应度返回PER错误率越小越好"""
try:
# 解析个体参数
tta_weights = individual[:5] # 前5个参数是TTA权重
gru_weight = individual[5] # 第6个参数是GRU权重
# 转换为GPU数组如果可用
if GPU_AVAILABLE:
tta_weights = cp.array(tta_weights)
else:
tta_weights = np.array(tta_weights)
total_per = 0.0
total_chars = 0
for trial_data in base_predictions_data:
gru_probs = trial_data['gru_probs'] # shape: (5, seq_len, vocab_size)
lstm_probs = trial_data['lstm_probs'] # shape: (5, seq_len, vocab_size)
true_chars = trial_data['true_chars']
# 转换为GPU数组
if GPU_AVAILABLE:
gru_probs = cp.asarray(gru_probs)
lstm_probs = cp.asarray(lstm_probs)
# 计算TTA加权平均
tta_sum = cp.sum(tta_weights)
if tta_sum > 0:
tta_weights_norm = tta_weights / tta_sum
else:
tta_weights_norm = cp.ones_like(tta_weights) / len(tta_weights)
# 对每个TTA样本加权
gru_weighted = cp.sum(gru_probs * tta_weights_norm[:, None, None], axis=0)
lstm_weighted = cp.sum(lstm_probs * tta_weights_norm[:, None, None], axis=0)
# 模型集成
ensemble_probs = gru_weighted * gru_weight + lstm_weighted * (1 - gru_weight)
# 解码预测
if GPU_AVAILABLE:
predicted_chars = cp.argmax(ensemble_probs, axis=1)
predicted_chars = to_cpu(predicted_chars)
else:
predicted_chars = np.argmax(ensemble_probs, axis=1)
# 计算PER
per = calculate_per(predicted_chars, true_chars)
total_per += per * len(true_chars)
total_chars += len(true_chars)
avg_per = total_per / total_chars if total_chars > 0 else 1.0
return (avg_per,) # DEAP需要返回元组
except Exception as e:
print(f"Error in evaluate_individual: {e}")
return (1.0,) # 返回最差分数
class GeneticOptimizer:
"""基于DEAP的遗传算法优化器"""
def __init__(self,
bounds,
population_size=50,
generations=100,
crossover_prob=0.8,
mutation_prob=0.2,
tournament_size=3,
elite_size=2):
self.bounds = bounds
self.population_size = population_size
self.generations = generations
self.crossover_prob = crossover_prob
self.mutation_prob = mutation_prob
self.tournament_size = tournament_size
self.elite_size = elite_size
# 参数边界
self.param_names = list(bounds.keys())
self.n_params = len(self.param_names)
self.lower_bounds = [bounds[name][0] for name in self.param_names]
self.upper_bounds = [bounds[name][1] for name in self.param_names]
self.setup_deap()
def setup_deap(self):
"""设置DEAP框架"""
# 清理之前的类定义
if hasattr(creator, "FitnessMin"):
del creator.FitnessMin
if hasattr(creator, "Individual"):
del creator.Individual
# 创建适应度类最小化PER
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", list, fitness=creator.FitnessMin)
# 创建工具箱
self.toolbox = base.Toolbox()
# 注册属性生成器 - 确保在范围内
def create_bounded_uniform(low, high):
def bounded_uniform():
return random.uniform(low, high)
return bounded_uniform
for i, (name, (low, high)) in enumerate(zip(self.param_names, zip(self.lower_bounds, self.upper_bounds))):
self.toolbox.register(f"attr_{name}", create_bounded_uniform(low, high))
# 注册个体和种群生成器
self.toolbox.register("individual", tools.initCycle, creator.Individual,
[getattr(self.toolbox, f"attr_{name}") for name in self.param_names], n=1)
self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual)
# 注册遗传操作
self.toolbox.register("mate", tools.cxBlend, alpha=0.3) # 混合交叉减小alpha
self.toolbox.register("mutate", self.gaussian_mutation) # 高斯变异
self.toolbox.register("select", tools.selTournament, tournsize=self.tournament_size) # 锦标赛选择
self.toolbox.register("evaluate", self.evaluate_wrapper)
self.toolbox.register("clone", tools.clone)
def evaluate_wrapper(self, individual):
"""评估函数包装器"""
return evaluate_individual(individual, self.base_predictions)
def gaussian_mutation(self, individual):
"""高斯变异操作"""
for i in range(len(individual)):
if random.random() < self.mutation_prob:
# 高斯变异标准差为参数范围的10%
sigma = (self.upper_bounds[i] - self.lower_bounds[i]) * 0.1
individual[i] += random.gauss(0, sigma)
# 边界处理 - 硬约束
individual[i] = max(self.lower_bounds[i], min(self.upper_bounds[i], individual[i]))
return (individual,)
def constraint_repair(self, individual):
"""约束修复 - 确保所有参数在合理范围内"""
for i in range(len(individual)):
individual[i] = max(self.lower_bounds[i], min(self.upper_bounds[i], individual[i]))
return individual
def optimize(self, base_predictions):
"""运行遗传算法优化"""
self.base_predictions = base_predictions
print(f"Starting Genetic Algorithm Optimization")
print(f"Population size: {self.population_size}")
print(f"Generations: {self.generations}")
print(f"Crossover probability: {self.crossover_prob}")
print(f"Mutation probability: {self.mutation_prob}")
print(f"Tournament size: {self.tournament_size}")
start_time = time.time()
# 创建初始种群
population = self.toolbox.population(n=self.population_size)
# 确保初始种群在边界内
for individual in population:
self.constraint_repair(individual)
# 评估初始种群
fitnesses = list(map(self.toolbox.evaluate, population))
for ind, fit in zip(population, fitnesses):
ind.fitness.values = fit
# 统计信息
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", np.mean)
stats.register("min", np.min)
stats.register("max", np.max)
stats.register("std", np.std)
# 名人堂(保存最佳个体)
hall_of_fame = tools.HallOfFame(maxsize=10)
# 记录历史
history = []
print(f"Initial population evaluated. Best fitness: {min(fitnesses)[0]:.6f}")
# 进化过程
for generation in range(self.generations):
# 选择下一代的父代
offspring = self.toolbox.select(population, len(population))
offspring = list(map(self.toolbox.clone, offspring))
# 交叉和变异
for child1, child2 in zip(offspring[::2], offspring[1::2]):
if random.random() < self.crossover_prob:
self.toolbox.mate(child1, child2)
# 约束修复
self.constraint_repair(child1)
self.constraint_repair(child2)
del child1.fitness.values
del child2.fitness.values
for mutant in offspring:
if random.random() < self.mutation_prob:
self.toolbox.mutate(mutant)
# 约束修复
self.constraint_repair(mutant)
del mutant.fitness.values
# 评估无效个体
invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
fitnesses = map(self.toolbox.evaluate, invalid_ind)
for ind, fit in zip(invalid_ind, fitnesses):
ind.fitness.values = fit
# 精英策略:保留最好的个体
combined = population + offspring
combined.sort(key=lambda x: x.fitness.values[0])
population = combined[:self.population_size]
# 更新统计信息和名人堂
hall_of_fame.update(population)
record = stats.compile(population)
# 记录历史
best_individual = hall_of_fame[0]
best_params = {self.param_names[i]: best_individual[i] for i in range(self.n_params)}
history.append((best_params.copy(), best_individual.fitness.values[0]))
if generation % 10 == 0 or generation == self.generations - 1:
print(f"Generation {generation+1:3d}: "
f"Min={record['min']:.6f}, "
f"Avg={record['avg']:.6f}, "
f"Std={record['std']:.6f}")
elapsed_time = time.time() - start_time
# 返回最佳结果
best_individual = hall_of_fame[0]
best_params = {self.param_names[i]: best_individual[i] for i in range(self.n_params)}
best_score = best_individual.fitness.values[0]
return {
'best_params': best_params,
'best_score': best_score,
'history': history,
'total_evaluations': self.population_size * (1 + self.generations),
'elapsed_time': elapsed_time,
'hall_of_fame': [
{
'params': {self.param_names[i]: ind[i] for i in range(self.n_params)},
'score': ind.fitness.values[0]
}
for ind in hall_of_fame
]
}
def save_results(result, output_file='genetic_optimization_results.json'):
"""保存优化结果"""
print(f"Saving results to {output_file}")
with open(output_file, 'w') as f:
json.dump(result, f, indent=2)
print(f"Results saved successfully")
def main():
parser = argparse.ArgumentParser(description='Genetic Algorithm for TTA-E Parameter Optimization')
parser.add_argument('--cache_file', type=str, default='base_predictions_cache.pkl',
help='Base predictions cache file')
parser.add_argument('--output_file', type=str, default='genetic_optimization_results.json',
help='Output file for results')
# 遗传算法参数
parser.add_argument('--population_size', type=int, default=50,
help='Population size')
parser.add_argument('--generations', type=int, default=100,
help='Number of generations')
parser.add_argument('--crossover_prob', type=float, default=0.8,
help='Crossover probability')
parser.add_argument('--mutation_prob', type=float, default=0.2,
help='Mutation probability')
parser.add_argument('--tournament_size', type=int, default=3,
help='Tournament selection size')
parser.add_argument('--elite_size', type=int, default=2,
help='Elite size for preservation')
args = parser.parse_args()
print(f"{'='*60}")
print("TTA-E Genetic Algorithm Parameter Optimization")
print(f"{'='*60}")
# 加载基础预测数据
base_predictions = load_base_predictions(args.cache_file)
# 定义参数搜索空间
bounds = {
'tta_weight_0': (0.0, 2.0), # 原始样本权重
'tta_weight_1': (0.0, 2.0), # 噪声增强权重
'tta_weight_2': (0.0, 2.0), # 缩放增强权重
'tta_weight_3': (0.0, 2.0), # 偏移增强权重
'tta_weight_4': (0.0, 2.0), # 平滑增强权重
'gru_weight': (0.0, 1.0) # GRU模型权重
}
print(f"Parameter search space:")
for param, (low, high) in bounds.items():
print(f" {param}: [{low}, {high}]")
print()
# 创建遗传算法优化器
optimizer = GeneticOptimizer(
bounds=bounds,
population_size=args.population_size,
generations=args.generations,
crossover_prob=args.crossover_prob,
mutation_prob=args.mutation_prob,
tournament_size=args.tournament_size,
elite_size=args.elite_size
)
# 运行优化
result = optimizer.optimize(base_predictions)
# 显示结果
print(f"\n{'='*60}")
print("OPTIMIZATION RESULTS")
print(f"{'='*60}")
print(f"Best Score (PER): {result['best_score']:.6f}")
print(f"Best Parameters:")
for param, value in result['best_params'].items():
print(f" {param}: {value:.4f}")
print(f"Total Evaluations: {result['total_evaluations']:,}")
print(f"Elapsed Time: {result['elapsed_time']:.2f} seconds")
print()
print("Top 5 Solutions from Hall of Fame:")
for i, solution in enumerate(result['hall_of_fame'][:5], 1):
print(f"{i}. Score: {solution['score']:.6f}")
params_str = ", ".join([f"{k}={v:.3f}" for k, v in solution['params'].items()])
print(f" Parameters: {params_str}")
# 保存结果
save_results(result, args.output_file)
print(f"\nOptimization completed successfully!")
if __name__ == '__main__':
main()