SelfBlendedImages论文精读

1. 论文解读

2. 代码运行

2.1 环境

下载 requirements.txt

并运行

!pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple

!pip install efficientnet-pytorch==0.7.1 -i https://pypi.tuna.tsinghua.edu.cn/simple

!pip install retinaface-pytorch==0.0.7 -i https://pypi.tuna.tsinghua.edu.cn/simple

!pip install dlib -i https://pypi.tuna.tsinghua.edu.cn/simple

!pip install imutils -i https://pypi.tuna.tsinghua.edu.cn/simple

!pip install numpy==1.21.4 -i https://pypi.tuna.tsinghua.edu.cn/simple

2.2 数据集

Celeb-DF-v2数据集

将其存放在./data/文件夹中

并且数据的文件树为以下结构

1
2
3
4
5
6
7
8
9
10
11
12
13
.
└── data
└── Celeb-DF-v2
├── Celeb-real
│ └── videos
│ └── *.mp4
├── Celeb-synthesis
│ └── videos
│ └── *.mp4
├── Youtube-real
│ └── videos
│ └── *.mp4
└── List_of_testing_videos.txt

这里有个坑,下载过的celeb数据集,路径是没有videos的,你要自己创建。反正要严格按照上面的路径来

FaceForensics++数据集

将其存放在./data/文件夹中

并且数据的文件树为以下结构

1
2
3
4
5
6
7
8
9
10
11
.
└── data
└── FaceForensics++
├── original_sequences
│ └── youtube
│ └── raw
│ └── videos
│ └── *.mp4
├── train.json
├── val.json
└── test.json

这个数据集一个original都要100G这么大,每个月的VPN流量伤不起啊

2.3 预训练模型

在作者的谷歌云盘中下载2个预训练模型:FF-raw and FF-c23.

并将其存放在 ./weights/ 文件中

2.4 测试

测试Celeb-DF-v2数据集

1
2
3
CUDA_VISIBLE_DEVICES=* python3 src/inference/inference_dataset.py \
-w weights/FFraw.tar \
-d CDF

测试视频

1
2
3
CUDA_VISIBLE_DEVICES=* python3 src/inference/inference_video.py \
-w weights/FFraw.tar \
-i /path/to/video.mp4

测试图片

1
2
3
CUDA_VISIBLE_DEVICES=* python3 src/inference/inference_image.py \
-w weights/FFraw.tar \
-i /path/to/image.png

2.5 训练模型

  1. 下载landmarks.dat

    here 将文件存放在 ./src/preprocess/ 文件中

  2. 运行两个人脸提取的代码,两个都要运行,不然训练不了,crop_dlib_ff.pycrop_retina_ff.py

1
2
python3 src/preprocess/crop_dlib_ff.py -d Original
CUDA_VISIBLE_DEVICES=* python3 src/preprocess/crop_retina_ff.py -d Original
  1. 特征点参数代码可以下载也可以不下载
1
2
mkdir src/utils/library
git clone https://github.com/AlgoHunt/Face-Xray.git src/utils/library
  1. 运行训练代码
1
2
3
CUDA_VISIBLE_DEVICES=* python3 src/train_sbi.py \
src/configs/sbi/base.json \
-n sbi

运行完 5 个权重文件将会保存在 ./output/ 文件中

image-20240324184108970

3. 代码解读

dilb人脸检测

可以当做模版使用

这里只分析参数即可,具体代码直接套用,不需要深度理解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
if __name__=='__main__':
parser=argparse.ArgumentParser()
#数据集
parser.add_argument('-d',dest='dataset',choices=['DeepFakeDetection_original','DeepFakeDetection','FaceShifter','Face2Face','Deepfakes','FaceSwap','NeuralTextures','Original','Celeb-real','Celeb-synthesis','YouTube-real','DFDC','DFDCP'])

#三种压缩系数
parser.add_argument('-c',dest='comp',choices=['raw','c23','c40'],default='raw')

#抽帧数
parser.add_argument('-n',dest='num_frames',type=int,default=32)
args=parser.parse_args()

# 数据路径,这里的{}加入的是压缩系数
if args.dataset=='Original': dataset_path='/home/jovyan/work/SelfBlendedImages/data/FaceForensics++/original_sequences/youtube/{}/'.format(args.comp)
elif args.dataset=='DeepFakeDetection_original':
dataset_path='data/FaceForensics++/original_sequences/actors/{}/'.format(args.comp)
elif args.dataset in ['DeepFakeDetection','FaceShifter','Face2Face','Deepfakes','FaceSwap','NeuralTextures']:
dataset_path='data/FaceForensics++/manipulated_sequences/{}/{}/'.format(args.dataset,args.comp)
elif args.dataset in ['Celeb-real','Celeb-synthesis','YouTube-real']:
dataset_path='data/Celeb-DF-v2/{}/'.format(args.dataset)
elif args.dataset in ['DFDC']:
dataset_path='data/{}/'.format(args.dataset)
else:
raise NotImplementedError

