144 lines
		
	
	
		
			6.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			144 lines
		
	
	
		
			6.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import torch 
 | |
| from torch import nn
 | |
| 
 | |
| class LSTMDecoder(nn.Module):
 | |
|     '''
 | |
|     Defines the LSTM decoder
 | |
| 
 | |
|     This class combines day-specific input layers, an LSTM, and an output classification layer
 | |
|     '''
 | |
|     def __init__(self,
 | |
|                  neural_dim,
 | |
|                  n_units,
 | |
|                  n_days,
 | |
|                  n_classes,
 | |
|                  rnn_dropout = 0.0,
 | |
|                  input_dropout = 0.0,
 | |
|                  n_layers = 5, 
 | |
|                  patch_size = 0,
 | |
|                  patch_stride = 0,
 | |
|                  ):
 | |
|         '''
 | |
|         neural_dim  (int)      - number of channels in a single timestep (e.g. 512)
 | |
|         n_units     (int)      - number of hidden units in each recurrent layer - equal to the size of the hidden state
 | |
|         n_days      (int)      - number of days in the dataset
 | |
|         n_classes   (int)      - number of classes 
 | |
|         rnn_dropout    (float) - percentage of units to droupout during training
 | |
|         input_dropout (float)  - percentage of input units to dropout during training
 | |
|         n_layers    (int)      - number of recurrent layers 
 | |
|         patch_size  (int)      - the number of timesteps to concat on initial input layer - a value of 0 will disable this "input concat" step 
 | |
|         patch_stride(int)      - the number of timesteps to stride over when concatenating initial input 
 | |
|         '''
 | |
|         super(LSTMDecoder, self).__init__()
 | |
|         
 | |
|         self.neural_dim = neural_dim
 | |
|         self.n_units = n_units
 | |
|         self.n_classes = n_classes
 | |
|         self.n_layers = n_layers 
 | |
|         self.n_days = n_days
 | |
| 
 | |
|         self.rnn_dropout = rnn_dropout
 | |
|         self.input_dropout = input_dropout
 | |
|         
 | |
|         self.patch_size = patch_size
 | |
|         self.patch_stride = patch_stride
 | |
| 
 | |
|         # Parameters for the day-specific input layers
 | |
|         self.day_layer_activation = nn.Softsign() # basically a shallower tanh 
 | |
| 
 | |
|         # Set weights for day layers to be identity matrices so the model can learn its own day-specific transformations
 | |
|         self.day_weights = nn.ParameterList(
 | |
|             [nn.Parameter(torch.eye(self.neural_dim)) for _ in range(self.n_days)]
 | |
|         )
 | |
|         self.day_biases = nn.ParameterList(
 | |
|             [nn.Parameter(torch.zeros(1, self.neural_dim)) for _ in range(self.n_days)]
 | |
|         )
 | |
| 
 | |
|         self.day_layer_dropout = nn.Dropout(input_dropout)
 | |
|         
 | |
|         self.input_size = self.neural_dim
 | |
| 
 | |
|         # If we are using "strided inputs", then the input size of the first recurrent layer will actually be in_size * patch_size
 | |
|         if self.patch_size > 0:
 | |
|             self.input_size *= self.patch_size
 | |
| 
 | |
|         self.lstm = nn.LSTM(
 | |
|             input_size = self.input_size,
 | |
|             hidden_size = self.n_units,
 | |
|             num_layers = self.n_layers,
 | |
|             dropout = self.rnn_dropout, 
 | |
|             batch_first = True, # The first dim of our input is the batch dim
 | |
|             bidirectional = False,
 | |
|         )
 | |
| 
 | |
|         # Set recurrent units to have orthogonal param init and input layers to have xavier init
 | |
|         for name, param in self.lstm.named_parameters():
 | |
|             if "weight_hh" in name:
 | |
|                 nn.init.orthogonal_(param)
 | |
|             elif "weight_ih" in name:
 | |
