diff --git a/CHANGELOG.md b/CHANGELOG.md index d3eaecc..62392ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ list yields a list of all related PRs for each release. # [unreleased] +## Added + +- Added core commands to execute `systemctl` commands and to execute arbitrary Linux commands. + # [v2.22.1] 2023-04-12 ## Added diff --git a/eive_tmtc/tmtc/core.py b/eive_tmtc/tmtc/core.py index b4856ea..7d5d8f7 100644 --- a/eive_tmtc/tmtc/core.py +++ b/eive_tmtc/tmtc/core.py @@ -39,6 +39,8 @@ class ActionId(enum.IntEnum): SWITCH_TO_BOTH_SD_CARDS = 18 XSC_REBOOT = 32 FULL_REBOOT = 34 + EXECUTE_SHELL_CMD_BLOCKING = 40 + SYSTEMCTL_CMD_EXECUTOR = 42 class ParamId(enum.IntEnum): @@ -53,6 +55,8 @@ class OpCode: ANNOUNCE_VERSION = "announce_version" ANNOUNCE_CURRENT_IMAGE = "announce_current_image" ANNOUNCE_BOOT_COUNTS = "announce_boot_counts" + EXECUTE_SHELL_CMD_BLOCKING = "exec_shell_cmd_blocking" + SYSTEMCTL_CMD_EXECUTOR = "systemctl_cmd" SET_PREF_SD = "set_pref_sd" REBOOT_XSC = ["reboot_xsc"] XSC_REBOOT_SELF = ["reboot_self"] @@ -82,6 +86,8 @@ class Info: ANNOUNCE_VERSION = "Announce version" ANNOUNCE_CURRENT_IMAGE = "Announce current image" ANNOUNCE_BOOT_COUNTS = "Announce boot counts" + SYSTEMCTL_CMD_EXECUTOR = "Perform systemctl command" + EXECUTE_SHELL_CMD_BLOCKING = "Execute shell command (blocking)" SET_PREF_SD = "Set preferred SD card" REBOOT_XSC = "XSC reboot with prompt" REBOOT_FULL = "Full regular reboot" @@ -105,6 +111,12 @@ class Copy(enum.IntEnum): NONE = 2 +class SystemctlCmd(enum.IntEnum): + START = 0 + STOP = 1 + RESTART = 2 + + @tmtc_definitions_provider def add_core_controller_definitions(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() @@ -123,6 +135,8 @@ def add_core_controller_definitions(defs: TmtcDefinitionWrapper): oce.add(keys=OpCode.OBSW_UPDATE_FROM_TMP, info=Info.OBSW_UPDATE_FROM_TMP) oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_0, info=Info.OBSW_UPDATE_FROM_SD_0) oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_1, info=Info.OBSW_UPDATE_FROM_SD_1) + oce.add(keys=OpCode.SYSTEMCTL_CMD_EXECUTOR, info=Info.SYSTEMCTL_CMD_EXECUTOR) + oce.add(keys=OpCode.EXECUTE_SHELL_CMD_BLOCKING, info=Info.EXECUTE_SHELL_CMD_BLOCKING) oce.add( keys=OpCode.GET_HK, info="Request housekeeping set", @@ -195,6 +209,26 @@ def pack_core_commands(q: DefaultPusQueueHelper, op_code: str): ) elif op_code in OpCode.XSC_REBOOT_SELF: add_xsc_reboot_cmd(q=q, reboot_self=True) + elif op_code == OpCode.SYSTEMCTL_CMD_EXECUTOR: + print("systemctl command types: ") + for idx, v in enumerate(SystemctlCmd): + print(f"{idx}: {v}") + systemctl_cmd = SystemctlCmd(int(input("Specify systemctl command type: "))) + unit_name = input("Specify unit name: ") + cmd_data = bytearray([systemctl_cmd]) + cmd_data.extend(unit_name.encode()) + create_action_cmd( + object_id=CORE_CONTROLLER_ID, + action_id=ActionId.SYSTEMCTL_CMD_EXECUTOR, + user_data=cmd_data + ) + elif op_code in OpCode.EXECUTE_SHELL_CMD_BLOCKING: + custom_cmd = input("Please specify command to execute: ") + create_action_cmd( + object_id=CORE_CONTROLLER_ID, + action_id=ActionId.EXECUTE_SHELL_CMD_BLOCKING, + user_data=custom_cmd.encode() + ) elif op_code in OpCode.XSC_REBOOT_0_0: add_xsc_reboot_cmd( q=q, reboot_self=False, chip=Chip.CHIP_0, copy=Copy.COPY_0_NOM