face_detector = dlib.get_frontal_face_detector()
#预测器的路径,需要下载
predictor_path = '/home/jovyan/work/SelfBlendedImages/src/preprocess/shape_predictor_81_face_landmarks.dat'
face_predictor = dlib.shape_predictor(predictor_path)

#视频路径
movies_path=dataset_path+'videos/'

#寻找以mp4后缀的文件名,并对齐排序
movies_path_list=sorted(glob(movies_path+'*.mp4'))
print("{} : videos are exist in {}".format(len(movies_path_list),args.dataset))

#用来记录进度条
n_sample=len(movies_path_list)

for i in tqdm(range(n_sample)):
#文件路径改成 frames 下
folder_path=movies_path_list[i].replace('videos/','frames/').replace('.mp4','/')
#并将参数输入 facecrop 函数
facecrop(movies_path_list[i],save_path=dataset_path,num_frames=args.num_frames,face_predictor=face_predictor,face_detector=face_detector)

retina人脸检测

可以当做模版使用

这里只分析参数即可,具体代码直接套用,不需要深度理解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
if __name__=='__main__':
parser=argparse.ArgumentParser()

parser.add_argument('-d',dest='dataset',choices=['DeepFakeDetection_original','DeepFakeDetection','FaceShifter','Face2Face','Deepfakes','FaceSwap','NeuralTextures','Original','Celeb-real','Celeb-synthesis','YouTube-real','DFDC','DFDCP'])

parser.add_argument('-c',dest='comp',choices=['raw','c23','c40'],default='raw')
parser.add_argument('-n',dest='num_frames',type=int,default=32)
args=parser.parse_args()
if args.dataset=='Original':
dataset_path='/home/jovyan/work/SelfBlendedImages/data/FaceForensics++/original_sequences/youtube/{}/'.format(args.comp)
elif args.dataset=='DeepFakeDetection_original':
dataset_path='/home/jovyan/work/SelfBlendedImages/data/FaceForensics++/original_sequences/actors/{}/'.format(args.comp)
elif args.dataset in ['DeepFakeDetection','FaceShifter','Face2Face','Deepfakes','FaceSwap','NeuralTextures']:
dataset_path='data/FaceForensics++/manipulated_sequences/{}/{}/'.format(args.dataset,args.comp)
elif args.dataset in ['Celeb-real','Celeb-synthesis','YouTube-real']:
dataset_path='data/Celeb-DF-v2/{}/'.format(args.dataset)
elif args.dataset in ['DFDC','DFDCVal']:
dataset_path='data/{}/'.format(args.dataset)
else:
raise NotImplementedError

#设备是cuda
device=torch.device('cuda')
#获取模型 resnet () 可以试试更换其他模型
model = get_model("resnet50_2020-07-20", max_size=2048,device=device)
model.eval()


movies_path=dataset_path+'videos/'

movies_path_list=sorted(glob(movies_path+'*.mp4'))

print("{} : videos are exist in {}".format(len(movies_path_list),args.dataset))


n_sample=len(movies_path_list)

for i in tqdm(range(n_sample)):
folder_path=movies_path_list[i].replace('videos/','frames/').replace('.mp4','/')
if len(glob(folder_path.replace('/frames/','/retina/')+'*.npy'))<args.num_frames:
facecrop(model,movies_path_list[i],save_path=dataset_path,num_frames=args.num_frames)

sbi 自混合图像

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 随机种子模版
seed=10
random.seed(seed)
torch.manual_seed(seed)
np.random.seed(seed)
torch.cuda.manual_seed(seed)

# cuda后端设置 模版
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

#图像大小调整为256x256。
image_dataset=SBI_Dataset(phase='test',image_size=256)

# 数据加载模版
batch_size=64
dataloader = torch.utils.data.DataLoader(image_dataset,
batch_size=batch_size,
#启用数据打乱
shuffle=True,
collate_fn=image_dataset.collate_fn,
#数据加载将在主进程中同步执行,不使用额外的进程。
num_workers=0,
worker_init_fn=image_dataset.worker_init_fn
)
# 获取数据
data_iter=iter(dataloader)
data=next(data_iter)

#处理图像数据
img=data['img']
#view函数用于重新调整张量的形状。这里,我们将图像数据重新调整为(-1, 3, 256, 256)的形状,其中-1表示自动计算该维度的大小,3表示图像有3个通道(RGB),256x256是图像的尺寸。
img=img.view((-1,3,256,256))

#图像像素值应在0到1之间,并且不进行归一化。
utils.save_image(img, 'loader.png', nrow=batch_size, normalize=False, range=(0, 1))

train 入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def main(args):
cfg=load_json(args.config)

seed=5
random.seed(seed)
torch.manual_seed(seed)
np.random.seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

device = torch.device('cuda')


