i-PRO の監視カメラ i-PRO mini (WV-S7130W) を入手したので、RTSP(H.264/H.265) で映像取得して遊んでみます。
製品紹介ページ:

ここでは TyTorch および VGG16 というネットワークモデルを使用し、画像分類と呼ばれる AI 処理を行ってみます。
VGG16 は、2014年の ILSVR で2位になった畳み込みニューラルネットワークです。オックスフォード大学の VGG(Visual Geometry Group) チームが作成した16層から構成されるネットワークモデルであるため VGG16 と呼ばれています。
Python を事前にインストール済みであることを前提に記載します。
私の評価環境は以下の通りです。
| 言語 : | Python, | 3.10.4 |
| OS : | Windows 11 home, | 21H2 |
| Windows 10 Pro, | 21H1 | |
(1)
下記URLを開きます。
https://pytorch.org/get-started/locally/
(2)
下図のような画面を表示するので、使用される環境を選択します。
私は Windows 環境で多くの人が動作する例を作成したいので、Computer Platform として CPU を選択してみました。
Package はなんとなく Pip を選択してみます。
commandは pip3 install torch torchvision torchaudio となりました。

ちなみに "CUDA 11.3" を選択すると pip3 install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu113 となりました。
(3)
表示されたコマンドをコマンドプロンプトなどのターミナルから入力することで Pytorch をインストールします。
これで Pytorch のインストールを完了です。
作業フォルダを準備して、下記プログラムを保存します。
[プログラムソース "preparation.py"]
import os
import urllib.request
# フォルダ「data」が存在しない場合はフォルダ data を作成します。
data_dir = "./data/"
if not os.path.exists(data_dir):
os.mkdir(data_dir)
# ImageNetのclass_indexをダウンロードします。
# Kerasで用意されているものです。下記オープンソースで使用している JSON ファイルです。
# https://github.com/fchollet/deep-learning-models/blob/master/imagenet_utils.py
url = "https://s3.amazonaws.com/deep-learning-models/image-models/imagenet_class_index.json"
save_path = os.path.join(data_dir, "imagenet_class_index.json")
if not os.path.exists(save_path):
urllib.request.urlretrieve(url, save_path)
そしてこのプログラムを実行すると、作業フォルダ中に data フォルダを作成してその中に "imagenet_class_index.json"
というファイルを作成して保存してくれます。
このファイルは画像分類で使用する 1000種類 のラベル情報です。
matplotlib を使用するので、下記コマンドによりインストールします。
pip install matplotlib
学習済みの VGG モデルを使用し、静止画(JPEGファイル)の画像分類を行ってみます。
ここでは入力画像として、 https://pixabay.com から取得した4つの画像(てんとう虫、ゴールデンレトリバー(犬)、車、デイジー(花))を使用させていただき実験してみます。
いずれも 商用利用無料、帰属表示必要なし、の画像です。
各画像の取得元は、下記ソースコード中に記載の URL を参照ください。
下記プログラムを実行する際は、ご自身で各画像をダウンロードして事前に data フォルダに保存してください。

