Facebook自监督学习Visual Transformers(ViT)的训练经验(Moco v3) -- 训练代码解析

Luna
Written by Luna on

    我们来讲Moco v3的代码

    论文的主要内容,参考系列首篇:自监督学习Visual Transformers(ViT)的训练经验(Moco v3) – 论文解析

    官方代码链接:

https://github.com/facebookresearch/moco-v3

    但现在最佳的模型是微软的EsViT(Swin-B),然后才是Moco v3,下面是来自https://paperswithcode.com/的统计:

    这张图最后边的点是EsViT(Swin-B),图中文字没显示出来。

    这个模型也公开了源代码:

https://github.com/microsoft/esvit

    这个代码也会解析哦,心动就关注吧,又给自己挖了一个坑

    公开的Moco v3是用Pytorch实现的,包含自监督学习的Resnet和ViT。而原始的Moco v3其实是用tensorflow实现的,在TPU上做的实验。

    准备工作有安装Python,Pytorch,等相关软件,下载ImageNet等,这里就不展开说了。

    我们直接开始看预训练主函数main_moco.py,开始逐行解析(解析都在注释里):

import torchvision.models as torchvision_models
torchvision_model_names = sorted(name for name in torchvision_models.__dict__
    if name.islower() and not name.startswith("__")
    and callable(torchvision_models.__dict__[name]))
# 产生所有可以用来进行预训练的模型的名称的集合
model_names = ['vit_small', 'vit_base', 'vit_conv_small', 'vit_conv_base'] + torchvision_model_names

    torchvision.models还可以用来加载预训练模型:

# arch是需要加载的预训练模型名,比如:resnet18
model = torchvison.models.__dict__[arch](pretrained=True) 

    接下来看main函数:

import torch.backends.cudnn as cudnn
def main():
    # 存入参数
    args = parser.parse_args()
    # seed默认是None
    # 关于种子的正确设定方式
    # 可以参考 https://pytorch.apachecn.org/docs/1.0/notes_randomness.html        
    if args.seed is not None:
        random.seed(args.seed)
        torch.manual_seed(args.seed)
        cudnn.deterministic = True
        warnings.warn('You have chosen to seed training. '
                      'This will turn on the CUDNN deterministic setting, '
                      'which can slow down your training considerably! '
                      'You may see unexpected behavior when restarting '
                      'from checkpoints.')
        # cudnn.benchmark = False 这里应该还要加一行这个
        
    # 默认是None,因为这个模型预训练工作量比较大,作者都是用几百片GPU或者TPU训练的。
    if args.gpu is not None:
        warnings.warn('You have chosen a specific GPU. This will completely '
                      'disable data parallelism.')


    # dist_url默认值是'tcp://224.66.41.62:23456',应当是作者服务器第一个节点的地址
    # world_size默认值是-1
    # WORLD_SIZE由torch.distributed.launch.py产生 具体数值为 nproc_per_node*node(服务器数量或者节点数)
    if args.dist_url == "env://" and args.world_size == -1:
        args.world_size = int(os.environ["WORLD_SIZE"])


    # multiprocessing_distributed的默认值为False
    # 需要多进程运行程序时一定要使multiprocessing_distributed为True
    args.distributed = args.world_size > 1 or args.multiprocessing_distributed


    # 返回显卡数量
    ngpus_per_node = torch.cuda.device_count()
    if args.multiprocessing_distributed:
        # Since we have ngpus_per_node processes per node,
        # the total world_size needs to be adjusted accordingly
        # 计算总的GPU的数量
        args.world_size = ngpus_per_node * args.world_size
        # Use torch.multiprocessing.spawn to launch distributed processes: 
        # the main_worker process function
        # 开启多进程,每个进程调用main_worker函数,控制一个GPU。
        mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, args))
    else:
        # Simply call main_worker function
        # args.gpu默认是None,如果不采用分布式,则通过这个参数输入用来计算的GPU的编号
        main_worker(args.gpu, ngpus_per_node, args)

    这里需要注意的是,mp.spawn调用的main_worker其实有三个参数,但是mp.spawn后面的args却只给了后两个参数。而第一个参数的gpu的值会自动产生,调用的哪个gpu,就是那个gpu的编号。

    接下来看main_worker函数,因为这个函数太长,会切成若干个代码框:

import torch.distributed as dist
import vits
def main_worker(gpu, ngpus_per_node, args):
    #gpu是运行这个函数使用的gpu编号,如果是None,则不调用gpu
    args.gpu = gpu
    # suppress printing if not first GPU on each node
    # 如果不是主节点上的GPU 0,则抑制打印
    # rank默认是-1,所以在第一个主机运行程序的时候要指定rank为0
    # 可以打印的进程是在第一个主机控制着GPU 0的进程。
    if args.multiprocessing_distributed and (args.gpu != 0 or args.rank != 0):
        def print_pass(*args):
            pass
        builtins.print = print_pass
   
    if args.gpu is not None:
        print("Use GPU: {} for training".format(args.gpu))


    # rank分两种,一种是全局rank, 可以理解为所有进程的编号。用于进程间通信。
    # 一种是local rank, 每个节点上的进程的编号。用于本地设备分配。
    # 以下rank都指全局rank
    if args.distributed:
        if args.dist_url == "env://" and args.rank == -1:
            args.rank = int(os.environ["RANK"])
        if args.multiprocessing_distributed:
            # For multiprocessing distributed training, 
            # rank needs to be the global rank among all the processes
            # 让每个进程都获得一个唯一的编号
            # rank在作为参数输入的时候写的是节点编号
            args.rank = args.rank * ngpus_per_node + gpu
        # dist_backend默认的是nccl,一个多gpu卡通信框架
        dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,
                                world_size=args.world_size, rank=args.rank)
        # 设置栅栏,同步进程
        torch.distributed.barrier()
    # create model
    # arch或a指模型的名称,默认是resnet50 
    print("=> creating model '{}'".format(args.arch))
    # 如果模型名称是以vit开头
    if args.arch.startswith('vit'):
        model = moco.builder.MoCo_ViT(
            # 这里留意一下偏函数的应用,args.stop_grad_conv1默认是False
            partial(vits.__dict__[args.arch], stop_grad_conv1=args.stop_grad_conv1),
            # 默认分别是256,4096,1
            args.moco_dim, args.moco_mlp_dim, args.moco_t)
    else:
        model = moco.builder.MoCo_ResNet(
            # 这里zero_init_residual会使残差分支中最后一个BatchNorm2d的初始值为0
            partial(torchvision_models.__dict__[args.arch], zero_init_residual=True), 
            args.moco_dim, args.moco_mlp_dim, args.moco_t)

    上面这段代码主要是设置一些关键的参数,以及加载模型,接下来看第二部分代码main_worker[2]:

    # infer learning rate before changing batch size
    args.lr = args.lr * args.batch_size / 256


    if not torch.cuda.is_available():
        print('using CPU, this will be slow')
    elif args.distributed:
        # apply SyncBN,要理解SyncBN可以仔细看一下参考[1]
        # 简单理解SyncBN就是多卡模式的Batch Normalization (BN)
        # 把是或者继承了torch.nn.modules.batchnorm._BatchNorm的BN全部替换成SyncBN
        # 所以在写model时,BN必须用torch.nn.modules.batchnorm._BatchNorm来实现
        # 否则就得自己写一个SyncBN
        model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
        # For multiprocessing distributed, DistributedDataParallel constructor
        # should always set the single device scope, otherwise,
        # DistributedDataParallel will use all available devices.
        if args.gpu is not None:
            torch.cuda.set_device(args.gpu)
            model.cuda(args.gpu)
            # When using a single GPU per process and per
            # DistributedDataParallel, we need to divide the batch size
            # ourselves based on the total number of GPUs we have
            # 把batch分一分
            args.batch_size = int(args.batch_size / args.world_size)
            # 用来导入数据的进程数量,必须大于0,默认是32,这里为什么这么计算,不是很懂。
            args.workers = int((args.workers + ngpus_per_node - 1) / ngpus_per_node)
            # 构造DDP模型
            model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
        else:
            model.cuda()
            # DistributedDataParallel will divide and allocate batch_size to all
            # available GPUs if device_ids are not set
            model = torch.nn.parallel.DistributedDataParallel(model)
    elif args.gpu is not None:
        torch.cuda.set_device(args.gpu)
        model = model.cuda(args.gpu)
        # comment out the following line for debugging
        raise NotImplementedError("Only DistributedDataParallel is supported.")
    else:
        # AllGather/rank implementation in this code only supports DistributedDataParallel.
        raise NotImplementedError("Only DistributedDataParallel is supported.")
    print(model) # print model after SyncBatchNorm
    # optimizer默认是lars,resnet50用的是lars
    # weight_decay默认是1e-6,momentum默认是0.9
    if args.optimizer == 'lars':
        optimizer = moco.optimizer.LARS(model.parameters(), args.lr,
                                        weight_decay=args.weight_decay,
                                        momentum=args.momentum)
    # vit用adamw
    # weight_decay需要被设定为0.1
    elif args.optimizer == 'adamw':
        optimizer = torch.optim.AdamW(model.parameters(), args.lr,
                                weight_decay=args.weight_decay)

    依旧是一些关键参数,以及把并行训练的一些配置,main_worker[3]:

    # 在训练最开始之前实例化一个GradScaler对象
    # 自动混合精度的详细内容请参考[2]
    scaler = torch.cuda.amp.GradScaler()
    # 如果是第一进程,使用预设名称建立实体,具体请参考[3]
    summary_writer = SummaryWriter() if args.rank == 0 else None


    # optionally resume from a checkpoint
    # resume默认为空
    # 如果需要训练已经训练了一段时间的模型,可以通过resume输入模型地址
    if args.resume:
        if os.path.isfile(args.resume):
            print("=> loading checkpoint '{}'".format(args.resume))
            if args.gpu is None:
                checkpoint = torch.load(args.resume)
            else:
                # Map model to be loaded to specified single gpu.
                loc = 'cuda:{}'.format(args.gpu)
                checkpoint = torch.load(args.resume, map_location=loc)
            args.start_epoch = checkpoint['epoch']
            model.load_state_dict(checkpoint['state_dict'])
            optimizer.load_state_dict(checkpoint['optimizer'])
            scaler.load_state_dict(checkpoint['scaler'])
            print("=> loaded checkpoint '{}' (epoch {})"
                  .format(args.resume, checkpoint['epoch']))
        else:
            print("=> no checkpoint found at '{}'".format(args.resume))
    # 在cuDNN中选择卷积算法,以加快训练速度,具体见参考[4]
    cudnn.benchmark = True


    # Data loading code
    # 产生训练数据目录
    traindir = os.path.join(args.data, 'train')

    一些关键的设置,比如自动混合精度,归一化,以及是否是接续之前的训练,训练模型。main_worker[4]:

    # transforms.Normalize公式input[channel] = (input[channel] - mean[channel]) / std[channel]
    # Normalize() 函数的作用是将数据转换为标准正太分布,使模型更容易收敛。
    # mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225] 
    # 是从 ImageNet 数据集的数百万张图片中随机抽样计算得到的。
    normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
    # follow BYOL's augmentation recipe: https://arxiv.org/abs/2006.07733
    # 数据增强
    augmentation1 = [
        # 将给定图像随机裁剪为不同的大小和宽高比,然后缩放所裁剪得到的图像为制定的大小
        # 即先随机采集,然后对裁剪得到的图像缩放为同一大小,默认scale=(0.08, 1.0)
        transforms.RandomResizedCrop(224, scale=(args.crop_min, 1.)),
        # 以0.8的概率执行此动作,
        transforms.RandomApply([
            # 改变图像的属性:亮度(brightness)、对比度(contrast)、饱和度(saturation)和色调(hue)
            # 怎么变的呢,将数字带入[max(0, 1 - offset), 1 + offset]获得区间
            # 比如亮度位置上是0.4,则得到区间[0.6,1.4]
            # 那么新图的亮度会是这个区间的一个随机数
            transforms.ColorJitter(0.4, 0.4, 0.2, 0.1)  # not strengthened
        ], p=0.8),
        # 依概率p将图片转换为灰度图
        transforms.RandomGrayscale(p=0.2),
        # 产生在[0.1,2]区间的随机数,对图片进行高斯模糊
        # 调用的函数是ImageFilter.GaussianBlur
        transforms.RandomApply([moco.loader.GaussianBlur([.1, 2.])], p=1.0),
        # 依据概率p对PIL图片进行水平翻转 参数:p默认值为0.5
        transforms.RandomHorizontalFlip(),
        # 将PIL Image或者 ndarray 转换为tensor,并且归一化至[0-1] 注意事项:归一化至[0-1]是直接除以255,若自己的ndarray数据尺度有变化,则需要自行修改。
        transforms.ToTensor(),
        # 之前定义的归一化方式,归一化后,数值所在的区间大概是(-1,1)
        normalize
    ]


    augmentation2 = [
        transforms.RandomResizedCrop(224, scale=(args.crop_min, 1.)),
        transforms.RandomApply([
            transforms.ColorJitter(0.4, 0.4, 0.2, 0.1)  # not strengthened
        ], p=0.8),
        transforms.RandomGrayscale(p=0.2),
        # 高斯模糊的概率和augmentation1不一样
        transforms.RandomApply([moco.loader.GaussianBlur([.1, 2.])], p=0.1),
        # 以0.2的概率做Solarize
        # 默认阈值是128,大于128的像素值,转换成二进制,然后做01反转
        # 即0变1,1变0,得到新的像素值。
        transforms.RandomApply([moco.loader.Solarize()], p=0.2),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        normalize
    ]

    这部分主要是关于数据增强的设定,至于为什么这么做数据增强,作者有把参考的文章列在代码注释里,感兴趣的可以去看一下。接下来是这个函数的最后一个部分main_worker[5]:

    # 在指定目录下做载入数据
    # ImageFolder(root,transform=None,target_transform=None,loader=default_loader)
    # transform:对图片进行预处理的操作(函数),原始图片作为输入,返回一个转换后的图片。
    # 对图片类别进行预处理的操作,输入为 target,输出对其的转换。如果不传该参数,即对 target 不做任何转换。
    # loader 默认操作是读取PIL image对象。
    # 这里是模型很关键的一步,将每一幅图做两种不同的transform,得到两张图
    train_dataset = datasets.ImageFolder(
        traindir,
        # 输入im返回是[im1,im2],分别按augmentation1和augmentation2做了变换
        moco.loader.TwoCropsTransform(transforms.Compose(augmentation1), 
                                      transforms.Compose(augmentation2)))


    if args.distributed:
        # 把数据划分成num_gpu份,不同的GPU拿自己那一份
        train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
    else:
        train_sampler = None
    # sampler和shuffle是互斥的,有sampler,shuffle就可以是None了
    # num_workers要大于0,0代表用主进程导入数据,大于0表示用num_workers个进程导入数据
    # pin_memory: if True, the data loader will copy tensors into CUDA pinned memory before returning them
    # 主机中的内存,有两种存在方式,一是锁页,二是不锁页, 
    # 锁页内存存放的内容在任何情况下都不会与主机的虚拟内存进行交换(注:虚拟内存就是硬盘),
    # 而不锁页内存在主机内存不足时,数据会存放在虚拟内存中。
    # 显卡中的显存全部是锁页内存,当计算机的内存充足的时候,可以设置pin_memory=True。
    # 意味着生成的Tensor数据存放在锁页内存中,这样内存中的Tensor转移到GPU的显存会更快。 
    # 当系统卡住,或者交换内存使用过多的时候,设置pin_memory=False。
    # 因为pin_memory与电脑硬件性能有关,pin_memory默认为False。
    # drop_last: set to ``True`` to drop the last incomplete batch   
    train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None),
        num_workers=args.workers, pin_memory=True, sampler=train_sampler, drop_last=True)


    for epoch in range(args.start_epoch, args.epochs):
        if args.distributed:
            # 加了这行,每次gpu才会拿到不同组合的batch
            # 换句话说,加上这行,每个epoch才会被shuffle
            train_sampler.set_epoch(epoch)


        # train for one epoch,这个接下来讲
        train(train_loader, model, optimizer, scaler, summary_writer, epoch, args)
        #主进程存储训练过程中的模型,防止因为一些事情中断训练后,需要从头再来。
        if not args.multiprocessing_distributed or (args.multiprocessing_distributed
                and args.rank == 0): # only the first GPU saves checkpoint
            save_checkpoint({
                'epoch': epoch + 1,
                'arch': args.arch,
                'state_dict': model.state_dict(),
                'optimizer' : optimizer.state_dict(),
                'scaler': scaler.state_dict(),
            # 这个代码里没有做is_best判断
            # 通常这个参数是用来保存训练过程中测试的效果最好的模型,防止过拟合。
            }, is_best=False, filename='checkpoint_%04d.pth.tar' % epoch)
    if args.rank == 0:
        summary_writer.close()

