gseps-edge-app/main.py.bk

450 lines
15 KiB
Plaintext
Raw Permalink Normal View History

2024-05-29 09:58:46 +00:00
#!/usr/bin/python3
import os
import sys
import json
import time
import datetime
from datetime import timedelta
import traceback
import logging.handlers
import cv2
import glob
import pandas as pd
import pika
import boto3
import serial
os.environ['GENICAM_GENTL64_PATH'] = '/opt/ids-peak_2.7.0.0-16268_amd64/lib/ids/cti/'
from ids_peak import ids_peak as peak
from ids_peak_ipl import ids_peak_ipl
###############################################
# Logger Setting #
###############################################
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log_fileHandler = logging.handlers.RotatingFileHandler(
filename=f"./logs/gseps.log",
maxBytes=1024000,
backupCount=3,
mode='a')
log_fileHandler.setFormatter(formatter)
logger.addHandler(log_fileHandler)
###############################################
# Config #
###############################################
with open('./acquisition_config.json', 'r') as f:
info = json.load(f)
# CAM2_DEVICE_PATH = info['cam2_device_path']
LASER_DEVICE_PATH = info['laser_device_path']
S3_BUCKET = info['S3BucketName']
GSEPS_STAGE_BUCKET = info['gseps_stage_bucket']
S3_UPLOAD_KEY = info['BucketKey']
###############################################
# Camera Variable #
###############################################
m_device = None
m_dataStream = None
m_node_map_remote_device = None
# Load Camera2
# cam2 = cv2.VideoCapture(CAM2_DEVICE_PATH)
# _ = cam2.read()
# logger.info(f'CAM2 Load.')
# class Publisher:
# def __init__(self):
# self.__url = info['amqp_url']
# self.__port = info['amqp_port']
# self.__vhost = info['amqp_vhost']
# self.__cred = pika.PlainCredentials(info['amqp_id'], info['amqp_pw'])
# self.__queue = info['amqp_queue']
# self.__ReadyQ = info['amqp_ReadyQ']
#
# def check_server_state(self):
# conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url,
# self.__port,
# self.__vhost,
# self.__cred))
# chan = conn.channel()
# method, properties, body = chan.basic_get(queue=self.__ReadyQ,
# auto_ack=True)
#
# if method:
# chan.queue_purge(queue=self.__ReadyQ)
# conn.close()
# return True
#
# conn.close()
# return False
#
# def pub(self, body: dict):
# try:
# conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url,
# self.__port,
# self.__vhost,
# self.__cred))
# chan = conn.channel()
# chan.basic_publish(exchange='',
# routing_key=self.__queue,
# body=json.dumps(body))
# conn.close()
# return
# except Exception as e:
# # add error alarm
# logger.error(traceback.format_exc())
def open_camera():
global m_device, m_node_map_remote_device
try:
# Create instance of the device manager
device_manager = peak.DeviceManager.Instance()
# Update the device manager
device_manager.Update()
# Return if no device was found
if device_manager.Devices().empty():
return False
# open the first openable device in the device manager's device list
device_count = device_manager.Devices().size()
for i in range(device_count):
if device_manager.Devices()[i].IsOpenable():
m_device = device_manager.Devices()[i].OpenDevice(peak.DeviceAccessType_Control)
# Get NodeMap of the RemoteDevice for all accesses to the GenICam NodeMap tree
m_node_map_remote_device = m_device.RemoteDevice().NodeMaps()[0]
return True
except Exception as e:
print(e)
# logger.error(traceback.format_exc())
return False
def prepare_acquisition():
global m_dataStream
try:
data_streams = m_device.DataStreams()
if data_streams.empty():
# no data streams available
return False
m_dataStream = m_device.DataStreams()[0].OpenDataStream()
return True
except Exception as e:
logger.error(traceback.format_exc())
return False
def set_roi(x, y, width, height):
try:
# Get the minimum ROI and set it. After that there are no size restrictions anymore
x_min = m_node_map_remote_device.FindNode("OffsetX").Minimum()
y_min = m_node_map_remote_device.FindNode("OffsetY").Minimum()
w_min = m_node_map_remote_device.FindNode("Width").Minimum()
h_min = m_node_map_remote_device.FindNode("Height").Minimum()
m_node_map_remote_device.FindNode("OffsetX").SetValue(x_min)
m_node_map_remote_device.FindNode("OffsetY").SetValue(y_min)
m_node_map_remote_device.FindNode("Width").SetValue(w_min)
m_node_map_remote_device.FindNode("Height").SetValue(h_min)
# Get the maximum ROI values
x_max = m_node_map_remote_device.FindNode("OffsetX").Maximum()
y_max = m_node_map_remote_device.FindNode("OffsetY").Maximum()
w_max = m_node_map_remote_device.FindNode("Width").Maximum()
h_max = m_node_map_remote_device.FindNode("Height").Maximum()
if (x < x_min) or (y < y_min) or (x > x_max) or (y > y_max):
return False
elif (width < w_min) or (height < h_min) or ((x + width) > w_max) or ((y + height) > h_max):
return False
else:
# Now, set final AOI
m_node_map_remote_device.FindNode("OffsetX").SetValue(0)
m_node_map_remote_device.FindNode("OffsetY").SetValue(0)
m_node_map_remote_device.FindNode("Width").SetValue(w_max)
m_node_map_remote_device.FindNode("Height").SetValue(h_max)
return True
except Exception as e:
logger.error(traceback.format_exc())
return False
def alloc_and_announce_buffers():
try:
if m_dataStream:
# Flush queue and prepare all buffers for revoking
m_dataStream.Flush(peak.DataStreamFlushMode_DiscardAll)
# Clear all old buffers
for buffer in m_dataStream.AnnouncedBuffers():
m_dataStream.RevokeBuffer(buffer)
payload_size = m_node_map_remote_device.FindNode("PayloadSize").Value()
# Get number of minimum required buffers
num_buffers_min_required = m_dataStream.NumBuffersAnnouncedMinRequired()
# Alloc buffers
for count in range(num_buffers_min_required):
buffer = m_dataStream.AllocAndAnnounceBuffer(payload_size)
m_dataStream.QueueBuffer(buffer)
return True
except Exception as e:
logger.error(traceback.format_exc())
return False
def start_acquisition():
try:
m_dataStream.StartAcquisition(peak.AcquisitionStartMode_Default, peak.DataStream.INFINITE_NUMBER)
m_node_map_remote_device.FindNode("TLParamsLocked").SetValue(1)
m_node_map_remote_device.FindNode("AcquisitionStart").Execute()
# m_node_map_remote_device.FindNode("DeviceRegistersStreamingStart").Execute()
return True
except Exception as e:
logger.error(traceback.format_exc())
return False
# check alarm
def check_distance():
while True:
try:
ser = serial.Serial(LASER_DEVICE_PATH, baudrate=9600, timeout=1)
ser.write(b'O')
result = ser.read(6)[1:]
distance_bytes = [str(n) for n in result]
distance = int(''.join(distance_bytes))
logger.info(f"laser_value : {distance}")
return distance
except KeyboardInterrupt:
exit()
except Exception as e:
print(e)
# logger.error(traceback.format_exc())
# raise
print(e)
# logger.error(f"Error : {e}")
continue
def image_capture_from_cam2(save_path):
global cam2
ret, frame = cam2.read()
if ret:
cv2.imwrite(save_path, frame)
return True
return False
def upload_to_s3(company, image_list=None):
today = datetime.date.today()
if company == 'sdt':
s3 = boto3.resource('s3',
aws_access_key_id=info['AccessKey'],
aws_secret_access_key=info['SecretKey'],
region_name=info['Boto3RegionName'])
for path in image_list:
image_name = path.split('/')[-1]
s3.Bucket(S3_BUCKET).upload_file(path, f'{today}/{image_name}')
elif company == 'gseps':
yesterday = today - timedelta(days=1)
s3 = boto3.resource('s3')
laser_csv = glob.glob('./laser_value/*')
total_laser_info = pd.DataFrame()
for path in laser_csv:
csv = pd.read_csv(path)
total_laser_info = pd.concat([total_laser_info, csv], ignore_index=True)
laser_save_path = f'./laser_value/{yesterday}-mv_laser_height.csv'
total_laser_info.to_csv(laser_save_path, index=False)
s3.Bucket(GSEPS_STAGE_BUCKET).upload_file(laser_save_path, f'{S3_UPLOAD_KEY}/year={yesterday.year}/month={yesterday.month:02d}/day={yesterday.day:02d}/mv_laser_height.csv')
def check_hour_for_laser(current_time):
if current_time.minute == 0 and current_time.second == 0:
return True
return False
def main():
# logger.info(f'Cam Initialize.')
# initialize library
peak.Library.Initialize()
# add logger error
if not open_camera():
# error
print(1)
sys.exit(-1)
logger.info(f'CAM1 Load.')
if not prepare_acquisition():
# error
print(2)
sys.exit(-2)
if not set_roi(16, 16, 256, 128):
# error
print(3)
sys.exit(-3)
if not alloc_and_announce_buffers():
# error
print(4)
sys.exit(-4)
if not start_acquisition():
# error
print(5)
sys.exit(-5)
image_info = {}
# publisher = Publisher()
laser_history = pd.DataFrame()
laser_count = 0
while True:
try:
now_datetime = datetime.datetime.now()
now_unix = int(now_datetime.timestamp())
now = now_datetime.strftime("%Y%m%d-%H%M%S%f")
# laser_value = check_distance()
event_flag = 0
if True:
laser_count = 0
event_flag = 1
# logger.info(f"Capture Start at {laser_value}")
# Get buffer from device's DataStream. Wait 5000 ms. The buffer is automatically locked until it is queued again.
################# CHANGE ####################
buffer = m_dataStream.WaitForFinishedBuffer(1000)
image = ids_peak_ipl.Image.CreateFromSizeAndBuffer(
buffer.PixelFormat(),
buffer.BasePtr(),
buffer.Size(),
buffer.Width(),
buffer.Height()
)
image_processed = image.ConvertTo(ids_peak_ipl.PixelFormatName_BGRa8, ids_peak_ipl.ConversionMode_Fast)
# Queue buffer again
m_dataStream.QueueBuffer(buffer)
###############################################
img_numpy = image_processed.get_numpy()
# save image
cam1_image_name = f'{now}_cam1.jpg'
cam2_image_name = f'{now}_cam2.jpg'
image_save_path = info['image_save_path']
if not os.path.exists(image_save_path):
os.makedirs(image_save_path)
cam1_image_path = os.path.join(image_save_path, cam1_image_name)
cam2_image_path = os.path.join(image_save_path, cam2_image_name)
cv2.imwrite(cam1_image_path, img_numpy)
print(cam1_image_path)
# Capture cam2
# if not image_capture_from_cam2(cam2_image_path):
# logger.error(f"Cannot Load CAM2!")
# cam2_image_path = None
# Upload image to MinIO(inference server)
# image_list = [cam1_image_path, cam2_image_path]
# upload_to_s3('sdt', image_list)
# publish
# image_info = {'to': {"cam1_image_name": cam1_image_name,
# "cam2_image_name": cam2_image_name,
# "laser": laser_value,
# "bucket": "gseps-dataset"}}
# if publisher.check_server_state():
# publisher.pub(image_info)
############## CHANGE #######################
# Logger Contents Change // MinIO => S3 #
#############################################
# logger.info(f'Successfully Uploaded To S3!. cam1: {cam1_image_name}, cam2: {cam2_image_name}')
# print(f'Successfully Uploaded To Minio!. cam1: {cam1_image_name}, cam2: {cam2_image_name}')
# current_laser_info = pd.DataFrame([{"timestamp": now_unix,
# "laser_value": laser_value,
# "event": event_flag}])
# laser_history = pd.concat([laser_history, current_laser_info], ignore_index=True)
#
# current_time = datetime.datetime.now()
# if check_hour_for_laser(current_time):
# laser_save_hour = current_time.strftime('%m%d-%H')
# hour = current_time.hour
# if not os.path.exists(f'./laser_value/{laser_save_hour}.csv'):
# laser_history.to_csv(f'./laser_value/{laser_save_hour}.csv', index=False)
# laser_history = pd.DataFrame()
# if hour == 0:
# upload_to_s3('gseps')
# laser_count += 1
# time.sleep(3)
except Exception as e:
print(traceback.format_exc())
# logger.error(traceback.format_exc())
exit()
peak.Library.Close()
sys.exit(0)
if __name__ == '__main__':
main()