OpenCVの動画ファイル読み込みのスレッド化

前回、Apex Legendsの与ダメージをOCRで取得したが、静止画が対象だったので動画に適用してみたいが読み込みが単一スレッドなのが気になった。

ゲーム映像の読み込みの際に使用した以下のコードでも処理は可能だとは思う。

import cv2
 
path = 'D:\\sample.mp4'
delay = 1
window_name = 'frame'
show = True
 
cap = cv2.VideoCapture(path)
 
if not cap.isOpened():
    sys.exit()
 
while True:
    ret, frame = cap.read()
    if ret:
        if show:
            cv2.imshow(window_name, frame)
            if cv2.waitKey(delay) & 0xFF == ord('q'):
                break
    else:
        break 

特にcv2.waitKeyの処理をする場合は、第1引数に指定した秒数だけウェイト処理が入ることになってしまう。

色々試した限りでは、cv2.imshowの処理後はcv2.waitKeyの処理を入れないと処理が安定しないし、将来的に取得したフレームに対して画像処理を行った場合にはさらに処理が増えることになる。

このため、出来れば動画の読み込み部分は別なスレッドで処理がしたいと思う。

別なスレッドで処理するため、読み込んだフレームデータはキューに入れ、画像処理側で取得後にキューから取り出すことことにする。

途中経過とかも書ければ良かったが、なんだかんだ出来上がってしまったので、完成したコード記載する。

import os,sys
import cv2

import queue
import threading
import time

class Loader(object):
    def __init__(self,path = None, **opts):
        self.path = path
        # 確認用にshowフラグを用意、Trueの場合は動画を別ウィンドウで表示する
        self.show = opts.get("show",False)
        self.winsize ="960x540"
        # 画像処理を行う関数を設定する。
        self.callback = opts.get("callback")
        # キューのあふれを確認するための表示をするためのフラグ
        self.printqsize = opts.get("printqsize", False)

        if self.path:
            self.open(self.path)

        # 処理の確認のため、スレッドモードと非スレッドモードを切り替えれるようにする
        self.readthread = opts.get("readthread",True)

        # 非スレッドモード時には、メンバ関数のreadを使用する
        self.reader = self.read
        if self.readthread:
            # queueのサイズを指定する
            queuesize = opts.get("queuesize",100)
            self.frameQueue = queue.Queue(maxsize=queuesize)
            # スレッドモード時には、メンバ関数のreadQueueを読み込みの関数に使用する
            self.reader = self.readQueue
            self.retryLimit = opts.get("retrylimit",100)
            # 動画の読み込みスレッドを作成する。
            self.readThreadHandler = threading.Thread(target=self.readingThread)
            self.readThreadHandler.daemon = False

    def open(self,path):
        try:
            self.cap = cv2.VideoCapture(path,0)
        except:
            raise "cannot read %s" % (path)

        self.fps = self.cap.get(cv2.CAP_PROP_FPS)
        self.frame_count = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))

        self.first_frame = 0
        self.last_frame = self.frame_count - 1
        self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        self.prev_frame = None

    def load(self,first_frame=None,last_frame=None):
        ret = True
        first_frame = first_frame if first_frame != None else self.first_frame
        last_frame = last_frame if last_frame != None else self.last_frame

        if self.readthread:
            self.readThreadHandler.start()

        try:
            while(ret):
                ret,frame = self.reader()
                if frame is not None and self.callback:
                    rs = self.callback(self,frame)
                self.prev_frame = frame
        except KeyboardInterrupt:
            ret = False
            self.readthread = False
            pass

    # 非スレッドモードの読み込み関数
    def read(self):
        self.frameno = int(self.cap.get(cv2.CAP_PROP_POS_FRAMES))
        if self.last_frame <= self.frameno:
            print("cannot read stram %d vs %d" % (self.last_frame,self.frameno))
            return False,None

        try:
            ret,frame = self.cap.read()
        except cv2.error as e:
            self.mstime = self.cap.get(cv2.CAP_PROP_POS_MSEC)
            self.frameno = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
            return False,None

        if not ret:
            print("ret is not")
            return False,None


        self.showFrame(frame)
        return True,frame

    # 動画の表示用関数
    def showFrame(self,frame):
        if self.show:
            if self.winsize:
                hw = tuple([int(i) for i in self.winsize.split("x")])
                cv2.imshow("frame", cv2.resize(frame,dsize=hw))
            else:
                cv2.imshow("frame", frame)
            cv2.waitKey(1)

    def readingThread(self):
        frameno = -1
        try:
            while self.readthread:
                if self.frameQueue.full():
                    print("read queue is full:%d" % (self.frameQueue.qsize()))
                    time.sleep(1)
                    continue
                frameno = int(self.cap.get(cv2.CAP_PROP_POS_FRAMES))
                ret, frame = self.cap.read()
                if not ret:
                    break
                self.showFrame(frame)
                self.frameQueue.put([frameno,frame])
                # 60フレームに1回、キューの状況を表示する
                if self.printqsize and frameno % 60 == 0:
                    print("queue size(%d):%d" % (frameno,self.frameQueue.qsize()))
        except KeyboardInterrupt:
            self.readthread = False

        print("reading thread fin.")

    def readQueue(self):
        readFlag = True
        ret = True
        retry = 0
        frame = None
        while readFlag:
            try:
                # キューからフレームデータを取り出す
                [frameno,frame] = self.frameQueue.get(False)
                self.frameno = frameno
                readFlag = False
            except queue.Empty:
                # queueが空で読み込みスレッドが終了していれば、終了する
                if not self.readThreadHandler.is_alive():
                    ret = False
                    break
                # すぐに読み込んでもからの可能性があるので少しだけ待つ
                time.sleep(1)
                retry = retry + 1
                # 指定されたリトライ回数分は読み込みを試みる
                if retry>self.retryLimit:
                    ret = False
                    break
        return ret,frame

使用方法は、以下の通り。
callbackの引数は第1引数にLoaderクラスのハンドル、第2引数にフレームが入る。

def callback(lodaer,frame):
    pass

def main():
    gl = Loader(r"douga_file.mp4", callback=callback, printqsize=True, queuesize=500)
    gl.load()

if __name__ == '__main__':
    main()

キューサイズは初期値の100だと小さすぎて、すぐにキューがあふれてしまう。

表示だけする分には500くらいあれば、あふれはしないが環境次第ではある。

コメントする

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

CAPTCHA