赤を白にするフィルターを作る

Apex LegendsのダメージをOCRで取得する試みをしてきたわけだが、部隊数や人数の変動により右上の表示エリアが白から赤に変わってしまって取得できないことが分かった。

減少時のステータスエリア

ダメージの数値自体は白のままでその周りが赤っぽくなり、目印にしていたシンボルマークが赤くなってしまって検知できないため、OCRの取得処理ができていない。

そもそも赤くなってしまうのが問題なので、白くすればよいという結論に至る。

具体的な方法としては、前回も使用したHSV(色相、彩度、明度)に変換して、赤の色相の場合には彩度を0にすれば白くなると思った。

またこの時に明度が低い場合は、黒っぽい色のはずなので対象から省くこととする。

OpenCVの画像認識処理の書かれたブログなどを見ても、いつもながら適切なものが見つからないので、自作していくことにする。

完成したものがこちら、コメントに解説を入れようと思ったがゴチャつくので抜き出しながら個別に解説する。

def red2white(img):
    hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
    white_image = np.copy(hsv)

    coeff = 1.1
    cond = ( ((hsv[:, :, 0]>150) | (hsv[:, :, 0]<30)) & (hsv[:, :, 1]>150) )
    revised_val = np.array(hsv[:, :, 2],dtype="int16")*coeff
    revised_val = np.clip(revised_val,None,255)
    white_image[:, :, 1] = np.where(cond,0,hsv[:, :, 1])
    white_image[:, :, 2] = np.where(cond,revised_val,hsv[:, :, 2])
    return cv2.cvtColor(white_image, cv2.COLOR_HSV2BGR) # HSV->BGR変換

まずはnp.whereで使う条件を変数に入れている。

cond = ( ((hsv[:, :, 0]>150) | (hsv[:, :, 0]<30)) & (hsv[:, :, 1]>150) )

直接指定しても良いのだが長くなってしまい見にくいので、外に出した。

最初の部分は色相の30未満、150より上の値が今回の対象の赤を含んでいたためOR結合している。

最後のAND以降は、明度が低いと黒に近づくため対象外とするため150より大きい値を対象としている。

hsv[:, :, 0]という部分に関しては、0でHue,1ならSatiration,2がValueのnp.arrayに対しての操作を行っている。

次に置換する明度の配列を作成している。

    revised_val = np.array(hsv[:, :, 2],dtype="int16")*coeff

np.arrayの部分は元々の配列がデバッガで確認したところ、uint8だったため255より大きい値になると桁あふれを起こしてしまうため、int16に変換してからcoeffを掛けることで起きる桁あふれを防いでいる。

ただし、255より大きい値は意味がなく後で置換したときにuint8に戻ってしまう。

このため、次にnp.clipでオーバーしている数値を255にしている。

    revised_val = np.clip(revised_val,None,255)

最初のnp.whereで対象の部分の彩度を0にして、それ以外はそのまま使う。

    white_image[:, :, 1] = np.where(cond,0,hsv[:, :, 1])

その次のnp.whereで対象の部分の明度を係数を掛けた値と置き換える。

    white_image[:, :, 2] = np.where(cond,revised_val,hsv[:, :, 2])

あとはreturnの行で画像形式をBGRに戻して返す。

結果以下のように、ほぼほぼ白い時の画像に近づいた。

フィルター後のステータスエリア

輪郭部分とシンボルがぼやけてしまっているが、係数をあまり大きくしてもアシストの握手している部分が潰れてしまうのでとりあえず1.1で検知できるかを試していくことにする。

今回は適当な値を入れたり、Windowsのペイントアプリのスポイトで気になる色を取得後、パレットでRGBからHSVにすることで大まかな値を調べた。

ただし、ペイントのHSV表示では色相が0~359,彩度が0~100,明度が0~100となっており、OpenCVのHSVとは値が異なるので値を直す必要があるので注意が必要となる。

Apex Legendsのダメージ表示エリアを取得する

OpenCVでファイルの読み込みのスレッド化ができたので、以前作成したOCRによるダメージ取得を適用してみたが芳しくない。

