input helper to get user input
This commit is contained in:
parent
f154299069
commit
0a8a2fb9c6
@ -385,6 +385,7 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
|
||||
"65": ("Star Tracker: Set log level parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||
"66": ("Star Tracker: Set log subscription parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||
"67": ("Star Tracker: Set debug camera parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||
"68": ("Star Tracker: Firmware update", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||
}
|
||||
service_star_tracker_tuple = ("Star tracker", op_code_dict_srv_star_tracker)
|
||||
|
||||
|
@ -13,6 +13,7 @@ from tmtccmd.tc.packer import TcQueueT
|
||||
from spacepackets.ecss.tc import PusTelecommand
|
||||
from pus_tc.service_200_mode import pack_mode_data
|
||||
from tmtccmd.utility.logger import get_console_logger
|
||||
from utility.input_helper import InputHelper
|
||||
|
||||
LOGGER = get_console_logger()
|
||||
|
||||
@ -80,9 +81,10 @@ class StarTrackerActionIds:
|
||||
LOGLEVEL = 81
|
||||
LOG_SUBSCRIPTION = 82
|
||||
DEBUG_CAMERA = 83
|
||||
FIRMWARE_UPDATE = 84
|
||||
|
||||
|
||||
class ImagePathDefs:
|
||||
class FileDefs:
|
||||
uploadFile = "/mnt/sd0/startracker/gemma.bin"
|
||||
downloadFile = "test_image.bin"
|
||||
downloadPath = "/mnt/sd0/startracker"
|
||||
@ -97,13 +99,20 @@ class ImagePathDefs:
|
||||
egseFlightConfig = "/home/pi/arcsec/flight-config.json"
|
||||
q7sGroundConfig = "/mnt/sd0/startracker/ground-config.json"
|
||||
q7sFlightConfig = "/mnt/sd0/startracker/flight-config.json"
|
||||
firmware2_1 = "/home/pi/arcsec/firmware/sagitta2-1.bin"
|
||||
firmware22_1 = "/home/pi/arcsec/firmware/sagitta-22.1.bin"
|
||||
|
||||
|
||||
json_selection = {
|
||||
"1": ["Q7S flight config", ImagePathDefs.q7sFlightConfig],
|
||||
"2": ["Q7S ground config", ImagePathDefs.q7sGroundConfig],
|
||||
"3": ["EGSE flight config", ImagePathDefs.egseFlightConfig],
|
||||
"4": ["EGSE ground config", ImagePathDefs.egseGroundConfig]
|
||||
json_dict = {
|
||||
"1": ["Q7S flight config", FileDefs.q7sFlightConfig],
|
||||
"2": ["Q7S ground config", FileDefs.q7sGroundConfig],
|
||||
"3": ["EGSE flight config", FileDefs.egseFlightConfig],
|
||||
"4": ["EGSE ground config", FileDefs.egseGroundConfig]
|
||||
}
|
||||
|
||||
firmware_dict = {
|
||||
"1": ["Firmware Major = 2, Minor = 1", FileDefs.firmware2_1],
|
||||
"2": ["Firmware Major = 22, Minor = 1", FileDefs.firmware22_1],
|
||||
}
|
||||
|
||||
|
||||
@ -218,7 +227,7 @@ def pack_star_tracker_commands(
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", StarTrackerActionIds.UPLOAD_IMAGE)
|
||||
+ bytearray(ImagePathDefs.uploadFile, "utf-8")
|
||||
+ bytearray(FileDefs.uploadFile, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=40, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
@ -226,7 +235,7 @@ def pack_star_tracker_commands(
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Download image"))
|
||||
path = input("Specify storage location (default - /mnt/sd0/startracker): ")
|
||||
if not path:
|
||||
path = ImagePathDefs.downloadPath
|
||||
path = FileDefs.downloadPath
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", StarTrackerActionIds.DOWNLOAD_IMAGE)
|
||||
@ -239,7 +248,7 @@ def pack_star_tracker_commands(
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", StarTrackerActionIds.LIMITS)
|
||||
+ bytearray(ImagePathDefs.jsonFile, "utf-8")
|
||||
+ bytearray(FileDefs.jsonFile, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=42, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
@ -250,7 +259,7 @@ def pack_star_tracker_commands(
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", StarTrackerActionIds.TRACKING)
|
||||
+ bytearray(ImagePathDefs.jsonFile, "utf-8")
|
||||
+ bytearray(FileDefs.jsonFile, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=43, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
@ -259,7 +268,7 @@ def pack_star_tracker_commands(
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", StarTrackerActionIds.MOUNTING)
|
||||
+ bytearray(ImagePathDefs.jsonFile, "utf-8")
|
||||
+ bytearray(FileDefs.jsonFile, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=44, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
@ -268,7 +277,7 @@ def pack_star_tracker_commands(
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", StarTrackerActionIds.CAMERA)
|
||||
+ bytearray(ImagePathDefs.jsonFile, "utf-8")
|
||||
+ bytearray(FileDefs.jsonFile, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=45, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
@ -277,7 +286,7 @@ def pack_star_tracker_commands(
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", StarTrackerActionIds.BLOB)
|
||||
+ bytearray(ImagePathDefs.jsonFile, "utf-8")
|
||||
+ bytearray(FileDefs.jsonFile, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=46, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
@ -286,7 +295,7 @@ def pack_star_tracker_commands(
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", StarTrackerActionIds.CENTROIDING)
|
||||
+ bytearray(ImagePathDefs.jsonFile, "utf-8")
|
||||
+ bytearray(FileDefs.jsonFile, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=47, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
@ -295,7 +304,7 @@ def pack_star_tracker_commands(
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", StarTrackerActionIds.LISA)
|
||||
+ bytearray(ImagePathDefs.jsonFile, "utf-8")
|
||||
+ bytearray(FileDefs.jsonFile, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=48, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
@ -304,7 +313,7 @@ def pack_star_tracker_commands(
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", StarTrackerActionIds.MATCHING)
|
||||
+ bytearray(ImagePathDefs.jsonFile, "utf-8")
|
||||
+ bytearray(FileDefs.jsonFile, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=49, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
@ -313,7 +322,7 @@ def pack_star_tracker_commands(
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", StarTrackerActionIds.VALIDATION)
|
||||
+ bytearray(ImagePathDefs.jsonFile, "utf-8")
|
||||
+ bytearray(FileDefs.jsonFile, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=50, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
@ -374,7 +383,7 @@ def pack_star_tracker_commands(
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", StarTrackerActionIds.SET_JSON_FILE_NAME)
|
||||
+ bytearray(ImagePathDefs.jsonFile, "utf-8")
|
||||
+ bytearray(FileDefs.jsonFile, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=57, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
@ -428,7 +437,7 @@ def pack_star_tracker_commands(
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", StarTrackerActionIds.UPLOAD_CENTROID)
|
||||
+ bytearray(ImagePathDefs.uploadCentroidJson, "utf-8")
|
||||
+ bytearray(FileDefs.uploadCentroidJson, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=63, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
@ -475,7 +484,7 @@ def pack_star_tracker_commands(
|
||||
+ struct.pack("!I", StarTrackerActionIds.DOWNLOAD_FPGA_IMAGE)
|
||||
+ struct.pack("!I", position)
|
||||
+ struct.pack("!I", length)
|
||||
+ bytearray(ImagePathDefs.downloadFpgaImagePath, "utf-8")
|
||||
+ bytearray(FileDefs.downloadFpgaImagePath, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=66, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
@ -486,7 +495,7 @@ def pack_star_tracker_commands(
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", StarTrackerActionIds.CHANGE_FPGA_DOWNLOAD_FILE)
|
||||
+ bytearray(ImagePathDefs.downloadFpgaImageName, "utf-8")
|
||||
+ bytearray(FileDefs.downloadFpgaImageName, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=67, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
@ -495,7 +504,7 @@ def pack_star_tracker_commands(
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", StarTrackerActionIds.UPLOAD_FPGA_IMAGE)
|
||||
+ bytearray(ImagePathDefs.uploadFpgaImageName, "utf-8")
|
||||
+ bytearray(FileDefs.uploadFpgaImageName, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=68, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
@ -529,19 +538,19 @@ def pack_star_tracker_commands(
|
||||
if op_code == "50":
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Set image processor parameters"))
|
||||
command = object_id + struct.pack('!I', StarTrackerActionIds.IMAGE_PROCESSOR) + \
|
||||
bytearray(ImagePathDefs.jsonFile, 'utf-8')
|
||||
bytearray(FileDefs.jsonFile, 'utf-8')
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=70, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
if op_code == "51":
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: EGSE load ground config camera parameters"))
|
||||
command = object_id + struct.pack('!I', StarTrackerActionIds.CAMERA) + \
|
||||
bytearray(ImagePathDefs.egseGroundConfig, 'utf-8')
|
||||
bytearray(FileDefs.egseGroundConfig, 'utf-8')
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=71, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
if op_code == "52":
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: EGSE load flight config camera parameters"))
|
||||
command = object_id + struct.pack('!I', StarTrackerActionIds.CAMERA) + \
|
||||
bytearray(ImagePathDefs.egseFlightConfig, 'utf-8')
|
||||
bytearray(FileDefs.egseFlightConfig, 'utf-8')
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=72, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
if op_code == "53":
|
||||
@ -620,7 +629,13 @@ def pack_star_tracker_commands(
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Set debug camera parameters"))
|
||||
jsonfile = get_config_file()
|
||||
command = object_id + struct.pack('!I', StarTrackerActionIds.DEBUG_CAMERA) + bytearray(jsonfile, "utf-8")
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=85, app_data=command)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=86, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
if op_code == "68":
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Firmware update"))
|
||||
firmware = get_firmware()
|
||||
command = object_id + struct.pack('!I', StarTrackerActionIds.FIRMWARE_UPDATE) + bytearray(firmware, "utf-8")
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=87, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
|
||||
|
||||
@ -632,7 +647,7 @@ def pack_write_command(object_id: bytearray) -> bytearray:
|
||||
+ struct.pack("!I", StarTrackerActionIds.WRITE)
|
||||
+ struct.pack("!B", region)
|
||||
+ struct.pack("!I", address)
|
||||
+ bytearray(ImagePathDefs.flashFile, "utf-8")
|
||||
+ bytearray(FileDefs.flashFile, "utf-8")
|
||||
)
|
||||
return command
|
||||
|
||||
@ -647,7 +662,7 @@ def pack_read_command(object_id: bytearray) -> bytearray:
|
||||
+ struct.pack("!B", region)
|
||||
+ struct.pack("!I", address)
|
||||
+ struct.pack("!I", size)
|
||||
+ bytearray(ImagePathDefs.flashReadPath, "utf-8")
|
||||
+ bytearray(FileDefs.flashReadPath, "utf-8")
|
||||
)
|
||||
return command
|
||||
|
||||
@ -667,27 +682,16 @@ def pack_checksum_command(object_id: bytearray) -> bytearray:
|
||||
|
||||
|
||||
def get_config_file() -> str:
|
||||
LOGGER.info("Specify json file to use")
|
||||
key = get_json_file_key()
|
||||
while key not in json_selection:
|
||||
LOGGER.info("Invalid key specified, try again.")
|
||||
key = get_json_file_key()
|
||||
jsonfile = json_selection[key][1]
|
||||
LOGGER.info("Specify json file")
|
||||
input_helper = InputHelper(json_dict)
|
||||
key = input_helper.get_key()
|
||||
jsonfile = json_dict[key][1]
|
||||
return jsonfile
|
||||
|
||||
|
||||
def get_json_file_key():
|
||||
key_column_width = 10
|
||||
description_column_width = 50
|
||||
separator_width = key_column_width + description_column_width + 3
|
||||
separator_string = separator_width * "-"
|
||||
key_string = "Key".ljust(key_column_width)
|
||||
description_string = "Description".ljust(description_column_width)
|
||||
LOGGER.info(f"{key_string} | {description_string}")
|
||||
LOGGER.info(separator_string)
|
||||
for key in json_selection:
|
||||
key_string = key.ljust(key_column_width)
|
||||
description_string = json_selection[key][0].ljust(description_column_width)
|
||||
LOGGER.info(f"{key_string} | {description_string}")
|
||||
key = input("Specify key: ")
|
||||
return key
|
||||
def get_firmware() -> str:
|
||||
LOGGER.info("Specify firmware file")
|
||||
input_helper = InputHelper(firmware_dict)
|
||||
key = input_helper.get_key()
|
||||
firmware = firmware_dict[key][1]
|
||||
return firmware
|
||||
|
2
tmtccmd
2
tmtccmd
@ -1 +1 @@
|
||||
Subproject commit 0fa5c5a870cc5cd8ceb2199f775064ee07b332be
|
||||
Subproject commit 49cf288831216c0680aedab88e31d684ba5b8da8
|
49
utility/input_helper.py
Normal file
49
utility/input_helper.py
Normal file
@ -0,0 +1,49 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
@file input_helper.py
|
||||
@brief This class can be used to get user input. A dictionary must be provided which describes the input options.
|
||||
@author J. Meier
|
||||
@date 13.02.2021
|
||||
"""
|
||||
|
||||
from tmtccmd.utility.logger import get_console_logger
|
||||
|
||||
LOGGER = get_console_logger()
|
||||
|
||||
|
||||
class InputHelper:
|
||||
|
||||
def __init__(self, menu: dict):
|
||||
"""
|
||||
@brief Constructor
|
||||
@param menu The describing the input options
|
||||
"""
|
||||
self.menu = menu
|
||||
|
||||
def get_key(self) -> str:
|
||||
"""
|
||||
@brief Ask the user for input and returns the chosen key
|
||||
"""
|
||||
key = self.menu_handler()
|
||||
while key not in self.menu:
|
||||
LOGGER.info("Invalid key specified, try again.")
|
||||
key = self.menu_handler()
|
||||
return key
|
||||
|
||||
def menu_handler(self) -> str:
|
||||
key_column_width = 10
|
||||
description_column_width = 50
|
||||
separator_width = key_column_width + description_column_width + 3
|
||||
separator_string = separator_width * "-"
|
||||
key_string = "Key".ljust(key_column_width)
|
||||
description_string = "Description".ljust(description_column_width)
|
||||
LOGGER.info(f"{key_string} | {description_string}")
|
||||
LOGGER.info(separator_string)
|
||||
for key in self.menu:
|
||||
key_string = key.ljust(key_column_width)
|
||||
description_string = self.menu[key][0].ljust(description_column_width)
|
||||
LOGGER.info(f"{key_string} | {description_string}")
|
||||
key = input("Specify key: ")
|
||||
return key
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user