From 63c584e061a8e0d27189d66fb3ac1c48d011afe7 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Fri, 14 Apr 2023 19:21:51 +0200 Subject: [PATCH] Add new PDEC commands --- eive_tmtc/tmtc/com/pdec_handler.py | 27 ++++++++++++++-- eive_tmtc/tmtc/core.py | 49 +++++++++++++++++++----------- 2 files changed, 56 insertions(+), 20 deletions(-) diff --git a/eive_tmtc/tmtc/com/pdec_handler.py b/eive_tmtc/tmtc/com/pdec_handler.py index fbe10c8..38763b4 100644 --- a/eive_tmtc/tmtc/com/pdec_handler.py +++ b/eive_tmtc/tmtc/com/pdec_handler.py @@ -5,11 +5,13 @@ @author J. Meier @date 22.11.2021 """ +import enum + from spacepackets.ecss.tc import PusTelecommand from tmtccmd.tc import DefaultPusQueueHelper from tmtccmd.tc.pus_20_fsfw_param import create_load_param_cmd - +from tmtccmd.tc.pus_8_fsfw_funccmd import create_action_cmd from tmtccmd.pus.s20_fsfw_param_defs import create_scalar_u8_parameter from tmtccmd.config.tmtc import ( @@ -28,16 +30,23 @@ class CommandId: PRINT_PDEC_MON = bytearray([0x0, 0x0, 0x0, 0x1]) -class ParameterId: +class ParameterId(enum.IntEnum): POSITIVE_WINDOW = 0 NEGATIVE_WINDOW = 1 +class ActionId(enum.IntEnum): + RESET_NO_INIT = 2 + RESET_WITH_INIT = 3 + + class OpCode: PRINT_CLCW = "print_clcw" PRINT_MON_REG = "print_mon_reg" POSITIVE_WINDOW = "positive_window" NEGATIVE_WINDOW = "negative_window" + RESET_WITH_INIT = "reset_with_init" + RESET_NO_INIT = "reset_no_init" class Info: @@ -47,6 +56,8 @@ class Info: ) POSITIVE_WINDOW = "Change positive window parameter for AD frames" NEGATIVE_WINDOW = "Change negative window parameter for AD frames" + RESET_WITH_INIT = "Reset with full initialization" + RESET_NO_INIT = "Reset with mandatory initialization" def pack_pdec_handler_test( @@ -88,6 +99,16 @@ def pack_pdec_handler_test( ).pack() ) ) + if op_code == OpCode.RESET_NO_INIT: + q.add_log_cmd(f"{prefix}: {Info.RESET_NO_INIT}") + q.add_pus_tc( + create_action_cmd(object_id=object_id, action_id=ActionId.RESET_NO_INIT) + ) + if op_code == OpCode.RESET_WITH_INIT: + q.add_log_cmd(f"{prefix}: {Info.RESET_WITH_INIT}") + q.add_pus_tc( + create_action_cmd(object_id=object_id, action_id=ActionId.RESET_WITH_INIT) + ) @tmtc_definitions_provider @@ -97,4 +118,6 @@ def add_pdec_cmds(defs: TmtcDefinitionWrapper): oce.add(OpCode.PRINT_MON_REG, Info.PRINT_MON_REG) oce.add(OpCode.POSITIVE_WINDOW, Info.POSITIVE_WINDOW) oce.add(OpCode.NEGATIVE_WINDOW, Info.NEGATIVE_WINDOW) + oce.add(OpCode.RESET_WITH_INIT, Info.RESET_WITH_INIT) + oce.add(OpCode.RESET_NO_INIT, Info.RESET_NO_INIT) defs.add_service(CustomServiceList.PDEC_HANDLER.value, "PDEC Handler", oce) diff --git a/eive_tmtc/tmtc/core.py b/eive_tmtc/tmtc/core.py index 02b1845..9d96b5c 100644 --- a/eive_tmtc/tmtc/core.py +++ b/eive_tmtc/tmtc/core.py @@ -139,8 +139,13 @@ def add_core_controller_definitions(defs: TmtcDefinitionWrapper): oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_0, info=Info.OBSW_UPDATE_FROM_SD_0) oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_1, info=Info.OBSW_UPDATE_FROM_SD_1) oce.add(keys=OpCode.SYSTEMCTL_CMD_EXECUTOR, info=Info.SYSTEMCTL_CMD_EXECUTOR) - oce.add(keys=OpCode.EXECUTE_SHELL_CMD_BLOCKING, info=Info.EXECUTE_SHELL_CMD_BLOCKING) - oce.add(keys=OpCode.EXECUTE_SHELL_CMD_NON_BLOCKING, info=Info.EXECUTE_SHELL_CMD_NON_BLOCKING) + oce.add( + keys=OpCode.EXECUTE_SHELL_CMD_BLOCKING, info=Info.EXECUTE_SHELL_CMD_BLOCKING + ) + oce.add( + keys=OpCode.EXECUTE_SHELL_CMD_NON_BLOCKING, + info=Info.EXECUTE_SHELL_CMD_NON_BLOCKING, + ) oce.add( keys=OpCode.GET_HK, info="Request housekeeping set", @@ -217,29 +222,37 @@ def pack_core_commands(q: DefaultPusQueueHelper, op_code: str): print("systemctl command types: ") for entry in SystemctlCmd: print(f"{entry}: {entry.name}") - systemctl_cmd = SystemctlCmd(int(input("Specify systemctl command type by key: "))) + systemctl_cmd = SystemctlCmd( + int(input("Specify systemctl command type by key: ")) + ) unit_name = input("Specify unit name: ") cmd_data = bytearray([systemctl_cmd]) cmd_data.extend(unit_name.encode()) - q.add_pus_tc(create_action_cmd( - object_id=CORE_CONTROLLER_ID, - action_id=ActionId.SYSTEMCTL_CMD_EXECUTOR, - user_data=cmd_data - )) + q.add_pus_tc( + create_action_cmd( + object_id=CORE_CONTROLLER_ID, + action_id=ActionId.SYSTEMCTL_CMD_EXECUTOR, + user_data=cmd_data, + ) + ) elif op_code == OpCode.EXECUTE_SHELL_CMD_BLOCKING: custom_cmd = input("Please specify command to execute: ") - q.add_pus_tc(create_action_cmd( - object_id=CORE_CONTROLLER_ID, - action_id=ActionId.EXECUTE_SHELL_CMD_BLOCKING, - user_data=custom_cmd.encode() - )) + q.add_pus_tc( + create_action_cmd( + object_id=CORE_CONTROLLER_ID, + action_id=ActionId.EXECUTE_SHELL_CMD_BLOCKING, + user_data=custom_cmd.encode(), + ) + ) elif op_code == OpCode.EXECUTE_SHELL_CMD_NON_BLOCKING: custom_cmd = input("Please specify command to execute: ") - q.add_pus_tc(create_action_cmd( - object_id=CORE_CONTROLLER_ID, - action_id=ActionId.EXECUTE_SHELL_CMD_NON_BLOCKING, - user_data=custom_cmd.encode() - )) + q.add_pus_tc( + create_action_cmd( + object_id=CORE_CONTROLLER_ID, + action_id=ActionId.EXECUTE_SHELL_CMD_NON_BLOCKING, + user_data=custom_cmd.encode(), + ) + ) elif op_code in OpCode.XSC_REBOOT_0_0: add_xsc_reboot_cmd( q=q, reboot_self=False, chip=Chip.CHIP_0, copy=Copy.COPY_0_NOM