OpenCVのthreshold関数やinRange関数で2値化して、取得という流れで試してみたもののいくつかの理由で余分な数値を取得してしまう。

一番多いのは背景の一部を数値として取得してしまうことが多いような印象だった。

ダメージが表示される場所は左上固定なので、そこだけ切り抜きたいところではあるのだが、0ダメージの場合はそもそも表示されていない。

なのでダメージが表示されている時だけ処理するように赤枠の部分を目印にすることにする。

ダメージのエリアが表示されているかの目印にするシンボル

このシンボルを画像認識を使用して、場所の取得をすることにします。

ただし、ダメージの表示エリアの白い線の内側は背景を少し透過しており、その影響を受ける。

特に雲を見つめると全体がほぼほぼ白く見えてしまうので、影響を少なくするために2値化を行う。

2値化は、簡単に言うとある特定の閾値で0と255に分けて白黒の画像を作る作業になる。

OpenCVの解説でよく見かけるのはグレースケールに変換後にcv2.threshold関数を用いて2値化が多い。

特に第4引数にcv2.THRESH_OTSUを設定することで、自動で閾値を自動で決めるというやり方が多い。

ret,image = cv2.threshold(im, 0, 255, cv2.THRESH_OTSU)

今回は後述の問題が発生する関係で、HSVというHue(色相),Saturation(彩度),Value(明度)の画像形式に変換し、cv2.splitにより明度のみにすることで作成する。明度の閾値に応じて、閾値より低い値は0にすることでグレースケールのフィルタリングした画像を作成する。

後々、使いそうなので関数化する。

def valuebase_grayscale(img,threshold=None):
    # 画像をBGRからHSVに変換(本来は元の画像のタイプを判別した方が良い)
    hsv_img = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
    # cv.splitでHue,Saturation,Valueする、ただしHue,Saturationは今回は不要なので捨てる
    [_, _, val] = cv2.split(hsv_img)
    if threshold != None:
        # Valueの閾値未満の値を0にする
        return  np.where(val < threshold, 0, val)
    else:
        return val

先ほどの画像に適用するとこのようになる。

threshold=180でフィルタリングした画像

この中からダメージのシンボルマークのみを切り出す。

切り出したダメージのシンボル

位置の特定には、cv2.matchTemplate関数を使う。

こちらも一般的な解説では、実行後にnumpyのnp.where関数を用いて閾値以上の座標のリストを取得後に、zip関数でxとy座標のペアに変換、for文で処理するようなものが多い。

今回の処理では全体必要なく、一番一致率の高い座標を取得できれば良いので、以下のような関数を作成して使用することにする。

def find_symbol(img,templ,threshold=0.8, method=cv2.TM_CCOEFF_NORMED):
    ret = None
    # imgには対象の画像、templには探したい画像を入れる
    res = cv2.matchTemplate(img, templ, method)
    loc = np.where(res>threshold)

    # 閾値での絞り込みの結果が空かを確認する
    if len(loc[0]):
        # 閾値,x座標,y座標の2次元リストに変換する
        points = list(zip(res[loc],*loc[::-1]))
        # 閾値の高い順に並び変える
        ret = sorted(points,reverse=True)
    return ret

あとは関数外で、リストの先頭の座標に元画像の高さと幅を足し合わせて、top,bottom,left,rightの座標を取得する。

これらを利用してダメージの値が表示されている矩形領域の座標を特定する。

この時に右端の座標は変わらないため、固定値とする。
※ ダメージのエリアの枠部分で取得しようとしたが、背景の誤検知が多いので使わないことにした。

これらを組み合わせて、ダメージのエリアを取得する関数を作成しておく。

def find_damge_area(img,symbol):
    matches =find_symbol(img,symbol,threshold=0.7)
    if matches == None or len(matches)<=0:
        return None
    [h, w] = symbol.shape[0: 2]
    top = matches[0][2]
    bottom = matches[0][2] + h
    left = matches[0][1] + w - 2
    [h, w] = img.shape[0: 2]

    # 右端は固定
    right = w-25 # -25としているが初めにキル,アシスト,ダメージの表示エリアをざっくりと切り抜いてある
    return [top,bottom,left,right]

