diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a00ec9..f351cc8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,18 @@ list yields a list of all related PRs for each release. # [unreleased] +# [v1.12.0] + +- Add Rad Sensor HK parsing + PR: https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/81 +- Add procedures, parser functions and general application functionalities + for the thermal-vacuum test. This includes daemon functionality to poll + all Telemetry even when there is no operator present + PR: https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/76 + https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/74 + https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/79 + https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/73 + # [v1.11.0] - Add `setup.cfg` and `setup.py` file, allowing package installation diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py index 2db6622..32f6c2c 100644 --- a/pus_tc/cmd_definitions.py +++ b/pus_tc/cmd_definitions.py @@ -1,4 +1,5 @@ from pus_tc.devs.gps import GpsOpCodes +from pus_tc.devs.rad_sensor import add_rad_sens_cmds from tmtccmd.config import ( add_op_code_entry, add_service_op_code_entry, @@ -775,20 +776,6 @@ def add_imtq_cmds(cmd_dict: ServiceOpCodeDictT): cmd_dict[CustomServiceList.IMTQ.value] = service_imtq_tuple -def add_rad_sens_cmds(cmd_dict: ServiceOpCodeDictT): - op_code_dict_srv_rad_sensor = { - "0": ("Radiation Sensor: Set mode on", {OpCodeDictKeys.TIMEOUT: 2.0}), - "1": ("Radiation Sensor: Set mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}), - "2": ("Radiation Sensor: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}), - "3": ("Radiation Sensor: Start conversions", {OpCodeDictKeys.TIMEOUT: 2.0}), - "4": ("Radiation Sensor: Read conversions", {OpCodeDictKeys.TIMEOUT: 2.0}), - "5": ("Radiation Sensor: Enable debug output", {OpCodeDictKeys.TIMEOUT: 2.0}), - "6": ("Radiation Sensor: Disable debug putput", {OpCodeDictKeys.TIMEOUT: 2.0}), - } - service_rad_sensor_tuple = ("Radiation Sensor", op_code_dict_srv_rad_sensor) - cmd_dict[CustomServiceList.RAD_SENSOR.value] = service_rad_sensor_tuple - - def add_ploc_mpsoc_cmds(cmd_dict: ServiceOpCodeDictT): op_code_dict_srv_ploc_mpsoc = { "0": ("Ploc MPSoC: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}), diff --git a/pus_tc/devs/rad_sensor.py b/pus_tc/devs/rad_sensor.py index 8fe63b4..7d66597 100644 --- a/pus_tc/devs/rad_sensor.py +++ b/pus_tc/devs/rad_sensor.py @@ -7,11 +7,37 @@ """ import struct -from tmtccmd.config.definitions import QueueCommands +from config.definitions import CustomServiceList +from tmtccmd.config import add_op_code_entry, add_service_op_code_entry +from tmtccmd.config.definitions import QueueCommands, ServiceOpCodeDictT, OpCodeDictKeys from tmtccmd.tc.packer import TcQueueT from spacepackets.ecss.tc import PusTelecommand from pus_tc.service_200_mode import pack_mode_data, Modes +from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid +from tmtccmd.utility import ObjectId + + +class SetIds: + HK = 3 + + +class OpCodes: + ON = ["0", "on"] + NORMAL = ["1", "normal"] + OFF = ["2", "off"] + REQ_HK_ONCE = ["3", "hk-os"] + DEBUG_ON = ["10", "dbg-on"] + DEBUG_OFF = ["11", "dbg-off"] + + +class Info: + ON = "Switch Rad Sensor on" + NORMAL = "Switch Rad Sensor normal" + OFF = "Switch Rad sensor off" + REQ_OS_HK = "Request one-shot HK" + DEBUG_ON = "Switch debug output on" + DEBUG_OFF = "Switch debug output off" class CommandIds: @@ -21,52 +47,64 @@ class CommandIds: DISABLE_DEBUG_OUTPUT = 5 -def pack_rad_sensor_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str): - tc_queue.appendleft( - ( - QueueCommands.PRINT, - "Testing radiation sensor handler with object id: 0x" + object_id.hex(), - ) +def add_rad_sens_cmds(cmd_dict: ServiceOpCodeDictT): + + op_code_dict = dict() + add_op_code_entry(op_code_dict=op_code_dict, info=Info.ON, keys=OpCodes.ON) + add_op_code_entry(op_code_dict=op_code_dict, info=Info.OFF, keys=OpCodes.OFF) + add_op_code_entry(op_code_dict=op_code_dict, info=Info.NORMAL, keys=OpCodes.NORMAL) + add_op_code_entry( + op_code_dict=op_code_dict, info=Info.REQ_OS_HK, keys=OpCodes.REQ_HK_ONCE + ) + add_op_code_entry( + op_code_dict=op_code_dict, info=Info.DEBUG_ON, keys=OpCodes.DEBUG_ON + ) + add_op_code_entry( + op_code_dict=op_code_dict, info=Info.DEBUG_OFF, keys=OpCodes.DEBUG_OFF + ) + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name=CustomServiceList.RAD_SENSOR.value, + info="Radiation Sensor", + op_code_entry=op_code_dict, ) - if op_code == "0": - tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Switch to mode on")) - mode_data = pack_mode_data(object_id, Modes.ON, 0) - command = PusTelecommand(service=200, subservice=1, ssc=41, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) - if op_code == "1": - tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Switch to mode normal")) - mode_data = pack_mode_data(object_id, Modes.NORMAL, 0) - command = PusTelecommand(service=200, subservice=1, ssc=42, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) +def pack_rad_sensor_test_into(object_id: ObjectId, tc_queue: TcQueueT, op_code: str): + tc_queue.appendleft( + (QueueCommands.PRINT, f"Commanding Radiation sensor handler {object_id}") + ) - if op_code == "2": - tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Switch to mode off")) - mode_data = pack_mode_data(object_id, Modes.OFF, 0) - command = PusTelecommand(service=200, subservice=1, ssc=42, app_data=mode_data) + if op_code in OpCodes.ON: + rad_sensor_mode_cmd(object_id, Modes.ON, Info.ON, tc_queue) + if op_code in OpCodes.NORMAL: + rad_sensor_mode_cmd(object_id, Modes.NORMAL, Info.NORMAL, tc_queue) + if op_code in OpCodes.OFF: + rad_sensor_mode_cmd(object_id, Modes.OFF, Info.OFF, tc_queue) + if op_code in OpCodes.REQ_HK_ONCE: + tc_queue.appendleft((QueueCommands.PRINT, f"Rad sensor: {Info.REQ_OS_HK}")) + cmd = generate_one_hk_command( + sid=make_sid(object_id.as_bytes, set_id=SetIds.HK), ssc=0 + ) + tc_queue.appendleft(cmd.pack_command_tuple()) + if op_code in OpCodes.DEBUG_ON: + tc_queue.appendleft((QueueCommands.PRINT, f"Rad sensor: {Info.DEBUG_ON}")) + command = object_id.as_bytes + struct.pack("!I", CommandIds.ENABLE_DEBUG_OUTPUT) + command = PusTelecommand(service=8, subservice=128, ssc=45, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) - - if op_code == "3": - tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Start conversions")) - command = object_id + struct.pack("!I", CommandIds.START_CONVERSIONS) - command = PusTelecommand(service=8, subservice=128, ssc=43, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) - - if op_code == "4": - tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Read conversions")) - command = object_id + struct.pack("!I", CommandIds.READ_CONVERSIONS) - command = PusTelecommand(service=8, subservice=128, ssc=44, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) - - if op_code == "5": - tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Enable debug output")) - command = object_id + struct.pack("!I", CommandIds.ENABLE_DEBUG_OUTPUT) + if op_code in OpCodes.DEBUG_OFF: + tc_queue.appendleft((QueueCommands.PRINT, f"Rad sensor: {Info.DEBUG_OFF}")) + command = object_id.as_bytes + struct.pack( + "!I", CommandIds.DISABLE_DEBUG_OUTPUT + ) command = PusTelecommand(service=8, subservice=128, ssc=45, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) - if op_code == "6": - tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Disable debug output")) - command = object_id + struct.pack("!I", CommandIds.DISABLE_DEBUG_OUTPUT) - command = PusTelecommand(service=8, subservice=128, ssc=45, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) + +def rad_sensor_mode_cmd( + object_id: ObjectId, mode: Modes, info: str, tc_queue: TcQueueT +): + tc_queue.appendleft((QueueCommands.PRINT, f"Rad sensor: {info}")) + mode_data = pack_mode_data(object_id.as_bytes, mode, 0) + command = PusTelecommand(service=200, subservice=1, ssc=41, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) diff --git a/pus_tc/prompt_parameters.py b/pus_tc/prompt_parameters.py index aa7dacb..feca256 100644 --- a/pus_tc/prompt_parameters.py +++ b/pus_tc/prompt_parameters.py @@ -1,5 +1,11 @@ from PyQt5.QtWidgets import ( - QDialog, QDialogButtonBox, QVBoxLayout, QLabel, QGroupBox, QGridLayout, QLineEdit + QDialog, + QDialogButtonBox, + QVBoxLayout, + QLabel, + QGroupBox, + QGridLayout, + QLineEdit, ) from PyQt5 import QtCore @@ -16,12 +22,13 @@ class Parameter: self.widget = widget self.value = self.defaultValue self.widget.setPlaceholderText(self.defaultValue) - + def reset(self): self.value = self.defaultValue self.widget.setPlaceholderText(self.defaultValue) self.widget.setText("") + class ParameterDialog(QDialog): def __init__(self): super().__init__() @@ -42,7 +49,7 @@ class ParameterDialog(QDialog): self.rootLayout.addWidget(self.group) self.rootLayout.addWidget(self.buttonBox) self.setLayout(self.rootLayout) - + self.groupLayout = QGridLayout() self.group.setLayout(self.groupLayout) @@ -59,7 +66,6 @@ class ParameterDialog(QDialog): self.parameters[name] = parameter - def _reset(self): for value in self.parameters.values(): value.reset() @@ -80,14 +86,13 @@ class ParameterDialog(QDialog): super().reject() - - - """Prompt the user to specify additional Parameters :param parameterList: array of dictionaries with name and defaultValue attributes :return: dict with all names as key and user supplied input as value string """ + + def prompt_parameters(parameterList): gui = get_global(CoreGlobalIds.GUI) mode = get_global(CoreGlobalIds.MODE) @@ -95,9 +100,10 @@ def prompt_parameters(parameterList): # gui only works in cont mode right now if gui and mode == CoreModeList.CONTINUOUS_MODE: return _gui_prompt(parameterList) - else: + else: return _cli_prompt(parameterList) + def _gui_prompt(parameterList): dialog = ParameterDialog() for parameter in parameterList: @@ -105,11 +111,14 @@ def _gui_prompt(parameterList): dialog.exec_() return dialog.getParameters() + def _cli_prompt(parameterList): result = {} for parameter in parameterList: - userInput = input("Specify {} [{}]: ".format(parameter["name"], parameter["defaultValue"])) + userInput = input( + "Specify {} [{}]: ".format(parameter["name"], parameter["defaultValue"]) + ) if userInput == "": userInput = parameter["defaultValue"] result[parameter["name"]] = userInput - return result \ No newline at end of file + return result diff --git a/pus_tc/system/controllers.py b/pus_tc/system/controllers.py index 1f44f32..b7949d2 100644 --- a/pus_tc/system/controllers.py +++ b/pus_tc/system/controllers.py @@ -18,8 +18,12 @@ class Info: def pack_controller_commands(tc_queue: TcQueueT, op_code: str): - parameters = prompt_parameters([{"name": "Mode", "defaultValue": "2"}, { - "name": "Submode", "defaultValue": "0"}]) + parameters = prompt_parameters( + [ + {"name": "Mode", "defaultValue": "2"}, + {"name": "Submode", "defaultValue": "0"}, + ] + ) mode = int(parameters["Mode"]) if mode < 0 or mode > 2: print("Invalid Mode, defaulting to OFF") diff --git a/pus_tc/tc_packer_hook.py b/pus_tc/tc_packer_hook.py index c2e3f60..9a671d9 100644 --- a/pus_tc/tc_packer_hook.py +++ b/pus_tc/tc_packer_hook.py @@ -69,6 +69,7 @@ from config.object_ids import ( SYRLINKS_HANDLER_ID, SOLAR_ARRAY_DEPLOYMENT_ID, RW_ASSEMBLY, + get_object_ids, ) @@ -98,6 +99,7 @@ def pre_tc_send_cb( def pack_service_queue_user( service: Union[str, int], op_code: str, service_queue: TcQueueT ): + obj_id_man = get_object_ids() if service == CoreServiceList.SERVICE_5.value: return pack_generic_service5_test_into(tc_queue=service_queue) if service == CoreServiceList.SERVICE_17.value: @@ -170,7 +172,7 @@ def pack_service_queue_user( object_id=RW4_ID, rw_idx=4, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.RAD_SENSOR.value: - object_id = RAD_SENSOR_ID + object_id = obj_id_man.get(RAD_SENSOR_ID) return pack_rad_sensor_test_into( object_id=object_id, tc_queue=service_queue, op_code=op_code ) diff --git a/pus_tm/devs/rad_sensor.py b/pus_tm/devs/rad_sensor.py new file mode 100644 index 0000000..de0e65d --- /dev/null +++ b/pus_tm/devs/rad_sensor.py @@ -0,0 +1,24 @@ +import struct + +from pus_tm.defs import PrintWrapper +from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter +from pus_tc.devs.rad_sensor import SetIds + + +def handle_rad_sensor_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): + if set_id == SetIds.HK: + pw = PrintWrapper(printer) + current_idx = 0 + pw.dlog("Received Radiation Sensor HK data") + fmt_str = "!fHHHHHH" + inc_len = struct.calcsize(fmt_str) + (temp, ain0, ain1, ain4, ain5, ain6, ain7) = struct.unpack( + fmt_str, hk_data[current_idx : current_idx + inc_len] + ) + ain_dict = {0: ain0, 1: ain1, 4: ain4, 5: ain5, 6: ain6, 7: ain7} + pw.dlog(f"Temperature: {temp} C") + pw.dlog(f"AIN Channel | Raw Value (hex) | Raw Value (dec)") + for idx, val in ain_dict.items(): + pw.dlog(f"{idx} | {val:#06x} | {str(val).ljust(5)}") + current_idx += inc_len + printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=7) diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py index a7178b0..02f3866 100644 --- a/pus_tm/hk_handling.py +++ b/pus_tm/hk_handling.py @@ -1,6 +1,7 @@ """HK Handling for EIVE OBSW""" import struct +from pus_tm.devs.rad_sensor import handle_rad_sensor_data from pus_tm.system.tcs import handle_thermal_controller_hk_data, TM_TCP_SERVER from tmtccmd.config.definitions import HkReplyUnpacked from tmtccmd.tm.pus_3_fsfw_hk import ( @@ -92,6 +93,8 @@ def handle_regular_hk_print( return handle_pdu_data( printer=printer, pdu_idx=2, set_id=set_id, hk_data=hk_data ) + if objb == obj_ids.RAD_SENSOR_ID: + return handle_rad_sensor_data(printer=printer, hk_data=hk_data, set_id=set_id) if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: return handle_rw_hk_data( printer=printer, object_id=object_id, set_id=set_id, hk_data=hk_data diff --git a/tmtcgui.py b/tmtcgui.py index 6366d6f..b36f61c 100755 --- a/tmtcgui.py +++ b/tmtcgui.py @@ -2,10 +2,11 @@ """TMTC commander for EIVE""" from tmtcc import tmtcc_post_args, tmtcc_pre_args from tmtccmd.config.args import ( - create_default_args_parser, - add_gui_tmtccmd_args, - parse_gui_input_arguments, - ) + create_default_args_parser, + add_gui_tmtccmd_args, + parse_gui_input_arguments, +) + def main(): hook_obj = tmtcc_pre_args()