|                 nn.init.xavier_uniform_(param)
 | |
|             elif "bias" in name:
 | |
|                 # Initialize biases to zero first
 | |
|                 nn.init.zeros_(param)
 | |
|                 # Set forget gate bias to 1.0 to prevent vanishing gradients
 | |
|                 # LSTM bias structure: [input_gate, forget_gate, cell_gate, output_gate]
 | |
|                 # Each gate has hidden_size parameters
 | |
|                 hidden_size = param.size(0) // 4
 | |
|                 param.data[hidden_size:2*hidden_size].fill_(1.0)  # forget gate bias = 1.0
 | |
| 
 | |
|         # Prediciton head. Weight init to xavier
 | |
|         self.out = nn.Linear(self.n_units, self.n_classes)
 | |
|         nn.init.xavier_uniform_(self.out.weight)
 | |
| 
 | |
|         # Learnable initial hidden states
 | |
|         self.h0 = nn.Parameter(nn.init.xavier_uniform_(torch.zeros(1, 1, self.n_units)))
 | |
| 
 | |
|     def forward(self, x, day_idx, states = None, return_state = False):
 | |
|         '''
 | |
|         x        (tensor)  - batch of examples (trials) of shape: (batch_size, time_series_length, neural_dim)
 | |
|         day_idx  (tensor)  - tensor which is a list of day indexs corresponding to the day of each example in the batch x. 
 | |
|         '''
 | |
| 
 | |
|         # Apply day-specific layer to (hopefully) project neural data from the different days to the same latent space
 | |
|         day_weights = torch.stack([self.day_weights[i] for i in day_idx], dim=0)
 | |
|         day_biases = torch.cat([self.day_biases[i] for i in day_idx], dim=0).unsqueeze(1)
 | |
| 
 | |
|         x = torch.einsum("btd,bdk->btk", x, day_weights) + day_biases
 | |
|         x = self.day_layer_activation(x)
 | |
| 
 | |
|         # Apply dropout to the ouput of the day specific layer
 | |
|         if self.input_dropout > 0:
 | |
|             x = self.day_layer_dropout(x)
 | |
| 
 | |
|         # (Optionally) Perform input concat operation
 | |
|         if self.patch_size > 0: 
 | |
|   
 | |
|             x = x.unsqueeze(1)                      # [batches, 1, timesteps, feature_dim]
 | |
|             x = x.permute(0, 3, 1, 2)               # [batches, feature_dim, 1, timesteps]
 | |
|             
 | |
|             # Extract patches using unfold (sliding window)
 | |
|             x_unfold = x.unfold(3, self.patch_size, self.patch_stride)  # [batches, feature_dim, 1, num_patches, patch_size]
 | |
|             
 | |
|             # Remove dummy height dimension and rearrange dimensions
 | |
|             x_unfold = x_unfold.squeeze(2)           # [batches, feature_dum, num_patches, patch_size]
 | |
|             x_unfold = x_unfold.permute(0, 2, 3, 1)  # [batches, num_patches, patch_size, feature_dim]
 | |
| 
 | |
|             # Flatten last two dimensions (patch_size and features)
 | |
|             x = x_unfold.reshape(x.size(0), x_unfold.size(1), -1) 
 | |
|         
 | |
|         # Determine initial hidden states
 | |
|         if states is None:
 | |
|             h0 = self.h0.expand(self.n_layers, x.shape[0], self.n_units).contiguous()
 | |
|             c0 = torch.zeros_like(h0)  # Initialize cell state to zeros
 | |
|             states = (h0, c0)
 | |
| 
 | |
|         # Pass input through RNN 
 | |
|         output, hidden_states = self.lstm(x, states)
 | |
| 
 | |
|         # Compute logits
 | |
|         logits = self.out(output)
 | |
|         
 | |
|         if return_state:
 | |
|             return logits, hidden_states
 | |
|         
 | |
|         return logits | 