これでダメージの表示エリアがわかった。

そもそも固定座標で切り抜けばよいのでは?と思うかもしれないが、実はダメージの桁数でダメージのシンボルの表示位置が変わる。

加えて、ダメージの数値のフォントは可変幅フォントであるため数字の1が含まれている場合は横幅が狭くなったりしてしまい。

OCRで存在しない数値を検知してしまうことがある。

OCRの精度と速度の向上のためにも、出来る限り表示領域を特定しておいた方が良い。

ただこれでも、別の問題が発生した。

残り部隊数や、キル、アシスト、ダメージのエリアは残り人数や部隊数が変動すると赤くなってしまう。

人数または部隊数が減った場合の画面

この状態も検知できるように画像等を調整することもできるとは思うが、白から赤へ、白から赤へは徐々に変化するため中間の状態も存在する。

これらをまとめて処理するため、次回は赤いものも白くするフィルターを作成する。

OpenCVの動画ファイル読み込みのスレッド化

前回、Apex Legendsの与ダメージをOCRで取得したが、静止画が対象だったので動画に適用してみたいが読み込みが単一スレッドなのが気になった。

ゲーム映像の読み込みの際に使用した以下のコードでも処理は可能だとは思う。

import cv2
 
path = 'D:\\sample.mp4'
delay = 1
window_name = 'frame'
show = True
 
cap = cv2.VideoCapture(path)
 
if not cap.isOpened():
    sys.exit()
 
while True:
    ret, frame = cap.read()
    if ret:
        if show:
            cv2.imshow(window_name, frame)
            if cv2.waitKey(delay) &amp;amp;amp; 0xFF == ord('q'):
                break
    else:
        break 

特にcv2.waitKeyの処理をする場合は、第1引数に指定した秒数だけウェイト処理が入ることになってしまう。

色々試した限りでは、cv2.imshowの処理後はcv2.waitKeyの処理を入れないと処理が安定しないし、将来的に取得したフレームに対して画像処理を行った場合にはさらに処理が増えることになる。

このため、出来れば動画の読み込み部分は別なスレッドで処理がしたいと思う。

別なスレッドで処理するため、読み込んだフレームデータはキューに入れ、画像処理側で取得後にキューから取り出すことことにする。

途中経過とかも書ければ良かったが、なんだかんだ出来上がってしまったので、完成したコード記載する。

import os,sys
import cv2

import queue
import threading
import time