| 言語 : | Python, | 3.10.4 |
| OS : | Windows 11 home, | 21H2 |
| Windows 10 Pro, | 21H1 | |
部分ごとに説明して最後に全体ソースコードを示します。
1.
パッケージのインポートを最初に行います。
import numpy as np import json from PIL import Image import matplotlib.pyplot as plt import torch import torchvision from torchvision import models, transforms
2.
下記コードにより VGG16 の学習済みモデルをロードします。
初めて実行する際は、学習済みパラメータをインターネットからダウンロードするため、実行に時間がかかります。
# Create an instance of the VGG16 model
self.net = models.vgg16( pretrained = True )
self.net.eval() # Set to evaluation mode.
# Display network-model.
print(self.net)
self.net = models.vgg16( pretrained = True )
により取得した学習済みモデルは下記に保存されます。
'~' はログインしているユーザーのホームディレクトリを意味します。
Where are my downloaded models saved?
The locations are used in the order of
- Calling hub.set_dir(<PATH_TO_HUB_DIR>)
- $TORCH_HOME/hub, if environment variable TORCH_HOME is set.
- $XDG_CACHE_HOME/torch/hub, if environment variable XDG_CACHE_HOME is set.
- ~/.cache/torch/hub
私の場合は下記に vgg16-397923af.pth を保存していました。約540MBのファイルサイズでした。
"~.cache\torch\hub\checkpoints\vgg16-397923af.pth"
3.
入力画像の前処理クラスを作成します。
class BaseTransform():
'''
Pre-process the input image. Image resizing, color standardization, etc.
入力画像の前処理を行う。画像のリサイズ、色の標準化など。
'''
def __init__(self, resize, mean, std):
self.base_transform = transforms.Compose([
transforms.Resize((resize, resize)), # Resize both long and short sides to the size of resize.
#transforms.Resize(resize), # Resize the short edge length to the size of resize while preserving the aspect
#transforms.CenterCrop(resize), # Crop the center of the image with resize × resize.
transforms.ToTensor(), # Convert to Torch-Tensor.
transforms.Normalize(mean, std) # color standardization
])
def __call__(self, img):
'''
Perform pre-process the input image.
'''
return self.base_transform(img)
こんな感じで BaseTransform クラスのインスタンスを生成するときに画像サイズ、規格化の情報を与えています。
# Create an instance of preprocessing.
resize = 224
mean = (0.485, 0.456, 0.406)
std = (0.229, 0.224, 0.225)
self.transform = BaseTransform(resize, mean, std)
4.
出力結果からラベルを判定するクラスを作成します。
最もスコアの高いラベル(predicted_label_name)とそのスコア(score)を返します。
出力を softmax
で処理することで全体(1000種別)のスコアを足すと 1.0 になるようにしています。
class ILSVRCPredictor():
'''
Get the label name with the highest score from the calculation result.
演算結果から最もスコアの高いラベル名を取得する。
'''
def __init__(self, class_index):
'''
Constructor
Args:
class_index [i] class index.
'''
self.class_index = class_index
def predict_max(self, out):
'''
Get the label name with the highest score from the calculation result.
最もスコアの高いラベル名を取得する。
'''
data = out.detach().numpy()
probabilities = torch.nn.functional.softmax(out, dim=1)[0]
maxid = np.argmax(data)
score = probabilities[maxid].item()
predicted_label_name = self.class_index[str(maxid)][1]
return predicted_label_name, score
5.
画像分類を行う本体のクラスを作成します。
class ImagenetClassificationVgg():
'''
Image classification.
画像分類を行う。
'''
def __init__(self, class_index_file):
'''
Constructor
Args:
class_index_file: [i] class index file path.
'''
# PyTorch version.
print("PyTorch Version: ", torch.__version__)
print("Torchvision Version: ", torchvision.__version__)
# Load a trained VGG-16 model.
# The first time you run it, it will take a long time to run because it will download the trained parameters.
# 学習済みの VGG-16 モデルをロードする。
# 初めて実行する際は、学習済みパラメータをダウンロードするため、実行に時間がかかります。
# Create an instance of the VGG16 model
self.net = models.vgg16( pretrained = True )
self.net.eval() # Set to evaluation mode.
# Display network-model.
print(self.net)
# Create an instance of preprocessing.
resize = 224
mean = (0.485, 0.456, 0.406)
std = (0.229, 0.224, 0.225)
self.transform = BaseTransform(resize, mean, std)
# Load ILSVRC label information and create an ILSVRCPredictor instance.
self.ILSVRC_class_index = json.load( open(class_index_file, 'r') )
self.predictor = ILSVRCPredictor(self.ILSVRC_class_index)
def imagenet_classification_vgg(self, img, debug=False):
'''
Perform image classification.
Args:
img: [i] An image for image classification. PIL.Image format.
debug: [i] if set to True, display debug images.
Returns:
results: Results of image classification.
'''
if debug==True:
# View original image.
plt.imshow(img)
plt.show()
# Preprocessing.
img_transformed = self.transform(img) # torch.Size([3, 224, 224])
if debug==True:
# Display the image after preprocessing.
img_transformed_2 = img_transformed.numpy().transpose((1, 2, 0))
img_transformed_2 = np.clip(img_transformed_2, 0, 1)
plt.imshow(img_transformed_2)
plt.show()
# Added batch size dimension.
inputs = img_transformed.unsqueeze_(0) # torch.Size([1, 3, 224, 224])
# ネットワークモデルへ画像を入力し、出力をラベルに変換
out = self.net(inputs) # torch.Size([1, 1000])
result = self.predictor.predict_max(out)
return result
6.
main 部分です。
ImagenetClassificatinVgg クラスのインスタンスを作成して分類を行う画像を入力するだけです。
if __name__ == "__main__":
'''
main
'''
imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json')
# Open image file.
# https://pixabay.com/ja/photos/%e3%81%a6%e3%82%93%e3%81%a8%e3%81%86%e8%99%ab-%e7%94%b2%e8%99%ab-%e3%83%86%e3%83%b3%e3%83%88%e3%82%a6%e3%83%a0%e3%82%b7-1480102/
# https://pixabay.com/ja/service/license/
# 商用利用無料、帰属表示必要なし、1280x855
img = Image.open('./data/ladybug-g7744c038e_1280.jpg') # [height][width][color]
result = imagenetClassifigationVgg.do_classification(img)
print("Result: ", result)
# https://pixabay.com/ja/photos/goldenretriever-%E7%8A%AC-3724972/
# https://pixabay.com/ja/service/license/
# 商用利用無料、帰属表示必要なし、640x426
img = Image.open('./data/goldenretriever-3724972_640.jpg') # [height][width][color]
result = imagenetClassifigationVgg.do_classification(img)
print("Result: ", result)
# https://pixabay.com/ja/photos/%e8%bb%8a-%e7%94%b2%e8%99%ab-%e3%83%95%e3%82%a9%e3%83%ab%e3%82%af%e3%82%b9%e3%83%af%e3%83%bc%e3%82%b2%e3%83%b3-1283947/
# https://pixabay.com/ja/service/license/
# 商用利用無料、帰属表示必要なし、1920x1280
img = Image.open('./data/car-g955f2640f_1920.jpg') # [height][width][color]
result = imagenetClassifigationVgg.do_classification(img)
print("Result: ", result)
# https://pixabay.com/ja/photos/%e3%83%9e%e3%83%bc%e3%82%ac%e3%83%ac%e3%83%83%e3%83%88-%e3%83%87%e3%82%a4%e3%82%b8%e3%83%bc-%e8%8a%b1-729510/
# https://pixabay.com/ja/service/license/
# 商用利用無料、帰属表示必要なし、1920x1249
img = Image.open('./data/marguerite-gfad1f1cea_1920.jpg') # [height][width][color]
result = imagenetClassifigationVgg.do_classification(img)
print("Result: ", result)
以下に全ソースコードを記載します。
[全ソースコード "classsification_vgg.py"]
import numpy as np
import json
from PIL import Image
import matplotlib.pyplot as plt
import torch
import torchvision
from torchvision import models, transforms
class BaseTransform():
'''
Pre-process the input image. Image resizing, color standardization, etc.
入力画像の前処理を行う。画像のリサイズ、色の標準化など。
'''
def __init__(self, resize, mean, std):
self.base_transform = transforms.Compose([
transforms.Resize((resize, resize)), # Resize both long and short sides to the size of resize.
transforms.ToTensor(), # Convert to Torch-Tensor.
transforms.Normalize(mean, std) # color standardization
])
def __call__(self, img):
'''
Perform pre-process the input image.
'''
return self.base_transform(img)
class ILSVRCPredictor():
'''
Get the label name with the highest score from the calculation result.
演算結果から最もスコアの高いラベル名を取得する。
'''
def __init__(self, class_index):
'''
Constructor
Args:
class_index [i] class index.
'''
self.class_index = class_index
def predict_max(self, out):
'''
Get the label name with the highest score from the calculation result.
最もスコアの高いラベル名を取得する。
'''
data = out.detach().numpy()
probabilities = torch.nn.functional.softmax(out, dim=1)[0]
maxid = np.argmax(data)
score = probabilities[maxid].item()
predicted_label_name = self.class_index[str(maxid)][1]
return predicted_label_name, score
class ImagenetClassificationVgg():
'''
Image classification.
画像分類を行う。
'''
def __init__(self, class_index_file):
'''
Constructor
Args:
class_index_file: [i] class index file path.
'''
# PyTorch version.
print("PyTorch Version: ", torch.__version__)
print("Torchvision Version: ", torchvision.__version__)
# Load a trained VGG-16 model.
# The first time you run it, it will take a long time to run because it will download the trained parameters.
# 学習済みの VGG-16 モデルをロードする。
# 初めて実行する際は、学習済みパラメータをダウンロードするため、実行に時間がかかります。
# Create an instance of the VGG16 model
self.net = models.vgg16( pretrained = True )
self.net.eval() # Set to evaluation mode.
# Display network-model.
print(self.net)
# Create an instance of preprocessing.
resize = 224
mean = (0.485, 0.456, 0.406)
std = (0.229, 0.224, 0.225)
self.transform = BaseTransform(resize, mean, std)
# Load ILSVRC label information and create an ILSVRCPredictor instance.
self.ILSVRC_class_index = json.load( open(class_index_file, 'r') )
self.predictor = ILSVRCPredictor(self.ILSVRC_class_index)
def do_classification(self, img, debug=False):
'''
Perform image classification.
Args:
img: [i] An image for image classification. PIL.Image format.
debug: [i] if set to True, display debug images.
Returns:
results: Results of image classification.
'''
if debug==True:
# View original image.
plt.imshow(img)
plt.show()
# Preprocessing.
img_transformed = self.transform(img) # torch.Size([3, 224, 224])
if debug==True:
# Display the image after preprocessing.
img_transformed_2 = img_transformed.numpy().transpose((1, 2, 0))
img_transformed_2 = np.clip(img_transformed_2, 0, 1)
plt.imshow(img_transformed_2)
plt.show()
# Added batch size dimension.
inputs = img_transformed.unsqueeze_(0) # torch.Size([1, 3, 224, 224])
# ネットワークモデルへ画像を入力し、出力をラベルに変換
out = self.net(inputs) # torch.Size([1, 1000])
result = self.predictor.predict_max(out)
return result
if __name__ == "__main__":
'''
main
'''
imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json')
# Open image file.
# https://pixabay.com/ja/photos/%e3%81%a6%e3%82%93%e3%81%a8%e3%81%86%e8%99%ab-%e7%94%b2%e8%99%ab-%e3%83%86%e3%83%b3%e3%83%88%e3%82%a6%e3%83%a0%e3%82%b7-1480102/
# https://pixabay.com/ja/service/license/
# 商用利用無料、帰属表示必要なし、1280x855
img = Image.open('./data/ladybug-g7744c038e_1280.jpg') # [height][width][color]
result = imagenetClassifigationVgg.do_classification(img)
print("Result: ", result)
# https://pixabay.com/ja/photos/goldenretriever-%E7%8A%AC-3724972/
# https://pixabay.com/ja/service/license/
# 商用利用無料、帰属表示必要なし、640x426
img = Image.open('./data/goldenretriever-3724972_640.jpg') # [height][width][color]
result = imagenetClassifigationVgg.do_classification(img)
print("Result: ", result)
# https://pixabay.com/ja/photos/%e8%bb%8a-%e7%94%b2%e8%99%ab-%e3%83%95%e3%82%a9%e3%83%ab%e3%82%af%e3%82%b9%e3%83%af%e3%83%bc%e3%82%b2%e3%83%b3-1283947/
# https://pixabay.com/ja/service/license/
# 商用利用無料、帰属表示必要なし、1920x1280
img = Image.open('./data/car-g955f2640f_1920.jpg') # [height][width][color]
result = imagenetClassifigationVgg.do_classification(img)
print("Result: ", result)
# https://pixabay.com/ja/photos/%e3%83%9e%e3%83%bc%e3%82%ac%e3%83%ac%e3%83%83%e3%83%88-%e3%83%87%e3%82%a4%e3%82%b8%e3%83%bc-%e8%8a%b1-729510/
# https://pixabay.com/ja/service/license/
# 商用利用無料、帰属表示必要なし、1920x1249
img = Image.open('./data/marguerite-gfad1f1cea_1920.jpg') # [height][width][color]
result = imagenetClassifigationVgg.do_classification(img)
print("Result: ", result)
実行結果です。コンソールへ出力された内容です。
PyTorch Version: 1.11.0+cpu
Torchvision Version: 0.12.0+cpu
VGG(
(features): Sequential(
(0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(1): ReLU(inplace=True)
(2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(3): ReLU(inplace=True)
(4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
(5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(6): ReLU(inplace=True)
(7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(8): ReLU(inplace=True)
(9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
(10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(11): ReLU(inplace=True)
(12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(13): ReLU(inplace=True)
(14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(15): ReLU(inplace=True)
(16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
(17): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(18): ReLU(inplace=True)
(19): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(20): ReLU(inplace=True)
(21): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(22): ReLU(inplace=True)
(23): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
(24): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(25): ReLU(inplace=True)
(26): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(27): ReLU(inplace=True)
(28): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(29): ReLU(inplace=True)
(30): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
)
(avgpool): AdaptiveAvgPool2d(output_size=(7, 7))
(classifier): Sequential(
(0): Linear(in_features=25088, out_features=4096, bias=True)
(1): ReLU(inplace=True)
(2): Dropout(p=0.5, inplace=False)
(3): Linear(in_features=4096, out_features=4096, bias=True)
(4): ReLU(inplace=True)
(5): Dropout(p=0.5, inplace=False)
(6): Linear(in_features=4096, out_features=1000, bias=True)
)
)
Result: ('ladybug', 0.9478541612625122)
Result: ('golden_retriever', 0.9413034915924072)
Result: ('sports_car', 0.3690492510795593)
Result: ('daisy', 0.9962427616119385)
正しく画像分類できていそうです。
ImagenetClassificationVgg クラスのインスタンスを作成したらあとは画像を渡すだけ、という感じで実行できます。
学習済みのモデルを使用して推論するだけならさほど難しく無いと思います。興味あればチャレンジしてみて下さい。
さて、i-PRO カメラと接続して取得した映像に対して「画像分類」をリアルタイムに実施してみたいと思います。
「RTSP で画像を取得する」中で OpenCV による顔検知を作成しましたので、このプログラムを元に OpenCV の処理部分を上記で作成した物体検知へ変更してみたいと思います。
| 言語 : | Python, | 3.10.4 |
| OS : | Windows 11 home, | 21H2 |
| Windows 10 Pro, | 21H1 | |
上記で作成した "imagenet_classsification_vgg.py" をそのままライブラリとして活用することができます。
具体的には、同じフォルダ中にある python のソースコードからであれば下記のように from imagenet_classsification_vgg import ImagenetClassificationVgg と記載することでそのまま使用できます。
こんな方法を活用して必要最小限の情報記載で進めたいと思います。
["classification_main.py"]
from PIL import Image
from classification_vgg import ImagenetClassificationVgg
if __name__ == "__main__":
'''
main
'''
imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json')
# Open image file.
# https://pixabay.com/ja/photos/%e3%81%a6%e3%82%93%e3%81%a8%e3%81%86%e8%99%ab-%e7%94%b2%e8%99%ab-%e3%83%86%e3%83%b3%e3%83%88%e3%82%a6%e3%83%a0%e3%82%b7-1480102/
# https://pixabay.com/ja/service/license/
# 商用利用無料、帰属表示必要なし、1280x855
img = Image.open('./data/ladybug-g7744c038e_1280.jpg') # [height][width][color]
result = imagenetClassifigationVgg.imagenet_classification_vgg(img)
print("Result: ", result)
"connect_with_rtsp_3_1.py" を元に、RTSP接続して受信したカメラ映像をリアルタイムに画像分類するプログラムを作成してみます。
VGG の処理負荷はとても高そうなのでちょっと心配ですが、必要に応じてカメラ側の設定でフレームレートや解像度を下げて使用する、という方針で進めます。
[プログラムソース "classification_with_camera_1.py"]
'''
[Abstract]
Image classification.
画像分類を行います。
[Details]
This program connects to an i-PRO camera and classifies live images.
このプログラムは、i-PRO カメラと接続してライブ映像に対して画像分類を行います。
[Library install]
torch, torchvision : see https://pytorch.org/get-started/locally/
cv2 : pip install opencv-python
matplotlib : pip install matplotlib
numpy : pip install numpy
PIL : pip install pillow
json : Built-in module in Python, you don’t need to install it with pip.
'''
import cv2
from PIL import Image
from classification_vgg import ImagenetClassificationVgg # Local module. See 'classification_vgg.py'.
user_id = "user-id" # Change to match your camera setting
user_pw = "password" # Change to match your camera setting
host = "192.168.0.10" # Change to match your camera setting
winname = "VIDEO" # Window title
# Exception 定義
BackendError = type('BackendError', (Exception,), {})
def IsWindowVisible(winname):
'''
Check if the target window exists.
Args:
winname : Window title.
Returns:
True : Exist.
False : Not exist.
Raise:
BackendError :
'''
try:
ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE)
if ret == -1:
raise BackendError('Use Qt as backend to check whether window is visible or not.')
return bool(ret)
except cv2.error:
return False
def CV2Pil(image):
'''
Convert from OpenCV to PIL.Image
Params:
image: OpenCV image.
Returns:
PIL.Image format image.
'''
new_image = image.copy()
if new_image.ndim == 2: # モノクロ
pass
elif new_image.shape[2] == 3: # カラー
new_image = cv2.cvtColor(new_image, cv2.COLOR_BGR2RGB)
elif new_image.shape[2] == 4: # 透過
new_image = cv2.cvtColor(new_image, cv2.COLOR_BGRA2RGBA)
new_image = Image.fromarray(new_image)
return new_image
'''
[Abstract]
main 関数
'''
if __name__ == '__main__':
# Create an instance of class ImagenetClassificationVgg.
imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json')
# Create an instance of class cv2.VideoCapture
cap = cv2.VideoCapture(f"rtsp://{user_id}:{user_pw}@{host}/MediaInput/stream_1")
#
windowInitialized = False
while True:
try:
ret, frame = cap.read()
if ret == True:
# Image classification
pilImage = CV2Pil(frame)
result, score = imagenetClassifigationVgg.do_classification(pilImage)
if score > 0.15:
print(result, score)
else:
print('None')
# Resize to a display size that fits on your PC screen.
width = 640
height = 480
h, w = frame.shape[:2]
aspect = w / h
if width / height >= aspect:
nh = height
nw = round(nh * aspect)
else:
nw = width
nh = round(nw / aspect)
frame2 = cv2.resize(frame, (nw, nh))
# Display image.
cv2.imshow(winname, frame2)
if windowInitialized==False:
# Specify the display position only at the beginning.
cv2.moveWindow(winname, 100, 100)
windowInitialized = True
# Press the "q" key to finish.
k = cv2.waitKey(1) & 0xff # necessary to display the video by imshow ()
if k == ord("q"):
break
# Exit if there is no specified window.
if not IsWindowVisible(winname):
break
except KeyboardInterrupt:
# Press ctrl -c on the console to exit the program.
print("KeyboardInterrupt")
break
print("Finish main()")
cap.release()
cv2.destroyAllWindows()
結果:
予想通り VGG の処理がとても重たく、PC性能にもよると思いますが、今回実施しているCPU処理ではフレームレートを 1,3,5fps 程度に設定する必要がありそうです。
加えてフレームレートを下げても一定の遅延を発生しました。恐らく10フレーム程度のバッファリングが行われており、例えば 1fps に設定すると10秒程度の遅延を常に生じます。
用途にもよりますが、ちょっと残念。映像表示だけでも10fps以上の通常表示を維持しつつ、画像分類の処理をできるだけ実施するというような改善を考えてみたいところです。
そこで、画像分類の処理を別タスクに分離することで、映像受信と映像デコード処理を止めずにできるだけ "画像分類" をやってみる、という感じにプログラムを修正してみます。
multiprocessing, queue というライブラリを使用して実現してみます。
こちらが新規に作成したプログラムです。"connect_with_rtsp_3_2.py" を元に作成しています。
[プログラムソース "classification_with_camera_2.py"]
'''
[Abstract]
Try connecting to an i-PRO camera with RTSP.
RTSP で i-PRO カメラと接続してみる
[Details]
image classification.
[Author]
kinoshita hidetoshi (木下英俊)
[Library install]
pip install opencv-python
'''
import cv2
import multiprocessing as mp
from queue import Empty
from classification_vgg import ImagenetClassificationVgg
from PIL import Image
user_id = "user-id" # Change to match your camera setting
user_pw = "password" # Change to match your camera setting
host = "192.168.0.10" # Change to match your camera setting
winname = "VIDEO" # Window title
# Exception 定義
BackendError = type('BackendError', (Exception,), {})
def IsWindowVisible(winname):
'''
Check if the target window exists.
Args:
winname : Window title.
Returns:
True : Exist.
False : Not exist.
Raise:
BackendError :
'''
try:
ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE)
if ret == -1:
raise BackendError('Use Qt as backend to check whether window is visible or not.')
return bool(ret)
except cv2.error:
return False
def CV2Pil(image):
'''
Convert from OpenCV to PIL.Image
Params:
image: OpenCV image.
Returns:
PIL.Image format image.
'''
new_image = image.copy()
if new_image.ndim == 2: # モノクロ
pass
elif new_image.shape[2] == 3: # カラー
new_image = cv2.cvtColor(new_image, cv2.COLOR_BGR2RGB)
elif new_image.shape[2] == 4: # 透過
new_image = cv2.cvtColor(new_image, cv2.COLOR_BGRA2RGBA)
new_image = Image.fromarray(new_image)
return new_image
def ImageClassificationProcess(q):
'''
Image classification process.
Args:
q1 : [i] 顔検知する画像を保存する Queue
q2 : [o] 顔検知した結果を保存する Queue
Returns:
None
'''
imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json')
while True:
try:
image = q.get(True, 10)
# 終了処理: q1.get から取得したものが int で -1 なら終了
if type(image) == int:
if image == -1:
break
# Image classification
pilImage = CV2Pil(image)
result, score = imagenetClassifigationVgg.do_classification(pilImage)
if score > 0.15:
print(result, score)
else:
print('None')
except Empty: # timeout of q1.get()
print("Timeout happen.(3)")
print("Finish ImageClassificationProcess()")
'''
[Abstract]
main 関数
'''
if __name__ == '__main__':
cap = cv2.VideoCapture(f"rtsp://{user_id}:{user_pw}@{host}/MediaInput/stream_1")
#
windowInitialized = False
# Create and start image classification process.
q = mp.Queue()
p = mp.Process(target=ImageClassificationProcess, args=(q,))
p.start()
while True:
try:
ret, frame = cap.read()
if ret == True:
#
if (q.qsize() <= 1):
q.put(frame)
# Resize to a display size that fits on your PC screen.
width = 640
height = 480
h, w = frame.shape[:2]
aspect = w / h
if width / height >= aspect:
nh = height
nw = round(nh * aspect)
else:
nw = width
nh = round(nw / aspect)
frame2 = cv2.resize(frame, (nw, nh))
# Display image.
cv2.imshow(winname, frame2)
if windowInitialized==False:
# Specify the display position only at the beginning.
cv2.moveWindow(winname, 100, 100)
windowInitialized = True
# Press the "q" key to finish.
k = cv2.waitKey(1) & 0xff # necessary to display the video by imshow ()
if k == ord("q"):
break
# Exit if there is no specified window.
if not IsWindowVisible(winname):
break
except KeyboardInterrupt:
# Press ctrl -c on the console to exit the program.
print("KeyboardInterrupt")
break
# Terminate process p
q.put(-1)
# Waiting for process p to finish
p.join()
print("Finish main()")
cap.release()
cv2.destroyAllWindows()
[動画] i-PRO カメラと接続して、リアルタイムに画像分類
CPU 版の PyTorch での動作ですが、十分に高速な処理をしてくれているように私は感じました。
GPU 版を使うともっと素敵なパフォーマンスで動作することと思いますが、この画像分類についてはこれでもいろいろと活用できるのではないでしょうか。
前述の画像分類を GUI(tkinter)版で作成してみます。
記事「 RTSP で画像を取得する : 7-3. メニュー・ボタンを追加して GUI アプリらしくしてみる」で作成した GUI プログラムをベースに改造してみます。
ポイント
| 言語 : | Python, | 3.10.4 |
| OS : | Windows 11 home, | 21H2 |
| Windows 10 Pro, | 21H1 | |
[プログラムソース "classification_gui.py"]
'''
[Abstract]
Image classification.
画像分類を行います。
[Details]
Create an application in the GUI using tkinter.
tkinter を使って GUI アプリケーションを作成します。
[Author]
kinoshita hidetoshi (木下英俊)
[Library install]
pip install opencv-python
'''
import cv2
import time
import tkinter as tk
from tkinter import messagebox
from PIL import Image, ImageTk, ImageOps
import multiprocessing as mp
from queue import Empty
from classification_vgg import ImagenetClassificationVgg # Local module. See 'classification_vgg.py'.
user_id = "user-id" # Change to match your camera setting
user_pw = "password" # Change to match your camera setting
host = "192.168.0.10" # Change to match your camera setting
winname = "VIDEO" # Window title
url = f"rtsp://{user_id}:{user_pw}@{host}/MediaInput/stream_1"
class Application(tk.Frame):
def __init__(self, master = None):
super().__init__(master)
self.pack()
# Window settings.
self.master.title("Display i-PRO camera with tkinter") # Window title
self.master.geometry("800x600+100+100") # Window size, position
# Event registration for window termination.
self.master.protocol("WM_DELETE_WINDOW", self.on_closing_window)
# Create menu.
menubar = tk.Menu(self.master)
self.master.configure(menu=menubar)
filemenu = tk.Menu(menubar)
menubar.add_cascade(label='File', menu=filemenu)
filemenu.add_command(label='Quit', command = self.on_closing_window)
# Create button_frame
self.button_frame = tk.Frame(self.master, padx=10, pady=10, relief=tk.RAISED, bd=2)
self.button_frame.pack(side = tk.BOTTOM, fill=tk.X)
# Label
self.label_frame1 = tk.Frame(self.button_frame, width=10)
self.label_frame1.pack(side=tk.LEFT)
self.label_frame2 = tk.Frame(self.button_frame, width=40)
self.label_frame2.pack(side=tk.LEFT)
self.class_text = tk.StringVar()
self.score_text = tk.StringVar()
self.class_text.set('')
self.score_text.set('')
self.label1 = tk.Label(self.label_frame1, text='Class: ').pack(side=tk.TOP)
self.label2 = tk.Label(self.label_frame2, textvariable=self.class_text, relief=tk.RIDGE, width=20).pack(side=tk.TOP)
self.label3 = tk.Label(self.label_frame1, text='Score: ').pack(side=tk.TOP)
self.label4 = tk.Label(self.label_frame2, textvariable=self.score_text, relief=tk.RIDGE, width=20).pack(side=tk.TOP)
# Create quit_button
self.quit_button = tk.Button(self.button_frame, text='Quit', width=10, command = self.on_closing_window)
self.quit_button.pack(side=tk.RIGHT)
# Create canvas.
self.canvas = tk.Canvas(self.master)
# Add mouse click event to canvas.
self.canvas.bind('<Button-1>', self.canvas_click)
# Place canvas.
self.canvas.pack(expand = True, fill = tk.BOTH)
# Create queue and value for image receive process.
self.imageQueue = mp.Queue()
self.request = mp.Value('i', 0) # -1 : Exit ReceiveImageProcess.
# 0 : Normal.
# 1 : Connect camera.
# 2 : Release camera.
# Create queue for classification process.
self.imageQueue2 = mp.Queue()
self.resultQueue = mp.Queue()
# Create processes.
self.imageReceiveProcess = mp.Process(target=ReceiveImageProcess, args=(self.imageQueue, self.imageQueue2, self.request))
self.classificationProcess = mp.Process(target=ImageClassificationProcess, args=(self.imageQueue2, self.resultQueue))
self.imageReceiveProcess.start()
self.classificationProcess.start()
# Raise a video display event (disp_image) after 500m
self.disp_id = self.after(500, self.disp_image)
def on_closing_window(self):
''' Window closing event. '''
if messagebox.askokcancel("QUIT", "Do you want to quit?"):
# Request terminate process.
self.request.value = -1
self.imageQueue2.put(-1)
# Waiting for process p to finish
time.sleep(1)
# Flash queue.
# The program cannot complete processes unless the queue is emptied.
for i in range(self.imageQueue.qsize()):
image = self.imageQueue.get()
for i in range(self.imageQueue2.qsize()):
image = self.imageQueue2.get()
for i in range(self.resultQueue.qsize()):
result = self.resultQueue.get()
# Wait for process to be terminated.
self.imageReceiveProcess.join()
self.classificationProcess.join()
self.master.destroy()
print("Finish Application.")
def canvas_click(self, event):
''' Event handling with mouse clicks on canvas '''
if self.disp_id is None:
# Connect camera.
self.request.value = 1
# Display image.
self.disp_image()
else:
# Release camera.
self.request.value = 2
# Cancel scheduling
self.after_cancel(self.disp_id)
self.disp_id = None
def disp_image(self):
''' Display image on Canvas '''
# If there is data in the imageQueue, the program receives the data and displays the video.
num = self.imageQueue.qsize()
if num > 0:
if (num > 5):
num -= 1
for i in range(num):
cv_image = self.imageQueue.get()
# (2) Convert image from ndarray to PIL.Image.
pil_image = Image.fromarray(cv_image)
# Get canvas size.
canvas_width = self.canvas.winfo_width()
canvas_height = self.canvas.winfo_height()
# Resize the image to the size of the canvas without changing the aspect ratio.
# アスペクトを維持したまま画像を Canvas と同じサイズにリサイズ
pil_image = ImageOps.pad(pil_image, (canvas_width, canvas_height))
# (3) Convert image from PIL.Image to PhotoImage
# PIL.Image から PhotoImage へ変換する
self.photo_image = ImageTk.PhotoImage(image=pil_image)
# Display image on the canvas.
self.canvas.create_image(
canvas_width / 2, # Image display position (center of the canvas)
canvas_height / 2,
image=self.photo_image # image data
)
else:
pass
# Update GUI Label.
result_num = self.resultQueue.qsize()
if result_num > 0:
for i in range(result_num):
label, score = self.resultQueue.get()
self.class_text.set(label)
score = '{:.4f}'.format(score)
self.score_text.set(score)
# Raise a video display event (disp_image) after 1ms.
self.disp_id = self.after(1, self.disp_image)
def ReceiveImageProcess(imageQueue, imageQueue2, request):
'''
Receive Image Process.
Args:
imageQueue [o] Image data for display.
imageQueue2 [o] Image data for image classification.
request [i] Shared memory for receiving requests from the main process.
-1: Terminate process.
0: Nothing.
1: Connect camera.
2: Release camera connection.
Returns:
None
Raises
None
'''
# Connect camera.
cap = cv2.VideoCapture(url)
while True:
if cap != None:
# Get frame.
ret, frame = cap.read()
if ret == True:
# (1) Convert image from BGR to RGB.
cv_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# for display.
if imageQueue.qsize() < 10:
imageQueue.put(cv_image)
# for image classification.
if imageQueue2.qsize() <= 1:
imageQueue2.put(cv_image)
else:
print("cap.read() return False.")
# The timeout period seems to be 30 seconds.
# And there seems to be no API to change the timeout value.
time.sleep(1)
# Reconnect
cap.release()
cap = cv2.VideoCapture(url)
else:
time.sleep(0.1)
# Check process termination request.
if request.value == -1:
# Terminate process.
cap.release()
request.value = 0
break
# Check connect request.
if request.value == 1:
cap = cv2.VideoCapture(url)
request.value = 0
# Check release request.
if request.value == 2:
cap.release()
cap = None
request.value = 0
print("Terminate ReceiveImageProcess().")
def ImageClassificationProcess(imageQueue, resultQueue):
'''
Image classification process.
Args:
imageQueue : [i] Image for image classification.
resultQueue : [o] Save classification result labels and scores.
Returns:
None
'''
imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json')
while True:
try:
image = imageQueue.get(True, 10)
# If type(image) is 'int' and image is -1, then this process is terminated.
if type(image) == int:
if image == -1:
break
# Image classification
pilImage = Image.fromarray(image) # convert from OpenCV image to PIL.Image
result, score = imagenetClassifigationVgg.do_classification(pilImage)
if score > 0.15:
print(result, score)
resultQueue.put((result, score))
else:
print('None')
resultQueue.put(('None', 0.0))
except Empty: # timeout of imageQueue.get()
print("Timeout happen.(3)")
print("Finish ImageClassificationProcess()")
if __name__ == "__main__":
root = tk.Tk()
app = Application(master = root)
app.mainloop()
[動画] i-PRO カメラと接続してリアルタイムに画像分類、GUI版
そこそこ良い感じに作れたのでは、と思っています。
プログラムが約300ステップまで大きくなってきましたので、機能拡張はこの辺で一旦おしまいにしたいと思います。
本ページの情報は、特記無い限り下記 MIT ライセンスで提供されます。
|
MIT License Copyright (c) 2022 Kinoshita Hidetoshi Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
| 2022-06-14 | - | 3.1. を追加 |
| 2022-05-09 | - | 4章を追加 |
| 2022-05-04 | - | 新規作成 |