#!/usr/bin/python3 import os import sys import json import time import datetime from datetime import timedelta import traceback import logging.handlers import cv2 import glob import pandas as pd import pika import boto3 import serial os.environ['GENICAM_GENTL64_PATH'] = '/opt/ids-peak_2.7.0.0-16268_amd64/lib/ids/cti/' from ids_peak import ids_peak as peak from ids_peak_ipl import ids_peak_ipl ############################################### # Logger Setting # ############################################### logger = logging.getLogger() logger.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') log_fileHandler = logging.handlers.RotatingFileHandler( filename=f"./logs/gseps.log", maxBytes=1024000, backupCount=3, mode='a') log_fileHandler.setFormatter(formatter) logger.addHandler(log_fileHandler) ############################################### # Config # ############################################### with open('./acquisition_config.json', 'r') as f: info = json.load(f) # CAM2_DEVICE_PATH = info['cam2_device_path'] LASER_DEVICE_PATH = info['laser_device_path'] S3_BUCKET = info['S3BucketName'] GSEPS_STAGE_BUCKET = info['gseps_stage_bucket'] S3_UPLOAD_KEY = info['BucketKey'] ############################################### # Camera Variable # ############################################### m_device = None m_dataStream = None m_node_map_remote_device = None # Load Camera2 # cam2 = cv2.VideoCapture(CAM2_DEVICE_PATH) # _ = cam2.read() # logger.info(f'CAM2 Load.') # class Publisher: # def __init__(self): # self.__url = info['amqp_url'] # self.__port = info['amqp_port'] # self.__vhost = info['amqp_vhost'] # self.__cred = pika.PlainCredentials(info['amqp_id'], info['amqp_pw']) # self.__queue = info['amqp_queue'] # self.__ReadyQ = info['amqp_ReadyQ'] # # def check_server_state(self): # conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url, # self.__port, # self.__vhost, # self.__cred)) # chan = conn.channel() # method, properties, body = chan.basic_get(queue=self.__ReadyQ, # auto_ack=True) # # if method: # chan.queue_purge(queue=self.__ReadyQ) # conn.close() # return True # # conn.close() # return False # # def pub(self, body: dict): # try: # conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url, # self.__port, # self.__vhost, # self.__cred)) # chan = conn.channel() # chan.basic_publish(exchange='', # routing_key=self.__queue, # body=json.dumps(body)) # conn.close() # return # except Exception as e: # # add error alarm # logger.error(traceback.format_exc()) def open_camera(): global m_device, m_node_map_remote_device try: # Create instance of the device manager device_manager = peak.DeviceManager.Instance() # Update the device manager device_manager.Update() # Return if no device was found if device_manager.Devices().empty(): return False # open the first openable device in the device manager's device list device_count = device_manager.Devices().size() for i in range(device_count): if device_manager.Devices()[i].IsOpenable(): m_device = device_manager.Devices()[i].OpenDevice(peak.DeviceAccessType_Control) # Get NodeMap of the RemoteDevice for all accesses to the GenICam NodeMap tree m_node_map_remote_device = m_device.RemoteDevice().NodeMaps()[0] return True except Exception as e: print(e) # logger.error(traceback.format_exc()) return False def prepare_acquisition(): global m_dataStream try: data_streams = m_device.DataStreams() if data_streams.empty(): # no data streams available return False m_dataStream = m_device.DataStreams()[0].OpenDataStream() return True except Exception as e: logger.error(traceback.format_exc()) return False def set_roi(x, y, width, height): try: # Get the minimum ROI and set it. After that there are no size restrictions anymore x_min = m_node_map_remote_device.FindNode("OffsetX").Minimum() y_min = m_node_map_remote_device.FindNode("OffsetY").Minimum() w_min = m_node_map_remote_device.FindNode("Width").Minimum() h_min = m_node_map_remote_device.FindNode("Height").Minimum() m_node_map_remote_device.FindNode("OffsetX").SetValue(x_min) m_node_map_remote_device.FindNode("OffsetY").SetValue(y_min) m_node_map_remote_device.FindNode("Width").SetValue(w_min) m_node_map_remote_device.FindNode("Height").SetValue(h_min) # Get the maximum ROI values x_max = m_node_map_remote_device.FindNode("OffsetX").Maximum() y_max = m_node_map_remote_device.FindNode("OffsetY").Maximum() w_max = m_node_map_remote_device.FindNode("Width").Maximum() h_max = m_node_map_remote_device.FindNode("Height").Maximum() if (x < x_min) or (y < y_min) or (x > x_max) or (y > y_max): return False elif (width < w_min) or (height < h_min) or ((x + width) > w_max) or ((y + height) > h_max): return False else: # Now, set final AOI m_node_map_remote_device.FindNode("OffsetX").SetValue(0) m_node_map_remote_device.FindNode("OffsetY").SetValue(0) m_node_map_remote_device.FindNode("Width").SetValue(w_max) m_node_map_remote_device.FindNode("Height").SetValue(h_max) return True except Exception as e: logger.error(traceback.format_exc()) return False def alloc_and_announce_buffers(): try: if m_dataStream: # Flush queue and prepare all buffers for revoking m_dataStream.Flush(peak.DataStreamFlushMode_DiscardAll) # Clear all old buffers for buffer in m_dataStream.AnnouncedBuffers(): m_dataStream.RevokeBuffer(buffer) payload_size = m_node_map_remote_device.FindNode("PayloadSize").Value() # Get number of minimum required buffers num_buffers_min_required = m_dataStream.NumBuffersAnnouncedMinRequired() # Alloc buffers for count in range(num_buffers_min_required): buffer = m_dataStream.AllocAndAnnounceBuffer(payload_size) m_dataStream.QueueBuffer(buffer) return True except Exception as e: logger.error(traceback.format_exc()) return False def start_acquisition(): try: m_dataStream.StartAcquisition(peak.AcquisitionStartMode_Default, peak.DataStream.INFINITE_NUMBER) m_node_map_remote_device.FindNode("TLParamsLocked").SetValue(1) m_node_map_remote_device.FindNode("AcquisitionStart").Execute() # m_node_map_remote_device.FindNode("DeviceRegistersStreamingStart").Execute() return True except Exception as e: logger.error(traceback.format_exc()) return False # check alarm def check_distance(): while True: try: ser = serial.Serial(LASER_DEVICE_PATH, baudrate=9600, timeout=1) ser.write(b'O') result = ser.read(6)[1:] distance_bytes = [str(n) for n in result] distance = int(''.join(distance_bytes)) logger.info(f"laser_value : {distance}") return distance except KeyboardInterrupt: exit() except Exception as e: print(e) # logger.error(traceback.format_exc()) # raise print(e) # logger.error(f"Error : {e}") continue def image_capture_from_cam2(save_path): global cam2 ret, frame = cam2.read() if ret: cv2.imwrite(save_path, frame) return True return False def upload_to_s3(company, image_list=None): today = datetime.date.today() if company == 'sdt': s3 = boto3.resource('s3', aws_access_key_id=info['AccessKey'], aws_secret_access_key=info['SecretKey'], region_name=info['Boto3RegionName']) for path in image_list: image_name = path.split('/')[-1] s3.Bucket(S3_BUCKET).upload_file(path, f'{today}/{image_name}') elif company == 'gseps': yesterday = today - timedelta(days=1) s3 = boto3.resource('s3') laser_csv = glob.glob('./laser_value/*') total_laser_info = pd.DataFrame() for path in laser_csv: csv = pd.read_csv(path) total_laser_info = pd.concat([total_laser_info, csv], ignore_index=True) laser_save_path = f'./laser_value/{yesterday}-mv_laser_height.csv' total_laser_info.to_csv(laser_save_path, index=False) s3.Bucket(GSEPS_STAGE_BUCKET).upload_file(laser_save_path, f'{S3_UPLOAD_KEY}/year={yesterday.year}/month={yesterday.month:02d}/day={yesterday.day:02d}/mv_laser_height.csv') def check_hour_for_laser(current_time): if current_time.minute == 0 and current_time.second == 0: return True return False def main(): # logger.info(f'Cam Initialize.') # initialize library peak.Library.Initialize() # add logger error if not open_camera(): # error print(1) sys.exit(-1) logger.info(f'CAM1 Load.') if not prepare_acquisition(): # error print(2) sys.exit(-2) if not set_roi(16, 16, 256, 128): # error print(3) sys.exit(-3) if not alloc_and_announce_buffers(): # error print(4) sys.exit(-4) if not start_acquisition(): # error print(5) sys.exit(-5) image_info = {} # publisher = Publisher() laser_history = pd.DataFrame() laser_count = 0 while True: try: now_datetime = datetime.datetime.now() now_unix = int(now_datetime.timestamp()) now = now_datetime.strftime("%Y%m%d-%H%M%S%f") # laser_value = check_distance() event_flag = 0 if True: laser_count = 0 event_flag = 1 # logger.info(f"Capture Start at {laser_value}") # Get buffer from device's DataStream. Wait 5000 ms. The buffer is automatically locked until it is queued again. ################# CHANGE #################### buffer = m_dataStream.WaitForFinishedBuffer(1000) image = ids_peak_ipl.Image.CreateFromSizeAndBuffer( buffer.PixelFormat(), buffer.BasePtr(), buffer.Size(), buffer.Width(), buffer.Height() ) image_processed = image.ConvertTo(ids_peak_ipl.PixelFormatName_BGRa8, ids_peak_ipl.ConversionMode_Fast) # Queue buffer again m_dataStream.QueueBuffer(buffer) ############################################### img_numpy = image_processed.get_numpy() # save image cam1_image_name = f'{now}_cam1.jpg' cam2_image_name = f'{now}_cam2.jpg' image_save_path = info['image_save_path'] if not os.path.exists(image_save_path): os.makedirs(image_save_path) cam1_image_path = os.path.join(image_save_path, cam1_image_name) cam2_image_path = os.path.join(image_save_path, cam2_image_name) cv2.imwrite(cam1_image_path, img_numpy) print(cam1_image_path) # Capture cam2 # if not image_capture_from_cam2(cam2_image_path): # logger.error(f"Cannot Load CAM2!") # cam2_image_path = None # Upload image to MinIO(inference server) # image_list = [cam1_image_path, cam2_image_path] # upload_to_s3('sdt', image_list) # publish # image_info = {'to': {"cam1_image_name": cam1_image_name, # "cam2_image_name": cam2_image_name, # "laser": laser_value, # "bucket": "gseps-dataset"}} # if publisher.check_server_state(): # publisher.pub(image_info) ############## CHANGE ####################### # Logger Contents Change // MinIO => S3 # ############################################# # logger.info(f'Successfully Uploaded To S3!. cam1: {cam1_image_name}, cam2: {cam2_image_name}') # print(f'Successfully Uploaded To Minio!. cam1: {cam1_image_name}, cam2: {cam2_image_name}') # current_laser_info = pd.DataFrame([{"timestamp": now_unix, # "laser_value": laser_value, # "event": event_flag}]) # laser_history = pd.concat([laser_history, current_laser_info], ignore_index=True) # # current_time = datetime.datetime.now() # if check_hour_for_laser(current_time): # laser_save_hour = current_time.strftime('%m%d-%H') # hour = current_time.hour # if not os.path.exists(f'./laser_value/{laser_save_hour}.csv'): # laser_history.to_csv(f'./laser_value/{laser_save_hour}.csv', index=False) # laser_history = pd.DataFrame() # if hour == 0: # upload_to_s3('gseps') # laser_count += 1 # time.sleep(3) except Exception as e: print(traceback.format_exc()) # logger.error(traceback.format_exc()) exit() peak.Library.Close() sys.exit(0) if __name__ == '__main__': main()