tianmoucv.camera.sdk_utils 源代码

#coding=utf-8
print('>>this data reader is a dev version in 2024-01-06')
from ctypes import *
import ctypes

from threading import local

import numpy as np
import time
import os
from queue import Queue

global_aop_queue = Queue()
global_cop_queue = Queue()
result_queue = Queue()
inference_queue = Queue()

ROD_W =160
ROD_H =160
CONE_W =320
CONE_H =320

############################################################
#
# Basic data type
#
############################################################
class tianmoucData(Structure):
    _fields_ = [("isRod", c_int),#1:rod,0:cone
                ("timestamp", c_ulonglong),
                ("sdl_p", POINTER(c_ubyte)),
                ("sdr_p", POINTER(c_ubyte)),
                ("td_p",  POINTER(c_ubyte)),
                ("rgb_p", POINTER(c_float))]
                
############################################################
#
# A simple streaming reader
# Author: Y. Lin
# 2024 01 06
#
############################################################
[文档] class TMstreamingDataReader(): ''' 这段代码实现了一个名为TMstreamingDataReader的类,作为数据读取器。 类的初始化方法(init)中,加载了SDK库文件,并初始化了相机。 @parameter cfg_path: csv格式的配置文件 @parameter lib_path: 预编译的sdk动态链接库,ubuntu为.so,windows为.dll,macOS为,dylib @parameter device_id: 相机的id,支持多目相机同时使用 主要函数说明: - __init__方法:加载SDK库文件,初始化相机,准备数据包。 - setExposure方法:设置相机的曝光时间。 - __enter__方法:开始读取数据。 - __exit__方法:处理异常并停止相机。 - __call__方法:读取数据帧并返回。 这段代码的功能是通过SDK库读取相机的数据帧并进行处理,提供了读取数据和设置曝光时间的接口。 在使用该类时,可以通过with语句自动调用__enter__和__exit__方法,确保资源的正确释放。 ''' def __init__(self,cfg_path = None,lib_path = None,device_id=0): # load sdk lib file_path = os.path.realpath(__file__) file_path = os.path.dirname(file_path) if cfg_path is None: cfg_path = os.path.join(file_path,"lib","Golden_HCG_seq.csv") if lib_path is None: lib_path = os.path.join(file_path,"lib","libtianmouc.so") self.cfg_path = str(cfg_path).encode("utf-8") tm_sdk = cdll.LoadLibrary(lib_path) tm_sdk.tmOpenCamera.restype = ctypes.c_uint64 # init camera cameraHandle = tm_sdk.tmOpenCamera(device_id) print('[TMstreamingDataReader]type(cameraHandle)',type(cameraHandle),tm_sdk.tmOpenCamera.restype) # prepare data pack coneData = pointer(c_float(CONE_W*CONE_H*2*3)) tdData1 = pointer(c_ubyte(ROD_W*ROD_H)) sdData1 = pointer(c_ubyte(ROD_W*ROD_H)) sdData2 = pointer(c_ubyte(ROD_W*ROD_H)) idRod = c_int(1) timestamp = c_ulonglong(1) tianmoucDataValue = [idRod,timestamp,tdData1,sdData1,sdData2,coneData] self.dataPack = tianmoucData(*tianmoucDataValue) self.dataPack_p = pointer(self.dataPack) #store data buffer and ssdk self.tm_sdk = tm_sdk self.cameraHandle = c_uint64(cameraHandle) if cameraHandle > 0: print('[TMstreamingDataReader] Tianmouc camera:',hex(cameraHandle),'is raedy') self.tm_sdk.IICconfig_download(self.cameraHandle,self.cfg_path) print('[TMstreamingDataReader] Tianmouc camera:',hex(cameraHandle),' configure downloaded') else: print('[TMstreamingDataReader] fail to get correct camera handle:',hex(cameraHandle)) def setExposure(self,aoptime,coptime,aopG=1,copG=1): aoptime = min(max(aoptime,4),1240) coptime = min(max(coptime,4),15000) print(aoptime,coptime,aopG,copG) self.tm_sdk.tmExposureSet(self.cameraHandle,aoptime,coptime,aopG,copG,8,1,1) #1:par,8:bit #use python-with to start reader def __enter__(self): # start camera print('[TMstreamingDataReader] download',self.cfg_path) self.tm_sdk.tmStartTransfer(self.cameraHandle) print('[TMstreamingDataReader] Tianmouc data transfer start:',self.cameraHandle) return self # deal with exception def __exit__(self, exc_type, exc_val, exc_tb): print('[TMstreamingDataReader] catch exception:',exc_type, exc_val, exc_tb) #stop camera self.tm_sdk.tmCameraUninit(self.cameraHandle) return True # functional object def __call__(self): #ts = time.time() ret = self.tm_sdk.tmGetFrame(self.cameraHandle,self.dataPack_p) if not ret: print('[TMstreamingDataReader] loss data or loss connection, this frame is not correct') if self.dataPack.isRod == 1: td_buffer = cast(self.dataPack.td_p, POINTER(c_ubyte*ROD_W*ROD_H))[0] sdl_buffer = cast(self.dataPack.sdl_p, POINTER(c_ubyte*ROD_W*ROD_H))[0] sdr_buffer = cast(self.dataPack.sdr_p, POINTER(c_ubyte*ROD_W*ROD_H))[0] TD = np.frombuffer(td_buffer,dtype=np.byte, count=ROD_W*ROD_H).reshape(ROD_H,ROD_W) SDL = np.frombuffer(sdl_buffer,dtype=np.byte, count=ROD_W*ROD_H).reshape(ROD_H,ROD_W) SDR = np.frombuffer(sdr_buffer,dtype=np.byte, count=ROD_W*ROD_H).reshape(ROD_H,ROD_W) isRod = self.dataPack.isRod timestamp = self.dataPack.timestamp return isRod, timestamp,(TD,SDL,SDR) else: RGB_buffer = cast(self.dataPack.rgb_p, POINTER(c_float*CONE_H*CONE_W*2*3))[0] RGB = np.frombuffer(RGB_buffer,dtype=np.float32).reshape(CONE_H,CONE_W*2,3) isRod = self.dataPack.isRod timestamp = self.dataPack.timestamp return isRod, timestamp, RGB