236 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			236 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | #!/usr/bin/env python3 | |||
|  | """
 | |||
|  | TPU内存监控工具 - 专门用于训练过程 | |||
|  | 解决tf.config.experimental.get_memory_info()在TPU上无法工作的问题 | |||
|  | """
 | |||
|  | 
 | |||
|  | import tensorflow as tf | |||
|  | import time | |||
|  | import psutil | |||
|  | import os | |||
|  | 
 | |||
|  | class TPUMemoryMonitor: | |||
|  |     """TPU内存监控类""" | |||
|  | 
 | |||
|  |     def __init__(self): | |||
|  |         self.tpu_devices = tf.config.list_logical_devices('TPU') | |||
|  |         self.baseline_memory = None | |||
|  |         self.peak_allocations = {} | |||
|  | 
 | |||
|  |     def get_tpu_status(self) -> str: | |||
|  |         """获取TPU状态 - 实用版本,不依赖get_memory_info""" | |||
|  |         try: | |||
|  |             if not self.tpu_devices: | |||
|  |                 return "TPU: No devices" | |||
|  | 
 | |||
|  |             num_cores = len(self.tpu_devices) | |||
|  | 
 | |||
|  |             # 测试TPU响应性 | |||
|  |             try: | |||
|  |                 with tf.device('/TPU:0'): | |||
|  |                     test_tensor = tf.constant([1.0, 2.0, 3.0]) | |||
|  |                     result = tf.reduce_sum(test_tensor) | |||
|  |                     _ = result.numpy()  # 强制执行 | |||
|  |                 activity = "active" | |||
|  |             except Exception: | |||
|  |                 activity = "inactive" | |||
|  | 
 | |||
|  |             # 获取主机内存作为参考 | |||
|  |             try: | |||
|  |                 memory = psutil.virtual_memory() | |||
|  |                 host_mem = f"Host:{memory.percent:.1f}%" | |||
|  |             except: | |||
|  |                 host_mem = "Host:unknown" | |||
|  | 
 | |||
|  |             return f"TPU: {num_cores}cores {activity} {host_mem}" | |||
|  | 
 | |||
|  |         except Exception as e: | |||
|  |             return f"TPU: error({str(e)[:20]})" | |||
|  | 
 | |||
|  |     def estimate_tensor_memory(self, tensor_shape, dtype=tf.float32): | |||
|  |         """估算张量内存使用量""" | |||
|  |         if dtype == tf.float32: | |||
|  |             bytes_per_element = 4 | |||
|  |         elif dtype == tf.float16 or dtype == tf.bfloat16: | |||
|  |             bytes_per_element = 2 | |||
|  |         elif dtype == tf.int32: | |||
|  |             bytes_per_element = 4 | |||
|  |         elif dtype == tf.int64: | |||
|  |             bytes_per_element = 8 | |||
|  |         else: | |||
|  |             bytes_per_element = 4  # 默认 | |||
|  | 
 | |||
|  |         total_elements = 1 | |||
|  |         for dim in tensor_shape: | |||
|  |             total_elements *= dim | |||
|  | 
 | |||
|  |         total_bytes = total_elements * bytes_per_element | |||
|  |         return total_bytes / (1024 * 1024)  # 返回MB | |||
|  | 
 | |||
|  |     def track_allocation(self, name: str, tensor_shape, dtype=tf.float32): | |||
|  |         """跟踪内存分配""" | |||
|  |         mb = self.estimate_tensor_memory(tensor_shape, dtype) | |||
|  |         self.peak_allocations[name] = self.peak_allocations.get(name, 0) + mb | |||
|  |         return mb | |||
|  | 
 | |||
|  |     def get_allocation_summary(self) -> str: | |||
|  |         """获取分配汇总""" | |||
|  |         if not self.peak_allocations: | |||
|  |             return "No allocations tracked" | |||
|  | 
 | |||
|  |         total_mb = sum(self.peak_allocations.values()) | |||
|  |         top_3 = sorted(self.peak_allocations.items(), key=lambda x: x[1], reverse=True)[:3] | |||
|  | 
 | |||
|  |         summary = f"Tracked:{total_mb:.1f}MB " | |||
|  |         summary += f"Top:({top_3[0][0]}:{top_3[0][1]:.1f}MB)" | |||
|  | 
 | |||
|  |         return summary | |||
|  | 
 | |||