这部分代码主要内容:一、准备数据,二、训练,三、主进程保存节点模型。

    接下来看train这个函数:

def train(train_loader, model, optimizer, scaler, summary_writer, epoch, args):
    # AverageMeter是作者自己定义的函数,用来计算循环中,一些过程的平均消耗
    batch_time = AverageMeter('Time', ':6.3f')
    data_time = AverageMeter('Data', ':6.3f')
    learning_rates = AverageMeter('LR', ':.4e')
    losses = AverageMeter('Loss', ':.4e')
    progress = ProgressMeter(
        len(train_loader),
        [batch_time, data_time, learning_rates, losses],
        prefix="Epoch: [{}]".format(epoch))


    # switch to train mode
    model.train()


    end = time.time()
    iters_per_epoch = len(train_loader)
    moco_m = args.moco_m
    for i, (images, _) in enumerate(train_loader):
        # measure data loading time
        # 计算在循环中,载入数据平均消耗的时间。
        data_time.update(time.time() - end)
        # adjust learning rate and momentum coefficient per iteration
        # adjust_learning_rate是作者自己定义的函数
        # learning rate先由小变大,再由大变小。具体可以参考代码
        lr = adjust_learning_rate(optimizer, epoch + i / iters_per_epoch, args)
        # 更新在循环中,使用的lr的平均值
        learning_rates.update(lr)
        if args.moco_m_cos:
            #同样是作者自己定义的函数,moco_m则是由本来就很接近1(0.99),变得越来越接近1。
            moco_m = adjust_moco_momentum(epoch + i / iters_per_epoch, args)


        if args.gpu is not None:
            # 如果pin_memory=True的话,将数据放入GPU的时候,
            # 也应该把non_blocking打开,这样就只把数据放入GPU而不取出,访问时间会大大减少。
            images[0] = images[0].cuda(args.gpu, non_blocking=True)
            images[1] = images[1].cuda(args.gpu, non_blocking=True)


        # compute output
        with torch.cuda.amp.autocast(True):
            # moco_m: moco momentum of updating momentum encoder
            # model下一篇说
            loss = model(images[0], images[1], moco_m)
        # loss平均值更新
        losses.update(loss.item(), images[0].size(0))
        if args.rank == 0:
            summary_writer.add_scalar("loss", loss.item(), epoch * iters_per_epoch + i)


        # compute gradient and do SGD step
        # optimizer.zero_grad()意思是把梯度置零,
        # 也就是把loss关于weight的导数变成0.
        optimizer.zero_grad()
        # Scales loss. 为了梯度放大. scaler原理可以参考[2]
        scaler.scale(loss).backward()
        # scaler.step() 首先把梯度的值unscale回来.
        # 如果梯度的值不是 infs 或者 NaNs, 那么调用optimizer.step()来更新权重,
        # 否则,忽略step调用,从而保证权重不更新(不被破坏)
        scaler.step(optimizer)
        # 准备着,看是否要增大scaler  scaler.update()


        # measure elapsed time
        batch_time.update(time.time() - end)
        end = time.time()
        # print_ferq默认是10
        if i % args.print_freq == 0:
            progress.display(i)

    这里就是Moco v3训练的主要代码,有些太过细枝末节又容易懂的就没有放进来。

    代码主要特点和主要内容有: - 多卡,每张卡有多个gpu,每个进程管理一个gpu - 用了自动混合精度机制

  • 不同的数据增强的配置

    当然还有很多细节的设计,比如lr和moco_m参数的调整,数据存入gpu的方式等等,都是非常值得深入学习的,这篇只是浏览,具体原理可以参考我给的一些参考链接,自己搜索更有用的资料,或者去研读pytorch源代码。

    下一篇讲Moco v3的模型代码。喜欢或者觉得有用就关注吧,可以的话,右下角点个“在看”吧。    

参考:

[1] 996黄金一代,[原创][深度][PyTorch] DDP系列第三篇:实战与技巧,知乎,2020

[2] Gemfield,PyTorch的自动混合精度(AMP),知乎,2020

[3] dexter,TensorBoardX 介紹 (在 PyTorch 中使用 Tensorboard),知乎,2018

[4] xiaopl,torch.backends.cudnn.benchmark ?!,知乎,2019

Comments

comments powered by Disqus