image_size=cfg['image_size']
batch_size=cfg['batch_size']
train_dataset=SBI_Dataset(phase='train',image_size=image_size)
val_dataset=SBI_Dataset(phase='val',image_size=image_size)

train_loader=torch.utils.data.DataLoader(train_dataset,
batch_size=batch_size//2,
shuffle=True,
collate_fn=train_dataset.collate_fn,
num_workers=4,
pin_memory=True,
drop_last=True,
worker_init_fn=train_dataset.worker_init_fn
)
val_loader=torch.utils.data.DataLoader(val_dataset,
batch_size=batch_size,
shuffle=False,
collate_fn=val_dataset.collate_fn,
num_workers=4,
pin_memory=True,
worker_init_fn=val_dataset.worker_init_fn
)

model=Detector()

model=model.to('cuda')


# 用来记录
iter_loss=[]
train_losses=[]
test_losses=[]
train_accs=[]
test_accs=[]
val_accs=[]
val_losses=[]
n_epoch=cfg['epoch']
lr_scheduler=LinearDecayLR(model.optimizer, n_epoch, int(n_epoch/4*3))
last_loss=99999


now=datetime.now()

# 输出路径,其中还包含添加的参数名,以及日期
save_path='output/{}_'.format(args.session_name)+now.strftime(os.path.splitext(os.path.basename(args.config))[0])+'_'+now.strftime("%m_%d_%H_%M_%S")+'/'

#创建输出权重文件
os.mkdir(save_path)
os.mkdir(save_path+'weights/')
os.mkdir(save_path+'logs/')
logger = log(path=save_path+"logs/", file="losses.logs")

criterion=nn.CrossEntropyLoss()

# 每轮都要初始化,很好的模版
last_auc=0
last_val_auc=0
weight_dict={}
n_weight=5
for epoch in range(n_epoch):
np.random.seed(seed + epoch)
train_loss=0.
train_acc=0.
model.train(mode=True)
for step,data in enumerate(tqdm(train_loader)):
img=data['img'].to(device, non_blocking=True).float()
target=data['label'].to(device, non_blocking=True).long()
output=model.training_step(img, target)
loss=criterion(output,target)
loss_value=loss.item()
iter_loss.append(loss_value)
train_loss+=loss_value
acc=compute_accuray(F.log_softmax(output,dim=1),target)
train_acc+=acc
lr_scheduler.step()
train_losses.append(train_loss/len(train_loader))
train_accs.append(train_acc/len(train_loader))

log_text="Epoch {}/{} | train loss: {:.4f}, train acc: {:.4f}, ".format(
epoch+1,
n_epoch,
train_loss/len(train_loader),
train_acc/len(train_loader),
)

model.train(mode=False)
val_loss=0.
val_acc=0.
output_dict=[]
target_dict=[]
np.random.seed(seed)
for step,data in enumerate(tqdm(val_loader)):
img=data['img'].to(device, non_blocking=True).float()
target=data['label'].to(device, non_blocking=True).long()

with torch.no_grad():
output=model(img)
loss=criterion(output,target)

loss_value=loss.item()
iter_loss.append(loss_value)
val_loss+=loss_value
acc=compute_accuray(F.log_softmax(output,dim=1),target)
val_acc+=acc
output_dict+=output.softmax(1)[:,1].cpu().data.numpy().tolist()
target_dict+=target.cpu().data.numpy().tolist()
val_losses.append(val_loss/len(val_loader))
val_accs.append(val_acc/len(val_loader))
val_auc=roc_auc_score(target_dict,output_dict)
log_text+="val loss: {:.4f}, val acc: {:.4f}, val auc: {:.4f}".format(
val_loss/len(val_loader),
val_acc/len(val_loader),
val_auc
)


if len(weight_dict)<n_weight:
save_model_path=os.path.join(save_path+'weights/',"{}_{:.4f}_val.tar".format(epoch+1,val_auc))
weight_dict[save_model_path]=val_auc
torch.save({
"model":model.state_dict(),
"optimizer":model.optimizer.state_dict(),
"epoch":epoch
},save_model_path)
last_val_auc=min([weight_dict[k] for k in weight_dict])

elif val_auc>=last_val_auc:
save_model_path=os.path.join(save_path+'weights/',"{}_{:.4f}_val.tar".format(epoch+1,val_auc))
for k in weight_dict:
if weight_dict[k]==last_val_auc:
del weight_dict[k]
os.remove(k)
weight_dict[save_model_path]=val_auc
break
torch.save({
"model":model.state_dict(),
"optimizer":model.optimizer.state_dict(),
"epoch":epoch
},save_model_path)
last_val_auc=min([weight_dict[k] for k in weight_dict])

logger.info(log_text)

if __name__=='__main__':


parser=argparse.ArgumentParser()
parser.add_argument(dest='config')
parser.add_argument('-n',dest='session_name')
args=parser.parse_args()
main(args)

-------------已经到底啦!-------------