From 9e096193dd86cf71474caf0521a05bf690a8a594 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Fri, 10 May 2024 17:21:59 +0200 Subject: [PATCH] clean up python commander a bit --- satrs-example/pytmtc/pytmtc/hk.py | 15 ++++++++ satrs-example/pytmtc/pytmtc/mgms.py | 25 +++++++++++++ satrs-example/pytmtc/pytmtc/mode.py | 31 ++++++++++++++++ satrs-example/pytmtc/pytmtc/pus_tc.py | 51 ++------------------------- 4 files changed, 74 insertions(+), 48 deletions(-) create mode 100644 satrs-example/pytmtc/pytmtc/hk.py create mode 100644 satrs-example/pytmtc/pytmtc/mgms.py create mode 100644 satrs-example/pytmtc/pytmtc/mode.py diff --git a/satrs-example/pytmtc/pytmtc/hk.py b/satrs-example/pytmtc/pytmtc/hk.py new file mode 100644 index 0000000..a2af2b9 --- /dev/null +++ b/satrs-example/pytmtc/pytmtc/hk.py @@ -0,0 +1,15 @@ +import struct +from spacepackets.ecss.pus_3_hk import Subservice +from spacepackets.ecss import PusService, PusTc + + +def create_request_one_shot_hk_cmd(apid: int, unique_id: int, set_id: int) -> PusTc: + app_data = bytearray() + app_data.extend(struct.pack("!I", unique_id)) + app_data.extend(struct.pack("!I", set_id)) + return PusTc( + service=PusService.S3_HOUSEKEEPING, + subservice=Subservice.TC_GENERATE_ONE_PARAMETER_REPORT, + apid=apid, + app_data=app_data, + ) diff --git a/satrs-example/pytmtc/pytmtc/mgms.py b/satrs-example/pytmtc/pytmtc/mgms.py new file mode 100644 index 0000000..ff39072 --- /dev/null +++ b/satrs-example/pytmtc/pytmtc/mgms.py @@ -0,0 +1,25 @@ +import enum +from typing import List +from tmtccmd.tmtc import DefaultPusQueueHelper + +from pytmtc.common import AcsId, Apid +from pytmtc.hk import create_request_one_shot_hk_cmd +from pytmtc.mode import handle_set_mode_cmd + + +class SetId(enum.IntEnum): + SENSOR_SET = 0 + + +def create_mgm_cmds(q: DefaultPusQueueHelper, cmd_path: List[str]): + assert len(cmd_path) >= 3 + if cmd_path[2] == "hk": + if cmd_path[3] == "one_shot_hk": + q.add_log_cmd("Sending HK one shot request") + q.add_pus_tc( + create_request_one_shot_hk_cmd(Apid.ACS, AcsId.MGM_0, SetId.SENSOR_SET) + ) + + if cmd_path[2] == "mode": + if cmd_path[3] == "set_mode": + handle_set_mode_cmd(q, "MGM 0", cmd_path[4], Apid.ACS, AcsId.MGM_0) diff --git a/satrs-example/pytmtc/pytmtc/mode.py b/satrs-example/pytmtc/pytmtc/mode.py new file mode 100644 index 0000000..918fdb1 --- /dev/null +++ b/satrs-example/pytmtc/pytmtc/mode.py @@ -0,0 +1,31 @@ +import struct +from spacepackets.ecss import PusTc +from tmtccmd.pus.s200_fsfw_mode import Mode, Subservice +from tmtccmd.tmtc import DefaultPusQueueHelper + + +def create_set_mode_cmd(apid: int, unique_id: int, mode: int, submode: int) -> PusTc: + app_data = bytearray() + app_data.extend(struct.pack("!I", unique_id)) + app_data.extend(struct.pack("!I", mode)) + app_data.extend(struct.pack("!H", submode)) + return PusTc( + service=200, + subservice=Subservice.TC_MODE_COMMAND, + apid=apid, + app_data=app_data, + ) + + +def handle_set_mode_cmd( + q: DefaultPusQueueHelper, target_str: str, mode_str: str, apid: int, unique_id: int +): + if mode_str == "off": + q.add_log_cmd(f"Sending Mode OFF to {target_str}") + q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.OFF, 0)) + elif mode_str == "on": + q.add_log_cmd(f"Sending Mode ON to {target_str}") + q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.ON, 0)) + elif mode_str == "normal": + q.add_log_cmd(f"Sending Mode NORMAL to {target_str}") + q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.NORMAL, 0)) diff --git a/satrs-example/pytmtc/pytmtc/pus_tc.py b/satrs-example/pytmtc/pytmtc/pus_tc.py index 486f4f1..6e1570c 100644 --- a/satrs-example/pytmtc/pytmtc/pus_tc.py +++ b/satrs-example/pytmtc/pytmtc/pus_tc.py @@ -1,5 +1,4 @@ import datetime -import struct import logging from spacepackets.ccsds import CdsShortTimestamp @@ -8,7 +7,6 @@ from spacepackets.seqcount import FileSeqCountProvider from tmtccmd import ProcedureWrapper, TcHandlerBase from tmtccmd.config import CmdTreeNode from tmtccmd.pus import VerificationWrapper -from tmtccmd.pus.tc.s200_fsfw_mode import Mode from tmtccmd.tmtc import ( DefaultPusQueueHelper, FeedWrapper, @@ -18,9 +16,9 @@ from tmtccmd.tmtc import ( TcQueueEntryType, ) from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd -from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservice -from pytmtc.common import AcsId, Apid +from pytmtc.common import Apid +from pytmtc.mgms import create_mgm_cmds _LOGGER = logging.getLogger(__name__) @@ -115,21 +113,6 @@ def create_cmd_definition_tree() -> CmdTreeNode: return root_node -def create_set_mode_cmd( - apid: int, unique_id: int, mode: int, submode: int -) -> PusTelecommand: - app_data = bytearray() - app_data.extend(struct.pack("!I", unique_id)) - app_data.extend(struct.pack("!I", mode)) - app_data.extend(struct.pack("!H", submode)) - return PusTelecommand( - service=200, - subservice=ModeSubservice.TC_MODE_COMMAND, - apid=apid, - app_data=app_data, - ) - - def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str): # It should always be at least the root path "/", so we split of the empty portion left of it. cmd_path_list = cmd_path.split("/")[1:] @@ -165,32 +148,4 @@ def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str): if cmd_path_list[0] == "acs": assert len(cmd_path_list) >= 2 if cmd_path_list[1] == "mgms": - assert len(cmd_path_list) >= 3 - if cmd_path_list[2] == "hk": - if cmd_path_list[3] == "one_shot_hk": - q.add_log_cmd("Sending HK one shot request") - # TODO: Fix - # q.add_pus_tc( - # create_request_one_hk_command( - # make_addressable_id(Apid.ACS, AcsId.MGM_SET) - # ) - # ) - if cmd_path_list[2] == "mode": - if cmd_path_list[3] == "set_mode": - handle_set_mode_cmd( - q, "MGM 0", cmd_path_list[4], Apid.ACS, AcsId.MGM_0 - ) - - -def handle_set_mode_cmd( - q: DefaultPusQueueHelper, target_str: str, mode_str: str, apid: int, unique_id: int -): - if mode_str == "off": - q.add_log_cmd(f"Sending Mode OFF to {target_str}") - q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.OFF, 0)) - elif mode_str == "on": - q.add_log_cmd(f"Sending Mode ON to {target_str}") - q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.ON, 0)) - elif mode_str == "normal": - q.add_log_cmd(f"Sending Mode NORMAL to {target_str}") - q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.NORMAL, 0)) + create_mgm_cmds(q, cmd_path_list)