Compare commits

...

15 Commits

6 changed files with 129 additions and 19 deletions

View File

@ -10,6 +10,16 @@ list yields a list of all related PRs for each release.
# [unreleased]
## Added
- Added core commands to execute `systemctl` commands and to execute arbitrary Linux commands.
# [v2.22.1] 2023-04-12
## Added
- Prompts to specify custom filename for OBSW update
# [v2.22.0] 2023-04-07
- Various smaller and helper commands added for tests

View File

@ -1,4 +1,4 @@
__version__ = "2.22.0"
__version__ = "2.22.1"
import logging
from pathlib import Path
@ -6,7 +6,7 @@ from pathlib import Path
SW_NAME = "eive-tmtc"
VERSION_MAJOR = 2
VERSION_MINOR = 22
VERSION_REVISION = 0
VERSION_REVISION = 1
EIVE_TMTC_ROOT = Path(__file__).parent
PACKAGE_ROOT = EIVE_TMTC_ROOT.parent

View File

@ -103,6 +103,7 @@ class OpCodes:
TAKE_IMAGE = "take_image"
UPLOAD_IMAGE = "upload_image"
SET_IMG_PROCESSOR_MODE = "set_img_proc_mode"
FW_UPDATE = "fw_update"
class Info:
@ -111,6 +112,7 @@ class Info:
UPLOAD_IMAGE = "Upload Image"
TAKE_IMAGE = "Take Image"
SET_IMG_PROCESSOR_MODE = "Set Image Processor Mode"
FW_UPDATE = "Firmware Update"
class SetId(enum.IntEnum):
@ -141,6 +143,7 @@ class FileDefs:
firmware2_1 = "/home/pi/arcsec/firmware/sagitta-2-1.bin"
firmware22_1 = "/home/pi/arcsec/firmware/sagitta-22-1.bin"
firmware_origin = "/home/pi/arcsec/firmware/sagitta-origin.bin"
FW_SLOT_Q7S = "/mnt/sd0/startracker/updates/sagitta-update.bin"
json_dict = {
@ -158,7 +161,8 @@ json_dict = {
),
}
firmware_dict = {
FW_DICT = {
"0": ("Firmware Update Q7S", FileDefs.FW_SLOT_Q7S),
"1": ("Firmware Major = 2, Minor = 1", FileDefs.firmware2_1),
"2": ("Firmware Major = 22, Minor = 1", FileDefs.firmware22_1),
"3": ("Firmware Origin", FileDefs.firmware_origin),
@ -630,13 +634,13 @@ def pack_star_tracker_commands(
+ bytearray(json_file, "utf-8")
)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "69":
q.add_log_cmd("Star tracker: Firmware update")
if op_code == OpCodes.FW_UPDATE:
q.add_log_cmd(Info.FW_UPDATE)
firmware = get_firmware()
data = (
obyt
+ struct.pack("!I", StarTrackerActionId.FIRMWARE_UPDATE)
+ bytearray(firmware, "utf-8")
+ firmware.encode()
)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
@ -681,9 +685,13 @@ def get_config_file() -> str:
def get_firmware() -> str:
_LOGGER.info("Specify firmware file")
input_helper = InputHelper(firmware_dict)
key = input_helper.get_key()
firmware = firmware_dict[key][1]
bin_select = int(input("Use hardcoded paths (0) or specify path manually (1) ?: "))
if bin_select == 0:
input_helper = InputHelper(FW_DICT)
key = input_helper.get_key()
firmware = FW_DICT[key][1]
else:
firmware = input("Specify absolute path of the firmware update file: ")
return firmware
@ -870,7 +878,7 @@ def add_str_cmds(defs: TmtcDefinitionWrapper):
oce.add("66", "Star Tracker: Set log level parameters")
oce.add("67", "Star Tracker: Set log subscription parameters")
oce.add("68", "Star Tracker: Set debug camera parameters")
oce.add("69", "Star Tracker: Firmware update")
oce.add(OpCodes.FW_UPDATE, Info.FW_UPDATE)
oce.add("70", "Star Tracker: Disable timestamp generation")
oce.add("71", "Star Tracker: Enable timestamp generation")
defs.add_service(CustomServiceList.STAR_TRACKER.value, "Star Tracker", oce)

View File

@ -2,6 +2,7 @@ import enum
class Mode(enum.IntEnum):
NONE = 0
RX_ONLY = 10
RX_AND_TX_DEF_DATARATE = 11
RX_AND_TX_LOW_DATARATE = 12

View File

@ -11,6 +11,10 @@ from tmtccmd.config import TmtcDefinitionWrapper
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
from tmtccmd.tc.pus_20_fsfw_param import (
create_scalar_u8_parameter,
create_load_param_cmd,
)
from tmtccmd.config.tmtc import OpCodeEntry, tmtc_definitions_provider
from eive_tmtc.config.object_ids import CORE_CONTROLLER_ID
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
@ -35,6 +39,13 @@ class ActionId(enum.IntEnum):
SWITCH_TO_BOTH_SD_CARDS = 18
XSC_REBOOT = 32
FULL_REBOOT = 34
EXECUTE_SHELL_CMD_BLOCKING = 40
EXECUTE_SHELL_CMD_NON_BLOCKING = 41
SYSTEMCTL_CMD_EXECUTOR = 42
class ParamId(enum.IntEnum):
PREF_SD = 0
class SetId(enum.IntEnum):
@ -45,6 +56,10 @@ class OpCode:
ANNOUNCE_VERSION = "announce_version"
ANNOUNCE_CURRENT_IMAGE = "announce_current_image"
ANNOUNCE_BOOT_COUNTS = "announce_boot_counts"
EXECUTE_SHELL_CMD_BLOCKING = "exec_cmd_blocking"
EXECUTE_SHELL_CMD_NON_BLOCKING = "exec_cmd_non_blocking"
SYSTEMCTL_CMD_EXECUTOR = "systemctl_cmd"
SET_PREF_SD = "set_pref_sd"
REBOOT_XSC = ["reboot_xsc"]
XSC_REBOOT_SELF = ["reboot_self"]
XSC_REBOOT_0_0 = ["reboot_00"]
@ -73,6 +88,10 @@ class Info:
ANNOUNCE_VERSION = "Announce version"
ANNOUNCE_CURRENT_IMAGE = "Announce current image"
ANNOUNCE_BOOT_COUNTS = "Announce boot counts"
SYSTEMCTL_CMD_EXECUTOR = "Perform systemctl command"
EXECUTE_SHELL_CMD_BLOCKING = "Execute shell command blocking"
EXECUTE_SHELL_CMD_NON_BLOCKING = "Execute shell command non-blocking"
SET_PREF_SD = "Set preferred SD card"
REBOOT_XSC = "XSC reboot with prompt"
REBOOT_FULL = "Full regular reboot"
OBSW_UPDATE_FROM_SD_0 = "Update OBSW from SD Card 0"
@ -95,6 +114,12 @@ class Copy(enum.IntEnum):
NONE = 2
class SystemctlCmd(enum.IntEnum):
START = 0
STOP = 1
RESTART = 2
@tmtc_definitions_provider
def add_core_controller_definitions(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
@ -109,9 +134,13 @@ def add_core_controller_definitions(defs: TmtcDefinitionWrapper):
oce.add(keys=OpCode.XSC_REBOOT_0_1, info="Reboot 0 1")
oce.add(keys=OpCode.XSC_REBOOT_1_0, info="Reboot 1 0")
oce.add(keys=OpCode.XSC_REBOOT_1_1, info="Reboot 1 1")
oce.add(keys=OpCode.SET_PREF_SD, info=Info.SET_PREF_SD)
oce.add(keys=OpCode.OBSW_UPDATE_FROM_TMP, info=Info.OBSW_UPDATE_FROM_TMP)
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_0, info=Info.OBSW_UPDATE_FROM_SD_0)
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_1, info=Info.OBSW_UPDATE_FROM_SD_1)
oce.add(keys=OpCode.SYSTEMCTL_CMD_EXECUTOR, info=Info.SYSTEMCTL_CMD_EXECUTOR)
oce.add(keys=OpCode.EXECUTE_SHELL_CMD_BLOCKING, info=Info.EXECUTE_SHELL_CMD_BLOCKING)
oce.add(keys=OpCode.EXECUTE_SHELL_CMD_NON_BLOCKING, info=Info.EXECUTE_SHELL_CMD_NON_BLOCKING)
oce.add(
keys=OpCode.GET_HK,
info="Request housekeeping set",
@ -184,6 +213,33 @@ def pack_core_commands(q: DefaultPusQueueHelper, op_code: str):
)
elif op_code in OpCode.XSC_REBOOT_SELF:
add_xsc_reboot_cmd(q=q, reboot_self=True)
elif op_code == OpCode.SYSTEMCTL_CMD_EXECUTOR:
print("systemctl command types: ")
for entry in SystemctlCmd:
print(f"{entry}: {entry.name}")
systemctl_cmd = SystemctlCmd(int(input("Specify systemctl command type by key: ")))
unit_name = input("Specify unit name: ")
cmd_data = bytearray([systemctl_cmd])
cmd_data.extend(unit_name.encode())
q.add_pus_tc(create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.SYSTEMCTL_CMD_EXECUTOR,
user_data=cmd_data
))
elif op_code == OpCode.EXECUTE_SHELL_CMD_BLOCKING:
custom_cmd = input("Please specify command to execute: ")
q.add_pus_tc(create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.EXECUTE_SHELL_CMD_BLOCKING,
user_data=custom_cmd.encode()
))
elif op_code == OpCode.EXECUTE_SHELL_CMD_NON_BLOCKING:
custom_cmd = input("Please specify command to execute: ")
q.add_pus_tc(create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.EXECUTE_SHELL_CMD_NON_BLOCKING,
user_data=custom_cmd.encode()
))
elif op_code in OpCode.XSC_REBOOT_0_0:
add_xsc_reboot_cmd(
q=q, reboot_self=False, chip=Chip.CHIP_0, copy=Copy.COPY_0_NOM
@ -267,7 +323,7 @@ def pack_core_commands(q: DefaultPusQueueHelper, op_code: str):
)
elif op_code in OpCode.SWITCH_TO_BOTH_SD_CARDS:
while True:
active_sd_card = int(input("Please specify active SD cqrd [0/1]: "))
active_sd_card = int(input("Please specify active SD card [0/1]: "))
if active_sd_card not in [0, 1]:
_LOGGER.warning("Invalid SD card specified. Try again")
break
@ -283,6 +339,23 @@ def pack_core_commands(q: DefaultPusQueueHelper, op_code: str):
q.add_log_cmd("Requesting housekeeping set")
sid = make_sid(object_id=CORE_CONTROLLER_ID, set_id=SetId.HK)
q.add_pus_tc(generate_one_hk_command(sid))
elif op_code in OpCode.SET_PREF_SD:
q.add_log_cmd("Set preferred SD card")
pref_sd = int(
input("Specify which SD card to set as the preferred one (0/1): ")
)
if pref_sd not in [0, 1]:
raise ValueError("Only 0 or 1 allowed for preferred SD card")
q.add_pus_tc(
create_load_param_cmd(
create_scalar_u8_parameter(
object_id=CORE_CONTROLLER_ID,
domain_id=0,
unique_id=ParamId.PREF_SD,
parameter=pref_sd,
).pack()
)
)
else:
_LOGGER.warning(
f"Unknown operation code {op_code} for core controller commands"
@ -343,7 +416,11 @@ def determine_chip_and_copy() -> (int, int):
def pack_obsw_update_cmd(action_id: int) -> PusTelecommand:
chip, copy = determine_chip_and_copy()
user_data = bytes([chip, copy])
user_data = bytearray([chip, copy])
custom_file_name = input("Use custom filename [y/n] ?: ")
if custom_file_name.lower() in ["y", "yes", "1"]:
custom_file_name = input("Specify custom filename: ")
user_data.extend(custom_file_name.encode())
return create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=action_id, user_data=user_data
)