class Loader(object):
    def __init__(self,path = None, **opts):
        self.path = path
        # 確認用にshowフラグを用意、Trueの場合は動画を別ウィンドウで表示する
        self.show = opts.get("show",False)
        self.winsize ="960x540"
        # 画像処理を行う関数を設定する。
        self.callback = opts.get("callback")
        # キューのあふれを確認するための表示をするためのフラグ
        self.printqsize = opts.get("printqsize", False)

        if self.path:
            self.open(self.path)

        # 処理の確認のため、スレッドモードと非スレッドモードを切り替えれるようにする
        self.readthread = opts.get("readthread",True)

        # 非スレッドモード時には、メンバ関数のreadを使用する
        self.reader = self.read
        if self.readthread:
            # queueのサイズを指定する
            queuesize = opts.get("queuesize",100)
            self.frameQueue = queue.Queue(maxsize=queuesize)
            # スレッドモード時には、メンバ関数のreadQueueを読み込みの関数に使用する
            self.reader = self.readQueue
            self.retryLimit = opts.get("retrylimit",100)
            # 動画の読み込みスレッドを作成する。
            self.readThreadHandler = threading.Thread(target=self.readingThread)
            self.readThreadHandler.daemon = False

    def open(self,path):
        try:
            self.cap = cv2.VideoCapture(path,0)
        except:
            raise "cannot read %s" % (path)

        self.fps = self.cap.get(cv2.CAP_PROP_FPS)
        self.frame_count = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))

        self.first_frame = 0
        self.last_frame = self.frame_count - 1
        self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        self.prev_frame = None

    def load(self,first_frame=None,last_frame=None):
        ret = True
        first_frame = first_frame if first_frame != None else self.first_frame
        last_frame = last_frame if last_frame != None else self.last_frame

        if self.readthread:
            self.readThreadHandler.start()

        try:
            while(ret):
                ret,frame = self.reader()
                if frame is not None and self.callback:
                    rs = self.callback(self,frame)
                self.prev_frame = frame
        except KeyboardInterrupt:
            ret = False
            self.readthread = False
            pass

    # 非スレッドモードの読み込み関数
    def read(self):
        self.frameno = int(self.cap.get(cv2.CAP_PROP_POS_FRAMES))
        if self.last_frame <= self.frameno:
            print("cannot read stram %d vs %d" % (self.last_frame,self.frameno))
            return False,None

        try:
            ret,frame = self.cap.read()
        except cv2.error as e:
            self.mstime = self.cap.get(cv2.CAP_PROP_POS_MSEC)
            self.frameno = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
            return False,None

        if not ret:
            print("ret is not")
            return False,None


        self.showFrame(frame)
        return True,frame

    # 動画の表示用関数
    def showFrame(self,frame):
        if self.show:
            if self.winsize:
                hw = tuple([int(i) for i in self.winsize.split("x")])
                cv2.imshow("frame", cv2.resize(frame,dsize=hw))
            else:
                cv2.imshow("frame", frame)
            cv2.waitKey(1)

    def readingThread(self):
        frameno = -1
        try:
            while self.readthread:
                if self.frameQueue.full():
                    print("read queue is full:%d" % (self.frameQueue.qsize()))
                    time.sleep(1)
                    continue
                frameno = int(self.cap.get(cv2.CAP_PROP_POS_FRAMES))
                ret, frame = self.cap.read()
                if not ret:
                    break
                self.showFrame(frame)
                self.frameQueue.put([frameno,frame])
                # 60フレームに1回、キューの状況を表示する
                if self.printqsize and frameno % 60 == 0:
                    print("queue size(%d):%d" % (frameno,self.frameQueue.qsize()))
        except KeyboardInterrupt:
            self.readthread = False

        print("reading thread fin.")

    def readQueue(self):
        readFlag = True
        ret = True
        retry = 0
        frame = None
        while readFlag:
            try:
                # キューからフレームデータを取り出す
                [frameno,frame] = self.frameQueue.get(False)
                self.frameno = frameno
                readFlag = False
            except queue.Empty:
                # queueが空で読み込みスレッドが終了していれば、終了する
                if not self.readThreadHandler.is_alive():
                    ret = False
                    break
                # すぐに読み込んでもからの可能性があるので少しだけ待つ
                time.sleep(1)
                retry = retry + 1
                # 指定されたリトライ回数分は読み込みを試みる
                if retry>self.retryLimit:
                    ret = False
                    break
        return ret,frame

使用方法は、以下の通り。
callbackの引数は第1引数にLoaderクラスのハンドル、第2引数にフレームが入る。

def callback(lodaer,frame):
    pass

def main():
    gl = Loader(r"douga_file.mp4", callback=callback, printqsize=True, queuesize=500)
    gl.load()

if __name__ == '__main__':
    main()

キューサイズは初期値の100だと小さすぎて、すぐにキューがあふれてしまう。

表示だけする分には500くらいあれば、あふれはしないが環境次第ではある。

Apex LegendsのダメージをOCRで取得する

しばらくやってないがApex Legendsを少しプレイしていた。

正直、FPSはそれほどうまくない・・・

Apex Legendsをやめる頃(かなり前)にダメージが表示されるようになった。

折角なので画像処理でテキストとして、取得できるかをやってみる。

前回GPD Pocket3でApex Legendsの動画をキャプチャしたので、これをOpenCVで読み込んで処理するのだが、今回は1フレームだけ抜き出して静止画の状態で処理することにする。

とりあえず、OCRで取得するのでTesseract OCRとpyocrをインストールする。
※ 処理の関係上、PillowとOpenCVも必要。

Tesseract OCRは以下のURLからx64版をダウンロードして、インストールした。
日本語の学習データも一応は入れたが、今回は数値なので別に入れなくてもよかったかも・・・

