#!/usr/bin/python3 import os import sys import json import time import datetime from datetime import timedelta import traceback import logging.handlers import cv2 import glob import pandas as pd import pika import boto3 import serial os.environ['GENICAM_GENTL64_PATH'] = '/opt/ids-peak_2.9.0.0-48_amd64/lib/ids/cti/' from ids_peak import ids_peak as peak from ids_peak_ipl import ids_peak_ipl ############################################### # Logger Setting # ############################################### logger = logging.getLogger() logger.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') log_fileHandler = logging.handlers.RotatingFileHandler( filename=f"./logs/gseps.log", maxBytes=1024000, backupCount=3, mode='a') log_fileHandler.setFormatter(formatter) logger.addHandler(log_fileHandler) ############################################### # Config # ############################################### with open('./acquisition_config.json', 'r') as f: info = json.load(f) LASER_SAVE_PATH = info['laser_save_path'] LASER_DEVICE_PATH = info['laser_device_path'] S3_BUCKET = info['S3BucketName'] GSEPS_STAGE_BUCKET = info['gseps_stage_bucket'] S3_UPLOAD_KEY = info['BucketKey'] s3 = boto3.resource('s3') sdt_s3 = boto3.resource('s3', aws_access_key_id=info['AccessKey'], aws_secret_access_key=info['SecretKey'], region_name=info['Boto3RegionName']) ############################################### # Camera Variable # ############################################### m_device = None m_dataStream = None m_node_map_remote_device = None m_device_2 = None m_dataStream_2 = None m_node_map_remote_device_2 = None class Publisher: def __init__(self): self.__url = info['amqp_url'] self.__port = info['amqp_port'] self.__vhost = info['amqp_vhost'] self.__cred = pika.PlainCredentials(info['amqp_id'], info['amqp_pw']) self.__queue = info['amqp_queue'] self.__ReadyQ = info['amqp_ReadyQ'] def check_server_state(self): conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url, self.__port, self.__vhost, self.__cred)) chan = conn.channel() method, properties, body = chan.basic_get(queue=self.__ReadyQ, auto_ack=True) if method: chan.queue_purge(queue=self.__ReadyQ) conn.close() return True conn.close() return False def pub(self, body: dict): try: conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url, self.__port, self.__vhost, self.__cred)) chan = conn.channel() chan.basic_publish(exchange='', routing_key=self.__queue, body=json.dumps(body)) conn.close() return except Exception as e: # add error alarm logger.error(traceback.format_exc()) def open_camera(): global m_device, m_node_map_remote_device, m_device_2, m_node_map_remote_device_2 try: # Create instance of the device manager device_manager = peak.DeviceManager.Instance() # Update the device manager device_manager.Update() # Return if no device was found if device_manager.Devices().empty(): return False # open the first openable device in the device manager's device list device_count = device_manager.Devices().size() if device_manager.Devices()[0].IsOpenable() and device_manager.Devices()[1].IsOpenable(): m_device = device_manager.Devices()[1].OpenDevice(peak.DeviceAccessType_Control) # Get NodeMap of the RemoteDevice for all accesses to the GenICam NodeMap tree m_node_map_remote_device = m_device.RemoteDevice().NodeMaps()[0] m_device_2 = device_manager.Devices()[0].OpenDevice(peak.DeviceAccessType_Control) m_node_map_remote_device_2 = m_device_2.RemoteDevice().NodeMaps()[0] return True except Exception as e: print(e) logger.error(traceback.format_exc()) return False def prepare_acquisition(): global m_dataStream, m_dataStream_2 try: data_streams = m_device.DataStreams() data_streams_2 = m_device_2.DataStreams() if data_streams.empty() or data_streams_2.empty(): # no data streams available return False m_dataStream = m_device.DataStreams()[0].OpenDataStream() m_dataStream_2 = m_device_2.DataStreams()[0].OpenDataStream() return True except Exception as e: print(e) logger.error(traceback.format_exc()) return False def set_roi(width, height): try: offset_x = int((4512 - width)/2) offset_y = int((4512 - height)/2) # Get the minimum ROI and set it. After that there are no size restrictions anymore x_min = m_node_map_remote_device.FindNode("OffsetX").Minimum() y_min = m_node_map_remote_device.FindNode("OffsetY").Minimum() w_min = m_node_map_remote_device.FindNode("Width").Minimum() h_min = m_node_map_remote_device.FindNode("Height").Minimum() m_node_map_remote_device.FindNode("OffsetX").SetValue(x_min) m_node_map_remote_device.FindNode("OffsetY").SetValue(y_min) m_node_map_remote_device.FindNode("Width").SetValue(w_min) m_node_map_remote_device.FindNode("Height").SetValue(h_min) # Get the maximum ROI values x_max = m_node_map_remote_device.FindNode("OffsetX").Maximum() y_max = m_node_map_remote_device.FindNode("OffsetY").Maximum() w_max = m_node_map_remote_device.FindNode("Width").Maximum() h_max = m_node_map_remote_device.FindNode("Height").Maximum() if (offset_x < x_min) or (offset_y < y_min) or (offset_x > x_max) or (offset_y > y_max): return False elif (width < w_min) or (height < h_min) or ((offset_x + width) > w_max) or ((offset_y + height) > h_max): return False else: # Now, set final AOI m_node_map_remote_device.FindNode("OffsetX").SetValue(offset_x) m_node_map_remote_device.FindNode("OffsetY").SetValue(offset_y) m_node_map_remote_device.FindNode("Width").SetValue(width) m_node_map_remote_device.FindNode("Height").SetValue(height) return True except Exception as e: logger.error(traceback.format_exc()) return False def alloc_and_announce_buffers(): try: if m_dataStream and m_dataStream_2: # cam1 # Flush queue and prepare all buffers for revoking m_dataStream.Flush(peak.DataStreamFlushMode_DiscardAll) # Clear all old buffers for buffer in m_dataStream.AnnouncedBuffers(): m_dataStream.RevokeBuffer(buffer) payload_size = m_node_map_remote_device.FindNode("PayloadSize").Value() # Get number of minimum required buffers num_buffers_min_required = m_dataStream.NumBuffersAnnouncedMinRequired() # Alloc buffers for count in range(num_buffers_min_required): buffer = m_dataStream.AllocAndAnnounceBuffer(payload_size) m_dataStream.QueueBuffer(buffer) # cam2 m_dataStream_2.Flush(peak.DataStreamFlushMode_DiscardAll) for buffer in m_dataStream_2.AnnouncedBuffers(): m_dataStream_2.RevokeBuffer(buffer) payload_size_2 = m_node_map_remote_device_2.FindNode("PayloadSize").Value() num_buffers_min_required_2 = m_dataStream_2.NumBuffersAnnouncedMinRequired() for count in range(num_buffers_min_required_2): buffer = m_dataStream_2.AllocAndAnnounceBuffer(payload_size_2) m_dataStream_2.QueueBuffer(buffer) return True except Exception as e: logger.error(traceback.format_exc()) return False def start_acquisition(): try: m_dataStream.StartAcquisition(peak.AcquisitionStartMode_Default, peak.DataStream.INFINITE_NUMBER) m_node_map_remote_device.FindNode("TLParamsLocked").SetValue(1) m_node_map_remote_device.FindNode("AcquisitionStart").Execute() # m_node_map_remote_device.FindNode("DeviceRegistersStreamingStart").Execute() m_dataStream_2.StartAcquisition(peak.AcquisitionStartMode_Default, peak.DataStream.INFINITE_NUMBER) m_node_map_remote_device_2.FindNode("TLParamsLocked").SetValue(1) m_node_map_remote_device_2.FindNode("AcquisitionStart").Execute() # m_node_map_remote_device.FindNode("DeviceRegistersStreamingStart").Execute() return True except Exception as e: logger.error(traceback.format_exc()) return False # check alarm def check_distance(): while True: try: ser = serial.Serial(LASER_DEVICE_PATH, baudrate=19200, timeout=1) req = b'\x01\x04\x00\x00\x00\x01\x31\xca' ser.write(req) result = ser.read(6) data_length = result[2] hex_distance = result[3:3+data_length] distance = int((int.from_bytes(hex_distance, byteorder='big'))/10) logger.info(f"laser_value : {distance}mm") ser.close() return distance except KeyboardInterrupt: ser.close() exit() except Exception as e: ser.close() # raise logger.error(f"Error : {e}") continue def upload_to_s3(company, image_list=None): today = datetime.date.today() if company == 'sdt': for path in image_list: image_name = path.split('/')[-1] date = image_name.split('-')[0] folder = f'{date[:4]}-{date[4:6]}-{date[6:]}' sdt_s3.Bucket(S3_BUCKET).upload_file(path, f'{folder}/{image_name}') os.remove(path) elif company == 'gseps': yesterday = today - timedelta(days=1) file_name = f'{yesterday.year:04d}{yesterday.month:02d}{yesterday.day:02d}' laser_csv = sorted(glob.glob(f'{LASER_SAVE_PATH}/{file_name}*.csv')) total_laser_info = pd.DataFrame() for path in laser_csv: try: csv = pd.read_csv(path) total_laser_info = pd.concat([total_laser_info, csv], ignore_index=True) #os.remove(path) except pd.errors.EmptyDataError as e: logger.error(f"Error: {path} pandas empty data error, {e}") laser_save_path = f'{LASER_SAVE_PATH}/{yesterday}-mv_laser_height.csv' total_laser_info.to_csv(laser_save_path, index=False) s3.Bucket(GSEPS_STAGE_BUCKET).upload_file(laser_save_path, f'{S3_UPLOAD_KEY}/year={yesterday.year}/month={yesterday.month:02d}/day={yesterday.day: 02d}/mv_laser_height.csv') sdt_s3.Bucket('gseps-daily').upload_file(laser_save_path, f'{yesterday}/mv_laser_height.csv') #os.remove(laser_save_path) def check_hour_for_laser(current_time): if current_time.minute == 0 and current_time.second == 0: return True return False def main(): logger.info(f'Cam Initialize.') # initialize library peak.Library.Initialize() # add logger error if not open_camera(): # error print(1) sys.exit(-1) logger.info(f'CAM Load.') if not prepare_acquisition(): # error print(2) sys.exit(-2) if not set_roi(1208, 1024): # error print(3) sys.exit(-3) if not alloc_and_announce_buffers(): # error print(4) sys.exit(-4) if not start_acquisition(): # error print(5) sys.exit(-5) image_info = {} publisher = Publisher() laser_history = pd.DataFrame() laser_count = 0 while True: try: now_datetime = datetime.datetime.now() now_unix = int(now_datetime.timestamp()) now = now_datetime.strftime("%Y%m%d-%H%M%S%f")[:-3] laser_value = check_distance() event_flag = 0 if 230 <= laser_value < 240: laser_count = 0 event_flag = 1 logger.info(f"Capture Start at {laser_value}") # Get buffer from device's DataStream. Wait 5000 ms. The buffer is automatically locked until it is queued again. ################# CHANGE #################### #cam1 buffer = m_dataStream.WaitForFinishedBuffer(1000) image = ids_peak_ipl.Image.CreateFromSizeAndBuffer( buffer.PixelFormat(), buffer.BasePtr(), buffer.Size(), buffer.Width(), buffer.Height() ) image_processed = image.ConvertTo(ids_peak_ipl.PixelFormatName_BGRa8, ids_peak_ipl.ConversionMode_Fast) # Queue buffer again m_dataStream.QueueBuffer(buffer) ############################################### img_numpy = image_processed.get_numpy() # cam2 buffer = m_dataStream_2.WaitForFinishedBuffer(1000) image = ids_peak_ipl.Image.CreateFromSizeAndBuffer( buffer.PixelFormat(), buffer.BasePtr(), buffer.Size(), buffer.Width(), buffer.Height() ) image_processed_2 = image.ConvertTo(ids_peak_ipl.PixelFormatName_BGRa8, ids_peak_ipl.ConversionMode_Fast) # Queue buffer again m_dataStream_2.QueueBuffer(buffer) ############################################### img_numpy_2 = image_processed_2.get_numpy() # save image cam1_image_name = f'{now}_cam1.jpg' cam2_image_name = f'{now}_cam2.jpg' image_save_path = info['image_save_path'] if not os.path.exists(image_save_path): os.makedirs(image_save_path) cam1_image_path = os.path.join(image_save_path, cam1_image_name) cam2_image_path = os.path.join(image_save_path, cam2_image_name) cv2.imwrite(cam1_image_path, img_numpy) cv2.imwrite(cam2_image_path, img_numpy_2) #print('cam1 path: ',cam1_image_path) #print('cam2 path: ',cam2_image_path) # Upload image to MinIO(inference server) image_list = [cam1_image_path, cam2_image_path] upload_to_s3('sdt', image_list) # publish image_info = {'to': {"cam1_image_name": cam1_image_name, "cam2_image_name": cam2_image_name, "laser": laser_value, "bucket": "gseps-dataset"}} if publisher.check_server_state(): publisher.pub(image_info) #print(image_info) ############## CHANGE ####################### # Logger Contents Change // MinIO => S3 # ############################################# logger.info(f'Successfully Uploaded To S3!. cam1: {cam1_image_name}, cam2: {cam2_image_name}') # print(f'Successfully Uploaded To Minio!. cam1: {cam1_image_name}, cam2: {cam2_image_name}') if 50 <= laser_value < 500: current_laser_info = pd.DataFrame([{"timestamp": now_unix, "laser_value": laser_value, "event": event_flag}]) laser_history = pd.concat([laser_history, current_laser_info], ignore_index=True) current_time = datetime.datetime.now() # if check_hour_for_laser(current_time): if current_time.minute == 0: today = datetime.date.today() laser_save_hour = (current_time - timedelta(hours=1)).strftime('%Y%m%d-%H') hour = current_time.hour date = laser_save_hour.split('-')[0] folder = f'{date[:4]}-{date[4:6]}-{date[6:]}' if not os.path.exists(f'{LASER_SAVE_PATH}/{laser_save_hour}.csv'): laser_history.to_csv(f'{LASER_SAVE_PATH}/{laser_save_hour}.csv', index=False) sdt_s3.Bucket('gseps-daily').upload_file(f'{LASER_SAVE_PATH}/{laser_save_hour}.csv', f'{folder}/laser/{laser_save_hour}.csv') laser_history = pd.DataFrame() if hour == 0: upload_to_s3('gseps') laser_count += 1 time.sleep(0.3) except Exception as e: logger.error(traceback.format_exc()) exit() peak.Library.Close() sys.exit(0) if __name__ == '__main__': main()