|  |     def test_memory_allocation_across_cores(self): | |||
|  |         """测试8个核心的内存分配""" | |||
|  |         print("🧪 测试所有TPU核心内存分配") | |||
|  |         print("=" * 40) | |||
|  | 
 | |||
|  |         allocations_per_core = [] | |||
|  | 
 | |||
|  |         for i, device in enumerate(self.tpu_devices): | |||
|  |             print(f"核心 {i+1}: {device.name}") | |||
|  | 
 | |||
|  |             try: | |||
|  |                 with tf.device(device.name): | |||
|  |                     # 创建不同大小的测试张量 | |||
|  |                     test_sizes = [ | |||
|  |                         ([1000, 1000], "1K×1K"), | |||
|  |                         ([3000, 3000], "3K×3K"), | |||
|  |                         ([5000, 5000], "5K×5K"), | |||
|  |                         ([7000, 7000], "7K×7K"), | |||
|  |                     ] | |||
|  | 
 | |||
|  |                     core_total = 0 | |||
|  |                     successful_allocs = [] | |||
|  | 
 | |||
|  |                     for shape, desc in test_sizes: | |||
|  |                         try: | |||
|  |                             tensor = tf.ones(shape, dtype=tf.float32) | |||
|  |                             mb = self.estimate_tensor_memory(shape) | |||
|  |                             core_total += mb | |||
|  |                             successful_allocs.append(f"{desc}({mb:.1f}MB)") | |||
|  | 
 | |||
|  |                             # 实际使用张量防止被优化 | |||
|  |                             _ = tf.reduce_mean(tensor) | |||
|  | 
 | |||
|  |                         except Exception as e: | |||
|  |                             print(f"   {desc} 失败: {str(e)[:30]}") | |||
|  |                             break | |||
|  | 
 | |||
|  |                     allocations_per_core.append(core_total) | |||
|  |                     print(f"   成功分配: {' + '.join(successful_allocs)}") | |||
|  |                     print(f"   核心总计: {core_total:.1f}MB") | |||
|  | 
 | |||
|  |             except Exception as e: | |||
|  |                 print(f"   核心{i+1}失败: {e}") | |||
|  |                 allocations_per_core.append(0) | |||
|  | 
 | |||
|  |         # 汇总结果 | |||
|  |         total_all_cores = sum(allocations_per_core) | |||
|  |         avg_per_core = total_all_cores / len(self.tpu_devices) if self.tpu_devices else 0 | |||
|  | 
 | |||
|  |         print(f"\n📊 汇总结果:") | |||
|  |         print(f"   总分配: {total_all_cores:.1f}MB ({total_all_cores/1024:.2f}GB)") | |||
|  |         print(f"   平均每核: {avg_per_core:.1f}MB ({avg_per_core/1024:.2f}GB)") | |||
|  | 
 | |||
|  |         # 推测内存配置 | |||
|  |         if avg_per_core > 8000:  # > 8GB | |||
|  |             print("   推测: 每核心≥16GB (高端配置)") | |||
|  |         elif avg_per_core > 4000:  # > 4GB | |||
|  |             print("   推测: 每核心8-16GB (标准配置)") | |||
|  |         elif avg_per_core > 1000:  # > 1GB | |||
|  |             print("   推测: 每核心2-8GB (受限或共享)") | |||
|  |         else: | |||
|  |             print("   推测: 每核心<2GB (严重受限)") | |||
|  | 
 | |||
|  |         return allocations_per_core | |||
|  | 
 | |||
|  | def test_training_memory_pattern(): | |||
|  |     """测试模拟训练的内存模式""" | |||
|  |     print("\n🏋️ 模拟训练内存模式测试") | |||
|  |     print("=" * 30) | |||
|  | 
 | |||
|  |     monitor = TPUMemoryMonitor() | |||
|  | 
 | |||
|  |     # 模拟典型的brain-to-text模型内存使用 | |||
|  |     with tf.device('/TPU:0'): | |||
|  |         print("创建模拟模型组件...") | |||
|  | 
 | |||
|  |         # 1. 输入数据 (batch_size=32, seq_len=1000, features=512) | |||
|  |         batch_size, seq_len, features = 32, 1000, 512 | |||
|  |         input_data = tf.random.normal([batch_size, seq_len, features]) | |||
|  |         input_mb = monitor.track_allocation("input_data", [batch_size, seq_len, features]) | |||
|  |         print(f"   输入数据: {input_mb:.1f}MB") | |||
|  | 
 | |||
