Source code for cosense3d.utils.train_utils

import copy
import random
import warnings

import numpy as np
import torch
from torch.nn.utils.clip_grad import clip_grad_norm_


[docs]def get_gpu_architecture(): if torch.cuda.is_available(): device = torch.device("cuda") gpu_props = torch.cuda.get_device_properties(device) return gpu_props.major * 10 + gpu_props.minor else: return 0
[docs]def seed_everything(seed): torch.manual_seed(seed) random.seed(seed) np.random.seed(seed)
[docs]def build_optimizer(model, cfg): # construct an optimizer params = [p for p in model.parameters() if p.requires_grad] optimizer = torch.optim.AdamW(params, lr=cfg['lr'], weight_decay=cfg['weight_decay'], betas=tuple(cfg['betas'])) return optimizer
[docs]def build_lr_scheduler(optimizer, cfg, steps_per_epoch): cfg_ = copy.copy(cfg) policy = cfg_.pop('policy', 'MultiStepLR') if policy == 'MultiStepLR': # construct a learning rate scheduler lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=cfg['milestones'], gamma=cfg['gamma']) elif policy == 'CosineAnnealingWarm': from timm.scheduler.cosine_lr import CosineLRScheduler num_steps = cfg['epochs'] * steps_per_epoch warmup_lr = cfg['warmup_lr'] warmup_steps = cfg['warmup_epochs'] * steps_per_epoch lr_min = cfg['lr_min'] lr_scheduler = CosineLRScheduler( optimizer, t_initial=num_steps, lr_min=lr_min, warmup_lr_init=warmup_lr, warmup_t=warmup_steps, cycle_limit=1, t_in_epochs=False, ) else: raise NotImplementedError return lr_scheduler
[docs]def is_tensor_to_cuda(data, device=0): if isinstance(data, dict): for k, v in data.items(): data[k] = is_tensor_to_cuda(v, device) return data elif isinstance(data, torch.Tensor): return data.to(device) elif isinstance(data, list) or isinstance(data, tuple): data_t = [] for i in range(len(data)): data_t.append(is_tensor_to_cuda(data[i], device)) return data_t else: return data
[docs]def load_tensors_to_gpu(batch_dict, device=0): """ Load all tensors in batch_dict to gpu """ for k, v in batch_dict.items(): batch_dict[k] = is_tensor_to_cuda(v, device=device)
[docs]def load_model_dict(model, pretrained_dict): try: model.load_state_dict(pretrained_dict) except: UnmatchedParams = "" # 1. filter out unnecessary keys model_dict = model.state_dict() matched_dict = {} pretrained_keys = list() for k, v in pretrained_dict.items(): if 'module' in k: k = k.replace('module.', '') if k in model_dict and v.shape == model_dict[k].shape: matched_dict[k] = v elif v.shape != model_dict[k].shape: UnmatchedParams += f"{k} : Unmatched shape ({v.shape} -> {model_dict[k].shape})\n" else: UnmatchedParams += f"{k} : Pretrained parameters not in model dict\n" pretrained_keys.append(k) for k in set(model_dict.keys()) - set(pretrained_keys): UnmatchedParams += f"{k} : Model parameters not in pretrained dict\n" if len(UnmatchedParams) > 0: warnings.warn("Model state dict does not match pretrained state dict. Unmatched parameters are:\n" + UnmatchedParams) # 2. overwrite entries in the existing state dict model_dict.update(matched_dict) # 3. load the new state dict model.load_state_dict(model_dict) return model
[docs]def clip_grads(params, max_norm=35, norm_type=2): params = list( filter(lambda p: p.requires_grad and p.grad is not None, params)) if len(params) > 0: total_norm = clip_grad_norm_(params, max_norm=max_norm, norm_type=norm_type) return total_norm