ops-sat-rs/pytmtc/opssat_tmtc/controller.py
Robin Mueller ecea83fc4b implement move images command
- Also implement various importantr bugfixes for shutdown handling
2024-04-30 19:45:00 +02:00

54 lines
1.6 KiB
Python

import enum
from typing import List
from spacepackets.ecss import PusTc
from tmtccmd.config import CmdTreeNode
from tmtccmd.tmtc import DefaultPusQueueHelper
from opssat_tmtc.common import EXPERIMENT_APID, UniqueId, make_action_cmd_header
class ActionId(enum.IntEnum):
STOP_EXPERIMENT = 1
DOWNLINK_LOG_FILE = 2
DOWNLINK_IMAGES_BY_MOVING = 3
EXECUTE_SHELL_CMD_BLOCKING = 4
class OpCode:
DOWNLINK_LOGS = "downlink_logs"
DOWNLINK_IMAGES_BY_MOVING = "move_image_files"
def create_controller_node():
controller_node = CmdTreeNode("controller", "Main OBSW Controller")
controller_node.add_child(
CmdTreeNode(OpCode.DOWNLINK_LOGS, "Downlink Logs via toGround folder")
)
controller_node.add_child(
CmdTreeNode(
OpCode.DOWNLINK_IMAGES_BY_MOVING,
"Downlink all image files via the toGroundLP folder",
)
)
return controller_node
def create_ctrl_cmd(q: DefaultPusQueueHelper, cmd_path: List[str]):
assert len(cmd_path) >= 1
data = bytearray()
if cmd_path[0] == OpCode.DOWNLINK_LOGS:
data.extend(
make_action_cmd_header(UniqueId.Controller, ActionId.DOWNLINK_LOG_FILE)
)
elif cmd_path[0] == OpCode.DOWNLINK_IMAGES_BY_MOVING:
data.extend(
make_action_cmd_header(
UniqueId.Controller, ActionId.DOWNLINK_IMAGES_BY_MOVING
)
)
else:
raise ValueError("unknown controller action {}", cmd_path[0])
return q.add_pus_tc(
PusTc(service=8, subservice=128, apid=EXPERIMENT_APID, app_data=data)
)