From f6fab2d44aff98174835c8446fd69c1ff589521b Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 3 Nov 2022 22:16:10 +0100 Subject: [PATCH] add payload subsystem commands --- config/definitions.py | 1 + config/object_ids.py | 1 + tmtc/__init__.py | 1 + tmtc/payload/pl_subsystem.py | 65 ++++++++++++++++++++++++++++++++++++ 4 files changed, 68 insertions(+) diff --git a/config/definitions.py b/config/definitions.py index 578aecb..bb5b701 100644 --- a/config/definitions.py +++ b/config/definitions.py @@ -59,6 +59,7 @@ class CustomServiceList(str, enum.Enum): SYRLINKS = "syrlinks" ACS_CTRL = "acs_ctrl" ACS_SS = "acs_subsystem" + PL_SS = "pl_subsystem" ACS_BRD_ASS = "acs_brd_ass" SUS_BRD_ASS = "sus_brd_ass" TCS = "tcs" diff --git a/config/object_ids.py b/config/object_ids.py index cb47a47..75a898e 100644 --- a/config/object_ids.py +++ b/config/object_ids.py @@ -123,6 +123,7 @@ SUS_11_R_LOC_XBYMZB_PT_ZB = bytes([0x44, 0x12, 0x00, 0x43]) # System and Assembly Objects ACS_SUBSYSTEM_ID = bytes([0x73, 0x01, 0x00, 0x01]) +PL_SUBSYSTEM_ID = bytes([0x73, 0x01, 0x00, 0x02]) ACS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x01]) SUS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x02]) TCS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x03]) diff --git a/tmtc/__init__.py b/tmtc/__init__.py index e32ce78..22d3893 100644 --- a/tmtc/__init__.py +++ b/tmtc/__init__.py @@ -1 +1,2 @@ +from .payload.pl_subsystem import add_payload_subsystem_cmds from .test import add_test_defs diff --git a/tmtc/payload/pl_subsystem.py b/tmtc/payload/pl_subsystem.py index e69de29..85950d8 100644 --- a/tmtc/payload/pl_subsystem.py +++ b/tmtc/payload/pl_subsystem.py @@ -0,0 +1,65 @@ +import enum +from typing import Dict, Tuple + +from config.definitions import CustomServiceList +from config.object_ids import PL_SUBSYSTEM_ID +from spacepackets.ecss import PusTelecommand +from tmtc.common import pack_mode_cmd_with_info +from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry +from tmtccmd.config.tmtc import tmtc_definitions_provider +from tmtccmd.tc import service_provider +from tmtccmd.tc.decorator import ServiceProviderParams +from tmtccmd.tc.pus_200_fsfw_modes import Subservices as ModeSubservices + + +class OpCodes(str, enum.Enum): + OFF = "off" + REPORT_ALL_MODES = "report_modes" + + +class PayloadModes(enum.IntEnum): + OFF = 0 + + +class Info(str, enum.Enum): + OFF = "Off Command" + REPORT_ALL_MODES = "Report all modes" + + +HANDLER_LIST: Dict[str, Tuple[int, str]] = { + OpCodes.OFF: (PayloadModes.OFF, Info.OFF), +} + + +@service_provider(CustomServiceList.PL_SS) +def build_acs_subsystem_cmd(p: ServiceProviderParams): + op_code = p.op_code + q = p.queue_helper + info_prefix = "ACS Subsystem" + if op_code in OpCodes.REPORT_ALL_MODES: + q.add_log_cmd(f"{info_prefix}: {Info.REPORT_ALL_MODES}") + q.add_pus_tc( + PusTelecommand( + service=200, + subservice=ModeSubservices.TC_MODE_ANNOUNCE_RECURSIVE, + app_data=PL_SUBSYSTEM_ID, + ) + ) + mode_info_tup = HANDLER_LIST.get(op_code) + if mode_info_tup is None: + return + pack_mode_cmd_with_info( + object_id=PL_SUBSYSTEM_ID, + info=f"{info_prefix}: {mode_info_tup[1]}", + submode=0, + mode=mode_info_tup[0], + q=q, + ) + + +@tmtc_definitions_provider +def add_payload_subsystem_cmds(defs: TmtcDefinitionWrapper): + oce = OpCodeEntry() + oce.add(OpCodes.OFF, Info.OFF) + oce.add(OpCodes.REPORT_ALL_MODES, Info.REPORT_ALL_MODES) + defs.add_service(CustomServiceList.PL_SS, "Payload Subsystem", oce)