https://github.com/UB-Mannheim/tesseract/wiki

Visual StudioのPython環境からpyocrを検索してインストールした。


あとはOpenCVで読み込んだフレームを表示しながら確認して、ダメージが表示されてるフレームを画像として抜き出した。

赤枠部分がダメージ

最終的には動画から赤枠部分を抜き出して処理するが、今回はペイントツールでダメージ部分だけを切り出して、ファイルに保存したものを使用する。

切り出したダメージ部分のイメージ

背景が単色ではなく、ドットのような模様が入っているため、最初はうまく取得できずにフィルターを掛けたり、白黒を反転したりしたが、画像サイズを2~3倍にしたりして試したがどれも不要だった。
ビルダーをDigitBuilderにすることと、image_to_stringにlang=”jpn”を指定しないことで簡単に取得できた。

最終的なコードは以下のようになった。

import sys,os
import pyocr
import cv2
from PIL import Image

def init_ocr(path=r'C:\Program Files\Tesseract-OCR\tesseract.exe'):
    # pyocrの初期化TESSERACT_CMDにtesseract ocrの実行パスを設定する
    pyocr.tesseract.TESSERACT_CMD = path
    tools = pyocr.get_available_tools()
    if len(tools)>0:
        return tools[0]
    return None

def main():
    tool = init_ocr()
    # Pillowで読み込んでも良いが後のことを考えてcv2で読み込む
    # OCR処理する際にカラーである必要がないのでグレースケールで読み込む
    img = cv2.imread(r"D:\Resources\images\OCR\damage.png",cv2.IMREAD_GRAYSCALE)

    # OpenCVのnumpy型をPillowのイメージ型に変換
    pillow_img = Image.fromarray(img)

    builder = pyocr.builders.DigitBuilder(tesseract_layout=6)
    # 画像から数字を取得する
    text = tool.image_to_string(pillow_img, builder=builder)
    print('text:' + text)

if __name__ == '__main__':
    main()

実行結果は次の通り。

text:450

ダメージの表示部分は0の場合は表示されないので、次は表示があるかの判定を追加したい。

今回は静止画だったので、次回は動画を処理して精度の確認も行いたいと考えている。

対象のダメージの部分は背景のゲーム画面の色が透過しているので、その影響の確認の意味もある。

OCRの最終目標としては、PC版のApex Legendsで座標情報の取得して、全体のマップに移動した経路の情報を表示したい。

OpenCVでの動画の読み込み速度を確認する

HDMI分配器を購入して、ゲーム映像をGPD Pocket3でキャプチャする準備ができたので、さっそく試してみる。

まずはOBS Studioをインストール。
配信する予定は特にないので、録画用の設定で初期設定を行う。

次にデバイスを登録。
OBSを起動→ソース(左下)の+マークをクリック→映像キャプチャデバイス→新規で名前を付けてOKをクリック→デバイスから[HDMI Capture]を選択する
※この時にKVMモジュールにHDMIの映像信号が入力されていないと、デバイスが見つからない。

続いて、保存場所を設定する。
OBSの画面の左下から設定を選択→出力→録画ファイルのパスを変更する。

ついでに動画フォーマットとエンコーダを変更。
OBSの画面の左下から設定を選択→出力→動画フォーマットをmp4に変更する。
同様にエンコーダをソフトウェア(x264)に変更する。

ファイル名にスペースが入ると面倒なので[スペースなしのファイルを生成]にチェックを入れておく。

OBSの設定
変更後のOBSの設定画面

あとはOBSの画面左下の録画開始を押せば録画ができるはず・・・

とりあえずApex Legendsを1マッチを録画してみた。

20分54秒で約1.87GBほどのサイズになった。

続いてこれをOpenCVで読み込んで処理をして時間を計測してみる。

読み込みは単純なループで画面表示ありなし、加えてVisual Studioを使用するので、DebugとReleaseで測定することにする。

import sys
import time
import cv2

path = 'D:\\sample.mp4'
delay = 1
window_name = 'frame'
show = True

cap = cv2.VideoCapture(path)

