mdz/pytorch/AiATrack/1_scripts/2_save_infer.py

620 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
prj_path = os.path.join(os.path.dirname(__file__), './')
if prj_path not in sys.path:
sys.path.append(prj_path)
import torch
import torch.nn.functional as F
from lib.models.aiatrack.aiatrack import AIATRACK as AIATRACK_M
from lib.models.aiatrack.backbone import *
from lib.models.aiatrack.transformer import *
from lib.models.aiatrack.transformer import _get_activation_fn
from lib.models.aiatrack.head import *
from lib.models.aiatrack.position_encoding import PositionEmbeddingSine
from tracking.test import *
from lib.test.tracker.aiatrack import *
from lib.test.tracker.aiatrack import AIATRACK as AIATRACK_T
from lib.utils.box_ops import box_xyxy_to_cxcywh, box_xyxy_to_xywh
#----------------------------
# 1. AIATRACK.backbone相关内容修改
#----------------------------
#----------------------------
# 1.1 BackboneBase中self.body(IntermediateLayerGetter类)的forward函数
# 返回的变量格式由Dict改为Tensor
#----------------------------
def layer3ft(self, x):
for name, module in self.items():
x = module(x)
return x
IntermediateLayerGetter.forward = layer3ft
#----------------------------
# 1.2 替换BackboneBase的forward函数
# 输入参数格式由NestedTensor改为Tensor
# 返回的变量格式由Dict改为Tensor即上文layer3ft函数
#----------------------------
def BbBase_forward(self, imgft): # revision for icraft
xs = self.body(imgft)
return xs
BackboneBase.forward = BbBase_forward
#----------------------------
# 1.3 替换Backbone类的__init__函数
# backbone-ResNet50中的norm_layer由FrozenBatchNorm2d改为nn.BatchNorm2d
#----------------------------
def Bb_init(self, name: str,
train_backbone: bool,
return_interm_layers: bool,
dilation: bool,
freeze_bn: bool):
norm_layer = nn.BatchNorm2d
# Here is different from the original DETR because we use feature from block3
backbone = getattr(resnet_module, name)(
replace_stride_with_dilation=[False, dilation, False],
pretrained=is_main_process(), norm_layer=norm_layer, last_layer='layer3')
num_channels = 256 if name in ('resnet18', 'resnet34') else 1024
super(Backbone, self).__init__(backbone, train_backbone, num_channels, return_interm_layers)
Backbone.__init__ = Bb_init
#----------------------------
# 1.3 替换Joiner类的__init__和forward函数
# 去除原backbone中的embedding函数
# 输出的变量格式由Dict改为Tensor
#----------------------------
def Joiner_init(self, backbone, position_embedding, learned_embedding):
super(Joiner, self).__init__(backbone)
def Joiner_forward(self, imgft):
xsts = self[0](imgft)
return xsts
Joiner.__init__ = Joiner_init
Joiner.forward = Joiner_forward
#----------------------------
# 1.4 替换AIATRACK类的adjust函数
# 去除mask, pos_embed, inr_embed相关内容
# 输入和输出格式统一改为Tensor
#----------------------------
def aiatrack_adjust(self, src_feat):
# Reduce channel
feat = self.bottleneck(src_feat) # (B, C, H, W)
# Adjust shapes
feat_vec = feat.flatten(2).permute(2, 0, 1) # HWxBxC
return feat_vec
AIATRACK_M.adjust = aiatrack_adjust
#----------------------------
# 2. AIATRACK.transformer相关内容修改
#----------------------------
#----------------------------
# 2.1 替换TransformerEncoder&TransformerEncoderLayer类的forward函数
# 去除输入参数中的src_key_padding_mask
#----------------------------
def encoderlayer_forward(self, src,
pos: Optional[Tensor] = None,
inr: Optional[Tensor] = None):
q = k = self.with_pos_embed(src, pos) # Add pos to src
if self.divide_norm:
# Encoder divide by norm
q = q / torch.norm(q, dim=-1, keepdim=True) * self.scale_factor
k = k / torch.norm(k, dim=-1, keepdim=True)
# src2 = self.self_attn(q, k, value=src)[0]
src2 = self.self_attn(query=q, key=k, value=src, pos_emb=inr)[0]
# Add and norm
src = src + self.dropout1(src2)
src = self.norm1(src)
# FFN
src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
# Add and Norm
src = src + self.dropout2(src2)
src = self.norm2(src)
return src
def encoder_forward(self, src,
pos: Optional[Tensor] = None,
inr: Optional[Tensor] = None):
output = src # (HW,B,C)
for stack, layer in enumerate(self.layers):
output = layer(output, pos=pos, inr=inr)
if self.norm is not None:
output = self.norm(output)
return output
TransformerEncoderLayer.forward = encoderlayer_forward
TransformerEncoder.forward = encoder_forward
#----------------------------
# 2.2 替换TransformerDecoder&TransformerDecoderLayer类的forward函数
# 去除输入参数中的refer_msk_list
# 将TransformerDecoderLayer中对refer_mem_list, refer_emb_list, refer_pos_list的torch.cat操作移至网络外作为前处理计算并各自拆分为两部分输入TransformerDecoder, TransformerDecoderLayer
#----------------------------
def decoderlayer_forward(self, tgt,
refer_mem_list0, refer_mem_list1,
refer_emb_list0, refer_emb_list1,
refer_pos_list0, refer_pos_list1):
# Mutual attention
mem_ensemble = refer_mem_list0
emb_ensemble = refer_emb_list0
refer_pos = refer_pos_list0
refer_queries = tgt
refer_keys = mem_ensemble
refer_values = mem_ensemble + emb_ensemble
if self.divide_norm:
refer_queries = refer_queries / torch.norm(refer_queries, dim=-1, keepdim=True) * self.scale_factor
refer_keys = refer_keys / torch.norm(refer_keys, dim=-1, keepdim=True)
long_tgt_refer, long_attn_refer = self.long_term_attn(query=refer_queries,
key=refer_keys,
value=refer_values,
pos_emb=refer_pos)
mem_ensemble = refer_mem_list1
emb_ensemble = refer_emb_list1
refer_pos = refer_pos_list1
refer_queries = tgt
refer_keys = mem_ensemble
refer_values = mem_ensemble + emb_ensemble
if self.divide_norm:
refer_queries = refer_queries / torch.norm(refer_queries, dim=-1, keepdim=True) * self.scale_factor
refer_keys = refer_keys / torch.norm(refer_keys, dim=-1, keepdim=True)
short_tgt_refer, short_attn_refer = self.short_term_attn(query=refer_queries,
key=refer_keys,
value=refer_values,
pos_emb=refer_pos)
tgt = tgt + self.dropout1_1(long_tgt_refer) + self.dropout1_2(short_tgt_refer)
tgt = self.norm1(tgt)
# FFN
tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt))))
# Add and Norm
tgt = tgt + self.dropout2(tgt2)
tgt = self.norm2(tgt)
return tgt
def decoder_forward(self, tgt,
refer_mem_list0, refer_mem_list1,
refer_emb_list0, refer_emb_list1,
refer_pos_list0, refer_pos_list1):
output = tgt
for stack, layer in enumerate(self.layers):
output = layer(output,
refer_mem_list0, refer_mem_list1,
refer_emb_list0, refer_emb_list1,
refer_pos_list0, refer_pos_list1)
if self.norm is not None:
output = self.norm(output)
return output.unsqueeze(0)
TransformerDecoderLayer.forward = decoderlayer_forward
TransformerDecoder.forward = decoder_forward
#----------------------------
# 2.3 替换AiAModule及应用该模块的TransformerEncoderLayer和TransformerDecoderLayer类的__init__函数
# 将CorrAttention类forward函数中对pos_emb的torch.repeat_interleave操作移至网络外作为前处理计算
# 去除一些未使用的输入参数对应2.1、2.2中去除参数精简if/else语句
#----------------------------
from attention_for_save import AiAModule as AiAModuleTRACE
def encoderlayer_init(self, d_model, nhead, dim_feedforward=2048, dropout=0.1,
activation='relu', normalize_before=False, divide_norm=False,
use_AiA=True, match_dim=64, feat_size=400):
super(TransformerEncoderLayer, self).__init__()
self.self_attn = AiAModuleTRACE(d_model, nhead, dropout=dropout,
use_AiA=use_AiA, match_dim=match_dim, feat_size=feat_size)
# Implementation of Feedforward model
self.linear1 = nn.Linear(d_model, dim_feedforward)
self.dropout = nn.Dropout(dropout)
self.linear2 = nn.Linear(dim_feedforward, d_model)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
self.activation = _get_activation_fn(activation)
self.normalize_before = normalize_before # First normalization, then add
self.divide_norm = divide_norm
self.scale_factor = float(d_model // nhead) ** 0.5
def decoderlayer_init(self, d_model, nhead, dim_feedforward=2048, dropout=0.1,
activation='relu', normalize_before=False, divide_norm=False,
use_AiA=True, match_dim=64, feat_size=400):
super(TransformerDecoderLayer, self).__init__()
self.long_term_attn = AiAModuleTRACE(d_model, nhead, dropout=dropout,
use_AiA=use_AiA, match_dim=match_dim, feat_size=feat_size)
self.short_term_attn = AiAModuleTRACE(d_model, nhead, dropout=dropout,
use_AiA=use_AiA, match_dim=match_dim, feat_size=feat_size)
# Implementation of Feedforward model
self.linear1 = nn.Linear(d_model, dim_feedforward)
self.dropout = nn.Dropout(dropout)
self.linear2 = nn.Linear(dim_feedforward, d_model)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout1_1 = nn.Dropout(dropout)
self.dropout1_2 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
self.activation = _get_activation_fn(activation)
self.normalize_before = normalize_before
self.divide_norm = divide_norm
self.scale_factor = float(d_model // nhead) ** 0.5
TransformerEncoderLayer.__init__ = encoderlayer_init
TransformerDecoderLayer.__init__ = decoderlayer_init
#----------------------------
# 3. AIATRACK.box_head相关内容修改
#----------------------------
#----------------------------
# 3.1 替换Corner—_Predictor类的__init__, forward和soft_argmax函数
#----------------------------
# 3.1.1 在__init__函数中新增一个shape为(feat_sz*feat_sz, 1)、值为1.0的常量tensor
def CP_init(self, inplanes=64, channel=256, feat_sz=20, stride=16, freeze_bn=False):
super(Corner_Predictor, self).__init__()
self.feat_sz = feat_sz
self.stride = stride
self.img_sz = self.feat_sz * self.stride
# Top-left corner
self.conv1_tl = conv(inplanes, channel, freeze_bn=freeze_bn)
self.conv2_tl = conv(channel, channel // 2, freeze_bn=freeze_bn)
self.conv3_tl = conv(channel // 2, channel // 4, freeze_bn=freeze_bn)
self.conv4_tl = conv(channel // 4, channel // 8, freeze_bn=freeze_bn)
self.conv5_tl = nn.Conv2d(channel // 8, 1, kernel_size=(1, 1))
# Bottom-right corner
self.conv1_br = conv(inplanes, channel, freeze_bn=freeze_bn)
self.conv2_br = conv(channel, channel // 2, freeze_bn=freeze_bn)
self.conv3_br = conv(channel // 2, channel // 4, freeze_bn=freeze_bn)
self.conv4_br = conv(channel // 4, channel // 8, freeze_bn=freeze_bn)
self.conv5_br = nn.Conv2d(channel // 8, 1, kernel_size=(1, 1))
# About coordinates and indexes
with torch.no_grad():
self.indice = torch.arange(0, self.feat_sz).view(-1, 1) * self.stride
# Generate mesh-grid
self.coord_x = self.indice.repeat((self.feat_sz, 1)).view((self.feat_sz * self.feat_sz,)).float() # .cuda()
self.coord_y = self.indice.repeat((1, self.feat_sz)).view((self.feat_sz * self.feat_sz,)).float() # .cuda()
# revision for icraft: 新增用于进行torch.matmul操作的常量tensor
# 可自行决定是否在cuda device下保存模型确保此处三者与导入模型&输入参数一致即可
self.exp_e = torch.tensor([[1.0]]*feat_sz*feat_sz).float() # .cuda()
# 3.1.2 将soft_argmax函数中的torch.sum改为1中新增的常量tensor进行torch.matmul操作
def CP_soft_argmax(self, score_map):
"""
Get soft-argmax coordinate for a given heatmap.
"""
prob_vec = nn.functional.softmax(
score_map.view((-1, self.feat_sz * self.feat_sz)), dim=1) # (batch, feat_sz * feat_sz)
exp_x = self.coord_x * prob_vec
exp_y = self.coord_y * prob_vec
exp_x = torch.matmul(exp_x, self.exp_e)
exp_y = torch.matmul(exp_y, self.exp_e)
return exp_x, exp_y
# 3.1.3 将forward函数中的torch.stack改为torch.cat和reshape
def CP_forward(self, x):
"""
Forward pass with input x.
"""
score_map_tl, score_map_br = self.get_score_map(x)
coorx_tl, coory_tl = self.soft_argmax(score_map_tl)
coorx_br, coory_br = self.soft_argmax(score_map_br)
return torch.cat((coorx_tl, coory_tl, coorx_br, coory_br), dim=0).reshape(1, -1) / self.img_sz
Corner_Predictor.__init__ = CP_init
Corner_Predictor.soft_argmax = CP_soft_argmax
Corner_Predictor.forward = CP_forward
#----------------------------
# 3.2 替换AIATRACK类的forward_box_head函数
# 将box_xyxy_to_xywh, box_xyxy_to_cxcywh函数计算移至网络外作为后处理计算输出仅一个tensor变量
#----------------------------
def aiatrack_forward_box_head(self, hs):
"""
Args:
hs: Output embeddings (1, HW, B, C).
"""
# Adjust shape
opt = hs.permute(2, 0, 3, 1).contiguous()
bs, Nq, C, HW = opt.size()
opt_feat = opt.view(-1, C, self.feat_sz_s, self.feat_sz_s)
# Run the corner head
bbox_coor = self.box_head(opt_feat)
return bbox_coor
AIATRACK_M.forward_box_head = aiatrack_forward_box_head
#----------------------------
# 4. AIATRACK模型
#----------------------------
#----------------------------
# 4.1 替换AIATRACK类的forward函数
# 串联模型流程
#----------------------------
def aiatrack_forward(self, imgft, pos_emb, inr_emb,
refer_mem0, refer_mem1,
refer_emb0, refer_emb1,
refer_pos0, refer_pos1):
# Forward the backbone
feat = self.adjust(self.backbone(imgft))
# Forward the transformer encoder and decoder
search_mem = self.transformer.encoder(feat, pos_emb, inr_emb)
output_embed = self.transformer.decoder(search_mem,
refer_mem0, refer_mem1,
refer_emb0, refer_emb1,
refer_pos0, refer_pos1)
# Forward the corner head and get iou feature
bbox_coor, iou_feat = self.forward_heads(output_embed)
return search_mem, bbox_coor, iou_feat
#----------------------------
# 4.2 替换AIATRACK类的forward_heads函数
# box_xyxy_to_cxcywh, box_xyxy_to_xywh移至网络外计算
# iou_head中仅保留可导出的get_iou_feat部分
#----------------------------
def aiatrack_forward_heads(self, hs):
"""
Args:
hs: Output embeddings (1, HW, B, C).
"""
opt = hs.permute(2, 0, 3, 1).contiguous()
bs, Nq, C, HW = opt.size()
opt_feat = opt.view(-1, C, self.feat_sz_s, self.feat_sz_s)
# forward box_head
bbox_coor = self.box_head(opt_feat)
# get iou feature
iou_feat = self.iou_head.get_iou_feat(opt_feat)
return bbox_coor, iou_feat
AIATRACK_M.forward = aiatrack_forward
AIATRACK_M.forward_heads = aiatrack_forward_heads
#----------------------------
# 5. AIATRACK推理
#----------------------------
#----------------------------
# 5.1 替换PositionEmbeddingSine类的forward函数
# 去除输入参数中的tensor_list.tensors仅输入tensor_list.mask
#----------------------------
def posemb_forward(self, mask: Tensor):
assert mask is not None
not_mask = ~mask # (b,h,w)
# 1 1 1 1... 2 2 2 2... 3 3 3 3...
y_embed = not_mask.cumsum(1, dtype=torch.float32) # Cumulative sum along axis 1 (h axis) --> (b,h,w)
# 1 2 3 4... 1 2 3 4... 1 2 3 4...
x_embed = not_mask.cumsum(2, dtype=torch.float32) # Cumulative sum along axis 2 (w axis) --> (b,h,w)
if self.normalize:
eps = 1e-6
y_embed = y_embed / (y_embed[:, -1:, :] + eps) * self.scale # 2pi * (y / sigma(y))
x_embed = x_embed / (x_embed[:, :, -1:] + eps) * self.scale # 2pi * (x / sigma(x))
# num_pos_feats = d/2
dim_t = torch.arange(self.num_pos_feats, dtype=torch.float32, device=mask.device) # (0,1,2,...,d/2)
dim_t = self.temperature ** (2 * (dim_t // 2) / self.num_pos_feats)
pos_x = x_embed[:, :, :, None] / dim_t # (b,h,w,d/2)
pos_y = y_embed[:, :, :, None] / dim_t # (b,h,w,d/2)
pos_x = torch.stack((pos_x[:, :, :, 0::2].sin(), pos_x[:, :, :, 1::2].cos()), dim=4).flatten(3) # (b,h,w,d/2)
pos_y = torch.stack((pos_y[:, :, :, 0::2].sin(), pos_y[:, :, :, 1::2].cos()), dim=4).flatten(3) # (b,h,w,d/2)
pos = torch.cat((pos_y, pos_x), dim=3).permute(0, 3, 1, 2) # (b,h,w,d)
return pos # (b,d,h,w)
PositionEmbeddingSine.forward = posemb_forward
#----------------------------
# 5.2 替换AIATRACK类的__init__, initialize和track函数
#----------------------------
# 5.2.1 在__init__函数中新增self.position_embedding, self.inner_embedding
def aiatrack_init(self, params, dataset_name):
super(AIATRACK, self).__init__(params)
# network = build_aiatrack(params.cfg)
# network.load_state_dict(torch.load(self.params.checkpoint, map_location='cpu')['net'], strict=True)
network = torch.jit.load(model_pt)# 加载trace出来的模型
self.cfg = params.cfg
self.net = network#cuda.()#在Gpu环境需要取消此处注释
self.net.eval()
self.preprocessor = Preprocessor()
self.state = None
self.position_embedding, self.inner_embedding = build_position_encoding(params.cfg)
# 可通过读取1_save.py中保存的embed_bank文件获得embed_bank
# self.embed_bank = torch.tensor(np.fromfile(EMBED_BANK, dtype=np.float32).reshape(1, 2, 256)).cuda()
# For debug
self.debug = False
self.frame_id = 0
# Set the hyper-parameters
DATASET_NAME = dataset_name.upper()
if hasattr(self.cfg.TEST.HYPER, DATASET_NAME):
self.cache_siz = self.cfg.TEST.HYPER[DATASET_NAME][0]
self.refer_cap = 1 + self.cfg.TEST.HYPER[DATASET_NAME][1]
self.threshold = self.cfg.TEST.HYPER[DATASET_NAME][2]
else:
self.cache_siz = self.cfg.TEST.HYPER.DEFAULT[0]
self.refer_cap = 1 + self.cfg.TEST.HYPER.DEFAULT[1]
self.threshold = self.cfg.TEST.HYPER.DEFAULT[2]
if self.debug:
self.save_dir = 'debug'
if not os.path.exists(self.save_dir):
os.makedirs(self.save_dir)
# For save boxes from all queries
self.save_all_boxes = params.save_all_boxes
#----------------------------
# 5.2.2 在initialize函数中新增原在AIATRACK.backbone中进行的mask, pos, inr相关计算
# 在initialize函数中新增原在*AiAModule.CorrAttention*中进行的`torch.repeat_interleave`操作;
# 注释后续未使用的refer_msk_cache, refer_msk_list
#----------------------------
def aiatrack_initialize(self, image, info: dict, seq_name: str = None):
# Forward the long-term reference once
refer_crop, resize_factor, refer_att_mask = sample_target(image, info['init_bbox'], self.params.search_factor,
output_sz=self.params.search_size)
refer_box = transform_image_to_crop(torch.Tensor(info['init_bbox']), torch.Tensor(info['init_bbox']),
resize_factor,
torch.Tensor([self.params.search_size, self.params.search_size]),
normalize=True)
self.feat_size = self.params.search_size // 16
refer_img = self.preprocessor.process(refer_crop, refer_att_mask)
with torch.no_grad():
mask = F.interpolate(refer_img.mask[None].float(), size=[20, 20]).to(torch.bool)[0]
pos = self.position_embedding(mask).flatten(2).permute(2, 0, 1) # HWxBxC
inr = self.inner_embedding(mask).flatten(2).permute(2, 0, 1) # HWxBxC
inr_emb = torch.repeat_interleave(inr, 4, dim=1).transpose(0, -1).reshape(64, -1, 400).transpose(0, -1)
# 初始值为0
ref_mem0 = torch.zeros(400,1,256)
ref_mem1 = torch.zeros(1600,1,256)
ref_emb0 = torch.zeros(400,1,256)
ref_emb1 = torch.zeros(1600,1,256)
ref_pos0 = torch.zeros(400, 4, 64)
ref_pos1 = torch.zeros(400, 16, 64)
# feat = self.net.adjust(self.net.backbone(refer_img.tensors))
# refer_mem = self.net.transformer.encoder(feat, pos, inr_emb)
refer_mem, bbox, iou_feat = self.net(refer_img.tensors, pos, inr_emb,ref_mem0,ref_mem1,ref_emb0,ref_emb1,ref_pos0,ref_pos1)
target_region = torch.zeros((self.feat_size, self.feat_size))
x, y, w, h = (refer_box * self.feat_size).round().int()
target_region[max(y, 0):min(y + h, self.feat_size), max(x, 0):min(x + w, self.feat_size)] = 1
target_region = target_region.view(self.feat_size * self.feat_size, -1)
background_region = 1 - target_region
refer_region = torch.cat([target_region, background_region], dim=1).unsqueeze(0)#.cuda()#在Gpu环境需要取消此处注释
embed_bank = torch.cat([self.net.foreground_embed.weight, self.net.background_embed.weight],
dim=0).unsqueeze(0)
# embed_bank可通过读取1_save.py中保存的embed_bank文件获得
# embed_bank = self.embed_bank
self.refer_mem_cache = [refer_mem]
self.refer_emb_cache = [torch.bmm(refer_region, embed_bank).transpose(0, 1)]
self.refer_pos_cache = [inr]
self.refer_mem_list = list()
for _ in range(self.refer_cap):
self.refer_mem_list.append(self.refer_mem_cache[0])
self.refer_emb_list = list()
for _ in range(self.refer_cap):
self.refer_emb_list.append(self.refer_emb_cache[0])
self.refer_pos_list = list()
for _ in range(self.refer_cap):
self.refer_pos_list.append(self.refer_pos_cache[0])
# Save states
self.state = info['init_bbox']
if self.save_all_boxes:
# Save all predicted boxes
all_boxes_save = info['init_bbox'] * self.cfg.MODEL.NUM_OBJECT_QUERIES
return {'all_boxes': all_boxes_save}
#----------------------------
# 5.2.3 在track函数中新增原在AIATRACK.backbone中进行的mask, pos, pos_emb相关计算
# 在track函数中新增原在AiAModule.CorrAttention中进行的torch.repeat_interleave操作
# 在track函数中新增原在TransformerDecoderLayer.forward中进行的torch.cat操作
# 在track函数中新增原在AIATRACK.forward_box_head中进行的box_xyxy_to_xywh, box_xyxy_to_cxcywh计算
# 在track函数中去除原由AIATRACK.forward_iou_head获得pred_iou及其相关内容改为每帧均进行“Update state”
# 注释后续未使用的refer_msk_cache, refer_msk_list
#----------------------------
def aiatrack_track(self, image, info: dict = None, seq_name: str = None):
H, W, _ = image.shape
self.frame_id += 1
# Get the t-th search region
search_crop, resize_factor, search_att_mask = sample_target(image, self.state, self.params.search_factor,
output_sz=self.params.search_size) # (x1, y1, w, h)
search_img = self.preprocessor.process(search_crop, search_att_mask)
with torch.no_grad():
mask = F.interpolate(search_img.mask[None].float(), size=[20, 20]).to(torch.bool)[0]
pos = self.position_embedding(mask).flatten(2).permute(2, 0, 1) # HWxBxC
pos_emb = self.inner_embedding(mask).flatten(2).permute(2, 0, 1) # HWxBxC
inr_emb = torch.repeat_interleave(pos_emb, 4, dim=1).transpose(0, -1).reshape(64, -1, 400).transpose(0, -1)
refer_pos0 = torch.repeat_interleave(self.refer_pos_list[0], 4, dim=1).transpose(0, -1).reshape(64, -1, 400).transpose(0, -1)
refer_pos1 = torch.repeat_interleave(torch.cat(self.refer_pos_list[1:], dim=0), 4, dim=1).transpose(0, -1).reshape(64, -1, 400).transpose(0, -1)
search_mem, bbox_coor, iou_feat = self.net(search_img.tensors, pos, inr_emb,
self.refer_mem_list[0], torch.cat(self.refer_mem_list[1:], dim=0),
self.refer_emb_list[0], torch.cat(self.refer_emb_list[1:], dim=0),
refer_pos0, refer_pos1)
# Get the final result
outputs_coord = box_xyxy_to_xywh(bbox_coor)
out_dict = box_xyxy_to_cxcywh(bbox_coor)
out_dict = out_dict.view(1, -1, 4)
pred_boxes = out_dict.view(-1, 4)
# Baseline: Take the mean of all predicted boxes as the final result
pred_box = (pred_boxes.mean(
dim=0) * self.params.search_size / resize_factor).tolist() # (cx, cy, w, h) [0,1]
# Get the final box result
self.state = clip_box(self.map_box_back(pred_box, resize_factor), H, W, margin=10)
# Update state
if True:
if len(self.refer_mem_cache) == self.cache_siz:
_ = self.refer_mem_cache.pop(1)
_ = self.refer_emb_cache.pop(1)
_ = self.refer_pos_cache.pop(1)
target_region = torch.zeros((self.feat_size, self.feat_size))
x, y, w, h = (outputs_coord[0] * self.feat_size).round().int()
target_region[max(y, 0):min(y + h, self.feat_size), max(x, 0):min(x + w, self.feat_size)] = 1
target_region = target_region.view(self.feat_size * self.feat_size, -1)
background_region = 1 - target_region
refer_region = torch.cat([target_region, background_region], dim=1).unsqueeze(0)#.cuda()#在Gpu环境需要取消此处注释
embed_bank = torch.cat([self.net.foreground_embed.weight, self.net.background_embed.weight],
dim=0).unsqueeze(0)
new_emb = torch.bmm(refer_region, embed_bank).transpose(0, 1)
# embed_bank可通过读取1_save.py中保存的embed_bank文件获得
# new_emb = torch.bmm(refer_region, self.embed_bank).transpose(0, 1)
self.refer_mem_cache.append(search_mem)
self.refer_emb_cache.append(new_emb)
self.refer_pos_cache.append(pos_emb)
self.refer_mem_list = [self.refer_mem_cache[0]]
self.refer_emb_list = [self.refer_emb_cache[0]]
self.refer_pos_list = [self.refer_pos_cache[0]]
max_idx = len(self.refer_mem_cache) - 1
ensemble = self.refer_cap - 1
for part in range(ensemble):
self.refer_mem_list.append(self.refer_mem_cache[max_idx * (part + 1) // ensemble])
self.refer_emb_list.append(self.refer_emb_cache[max_idx * (part + 1) // ensemble])
self.refer_pos_list.append(self.refer_pos_cache[max_idx * (part + 1) // ensemble])
# For debug
if self.debug:
x1, y1, w, h = self.state
image_BGR = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
cv2.rectangle(image_BGR, (int(x1), int(y1)), (int(x1 + w), int(y1 + h)), color=(0, 255, 0), thickness=3)
save_seq_dir = os.path.join(self.save_dir, seq_name)
if not os.path.exists(save_seq_dir):
os.makedirs(save_seq_dir)
save_path = os.path.join(save_seq_dir, '%04d.jpg' % self.frame_id)
cv2.imwrite(save_path, image_BGR)
if self.save_all_boxes:
# Save all predictions
all_boxes = self.map_box_back_batch(pred_boxes * self.params.search_size / resize_factor, resize_factor)
all_boxes_save = all_boxes.view(-1).tolist() # (4N,)
return {'target_bbox': self.state,
'all_boxes': all_boxes_save}
else:
return {'target_bbox': self.state}
AIATRACK_T.__init__ = aiatrack_init
AIATRACK_T.initialize = aiatrack_initialize
AIATRACK_T.track = aiatrack_track
#----------------------------
# 推理参数配置
#----------------------------
# 网络模型参数文件名
PARAM = "baseline"
# 测试数据集
dataset_name = "lasot"
seq = None # 测试单一sequence时可配置
# 通过1_save.py保存的模型embedding值文件亦可无
# EMBED_BANK = "../2_compile/fmodel/embed_bank_1_2_256.ftmp"
# 加载的trace出来的模型
model_pt = "../2_compile/fmodel/AiATrack_ensemble4_320x320_traced.pt"
if __name__ == "__main__":
run_tracker("aiatrack", PARAM, dataset_name=dataset_name, sequence=seq)