View File

@ -15,6 +15,7 @@ from eive_tmtc.config.object_ids import (
SUS_6_R_LOC_XFYBZM_PT_XF,
RW1_ID,
RW2_ID,
RTD_0_PLOC_HSPD,
)
SUBSYSTEM_DICT = {
@ -40,6 +41,20 @@ ACS_OBJ_DICT = {
13: ("RW 2", RW2_ID),
}
TCS_OBJ_DICT = {
0: ("RTD 0", RTD_0_PLOC_HSPD),
}
def get_obj_if_from_dict(lut: dict) -> bytes:
for k, v in lut.items():
print(f"{k}: {v[0]}")
obj_key = int(input("Please specify target object by key: "))
name_and_obj_id = lut[obj_key]
if name_and_obj_id is None:
raise ValueError("invalid key")
return name_and_obj_id[1]
def prompt_object() -> bytes:
for k, v in SUBSYSTEM_DICT.items():
@ -49,10 +64,9 @@ def prompt_object() -> bytes:
if subsystem is None:
raise ValueError("invalid key")
if subsystem == "acs":
for k, v in ACS_OBJ_DICT.items():
print(f"{k}: {v[0]}")
obj_key = int(input("Please specify target object by key: "))
acs_obj = ACS_OBJ_DICT[obj_key]
if acs_obj is None:
raise ValueError("invalid key")
return acs_obj[1]
return get_obj_if_from_dict(ACS_OBJ_DICT)
elif subsystem == "tcs":
return get_obj_if_from_dict(TCS_OBJ_DICT)
else:
print(f"No object for subsystem {subsystem}")
return bytes()