From cb479cc2f48ed15668a5b223529dc1cf1ee5b6ad Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 22 Mar 2022 19:29:55 +0100 Subject: [PATCH 1/4] added TCS board ASS CMDS --- config/definitions.py | 1 + config/object_ids.py | 1 + pus_tc/cmd_definitions.py | 21 ++++++++++++++ pus_tc/devs/ploc_mpsoc.py | 60 ++++++++++++++++++++++++++------------- pus_tc/system/acs.py | 44 ++++++++++------------------ pus_tc/system/common.py | 18 ++++++++++++ pus_tc/system/tcs.py | 34 ++++++++++++++++++++++ pus_tc/tc_packer_hook.py | 7 ++++- pus_tm/service_8_hook.py | 7 +++-- 9 files changed, 142 insertions(+), 51 deletions(-) create mode 100644 pus_tc/system/common.py create mode 100644 pus_tc/system/tcs.py diff --git a/config/definitions.py b/config/definitions.py index bc6c639..476e8e4 100644 --- a/config/definitions.py +++ b/config/definitions.py @@ -44,3 +44,4 @@ class CustomServiceList(enum.Enum): SYRLINKS = "syrlinks" ACS_ASS = "acs-ass" SUS_ASS = "sus-ass" + TCS_ASS = "tcs-ass" diff --git a/config/object_ids.py b/config/object_ids.py index 04a906e..08ecc6c 100644 --- a/config/object_ids.py +++ b/config/object_ids.py @@ -69,6 +69,7 @@ PL_PCDU_ID = bytes([0x44, 0x30, 0x00, 0x00]) # System and Assembly Objects ACS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x01]) SUS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x02]) +TCS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x03]) def get_object_ids() -> ObjectIdDictT: diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py index 44395d7..30c7702 100644 --- a/pus_tc/cmd_definitions.py +++ b/pus_tc/cmd_definitions.py @@ -600,6 +600,7 @@ def add_ploc_supv_cmds(cmd_dict: ServiceOpCodeDictT): def add_system_cmds(cmd_dict: ServiceOpCodeDictT): from pus_tc.system.acs import AcsOpCodes, SusOpCodes + import pus_tc.system.tcs as tcs default_opts = generate_op_code_options( enter_listener_mode=False, custom_timeout=8.0 @@ -685,3 +686,23 @@ def add_system_cmds(cmd_dict: ServiceOpCodeDictT): info="SUS Assembly", op_code_entry=op_code_dict, ) + + op_code_dict = dict() + add_op_code_entry( + op_code_dict=op_code_dict, + keys=tcs.OpCodes.TCS_BOARD_ASS_NORMAL, + info=tcs.Info.TCS_BOARD_ASS_NORMAL, + options=default_opts, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=tcs.OpCodes.TCS_BOARD_ASS_OFF, + info=tcs.Info.TCS_BOARD_ASS_OFF, + options=default_opts, + ) + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name=CustomServiceList.TCS_ASS.value, + info="TCS Board Assembly", + op_code_entry=op_code_dict, + ) diff --git a/pus_tc/devs/ploc_mpsoc.py b/pus_tc/devs/ploc_mpsoc.py index 0d82cba..2621b38 100644 --- a/pus_tc/devs/ploc_mpsoc.py +++ b/pus_tc/devs/ploc_mpsoc.py @@ -47,7 +47,7 @@ class PlocReplyIds: def pack_ploc_mpsoc_commands( - object_id: bytearray, tc_queue: TcQueueT, op_code: str + object_id: bytearray, tc_queue: TcQueueT, op_code: str ) -> TcQueueT: tc_queue.appendleft( ( @@ -58,7 +58,9 @@ def pack_ploc_mpsoc_commands( if op_code == "0": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: TC mem write test")) - memory_address = int(input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16) + memory_address = int( + input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16 + ) memory_data = int(input("PLOC MPSoC: Tc Mem Write: Type memory data: 0x"), 16) # TODO: implement variable length mem write command mem_len = 1 # 1 32-bit word @@ -89,7 +91,7 @@ def pack_ploc_mpsoc_commands( tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "5": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Replay stop")) - command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_STOP) + command = object_id + struct.pack("!I", CommandIds.TC_REPLAY_STOP) command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "6": @@ -99,7 +101,7 @@ def pack_ploc_mpsoc_commands( tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "7": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Downlink pwr off")) - command = object_id + struct.pack('!I', CommandIds.TC_DOWNLINK_PWR_OFF) + command = object_id + struct.pack("!I", CommandIds.TC_DOWNLINK_PWR_OFF) command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "8": @@ -112,7 +114,7 @@ def pack_ploc_mpsoc_commands( def generate_write_mem_command( - object_id: bytearray, memory_address: int, memory_data: int, mem_len: int + object_id: bytearray, memory_address: int, memory_data: int, mem_len: int ) -> bytearray: """This function generates the command to write to a memory address within the PLOC @param object_id The object id of the PlocHandler @@ -120,11 +122,11 @@ def generate_write_mem_command( @param memory_data The data to write to the memory address specified by the bytearray memory_address. """ command = ( - object_id - + struct.pack('!I', CommandIds.TC_MEM_WRITE) - + struct.pack('!I', memory_address) - + struct.pack('!H', mem_len) - + struct.pack("!I", memory_data) + object_id + + struct.pack("!I", CommandIds.TC_MEM_WRITE) + + struct.pack("!I", memory_address) + + struct.pack("!H", mem_len) + + struct.pack("!I", memory_data) ) return command @@ -133,43 +135,63 @@ def prepare_mem_read_command(object_id: bytearray) -> bytearray: memory_address = int(input("PLOC MPSoC Tc Mem Read: Type memory address: 0x"), 16) num_words = int(input("PLOC MPSoC specify number of words (32-bit) to read: ")) command = ( - object_id + struct.pack('!I', CommandIds.TC_MEM_READ) + struct.pack("!I", memory_address) + struct.pack( - '!H', num_words) + object_id + + struct.pack("!I", CommandIds.TC_MEM_READ) + + struct.pack("!I", memory_address) + + struct.pack("!H", num_words) ) return command def prepare_flash_write_cmd(object_id: bytearray) -> bytearray: file = get_flash_write_file() - command = object_id + struct.pack('!I', CommandIds.FLASH_WRITE) + bytearray(file, 'utf-8') + command = ( + object_id + struct.pack("!I", CommandIds.FLASH_WRITE) + bytearray(file, "utf-8") + ) return command def prepare_flash_delete_cmd(object_id: bytearray) -> bytearray: file = get_mpsoc_file() - command = object_id + struct.pack('!I', CommandIds.TC_FLASH_DELETE) + bytearray(file, 'utf-8') + command = ( + object_id + + struct.pack("!I", CommandIds.TC_FLASH_DELETE) + + bytearray(file, "utf-8") + ) return command def prepare_replay_start_cmd(object_id: bytearray) -> bytearray: replay = int(input("Specify replay mode (0 - repeated, 1 - once): ")) - command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_START) + struct.pack('!B', replay) + command = ( + object_id + + struct.pack("!I", CommandIds.TC_REPLAY_START) + + struct.pack("!B", replay) + ) return command def prepare_downlink_pwr_on_cmd(object_id: bytearray) -> bytearray: mode = int(input("Specify JESD mode (0 - 5): ")) lane_rate = int(input("Specify lane rate (0 - 9): ")) - command = object_id + struct.pack('!I', CommandIds.TC_DOWNLINK_PWR_ON) + struct.pack('!B', mode) \ - + struct.pack('!B', lane_rate) + command = ( + object_id + + struct.pack("!I", CommandIds.TC_DOWNLINK_PWR_ON) + + struct.pack("!B", mode) + + struct.pack("!B", lane_rate) + ) return command def prepare_replay_write_sequence_cmd(object_id: bytearray) -> bytearray: use_decoding = int(input("Use decoding (set to 1): ")) file = get_mpsoc_file() - command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_WRITE_SEQUENCE) + struct.pack('!B', use_decoding) + \ - bytearray(file, 'utf-8') + command = ( + object_id + + struct.pack("!I", CommandIds.TC_REPLAY_WRITE_SEQUENCE) + + struct.pack("!B", use_decoding) + + bytearray(file, "utf-8") + ) return command diff --git a/pus_tc/system/acs.py b/pus_tc/system/acs.py index e65b61f..109a8e7 100644 --- a/pus_tc/system/acs.py +++ b/pus_tc/system/acs.py @@ -1,9 +1,10 @@ import enum -from tmtccmd.tc.definitions import TcQueueT, QueueCommands -from spacepackets.ecss.tc import PusTelecommand -from tmtccmd.tc.service_200_mode import pack_mode_data, Modes, Subservices +from tmtccmd.tc.definitions import TcQueueT +from tmtccmd.tc.service_200_mode import Modes from config.object_ids import ACS_BOARD_ASS_ID, SUS_BOARD_ASS_ID +from .common import command_assembly + class AcsOpCodes: ACS_ASS_A_SIDE = ["0", "acs-a"] @@ -30,7 +31,7 @@ class DualSideSubmodes(enum.IntEnum): def pack_acs_command(tc_queue: TcQueueT, op_code: str): if op_code in AcsOpCodes.ACS_ASS_A_SIDE: - command_acs_board( + command_assembly( object_id=ACS_BOARD_ASS_ID, mode=Modes.NORMAL, submode=DualSideSubmodes.A_SIDE, @@ -38,7 +39,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str): info="Switching to ACS board assembly A side", ) if op_code in AcsOpCodes.ACS_ASS_B_SIDE: - command_acs_board( + command_assembly( object_id=ACS_BOARD_ASS_ID, mode=Modes.NORMAL, submode=DualSideSubmodes.B_SIDE, @@ -46,7 +47,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str): info="Switching to ACS board assembly B side", ) if op_code in AcsOpCodes.ACS_ASS_DUAL_MODE: - command_acs_board( + command_assembly( object_id=ACS_BOARD_ASS_ID, mode=Modes.NORMAL, submode=DualSideSubmodes.DUAL_SIDE, @@ -54,7 +55,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str): info="Switching to ACS board assembly dual mode", ) if op_code in AcsOpCodes.ACS_ASS_A_ON: - command_acs_board( + command_assembly( object_id=ACS_BOARD_ASS_ID, mode=Modes.ON, submode=DualSideSubmodes.A_SIDE, @@ -62,7 +63,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str): info="Switching ACS board assembly A side on", ) if op_code in AcsOpCodes.ACS_ASS_B_ON: - command_acs_board( + command_assembly( object_id=ACS_BOARD_ASS_ID, mode=Modes.ON, submode=DualSideSubmodes.B_SIDE, @@ -70,7 +71,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str): info="Switching ACS board assembly B side on", ) if op_code in AcsOpCodes.ACS_ASS_DUAL_ON: - command_acs_board( + command_assembly( object_id=ACS_BOARD_ASS_ID, mode=Modes.ON, submode=DualSideSubmodes.B_SIDE, @@ -78,7 +79,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str): info="Switching ACS board assembly dual side on", ) if op_code in AcsOpCodes.ACS_ASS_OFF: - command_acs_board( + command_assembly( object_id=ACS_BOARD_ASS_ID, mode=Modes.OFF, submode=0, @@ -89,7 +90,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str): def pack_sus_cmds(tc_queue: TcQueueT, op_code: str): if op_code in SusOpCodes.SUS_ASS_NOM_SIDE: - command_acs_board( + command_assembly( object_id=SUS_BOARD_ASS_ID, mode=Modes.NORMAL, submode=DualSideSubmodes.A_SIDE, @@ -97,7 +98,7 @@ def pack_sus_cmds(tc_queue: TcQueueT, op_code: str): info="Switching to SUS board to nominal side", ) if op_code in SusOpCodes.SUS_ASS_RED_SIDE: - command_acs_board( + command_assembly( object_id=SUS_BOARD_ASS_ID, mode=Modes.NORMAL, submode=DualSideSubmodes.B_SIDE, @@ -105,7 +106,7 @@ def pack_sus_cmds(tc_queue: TcQueueT, op_code: str): info="Switching to SUS board to redundant side", ) if op_code in SusOpCodes.SUS_ASS_OFF: - command_acs_board( + command_assembly( object_id=SUS_BOARD_ASS_ID, mode=Modes.OFF, submode=0, @@ -113,25 +114,10 @@ def pack_sus_cmds(tc_queue: TcQueueT, op_code: str): info="Switching SUS board off", ) if op_code in SusOpCodes.SUS_ASS_DUAL_MODE: - command_acs_board( + command_assembly( object_id=SUS_BOARD_ASS_ID, mode=Modes.NORMAL, submode=DualSideSubmodes.DUAL_SIDE, tc_queue=tc_queue, info="Switching to SUS board to dual side", ) - - -def command_acs_board( - object_id: bytes, mode: Modes, submode: int, tc_queue: TcQueueT, info: str -): - tc_queue.appendleft((QueueCommands.PRINT, info)) - mode_data = pack_mode_data( - object_id=object_id, - mode=mode, - submode=submode, - ) - cmd = PusTelecommand( - service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data - ) - tc_queue.appendleft(cmd.pack_command_tuple()) diff --git a/pus_tc/system/common.py b/pus_tc/system/common.py new file mode 100644 index 0000000..a37412a --- /dev/null +++ b/pus_tc/system/common.py @@ -0,0 +1,18 @@ +from tmtccmd.tc.definitions import TcQueueT, QueueCommands +from spacepackets.ecss.tc import PusTelecommand +from tmtccmd.tc.service_200_mode import pack_mode_data, Modes, Subservices + + +def command_assembly( + object_id: bytes, mode: Modes, submode: int, tc_queue: TcQueueT, info: str +): + tc_queue.appendleft((QueueCommands.PRINT, info)) + mode_data = pack_mode_data( + object_id=object_id, + mode=mode, + submode=submode, + ) + cmd = PusTelecommand( + service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data + ) + tc_queue.appendleft(cmd.pack_command_tuple()) diff --git a/pus_tc/system/tcs.py b/pus_tc/system/tcs.py new file mode 100644 index 0000000..75924f6 --- /dev/null +++ b/pus_tc/system/tcs.py @@ -0,0 +1,34 @@ +from tmtccmd.tc.definitions import TcQueueT, QueueCommands +from tmtccmd.tc.service_200_mode import Modes + +from .common import command_assembly +from config.object_ids import TCS_BOARD_ASS_ID + + +class OpCodes: + TCS_BOARD_ASS_NORMAL = ["0", "tcs-normal"] + TCS_BOARD_ASS_OFF = ["1", "tcs-off"] + + +class Info: + TCS_BOARD_ASS_NORMAL = "Switching TCS board assembly on" + TCS_BOARD_ASS_OFF = "Switching TCS board assembly off" + + +def pack_tcs_sys_commands(tc_queue: TcQueueT, op_code: str): + if op_code in OpCodes.TCS_BOARD_ASS_NORMAL: + command_assembly( + object_id=TCS_BOARD_ASS_ID, + mode=Modes.NORMAL, + submode=0, + tc_queue=tc_queue, + info=Info.TCS_BOARD_ASS_NORMAL, + ) + if op_code in OpCodes.TCS_BOARD_ASS_OFF: + command_assembly( + object_id=TCS_BOARD_ASS_ID, + mode=Modes.NORMAL, + submode=0, + tc_queue=tc_queue, + info=Info.TCS_BOARD_ASS_OFF, + ) diff --git a/pus_tc/tc_packer_hook.py b/pus_tc/tc_packer_hook.py index 3db17f9..bda998f 100644 --- a/pus_tc/tc_packer_hook.py +++ b/pus_tc/tc_packer_hook.py @@ -34,6 +34,7 @@ from pus_tc.devs.gps import pack_gps_command from pus_tc.system.acs import pack_acs_command, pack_sus_cmds from pus_tc.devs.plpcdu import pack_pl_pcdu_commands from pus_tc.devs.str_img_helper import pack_str_img_helper_command +from pus_tc.system.tcs import pack_tcs_sys_commands from config.definitions import CustomServiceList from config.object_ids import ( P60_DOCK_HANDLER, @@ -120,7 +121,9 @@ def pack_service_queue_user( ) if service == CustomServiceList.PLOC_MPSOC.value: object_id = PLOC_MPSOC_ID - return pack_ploc_mpsoc_commands(object_id=object_id, tc_queue=service_queue, op_code=op_code) + return pack_ploc_mpsoc_commands( + object_id=object_id, tc_queue=service_queue, op_code=op_code + ) if service == CustomServiceList.REACTION_WHEEL_1.value: object_id = RW1_ID return pack_single_rw_test_into( @@ -205,6 +208,8 @@ def pack_service_queue_user( return pack_pl_pcdu_commands(tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.ACS_ASS.value: return pack_acs_command(tc_queue=service_queue, op_code=op_code) + if service == CustomServiceList.TCS_ASS.value: + return pack_tcs_sys_commands(tc_queue=service_queue, op_code=op_code) LOGGER.warning("Invalid Service !") diff --git a/pus_tm/service_8_hook.py b/pus_tm/service_8_hook.py index a1ef8cf..60d1d97 100644 --- a/pus_tm/service_8_hook.py +++ b/pus_tm/service_8_hook.py @@ -69,8 +69,11 @@ def handle_ploc_replies(action_id: int, custom_data: bytearray) -> DataReplyUnpa "PLOC Mem Len", "PLOC Read Memory Data", ] - reply.content_list = ["0x" + custom_data[:4].hex(), struct.unpack('!H', custom_data[4:6])[0], - "0x" + custom_data[6:10].hex()] + reply.content_list = [ + "0x" + custom_data[:4].hex(), + struct.unpack("!H", custom_data[4:6])[0], + "0x" + custom_data[6:10].hex(), + ] return reply From a55940339850e862b05a06a94fa8381bf7da0743 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 22 Mar 2022 19:33:38 +0100 Subject: [PATCH 2/4] update object list --- config/objects.csv | 1 + 1 file changed, 1 insertion(+) diff --git a/config/objects.csv b/config/objects.csv index 63116d0..9f8ce56 100644 --- a/config/objects.csv +++ b/config/objects.csv @@ -109,6 +109,7 @@ 0x54694269;TEST_TASK 0x73000001;ACS_BOARD_ASS 0x73000002;SUS_BOARD_ASS +0x73000003;TCS_BOARD_ASS 0x73000100;TM_FUNNEL 0x73500000;CCSDS_IP_CORE_BRIDGE 0xFFFFFFFF;NO_OBJECT From 0abdea987a677a629173b6ec9cd2fa8dc265cb24 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 22 Mar 2022 20:35:24 +0100 Subject: [PATCH 3/4] small fix --- pus_tc/system/tcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pus_tc/system/tcs.py b/pus_tc/system/tcs.py index 75924f6..fda1a76 100644 --- a/pus_tc/system/tcs.py +++ b/pus_tc/system/tcs.py @@ -27,7 +27,7 @@ def pack_tcs_sys_commands(tc_queue: TcQueueT, op_code: str): if op_code in OpCodes.TCS_BOARD_ASS_OFF: command_assembly( object_id=TCS_BOARD_ASS_ID, - mode=Modes.NORMAL, + mode=Modes.OFF, submode=0, tc_queue=tc_queue, info=Info.TCS_BOARD_ASS_OFF, From 43a534db9cf8ee14e6c1296dac8b3e2c3c94b240 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 22 Mar 2022 20:43:24 +0100 Subject: [PATCH 4/4] added new event --- config/events.csv | 1 + 1 file changed, 1 insertion(+) diff --git a/config/events.csv b/config/events.csv index 17a6b2e..53d6d6a 100644 --- a/config/events.csv +++ b/config/events.csv @@ -161,6 +161,7 @@ 12301;0x300d;NOT_ENOUGH_DEVICES_DUAL_MODE;HIGH;;mission/system/SusAssembly.h 12302;0x300e;POWER_STATE_MACHINE_TIMEOUT;MEDIUM;;mission/system/SusAssembly.h 12303;0x300f;SIDE_SWITCH_TRANSITION_NOT_ALLOWED;LOW;Not implemented, would increase already high complexity. Operator should instead command the assembly off first and then command the assembly on into the desired mode/submode combination;mission/system/SusAssembly.h +12400;0x3070;CHILDREN_LOST_MODE;MEDIUM;;mission/system/TcsBoardAssembly.h 13600;0x3520;ALLOC_FAILURE;MEDIUM;;bsp_q7s/core/CoreController.h 13601;0x3521;REBOOT_SW;MEDIUM; Software reboot occured. Can also be a systemd reboot. P1: Current Chip, P2: Current Copy;bsp_q7s/core/CoreController.h 13602;0x3522;REBOOT_MECHANISM_TRIGGERED;MEDIUM;The reboot mechanism was triggered. P1: First 16 bits: Last Chip, Last 16 bits: Last Copy, P2: Each byte is the respective reboot count for the slots;bsp_q7s/core/CoreController.h