if not cap.isOpened():
    sys.exit()

# 処理前に時間を取得
start_time = time.process_time()

while True:
    ret, frame = cap.read()
    if ret:
        if show:
            cv2.imshow(window_name, frame)
            if cv2.waitKey(delay) &amp;amp; 0xFF == ord('q'):
                break
    else:
        break

# 処理後に時間を取得
end_time = time.process_time()

# 開始時間と終了時間の差分(秒)
elapsed_time = end_time - start_time
print("elapsed time:", elapsed_time)

if show:
    cv2.destroyWindow(window_name)

実行結果は以下の通りになった。
当たり前ではあるが、思いのほか画面を表示すると遅くなるようだ。

DebugRelease
表示あり1528.0秒1510.203125秒
表示なし1009.8125秒968.671875秒

元の動画が20分54秒=1254秒なので、画面表示をした場合には処理に元の動画の再生時間よりも時間が掛かってしまっていることになる。
ただし、waitKeyはdelayで指定した秒数だけwaitが入るようなのでこの影響である可能性が高い。
今後の処理も考えると、読み込みと画像処理のスレッドを分けてキューイングした方が良さそうだ。

GPD Pocket3を購入してみた

最近、全く個人の開発ができていない原因は、デスクトップを起動したり、重いノートPCを起動するのが面倒で億劫なのが原因だと心の中で言い訳していた。

Raspberry PiやJetson Nanoとかを弄りたいけど、正直言って表示用のモニターを準備して、キーボードを準備してとか考えるとそれも面倒。

そんな中、前々から気になっていたGPD社のノートPCがモジュールの交換でインターフェイスを変更できることを知った。

HDMI入力に加えて、USB type Cを接続することでキーボードを接続した機器側で使用できるらしく、これならRaspberry Piのモニター兼キーボードとして使えそうだと思った。

いいなと思いつつも、高い買い物で本当に欲しいかを2週間ほど考えてみて、気持ちが変らなかったので購入した。

必要そうなツールも一通りインストールし終わり、環境も整ったのでとりあえず、気になっていたHDMI入力機能と、キーボードの機能を試してみた。

Raspberry Pi4があった気がするが見つからないのでとりあえず、Raspberry Pi Zero Wで試してみる。

Raspberry Pi Zero WをGPD Pocket3の背面にあるHDMIポートに接続、デスクトップにHDMI INというそのまんまの名前のショートカットがあったので起動した。

HDMI入力
KVMモジュールにRaspberr Pi Zero Wを接続

 

 

 

 

 

 

 

 

 

あっさり表示できたが、どうやって表示しているのか考えてみた。

普通に考えて、UVC(USB Video Class)だろうと思い、デバイスマネージャーを開いて確認すると、これまたわかりやすい名前のデバイス名があった。

UVC検出
カメラのカテゴリーにあるHDMI Captureというデバイス

 

カメラとして認識しており、やはりUVCのようだ。

次にキーボードを試してみたが、どうやら接続中はGPD Pocket3のキーボードとマウスは無効化されてしまうようだ。

しかも、Raspberry Pi Zero Wではキーボードもマウスもうまく認識してくれなかった。

その後、PS4に接続してみたが、通常のキーボードとして認識した。

試しにGPD Pocket3の電源を切った状態で接続してみたが、PS4は認識してくれない。

どうやら電源が入っていないとダメらしい。

Hello World

最近、全くプライベートでコーディングしてないことに気が付いた。

以前までは気になることがあると、気が向くままに流行の言語やフレームワークを試したりしていたが、PCの買い替え等でソースコードがどこにあるかすら覚えていない。

仕事でもプライベートでも細かいことを調べては、日本語の情報がなく困ることが多くあるが、いざ何かを作ろうとすると忘れてしまって、調べなおすを繰り返してきた。

新しいノートPCも買ったことだし、今後のプライベートなコーディングについては自分用のメモ書きとしてブログに残すことにしようと思う。

10数年プログラマーをやってはいるが、きわめて凡庸なプログラマーであるため、書く内容とレベルについては保証できない。

 

とりあえず、いつも通りHello Worldから始めてみる。