Files
b2txt25/data_analyse/show_pkl_contents.py
2025-10-12 09:11:32 +08:00

165 lines
6.1 KiB
Python

#!/usr/bin/env python3
"""
Display the contents and structure of the phoneme dataset pkl files
"""
import pickle
import numpy as np
from pathlib import Path
def show_phoneme_dataset_structure():
"""Show the structure of phoneme dataset pkl files"""
data_dir = Path("phoneme_segmented_data")
if not data_dir.exists():
print("No phoneme_segmented_data directory found")
return
# Find all dataset files
dataset_files = list(data_dir.glob("phoneme_dataset_*.pkl"))
ctc_files = list(data_dir.glob("ctc_results_*.pkl"))
print("=== PKL Files in phoneme_segmented_data/ ===")
print(f"Found {len(dataset_files)} phoneme dataset files:")
for f in sorted(dataset_files):
size_mb = f.stat().st_size / (1024 * 1024)
print(f" {f.name} ({size_mb:.1f} MB)")
print(f"\nFound {len(ctc_files)} CTC results files:")
for f in sorted(ctc_files):
size_mb = f.stat().st_size / (1024 * 1024)
print(f" {f.name} ({size_mb:.1f} MB)")
if not dataset_files:
print("No dataset files to examine")
return
# Load the latest dataset file
latest_dataset = max(dataset_files, key=lambda x: x.stat().st_mtime)
print(f"\n=== Examining Latest Dataset: {latest_dataset.name} ===")
with open(latest_dataset, 'rb') as f:
dataset = pickle.load(f)
print(f"Dataset type: {type(dataset)}")
print(f"Dataset size: {len(dataset)} phoneme types")
# Show phoneme types and their counts
print(f"\n=== Phoneme Types and Counts ===")
total_segments = 0
for phoneme, segments in sorted(dataset.items()):
count = len(segments)
total_segments += count
print(f"'{phoneme}': {count:5d} segments")
print(f"\nTotal segments across all phonemes: {total_segments:,}")
# Show sample segments from a few phonemes
print(f"\n=== Sample Segment Structure ===")
sample_phonemes = [' | ', 'AA', 'IH', 'T', 'S'] # Common phonemes
for phoneme in sample_phonemes:
if phoneme in dataset and dataset[phoneme]:
segment = dataset[phoneme][0] # First segment
print(f"\nSample segment for '{phoneme}':")
print(f" Type: {type(segment)}")
if isinstance(segment, dict):
for key, value in segment.items():
if key == 'confidence':
print(f" {key}: {value:.6f}")
elif isinstance(value, np.integer):
print(f" {key}: {int(value)}")
else:
print(f" {key}: {value}")
break
# Check for time alignment issues
print(f"\n=== Time Alignment Check ===")
error_count = 0
total_checked = 0
for phoneme, segments in dataset.items():
for segment in segments[:10]: # Check first 10 of each phoneme
if isinstance(segment, dict) and 'start_time' in segment and 'end_time' in segment:
start_time = int(segment['start_time'])
end_time = int(segment['end_time'])
total_checked += 1
if end_time < start_time:
error_count += 1
if error_count <= 5: # Show first 5 errors
print(f" ❌ Error: '{phoneme}' segment has start={start_time}, end={end_time}")
if total_checked > 0:
error_rate = (error_count / total_checked) * 100
print(f"\nChecked {total_checked} segments:")
print(f" ✅ Valid segments: {total_checked - error_count}")
print(f" ❌ Invalid segments: {error_count}")
print(f" Error rate: {error_rate:.1f}%")
# Show session and trial distribution
print(f"\n=== Session and Trial Distribution ===")
sessions = set()
trials = set()
for phoneme, segments in dataset.items():
for segment in segments[:100]: # Sample first 100 of each phoneme
if isinstance(segment, dict):
if 'session' in segment:
sessions.add(segment['session'])
if 'trial_num' in segment:
trials.add(f"{segment.get('session', 'unknown')}_trial_{segment['trial_num']}")
print(f"Sessions represented: {len(sessions)}")
if len(sessions) <= 10:
for session in sorted(sessions):
print(f" {session}")
else:
for session in sorted(list(sessions)[:5]):
print(f" {session}")
print(f" ... and {len(sessions) - 5} more")
print(f"Unique trials sampled: {len(trials)}")
def show_ctc_results_structure():
"""Show the structure of CTC results pkl files"""
data_dir = Path("phoneme_segmented_data")
ctc_files = list(data_dir.glob("ctc_results_*.pkl"))
if not ctc_files:
print("\n=== No CTC Results Files Found ===")
return
# Load the latest CTC results file
latest_ctc = max(ctc_files, key=lambda x: x.stat().st_mtime)
print(f"\n=== Examining Latest CTC Results: {latest_ctc.name} ===")
with open(latest_ctc, 'rb') as f:
ctc_data = pickle.load(f)
print(f"CTC data type: {type(ctc_data)}")
print(f"CTC data length: {len(ctc_data)}")
# Show sample CTC result
if ctc_data:
sample = ctc_data[0]
print(f"\nSample CTC result:")
print(f" Type: {type(sample)}")
if isinstance(sample, dict):
for key, value in sample.items():
if key == 'ctc_score':
print(f" {key}: {value:.3f}")
elif key == 'predicted_phonemes' and isinstance(value, list):
phonemes_str = ' '.join(value[:10]) # First 10 phonemes
if len(value) > 10:
phonemes_str += f" ... ({len(value)} total)"
print(f" {key}: [{phonemes_str}]")
elif key == 'alignment_info' and isinstance(value, list):
print(f" {key}: {len(value)} alignment segments")
else:
print(f" {key}: {str(value)[:100]}{'...' if len(str(value)) > 100 else ''}")
if __name__ == "__main__":
show_phoneme_dataset_structure()
show_ctc_results_structure()