159 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			159 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | #!/usr/bin/env python3 | ||
|  | 
 | ||
|  | import pickle | ||
|  | import numpy as np | ||
|  | from pathlib import Path | ||
|  | 
 | ||
|  | def examine_latest_dataset(): | ||
|  |     """Examine the latest processed dataset""" | ||
|  | 
 | ||
|  |     data_dir = Path("phoneme_segmented_data") | ||
|  | 
 | ||
|  |     # Find latest file | ||
|  |     latest_file = data_dir / "ctc_results_20251009_000024.pkl" | ||
|  | 
 | ||
|  |     if not latest_file.exists(): | ||
|  |         print(f"❌ File not found: {latest_file}") | ||
|  |         return False | ||
|  | 
 | ||
|  |     print(f"📁 Loading dataset: {latest_file}") | ||
|  |     print(f"📏 File size: {latest_file.stat().st_size / (1024*1024):.1f} MB") | ||
|  | 
 | ||
|  |     try: | ||
|  |         with open(latest_file, 'rb') as f: | ||
|  |             data = pickle.load(f) | ||
|  | 
 | ||
|  |         print(f"✅ Successfully loaded dataset") | ||
|  | 
 | ||
|  |         if isinstance(data, dict): | ||
|  |             print(f"📊 Dataset keys: {list(data.keys())}") | ||
|  | 
 | ||
|  |             # Look for trial data | ||
|  |             trial_count = 0 | ||
|  |             temporal_errors = 0 | ||
|  |             total_segments = 0 | ||
|  | 
 | ||
|  |             for key, value in data.items(): | ||
|  |                 if 'trial_' in str(key): | ||
|  |                     trial_count += 1 | ||
|  | 
 | ||
|  |                     if isinstance(value, dict) and 'phoneme_segments' in value: | ||
|  |                         segments = value['phoneme_segments'] | ||
|  |                         total_segments += len(segments) | ||
|  | 
 | ||
|  |                         for seg in segments: | ||
|  |                             if isinstance(seg, dict): | ||
|  |                                 start_time = seg.get('start_time', 0) | ||
|  |                                 end_time = seg.get('end_time', 0) | ||
|  | 
 | ||
|  |                                 if end_time < start_time: | ||
|  |                                     temporal_errors += 1 | ||
|  | 
 | ||
|  |             print(f"🔢 Trials processed: {trial_count}") | ||
|  |             print(f"🔤 Total phoneme segments: {total_segments}") | ||
|  |             print(f"⏰ Temporal ordering errors: {temporal_errors}") | ||
|  | 
 | ||
|  |             if temporal_errors == 0: | ||
|  |                 print("✅ SUCCESS: No temporal ordering bugs found!") | ||
|  |                 return True | ||
|  |             else: | ||
|  |                 print(f"❌ FAILED: Found {temporal_errors} temporal ordering bugs!") | ||
|  |                 return False | ||
|  | 
 | ||
|  |         else: | ||
|  |             print(f"❌ Unexpected data format: {type(data)}") | ||
|  |             return False | ||
|  | 
 | ||
|  |     except Exception as e: | ||
|  |         print(f"❌ Error loading dataset: {e}") | ||
|  |         return False | ||
|  | 
 | ||
|  | def test_alignment_logic(): | ||
|  |     """Test the corrected alignment logic""" | ||
|  | 
 | ||
|  |     print("\n=== Testing Alignment Logic ===") | ||
|  | 
 | ||
|  |     # Simple manual test of the alignment logic | ||
|  |     # Simulate what the fixed code should do | ||
|  | 
 | ||
|  |     # Test case: sequence [1, 2, 1] at times [0, 1, 2] | ||
|  |     # This should create segments that don't overlap incorrectly | ||
|  | 
 | ||
|  |     test_cases = [ | ||
|  |         { | ||
|  |             "sequence": [1, 2, 1], | ||
|  |             "path": [0, 1, 2], | ||
|  |             "description": "Simple case" | ||
|  |         }, | ||
|  |         { | ||
|  |             "sequence": [1, 1, 2], | ||
|  |             "path": [0, 1, 3], | ||
|  |             "description": "Repeated phoneme" | ||
|  |         } | ||
|  |     ] | ||
|  | 
 | ||
|  |     all_valid = True | ||
|  | 
 | ||
|  |     for case in test_cases: | ||
|  |         print(f"\nTesting: {case['description']}") | ||
|  |         sequence = case['sequence'] | ||
|  |         path = case['path'] | ||
|  | 
 | ||
|  |         # Simulate the corrected segment creation | ||
|  |         segments = [] | ||
|  |         current_phoneme = None | ||
|  |         start_time = None | ||
|  | 
 | ||
|  |         for i, (phoneme, time_idx) in enumerate(zip(sequence, path)): | ||
|  |             if phoneme != current_phoneme: | ||
|  |                 # End previous segment | ||
|  |                 if current_phoneme is not None: | ||
|  |                     end_time = path[i-1] | ||
|  |                     segments.append({ | ||
|  |                         'phoneme': current_phoneme, | ||
|  |                         'start_time': start_time, | ||
|  |                         'end_time': end_time | ||
|  |                     }) | ||
|  | 
 | ||
|  |                 # Start new segment | ||
|  |                 current_phoneme = phoneme | ||
|  |                 start_time = time_idx | ||
|  | 
 | ||
|  |         # Close final segment | ||
|  |         if current_phoneme is not None: | ||
|  |             segments.append({ | ||
|  |                 'phoneme': current_phoneme, | ||
|  |                 'start_time': start_time, | ||
|  |                 'end_time': path[-1] | ||
|  |             }) | ||
|  | 
 | ||
|  |         # Check temporal ordering | ||
|  |         case_valid = True | ||
|  |         for seg in segments: | ||
|  |             start = seg['start_time'] | ||
|  |             end = seg['end_time'] | ||
|  |             status = "✅" if end >= start else "❌ BUG!" | ||
|  |             if end < start: | ||
|  |                 case_valid = False | ||
|  |                 all_valid = False | ||
|  | 
 | ||
|  |             print(f"  Phoneme {seg['phoneme']}: {start}-{end} {status}") | ||
|  | 
 | ||
|  |         print(f"  Result: {'✅ PASS' if case_valid else '❌ FAIL'}") | ||
|  | 
 | ||
|  |     return all_valid | ||
|  | 
 | ||
|  | if __name__ == "__main__": | ||
|  |     print("=== CTC Alignment Verification ===\n") | ||
|  | 
 | ||
|  |     dataset_ok = examine_latest_dataset() | ||
|  |     logic_ok = test_alignment_logic() | ||
|  | 
 | ||
|  |     print(f"\n=== FINAL RESULTS ===") | ||
|  |     print(f"Dataset check: {'✅ PASS' if dataset_ok else '❌ FAIL'}") | ||
|  |     print(f"Logic test:   {'✅ PASS' if logic_ok else '❌ FAIL'}") | ||
|  | 
 | ||
|  |     if dataset_ok and logic_ok: | ||
|  |         print(f"\n🎉 VERIFICATION SUCCESSFUL: Bug fix appears to be working!") | ||
|  |     else: | ||
|  |         print(f"\n❌ VERIFICATION FAILED: Issues detected") |