diff --git a/config/definitions.py b/config/definitions.py index a7e91ce..f9fa985 100644 --- a/config/definitions.py +++ b/config/definitions.py @@ -51,7 +51,7 @@ class CustomServiceList(str, enum.Enum): STR_IMG_HELPER = "str_img_helper" SYRLINKS = "syrlinks" ACS_CTRL = "acs_ctrl" - ACS_SS = "acs_subsysten" + ACS_SS = "acs_subsystem" ACS_BRD_ASS = "acs_brd_ass" SUS_BRD_ASS = "sus_brd_ass" TCS = "tcs" diff --git a/config/object_ids.py b/config/object_ids.py index 180dbd8..cb47a47 100644 --- a/config/object_ids.py +++ b/config/object_ids.py @@ -122,6 +122,7 @@ SUS_5_N_LOC_XFYMZB_PT_ZB = bytes([0x44, 0x12, 0x00, 0x37]) SUS_11_R_LOC_XBYMZB_PT_ZB = bytes([0x44, 0x12, 0x00, 0x43]) # System and Assembly Objects +ACS_SUBSYSTEM_ID = bytes([0x73, 0x01, 0x00, 0x01]) ACS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x01]) SUS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x02]) TCS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x03]) diff --git a/pus_tc/procedure_packer.py b/pus_tc/procedure_packer.py index 0981288..db270da 100644 --- a/pus_tc/procedure_packer.py +++ b/pus_tc/procedure_packer.py @@ -192,12 +192,8 @@ def handle_default_procedure( ) if service == CustomServiceList.PROCEDURE.value: return pack_proc_commands(q=queue_helper, op_code=op_code) - if service == CustomServiceList.SUS_ASS.value: - return pack_sus_cmds(q=queue_helper, op_code=op_code) if service == CustomServiceList.PL_PCDU.value: return pack_pl_pcdu_commands(q=queue_helper, op_code=op_code) - if service == CustomServiceList.ACS_ASS.value: - return pack_acs_command(q=queue_helper, op_code=op_code) if service == CustomServiceList.TCS_ASS.value: return pack_tcs_sys_commands(q=queue_helper, op_code=op_code) if service == CustomServiceList.RW_ASSEMBLY.value: diff --git a/pus_tc/system/controllers.py b/pus_tc/system/controllers.py index f02a907..33252d2 100644 --- a/pus_tc/system/controllers.py +++ b/pus_tc/system/controllers.py @@ -4,7 +4,7 @@ from tmtccmd.tc import DefaultPusQueueHelper from tmtccmd.tc.pus_200_fsfw_modes import Modes from tmtccmd.util import ObjectIdU32, ObjectIdBase -from .common import command_mode +from tmtc.common import pack_mode_cmd_with_info import config.object_ids as obj_ids from pus_tc.prompt_parameters import prompt_parameters_cli, prompt_parameters_gui @@ -36,7 +36,7 @@ def pack_cmd_ctrl_to_prompted_mode( print("Invalid Mode, defaulting to OFF") mode = 0 submode = int(parameters["Submode"]) - command_mode( + pack_mode_cmd_with_info( object_id=object_id.as_bytes, mode=mode, submode=submode, @@ -48,7 +48,7 @@ def pack_cmd_ctrl_to_prompted_mode( def pack_cmd_ctrl_to_off( q: DefaultPusQueueHelper, object_id: Union[ObjectIdBase, ObjectIdU32] ): - command_mode( + pack_mode_cmd_with_info( object_id=object_id.as_bytes, mode=Modes.OFF, submode=0, @@ -58,7 +58,7 @@ def pack_cmd_ctrl_to_off( def pack_cmd_ctrl_to_on(q: DefaultPusQueueHelper, object_id: ObjectIdU32): - command_mode( + pack_mode_cmd_with_info( object_id=object_id.as_bytes, mode=Modes.ON, submode=0, @@ -70,7 +70,7 @@ def pack_cmd_ctrl_to_on(q: DefaultPusQueueHelper, object_id: ObjectIdU32): def pack_cmd_ctrl_to_nml( q: DefaultPusQueueHelper, object_id: Union[ObjectIdBase, ObjectIdU32] ): - command_mode( + pack_mode_cmd_with_info( object_id=object_id.as_bytes, mode=Modes.NORMAL, submode=0, diff --git a/pus_tc/system/tcs.py b/pus_tc/system/tcs.py index 5335d15..05220be 100644 --- a/pus_tc/system/tcs.py +++ b/pus_tc/system/tcs.py @@ -7,11 +7,10 @@ from tmtccmd.tc import DefaultPusQueueHelper from tmtccmd.tc.pus_200_fsfw_modes import Modes from tmtccmd.tc.pus_3_fsfw_hk import ( make_sid, - generate_one_diag_command, generate_one_hk_command, ) -from .common import command_mode +from tmtc.common import pack_mode_cmd_with_info from config.object_ids import TCS_BOARD_ASS_ID, TCS_CONTROLLER @@ -62,7 +61,7 @@ def pack_tcs_sys_commands(q: DefaultPusQueueHelper, op_code: str): def pack_tcs_ass_cmds(q: DefaultPusQueueHelper, op_code: str): if op_code in OpCodes.TCS_BOARD_ASS_NORMAL: - command_mode( + pack_mode_cmd_with_info( object_id=TCS_BOARD_ASS_ID, mode=Modes.NORMAL, submode=0, @@ -70,7 +69,7 @@ def pack_tcs_ass_cmds(q: DefaultPusQueueHelper, op_code: str): info=Info.TCS_BOARD_ASS_NORMAL, ) if op_code in OpCodes.TCS_BOARD_ASS_OFF: - command_mode( + pack_mode_cmd_with_info( object_id=TCS_BOARD_ASS_ID, mode=Modes.OFF, submode=0, diff --git a/tmtc/acs_board.py b/tmtc/acs_board.py index d62ac22..a07b152 100644 --- a/tmtc/acs_board.py +++ b/tmtc/acs_board.py @@ -6,11 +6,12 @@ from tmtccmd.config.tmtc import ( TmtcDefinitionWrapper, OpCodeEntry, ) -from tmtccmd.tc import DefaultPusQueueHelper +from tmtccmd.tc import DefaultPusQueueHelper, service_provider +from tmtccmd.tc.decorator import ServiceProviderParams from tmtccmd.tc.pus_200_fsfw_modes import Modes from config.object_ids import ACS_BOARD_ASS_ID -from pus_tc.system.common import command_mode +from tmtc.common import pack_mode_cmd_with_info class AcsOpCodes: @@ -29,9 +30,12 @@ class DualSideSubmodes(enum.IntEnum): DUAL_SIDE = 2 -def pack_acs_command(q: DefaultPusQueueHelper, op_code: str): +@service_provider(CustomServiceList.ACS_BRD_ASS) +def pack_acs_command(p: ServiceProviderParams): + op_code = p.op_code + q = p.queue_helper if op_code in AcsOpCodes.ACS_ASS_A_SIDE: - command_mode( + pack_mode_cmd_with_info( object_id=ACS_BOARD_ASS_ID, mode=Modes.NORMAL, submode=DualSideSubmodes.A_SIDE, @@ -39,7 +43,7 @@ def pack_acs_command(q: DefaultPusQueueHelper, op_code: str): info="Switching to ACS board assembly A side", ) if op_code in AcsOpCodes.ACS_ASS_B_SIDE: - command_mode( + pack_mode_cmd_with_info( object_id=ACS_BOARD_ASS_ID, mode=Modes.NORMAL, submode=DualSideSubmodes.B_SIDE, @@ -47,7 +51,7 @@ def pack_acs_command(q: DefaultPusQueueHelper, op_code: str): info="Switching to ACS board assembly B side", ) if op_code in AcsOpCodes.ACS_ASS_DUAL_MODE: - command_mode( + pack_mode_cmd_with_info( object_id=ACS_BOARD_ASS_ID, mode=Modes.NORMAL, submode=DualSideSubmodes.DUAL_SIDE, @@ -55,7 +59,7 @@ def pack_acs_command(q: DefaultPusQueueHelper, op_code: str): info="Switching to ACS board assembly dual mode", ) if op_code in AcsOpCodes.ACS_ASS_A_ON: - command_mode( + pack_mode_cmd_with_info( object_id=ACS_BOARD_ASS_ID, mode=Modes.ON, submode=DualSideSubmodes.A_SIDE, @@ -63,7 +67,7 @@ def pack_acs_command(q: DefaultPusQueueHelper, op_code: str): info="Switching ACS board assembly A side on", ) if op_code in AcsOpCodes.ACS_ASS_B_ON: - command_mode( + pack_mode_cmd_with_info( object_id=ACS_BOARD_ASS_ID, mode=Modes.ON, submode=DualSideSubmodes.B_SIDE, @@ -71,7 +75,7 @@ def pack_acs_command(q: DefaultPusQueueHelper, op_code: str): info="Switching ACS board assembly B side on", ) if op_code in AcsOpCodes.ACS_ASS_DUAL_ON: - command_mode( + pack_mode_cmd_with_info( object_id=ACS_BOARD_ASS_ID, mode=Modes.ON, submode=DualSideSubmodes.B_SIDE, @@ -79,7 +83,7 @@ def pack_acs_command(q: DefaultPusQueueHelper, op_code: str): info="Switching ACS board assembly dual side on", ) if op_code in AcsOpCodes.ACS_ASS_OFF: - command_mode( + pack_mode_cmd_with_info( object_id=ACS_BOARD_ASS_ID, mode=Modes.OFF, submode=0, diff --git a/tmtc/acs_subsystem.py b/tmtc/acs_subsystem.py index 277d01c..8f1e201 100644 --- a/tmtc/acs_subsystem.py +++ b/tmtc/acs_subsystem.py @@ -1,5 +1,8 @@ import enum +from typing import Tuple, Dict +from .common import pack_mode_cmd_with_info +from config.object_ids import ACS_SUBSYSTEM_ID from config.definitions import CustomServiceList from tmtccmd.config.tmtc import ( tmtc_definitions_provider, @@ -13,24 +16,50 @@ from tmtccmd.tc.decorator import ServiceProviderParams class OpCodes(str, enum.Enum): OFF = "off" SAFE = "safe" + DETUMBLE = "detumble" IDLE = "idle" + TARGET_PT = "target" + + +class AcsModes(enum.IntEnum): + OFF = 0 + SAFE = 1 + DETUMBLE = 2 + IDLE = 3 + TARGET_PT = 4 class Info(str, enum.Enum): OFF = "Off Command" SAFE = "Safe Mode Command" + DETUMBLE = "Detumble Mode Command" IDLE = "Idle Mode Command" + TARGET_PT = "Target Pointing Mode Command" + + +HANDLER_LIST: Dict[str, Tuple[int, str]] = { + OpCodes.OFF: (AcsModes.OFF, Info.OFF), + OpCodes.IDLE: (AcsModes.IDLE, Info.IDLE), + OpCodes.SAFE: (AcsModes.SAFE, Info.SAFE), + OpCodes.DETUMBLE: (AcsModes.DETUMBLE, Info.DETUMBLE), +} @service_provider(CustomServiceList.ACS_SS.value) def build_acs_subsystem_cmd(p: ServiceProviderParams): op_code = p.op_code - if op_code == OpCodes.OFF: - pass - if op_code == OpCodes.SAFE: - pass - if op_code == OpCodes.IDLE: - pass + q = p.queue_helper + info_prefix = "ACS Subsystem" + mode_info_tup = HANDLER_LIST[op_code] + if mode_info_tup is None: + return + pack_mode_cmd_with_info( + object_id=ACS_SUBSYSTEM_ID, + info=f"{info_prefix}: {mode_info_tup[1]}", + submode=0, + mode=mode_info_tup[0], + q=q, + ) @tmtc_definitions_provider diff --git a/pus_tc/system/common.py b/tmtc/common.py similarity index 59% rename from pus_tc/system/common.py rename to tmtc/common.py index c1bd1a4..e078ce5 100644 --- a/pus_tc/system/common.py +++ b/tmtc/common.py @@ -3,18 +3,25 @@ from typing import Union from spacepackets.ecss.tc import PusTelecommand from tmtccmd.tc import DefaultPusQueueHelper from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes, Subservices +from tmtccmd.util import ObjectIdU32 -def command_mode( - object_id: bytes, +def pack_mode_cmd_with_info( + object_id: Union[ObjectIdU32, bytes], mode: Union[int, Modes], submode: int, q: DefaultPusQueueHelper, info: str, ): + if isinstance(object_id, ObjectIdU32): + object_id_bytes = object_id.as_bytes + elif isinstance(object_id, bytes): + object_id_bytes = object_id + else: + raise ValueError("Invalid Object ID type") q.add_log_cmd(info) mode_data = pack_mode_data( - object_id=object_id, + object_id=object_id_bytes, mode=mode, submode=submode, ) diff --git a/tmtc/sus_board.py b/tmtc/sus_board.py index 2411695..6fb0687 100644 --- a/tmtc/sus_board.py +++ b/tmtc/sus_board.py @@ -1,26 +1,30 @@ from config.definitions import CustomServiceList from config.object_ids import SUS_BOARD_ASS_ID from tmtc.acs_board import DualSideSubmodes -from pus_tc.system.common import command_mode +from tmtc.common import pack_mode_cmd_with_info from tmtccmd.config.tmtc import ( tmtc_definitions_provider, TmtcDefinitionWrapper, OpCodeEntry, ) -from tmtccmd.tc import DefaultPusQueueHelper +from tmtccmd.tc import service_provider +from tmtccmd.tc.decorator import ServiceProviderParams from tmtccmd.tc.pus_200_fsfw_modes import Modes class SusOpCodes: - SUS_ASS_NOM_SIDE = ["0", "sus-nom"] - SUS_ASS_RED_SIDE = ["1", "sus-red"] - SUS_ASS_DUAL_MODE = ["2", "sus-d"] - SUS_ASS_OFF = ["3", "sus-off"] + SUS_ASS_NOM_SIDE = ["0", "nom"] + SUS_ASS_RED_SIDE = ["1", "red"] + SUS_ASS_DUAL_MODE = ["2", "dual"] + SUS_ASS_OFF = ["3", "off"] -def pack_sus_cmds(q: DefaultPusQueueHelper, op_code: str): +@service_provider(CustomServiceList.SUS_BRD_ASS) +def pack_sus_cmds(p: ServiceProviderParams): + op_code = p.op_code + q = p.queue_helper if op_code in SusOpCodes.SUS_ASS_NOM_SIDE: - command_mode( + pack_mode_cmd_with_info( object_id=SUS_BOARD_ASS_ID, mode=Modes.NORMAL, submode=DualSideSubmodes.A_SIDE, @@ -28,7 +32,7 @@ def pack_sus_cmds(q: DefaultPusQueueHelper, op_code: str): info="Switching to SUS board to nominal side", ) if op_code in SusOpCodes.SUS_ASS_RED_SIDE: - command_mode( + pack_mode_cmd_with_info( object_id=SUS_BOARD_ASS_ID, mode=Modes.NORMAL, submode=DualSideSubmodes.B_SIDE, @@ -36,7 +40,7 @@ def pack_sus_cmds(q: DefaultPusQueueHelper, op_code: str): info="Switching to SUS board to redundant side", ) if op_code in SusOpCodes.SUS_ASS_OFF: - command_mode( + pack_mode_cmd_with_info( object_id=SUS_BOARD_ASS_ID, mode=Modes.OFF, submode=0, @@ -44,7 +48,7 @@ def pack_sus_cmds(q: DefaultPusQueueHelper, op_code: str): info="Switching SUS board off", ) if op_code in SusOpCodes.SUS_ASS_DUAL_MODE: - command_mode( + pack_mode_cmd_with_info( object_id=SUS_BOARD_ASS_ID, mode=Modes.NORMAL, submode=DualSideSubmodes.DUAL_SIDE,