はGluon APIを使用して対照損失の実装です:
class ContrastiveLoss(Loss):
def __init__(self, margin=2.0, weight=None, batch_axis=0, **kwargs):
super(ContrastiveLoss, self).__init__(weight, batch_axis, **kwargs)
self.margin = margin
def hybrid_forward(self, F, output1, output2, label):
euclidean_distance = F.sqrt(F.square(output1 - output2))
loss_contrastive = F.mean(((1-label) * F.square(euclidean_distance) +
label * F.square(F.clip(self.margin - euclidean_distance, 0.0, 10))))
return loss_contrastive
私はhereから取られたシャムネットを使用する方法PyTorch例に基づいて、それを実装しています。
PyTorchとMxNetにはかなりの違いがありますので、これを試してみたい場合は、完全な実行可能な例を示します。 AT & T面のデータをダウンロードし、画像をjpegに変換する必要があります。これは、mxnetが.pgm形式の画像の読み込みをサポートしていないためです。
import matplotlib.pyplot as plt
import numpy as np
import random
from PIL import Image
import PIL.ImageOps
import mxnet as mx
from mxnet import autograd
from mxnet.base import numeric_types
from mxnet.gluon import nn, HybridBlock, Trainer
from mxnet.gluon.data import DataLoader
from mxnet.gluon.data.vision.datasets import ImageFolderDataset
from mxnet.gluon.loss import Loss
def imshow(img,text=None, should_save=False):
npimg = img.numpy()
plt.axis("off")
if text:
plt.text(75, 8, text, style='italic',fontweight='bold',
bbox={'facecolor':'white', 'alpha':0.8, 'pad':10})
plt.imshow(np.transpose(npimg, (1, 2, 0)))
plt.show()
def show_plot(iteration, loss):
plt.plot(iteration, loss)
plt.show()
class Config:
training_dir = "./faces/training/"
testing_dir = "./faces/testing/"
train_batch_size = 5
train_number_epochs = 100
class SiameseNetworkDataset(ImageFolderDataset):
def __init__(self, root, transform=None):
super().__init__(root, flag=0, transform=transform)
self.root = root
self.transform = transform
def __getitem__(self, index):
items_with_index = list(enumerate(self.items))
img0_index, img0_tuple = random.choice(items_with_index)
# we need to make sure approx 50% of images are in the same class
should_get_same_class = random.randint(0, 1)
if should_get_same_class:
while True:
# keep looping till the same class image is found
img1_index, img1_tuple = random.choice(items_with_index)
if img0_tuple[1] == img1_tuple[1]:
break
else:
img1_index, img1_tuple = random.choice(items_with_index)
img0 = super().__getitem__(img0_index)
img1 = super().__getitem__(img1_index)
return img0[0].transpose(), img1[0].transpose(), mx.nd.array(mx.nd.array([int(img1_tuple[1] != img0_tuple[1])]))
def __len__(self):
return super().__len__()
class ReflectionPad2D(HybridBlock):
"""Pads the input tensor using the reflection of the input boundary.
Parameters
----------
padding: int
An integer padding size
Shape:
- Input: :math:`(N, C, H_{in}, W_{in})`
- Output: :math:`(N, C, H_{out}, W_{out})` where
:math:`H_{out} = H_{in} + 2 * padding
:math:`W_{out} = W_{in} + 2 * padding
"""
def __init__(self, padding=0, **kwargs):
super(ReflectionPad2D, self).__init__(**kwargs)
if isinstance(padding, numeric_types):
padding = (0, 0, 0, 0, padding, padding, padding, padding)
assert(len(padding) == 8)
self._padding = padding
def hybrid_forward(self, F, x, *args, **kwargs):
return F.pad(x, mode='reflect', pad_width=self._padding)
class SiameseNetwork(HybridBlock):
def __init__(self):
super(SiameseNetwork, self).__init__()
self.cnn1 = nn.HybridSequential()
with self.cnn1.name_scope():
self.cnn1.add(ReflectionPad2D(padding=1))
self.cnn1.add(nn.Conv2D(in_channels=1, channels=4, kernel_size=3))
self.cnn1.add(nn.Activation('relu'))
self.cnn1.add(nn.BatchNorm())
self.cnn1.add(ReflectionPad2D(padding=1))
self.cnn1.add(nn.Conv2D(in_channels=4, channels=8, kernel_size=3))
self.cnn1.add(nn.Activation('relu'))
self.cnn1.add(nn.BatchNorm())
self.cnn1.add(ReflectionPad2D(padding=1))
self.cnn1.add(nn.Conv2D(in_channels=8, channels=8, kernel_size=3))
self.cnn1.add(nn.Activation('relu'))
self.cnn1.add(nn.BatchNorm())
self.fc1 = nn.HybridSequential()
with self.fc1.name_scope():
self.cnn1.add(nn.Dense(500)),
self.cnn1.add(nn.Activation('relu')),
self.cnn1.add(nn.Dense(500)),
self.cnn1.add(nn.Activation('relu')),
self.cnn1.add(nn.Dense(5))
def hybrid_forward(self, F, input1, input2):
output1 = self._forward_once(input1)
output2 = self._forward_once(input2)
return output1, output2
def _forward_once(self, x):
output = self.cnn1(x)
#output = output.reshape((output.shape[0],))
output = self.fc1(output)
return output
class ContrastiveLoss(Loss):
def __init__(self, margin=2.0, weight=None, batch_axis=0, **kwargs):
super(ContrastiveLoss, self).__init__(weight, batch_axis, **kwargs)
self.margin = margin
def hybrid_forward(self, F, output1, output2, label):
euclidean_distance = F.sqrt(F.square(output1 - output2))
loss_contrastive = F.mean(((1-label) * F.square(euclidean_distance) +
label * F.square(F.clip(self.margin - euclidean_distance, 0.0, 10))))
return loss_contrastive
def aug_transform(data, label):
augs = mx.image.CreateAugmenter(data_shape=(1, 100, 100))
for aug in augs:
data = aug(data)
return data, label
def run_training():
siamese_dataset = SiameseNetworkDataset(root=Config.training_dir,transform=aug_transform)
train_dataloader = DataLoader(siamese_dataset, shuffle=True, num_workers=1, batch_size=Config.train_batch_size)
counter = []
loss_history = []
iteration_number = 0
net = SiameseNetwork()
net.initialize(init=mx.init.Xavier())
trainer = Trainer(net.collect_params(), 'adam', {'learning_rate': 0.0005})
loss = ContrastiveLoss(margin=2.0)
for epoch in range(0, Config.train_number_epochs):
for i, data in enumerate(train_dataloader, 0):
img0, img1, label = data
with autograd.record():
output1, output2 = net(img0, img1)
loss_contrastive = loss(output1, output2, label)
loss_contrastive.backward()
trainer.step(Config.train_batch_size)
if i % 10 == 0:
print("Epoch number {}\n Current loss {}\n".format(epoch, loss_contrastive))
iteration_number += 10
counter.append(iteration_number)
loss_history.append(loss_contrastive)
#show_plot(counter, loss_history)
return net
def run_predict(net):
folder_dataset_test = SiameseNetworkDataset(root=Config.testing_dir,transform=aug_transform)
test_dataloader = DataLoader(folder_dataset_test, shuffle=True, num_workers=1, batch_size=Config.train_batch_size)
dataiter = iter(test_dataloader)
x0, _, _ = next(dataiter)
_, x1, label2 = next(dataiter)
output1, output2 = net(x0, x1)
euclidean_distance = mx.ndarray.sqrt(mx.ndarray.square(output1 - output2))
print('x0 vs x1 dissimilarity is {}'.format(euclidean_distance[0][0]))
if __name__ == '__main__':
net = run_training()
run_predict(net)