From 7e36b387fe83505bcd0cedbeabd1ba01fd912af9 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 23 May 2022 17:50:08 +0200 Subject: [PATCH] add rtd cmds --- config/definitions.py | 1 + config/object_ids.py | 18 ++++++ pus_tc/cmd_definitions.py | 2 + pus_tc/devs/rtd.py | 113 ++++++++++++++++++++++++++++++++++++++ pus_tc/tc_packer_hook.py | 3 + 5 files changed, 137 insertions(+) create mode 100644 pus_tc/devs/rtd.py diff --git a/config/definitions.py b/config/definitions.py index 708701f..5aa4174 100644 --- a/config/definitions.py +++ b/config/definitions.py @@ -48,5 +48,6 @@ class CustomServiceList(enum.Enum): TCS_ASS = "tcs-ass" TIME = "time" PROCEDURE = "proc" + RTD = "rtd" TVTTESTPROCEDURE = "tvtestproc" CONTROLLERS = "controllers" diff --git a/config/object_ids.py b/config/object_ids.py index b2b3aaf..045d671 100644 --- a/config/object_ids.py +++ b/config/object_ids.py @@ -77,6 +77,24 @@ HEATER_5_STR = bytes([0x60, 0x00, 0x00, 0x05]) HEATER_6_DRO = bytes([0x60, 0x00, 0x00, 0x06]) HEATER_7_HPA = bytes([0x60, 0x00, 0x00, 0x07]) +# RTDs +RTD_0_PLOC_HSPD = bytes([0x44, 0x42, 0x00, 0x16]) +RTD_1_PLOC_MISSIONBRD= bytes([0x44, 0x42, 0x00, 0x17]) +RTD_2_4K_CAM = bytes([0x44, 0x42, 0x00, 0x18]) +RTD_3_DAC_HSPD = bytes([0x44, 0x42, 0x00, 0x19]) +RTD_4_STR = bytes([0x44, 0x42, 0x00, 0x20]) +RTD_5_RW1_MX_MY = bytes([0x44, 0x42, 0x00, 0x21]) +RTD_6_DRO = bytes([0x44, 0x42, 0x00, 0x22]) +RTD_7_SCEX = bytes([0x44, 0x42, 0x00, 0x23]) +RTD_8_X8 = bytes([0x44, 0x42, 0x00, 0x24]) +RTD_9_HPA = bytes([0x44, 0x42, 0x00, 0x25]) +RTD_10_PL_TX = bytes([0x44, 0x42, 0x00, 0x26]) +RTD_11_MPA = bytes([0x44, 0x42, 0x00, 0x27]) +RTD_12_ACU = bytes([0x44, 0x42, 0x00, 0x28]) +RTD_13_PLPCDU_HSPD = bytes([0x44, 0x42, 0x00, 0x29]) +RTD_14_TCS_BRD = bytes([0x44, 0x42, 0x00, 0x30]) +RTD_15_IMTQ = bytes([0x44, 0x42, 0x00, 0x31]) + # System and Assembly Objects ACS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x01]) SUS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x02]) diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py index 4d5920a..655afc3 100644 --- a/pus_tc/cmd_definitions.py +++ b/pus_tc/cmd_definitions.py @@ -8,6 +8,7 @@ from tmtccmd.config import ( ) from config.definitions import CustomServiceList from pus_tc.devs.heater import add_heater_cmds +from pus_tc.devs.rtd import specify_rtd_cmds from pus_tc.devs.reaction_wheels import add_rw_cmds from pus_tc.devs.bpx_batt import BpxOpCodes @@ -20,6 +21,7 @@ def get_eive_service_op_code_dict() -> ServiceOpCodeDictT: add_core_controller_definitions(cmd_dict=service_op_code_dict) add_pl_pcdu_cmds(cmd_dict=service_op_code_dict) add_pcdu_cmds(cmd_dict=service_op_code_dict) + specify_rtd_cmds(cmd_dict=service_op_code_dict) add_imtq_cmds(cmd_dict=service_op_code_dict) add_rad_sens_cmds(cmd_dict=service_op_code_dict) add_rw_cmds(cmd_dict=service_op_code_dict) diff --git a/pus_tc/devs/rtd.py b/pus_tc/devs/rtd.py new file mode 100644 index 0000000..f15dc32 --- /dev/null +++ b/pus_tc/devs/rtd.py @@ -0,0 +1,113 @@ +from typing import Optional + +from config.definitions import CustomServiceList +from spacepackets.ecss import PusTelecommand +from tmtccmd.config import ServiceOpCodeDictT, add_op_code_entry, add_service_op_code_entry +from tmtccmd.tc.definitions import TcQueueT +from tmtccmd.utility import ObjectId +from tmtccmd.tc.pus_200_fsfw_modes import Modes, pack_mode_data, Subservices +import config.object_ids as oids +from config.object_ids import get_object_ids + +RTD_IDS = [ + oids.RTD_0_PLOC_HSPD, + oids.RTD_1_PLOC_MISSIONBRD, + oids.RTD_2_4K_CAM, + oids.RTD_3_DAC_HSPD, + oids.RTD_4_STR, + oids.RTD_5_RW1_MX_MY, + oids.RTD_6_DRO, + oids.RTD_7_SCEX, + oids.RTD_8_X8, + oids.RTD_9_HPA, + oids.RTD_10_PL_TX, + oids.RTD_11_MPA, + oids.RTD_12_ACU, + oids.RTD_13_PLPCDU_HSPD, + oids.RTD_14_TCS_BRD, + oids.RTD_15_IMTQ +] + + +class OpCodes: + ON = ["0", "on"] + OFF = ["1", "off"] + NORMAL = ["2", "normal"] + + +class Info: + ON = "Switch handler on" + OFF = "Switch handler off" + NORMAL = "Switch handler normal" + + +def specify_rtd_cmds(cmd_dict: ServiceOpCodeDictT): + op_code_dict = dict() + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.ON, + info=Info.ON + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.NORMAL, + info=Info.NORMAL + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.OFF, + info=Info.OFF + ) + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + op_code_entry=op_code_dict, + name=CustomServiceList.RTD.value, + info="RTD commands" + ) + + +def pack_rtd_commands(op_code: str, object_id: Optional[ObjectId], tc_queue: TcQueueT): + if object_id is not None and object_id not in RTD_IDS: + print("Specified object ID not a valid RTD ID") + object_id = None + if object_id is None: + tgt_rtd_idx = prompt_rtd_idx() + object_id_dict = get_object_ids() + object_id = object_id_dict.get(RTD_IDS[tgt_rtd_idx]) + if op_code in OpCodes.ON: + app_data = pack_mode_data(object_id=object_id.as_bytes, mode=Modes.ON, submode=0) + cmd = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=app_data + ) + tc_queue.appendleft(cmd.pack_command_tuple()) + if op_code in OpCodes.NORMAL: + app_data = pack_mode_data(object_id=object_id.as_bytes, mode=Modes.NORMAL, submode=0) + cmd = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=app_data + ) + tc_queue.appendleft(cmd.pack_command_tuple()) + if op_code in OpCodes.OFF: + app_data = pack_mode_data(object_id=object_id.as_bytes, mode=Modes.OFF, submode=0) + cmd = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=app_data + ) + tc_queue.appendleft(cmd.pack_command_tuple()) + + +def prompt_rtd_idx(): + while True: + rtd_idx = input("Please specify RTD index [0-15]: ") + if not rtd_idx.isdigit(): + print("Invalid input") + continue + rtd_idx = int(rtd_idx) + if rtd_idx < 0 or rtd_idx > 15: + print("Invalid device index") + continue + return rtd_idx diff --git a/pus_tc/tc_packer_hook.py b/pus_tc/tc_packer_hook.py index c2e3f60..97bb051 100644 --- a/pus_tc/tc_packer_hook.py +++ b/pus_tc/tc_packer_hook.py @@ -5,6 +5,7 @@ import os from collections import deque from typing import Union +from pus_tc.devs.rtd import pack_rtd_commands from spacepackets.ecss import PusTelecommand from tmtccmd.com_if.com_interface_base import CommunicationInterface from tmtccmd.config.definitions import CoreServiceList, QueueCommands @@ -111,6 +112,8 @@ def pack_service_queue_user( return pack_p60dock_cmds( object_id=object_id, tc_queue=service_queue, op_code=op_code ) + if service == CustomServiceList.RTD.value: + return pack_rtd_commands(object_id=None, tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.PDU1.value: object_id = PDU_1_HANDLER_ID return pack_pdu1_commands(