|  |         # 2. GRU权重 (假设3层, 每层256单元) | |||
|  |         n_layers, n_units = 3, 256 | |||
|  |         for layer in range(n_layers): | |||
|  |             # GRU有3个门,每个门需要权重矩阵 | |||
|  |             weight_shape = [features if layer == 0 else n_units, n_units * 3] | |||
|  |             weights = tf.random.normal(weight_shape) | |||
|  |             weight_mb = monitor.track_allocation(f"gru_layer_{layer}", weight_shape) | |||
|  |             print(f"   GRU层{layer+1}权重: {weight_mb:.1f}MB") | |||
|  | 
 | |||
|  |         # 3. 输出投影层 (n_units -> n_classes=41) | |||
|  |         n_classes = 41 | |||
|  |         output_weights = tf.random.normal([n_units, n_classes]) | |||
|  |         output_mb = monitor.track_allocation("output_projection", [n_units, n_classes]) | |||
|  |         print(f"   输出投影: {output_mb:.1f}MB") | |||
|  | 
 | |||
|  |         # 4. 中间激活值 (前向传播) | |||
|  |         hidden_states = tf.random.normal([batch_size, seq_len, n_units]) | |||
|  |         hidden_mb = monitor.track_allocation("hidden_states", [batch_size, seq_len, n_units]) | |||
|  |         print(f"   隐藏状态: {hidden_mb:.1f}MB") | |||
|  | 
 | |||
|  |         # 5. 梯度 (反向传播时会翻倍内存) | |||
|  |         total_params_mb = sum([v for k, v in monitor.peak_allocations.items() if 'layer' in k or 'projection' in k]) | |||
|  |         gradient_mb = total_params_mb  # 梯度内存约等于参数内存 | |||
|  |         print(f"   梯度内存: {gradient_mb:.1f}MB (估算)") | |||
|  | 
 | |||
|  |         print(f"\n模型总内存估算: {monitor.get_allocation_summary()}") | |||
|  | 
 | |||
|  |         # 实际执行一些操作确保内存被分配 | |||
|  |         result = tf.reduce_mean(input_data) + tf.reduce_mean(hidden_states) | |||
|  |         print(f"验证计算结果: {result.numpy():.4f}") | |||
|  | 
 | |||
|  | if __name__ == "__main__": | |||
|  |     print("🚀 TPU内存监控工具启动") | |||
|  | 
 | |||
|  |     monitor = TPUMemoryMonitor() | |||
|  | 
 | |||
|  |     # 基础状态检查 | |||
|  |     print(f"当前TPU状态: {monitor.get_tpu_status()}") | |||
|  | 
 | |||
|  |     # 测试所有核心 | |||
|  |     print("\n" + "="*50) | |||
|  |     core_allocations = monitor.test_memory_allocation_across_cores() | |||
|  | 
 | |||
|  |     # 训练内存模式测试 | |||
|  |     print("\n" + "="*50) | |||
|  |     test_training_memory_pattern() | |||
|  | 
 | |||
|  |     print(f"\n🎯 关键发现:") | |||
|  |     if core_allocations: | |||
|  |         max_core = max(core_allocations) | |||
|  |         min_core = min([x for x in core_allocations if x > 0]) | |||
|  |         print(f"   最大单核分配: {max_core:.1f}MB") | |||
|  |         print(f"   最小单核分配: {min_core:.1f}MB") | |||
|  | 
 | |||
|  |         if max_core > 9000:  # 你之前测试到9.4GB | |||
|  |             print("   ✅ 内存充足,可支持大模型训练") | |||
|  |         elif max_core > 5000: | |||
|  |             print("   ⚠️ 内存中等,建议优化模型大小") | |||
|  |         else: | |||
|  |             print("   ❌ 内存不足,需要大幅减少模型参数") | |||
|  | 
 | |||
|  |     print(f"\n💡 针对你的训练卡顿问题:") | |||
|  |     print(f"   - SetPriority错误通常是XLA编译问题,不是内存问题") | |||
|  |     print(f"   - 你的9.4GB测试说明TPU内存工作正常") | |||
|  |     print(f"   - 建议检查模型是否有导致XLA编译卡顿的操作") | |||
|  |     print(f"   - 考虑使用更简单的操作或关闭某些XLA优化") |