implement move images command
- Also implement various importantr bugfixes for shutdown handling
This commit is contained in:
@ -1,5 +1,12 @@
|
||||
import enum
|
||||
from typing import List
|
||||
|
||||
from spacepackets.ecss import PusTc
|
||||
from tmtccmd.config import CmdTreeNode
|
||||
from pydantic import BaseModel
|
||||
from tmtccmd.tmtc import DefaultPusQueueHelper
|
||||
|
||||
from opssat_tmtc.common import EXPERIMENT_APID, UniqueId, make_action_cmd_header
|
||||
|
||||
|
||||
class ActionId(enum.IntEnum):
|
||||
@ -18,3 +25,72 @@ class CameraParameters(BaseModel):
|
||||
P: bool
|
||||
E: int
|
||||
W: int
|
||||
|
||||
|
||||
def create_camera_node() -> CmdTreeNode:
|
||||
cam_node = CmdTreeNode("cam", "OPS-SAT IMS1000 batch handler commands")
|
||||
cam_node.add_child(
|
||||
CmdTreeNode("default_single", "Default Single Image Camera Parameters")
|
||||
)
|
||||
cam_node.add_child(
|
||||
CmdTreeNode("balanced_single", "Balanced Single Image Camera Parameters")
|
||||
)
|
||||
cam_node.add_child(
|
||||
CmdTreeNode(
|
||||
"default_single_flatsat",
|
||||
"Default Single Image Camera Parameters for use on FlatSat",
|
||||
)
|
||||
)
|
||||
cam_node.add_child(
|
||||
CmdTreeNode(
|
||||
"balanced_single_flatsat",
|
||||
"Balanced Single Image Camera Parameters for use on FlatSat",
|
||||
)
|
||||
)
|
||||
cam_node.add_child(
|
||||
CmdTreeNode("custom_params", "Custom Camera Parameters as specified from file")
|
||||
)
|
||||
return cam_node
|
||||
|
||||
|
||||
def create_cam_cmd(q: DefaultPusQueueHelper, cmd_path: List[str]):
|
||||
|
||||
assert len(cmd_path) >= 1
|
||||
q.add_log_cmd(
|
||||
"Sending PUS take image action request for command " + cmd_path[0] + " params."
|
||||
)
|
||||
data = bytearray()
|
||||
if cmd_path[0] == "default_single":
|
||||
data.extend(
|
||||
make_action_cmd_header(UniqueId.CameraHandler, ActionId.DEFAULT_SINGLE)
|
||||
)
|
||||
elif cmd_path[0] == "balanced_single":
|
||||
data.extend(
|
||||
make_action_cmd_header(UniqueId.CameraHandler, ActionId.BALANCED_SINGLE)
|
||||
)
|
||||
elif cmd_path[0] == "default_single_flatsat":
|
||||
data.extend(
|
||||
make_action_cmd_header(
|
||||
UniqueId.CameraHandler, ActionId.DEFAULT_SINGLE_FLATSAT
|
||||
)
|
||||
)
|
||||
elif cmd_path[0] == "balanced_single_flatsat":
|
||||
data.extend(
|
||||
make_action_cmd_header(
|
||||
UniqueId.CameraHandler, ActionId.BALANCED_SNGLE_FLATSAT
|
||||
)
|
||||
)
|
||||
elif cmd_path[0] == "custom":
|
||||
data.extend(
|
||||
make_action_cmd_header(UniqueId.CameraHandler, ActionId.CUSTOM_PARAMS)
|
||||
)
|
||||
# TODO: Implement asking params from user.
|
||||
|
||||
# params = CameraParameters(8, 8, 8, 1, True, 200, 1000)
|
||||
# data.extend(params.model_dump_json().encode())
|
||||
raise NotImplementedError()
|
||||
else:
|
||||
raise ValueError("unknown camera action {}", cmd_path[0])
|
||||
return q.add_pus_tc(
|
||||
PusTc(service=8, subservice=128, apid=EXPERIMENT_APID, app_data=data)
|
||||
)
|
||||
|
53
pytmtc/opssat_tmtc/controller.py
Normal file
53
pytmtc/opssat_tmtc/controller.py
Normal file
@ -0,0 +1,53 @@
|
||||
import enum
|
||||
from typing import List
|
||||
from spacepackets.ecss import PusTc
|
||||
from tmtccmd.config import CmdTreeNode
|
||||
from tmtccmd.tmtc import DefaultPusQueueHelper
|
||||
|
||||
from opssat_tmtc.common import EXPERIMENT_APID, UniqueId, make_action_cmd_header
|
||||
|
||||
|
||||
class ActionId(enum.IntEnum):
|
||||
STOP_EXPERIMENT = 1
|
||||
DOWNLINK_LOG_FILE = 2
|
||||
DOWNLINK_IMAGES_BY_MOVING = 3
|
||||
EXECUTE_SHELL_CMD_BLOCKING = 4
|
||||
|
||||
|
||||
class OpCode:
|
||||
DOWNLINK_LOGS = "downlink_logs"
|
||||
DOWNLINK_IMAGES_BY_MOVING = "move_image_files"
|
||||
|
||||
|
||||
def create_controller_node():
|
||||
controller_node = CmdTreeNode("controller", "Main OBSW Controller")
|
||||
controller_node.add_child(
|
||||
CmdTreeNode(OpCode.DOWNLINK_LOGS, "Downlink Logs via toGround folder")
|
||||
)
|
||||
controller_node.add_child(
|
||||
CmdTreeNode(
|
||||
OpCode.DOWNLINK_IMAGES_BY_MOVING,
|
||||
"Downlink all image files via the toGroundLP folder",
|
||||
)
|
||||
)
|
||||
return controller_node
|
||||
|
||||
|
||||
def create_ctrl_cmd(q: DefaultPusQueueHelper, cmd_path: List[str]):
|
||||
assert len(cmd_path) >= 1
|
||||
data = bytearray()
|
||||
if cmd_path[0] == OpCode.DOWNLINK_LOGS:
|
||||
data.extend(
|
||||
make_action_cmd_header(UniqueId.Controller, ActionId.DOWNLINK_LOG_FILE)
|
||||
)
|
||||
elif cmd_path[0] == OpCode.DOWNLINK_IMAGES_BY_MOVING:
|
||||
data.extend(
|
||||
make_action_cmd_header(
|
||||
UniqueId.Controller, ActionId.DOWNLINK_IMAGES_BY_MOVING
|
||||
)
|
||||
)
|
||||
else:
|
||||
raise ValueError("unknown controller action {}", cmd_path[0])
|
||||
return q.add_pus_tc(
|
||||
PusTc(service=8, subservice=128, apid=EXPERIMENT_APID, app_data=data)
|
||||
)
|
@ -10,12 +10,8 @@ from tmtccmd.tmtc import DefaultPusQueueHelper
|
||||
from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd
|
||||
from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservice
|
||||
|
||||
from opssat_tmtc.camera import CameraParameters
|
||||
from opssat_tmtc.common import (
|
||||
EXPERIMENT_APID,
|
||||
UniqueId,
|
||||
make_action_cmd_header,
|
||||
)
|
||||
from opssat_tmtc.camera import create_cam_cmd, create_camera_node
|
||||
from opssat_tmtc.controller import create_controller_node, create_ctrl_cmd
|
||||
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@ -71,42 +67,8 @@ def create_cmd_definition_tree() -> CmdTreeNode:
|
||||
)
|
||||
root_node.add_child(scheduler_node)
|
||||
|
||||
action_node = CmdTreeNode("action", "Action Node")
|
||||
cam_node = CmdTreeNode("take_image", "Take Image with IMS Imager")
|
||||
cam_node.add_child(
|
||||
CmdTreeNode("default_single", "Default Single Image Camera Parameters")
|
||||
)
|
||||
cam_node.add_child(
|
||||
CmdTreeNode("balanced_single", "Balanced Single Image Camera Parameters")
|
||||
)
|
||||
cam_node.add_child(
|
||||
CmdTreeNode(
|
||||
"default_single_flatsat",
|
||||
"Default Single Image Camera Parameters for use on FlatSat",
|
||||
)
|
||||
)
|
||||
cam_node.add_child(
|
||||
CmdTreeNode(
|
||||
"balanced_single_flatsat",
|
||||
"Balanced Single Image Camera Parameters for use on FlatSat",
|
||||
)
|
||||
)
|
||||
cam_node.add_child(
|
||||
CmdTreeNode("custom_params", "Custom Camera Parameters as specified from file")
|
||||
)
|
||||
action_node.add_child(cam_node)
|
||||
|
||||
controller_node = CmdTreeNode("controller", "Main OBSW Controller")
|
||||
controller_node.add_child(
|
||||
CmdTreeNode("downlink_logs", "Downlink Logs via toGround folder")
|
||||
)
|
||||
controller_node.add_child(
|
||||
CmdTreeNode("downlink_last_img", "Downlink last image via toGroundLP folder")
|
||||
)
|
||||
action_node.add_child(controller_node)
|
||||
|
||||
root_node.add_child(action_node)
|
||||
|
||||
root_node.add_child(create_camera_node())
|
||||
root_node.add_child(create_controller_node())
|
||||
return root_node
|
||||
|
||||
|
||||
@ -139,45 +101,10 @@ def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str):
|
||||
)
|
||||
if cmd_path_list[0] == "acs":
|
||||
assert len(cmd_path_list) >= 2
|
||||
if cmd_path_list[0] == "action":
|
||||
assert len(cmd_path_list) >= 2
|
||||
if cmd_path_list[1] == "take_image":
|
||||
assert len(cmd_path_list) >= 3
|
||||
q.add_log_cmd(
|
||||
"Sending PUS take image action request with "
|
||||
+ cmd_path_list[2]
|
||||
+ " params."
|
||||
)
|
||||
data = bytearray()
|
||||
if cmd_path_list[2] == "default_single":
|
||||
data.extend(make_action_cmd_header(UniqueId.CameraHandler, 1))
|
||||
if cmd_path_list[2] == "balanced_single":
|
||||
data.extend(make_action_cmd_header(UniqueId.CameraHandler, 2))
|
||||
if cmd_path_list[2] == "default_single_flatsat":
|
||||
data.extend(make_action_cmd_header(UniqueId.CameraHandler, 3))
|
||||
if cmd_path_list[2] == "balanced_single_flatsat":
|
||||
data.extend(make_action_cmd_header(UniqueId.CameraHandler, 4))
|
||||
if cmd_path_list[2] == "custom":
|
||||
data.extend(make_action_cmd_header(UniqueId.CameraHandler, 5))
|
||||
params = CameraParameters(8, 8, 8, 1, True, 200, 1000)
|
||||
data.extend(params.serialize_for_uplink())
|
||||
return q.add_pus_tc(
|
||||
PusTelecommand(
|
||||
service=8, subservice=128, apid=EXPERIMENT_APID, app_data=data
|
||||
)
|
||||
)
|
||||
if cmd_path_list[1] == "controller":
|
||||
assert len(cmd_path_list) >= 3
|
||||
data = bytearray()
|
||||
if cmd_path_list[2] == "downlink_logs":
|
||||
data.extend(make_action_cmd_header(UniqueId.Controller, 2))
|
||||
if cmd_path_list[2] == "downlink_last_img":
|
||||
data.extend(make_action_cmd_header(UniqueId.Controller, 3))
|
||||
return q.add_pus_tc(
|
||||
PusTelecommand(
|
||||
service=8, subservice=128, apid=EXPERIMENT_APID, app_data=data
|
||||
)
|
||||
)
|
||||
if cmd_path_list[0] == "cam":
|
||||
create_cam_cmd(q, cmd_path_list[1:])
|
||||
if cmd_path_list[0] == "controller":
|
||||
create_ctrl_cmd(q, cmd_path_list[1:])
|
||||
|
||||
|
||||
def handle_set_mode_cmd(
|
||||
|
Reference in New Issue
Block a user