clean up python commander a bit
This commit is contained in:
parent
43bd77eef0
commit
9e096193dd
15
satrs-example/pytmtc/pytmtc/hk.py
Normal file
15
satrs-example/pytmtc/pytmtc/hk.py
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
import struct
|
||||||
|
from spacepackets.ecss.pus_3_hk import Subservice
|
||||||
|
from spacepackets.ecss import PusService, PusTc
|
||||||
|
|
||||||
|
|
||||||
|
def create_request_one_shot_hk_cmd(apid: int, unique_id: int, set_id: int) -> PusTc:
|
||||||
|
app_data = bytearray()
|
||||||
|
app_data.extend(struct.pack("!I", unique_id))
|
||||||
|
app_data.extend(struct.pack("!I", set_id))
|
||||||
|
return PusTc(
|
||||||
|
service=PusService.S3_HOUSEKEEPING,
|
||||||
|
subservice=Subservice.TC_GENERATE_ONE_PARAMETER_REPORT,
|
||||||
|
apid=apid,
|
||||||
|
app_data=app_data,
|
||||||
|
)
|
25
satrs-example/pytmtc/pytmtc/mgms.py
Normal file
25
satrs-example/pytmtc/pytmtc/mgms.py
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
import enum
|
||||||
|
from typing import List
|
||||||
|
from tmtccmd.tmtc import DefaultPusQueueHelper
|
||||||
|
|
||||||
|
from pytmtc.common import AcsId, Apid
|
||||||
|
from pytmtc.hk import create_request_one_shot_hk_cmd
|
||||||
|
from pytmtc.mode import handle_set_mode_cmd
|
||||||
|
|
||||||
|
|
||||||
|
class SetId(enum.IntEnum):
|
||||||
|
SENSOR_SET = 0
|
||||||
|
|
||||||
|
|
||||||
|
def create_mgm_cmds(q: DefaultPusQueueHelper, cmd_path: List[str]):
|
||||||
|
assert len(cmd_path) >= 3
|
||||||
|
if cmd_path[2] == "hk":
|
||||||
|
if cmd_path[3] == "one_shot_hk":
|
||||||
|
q.add_log_cmd("Sending HK one shot request")
|
||||||
|
q.add_pus_tc(
|
||||||
|
create_request_one_shot_hk_cmd(Apid.ACS, AcsId.MGM_0, SetId.SENSOR_SET)
|
||||||
|
)
|
||||||
|
|
||||||
|
if cmd_path[2] == "mode":
|
||||||
|
if cmd_path[3] == "set_mode":
|
||||||
|
handle_set_mode_cmd(q, "MGM 0", cmd_path[4], Apid.ACS, AcsId.MGM_0)
|
31
satrs-example/pytmtc/pytmtc/mode.py
Normal file
31
satrs-example/pytmtc/pytmtc/mode.py
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
import struct
|
||||||
|
from spacepackets.ecss import PusTc
|
||||||
|
from tmtccmd.pus.s200_fsfw_mode import Mode, Subservice
|
||||||
|
from tmtccmd.tmtc import DefaultPusQueueHelper
|
||||||
|
|
||||||
|
|
||||||
|
def create_set_mode_cmd(apid: int, unique_id: int, mode: int, submode: int) -> PusTc:
|
||||||
|
app_data = bytearray()
|
||||||
|
app_data.extend(struct.pack("!I", unique_id))
|
||||||
|
app_data.extend(struct.pack("!I", mode))
|
||||||
|
app_data.extend(struct.pack("!H", submode))
|
||||||
|
return PusTc(
|
||||||
|
service=200,
|
||||||
|
subservice=Subservice.TC_MODE_COMMAND,
|
||||||
|
apid=apid,
|
||||||
|
app_data=app_data,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def handle_set_mode_cmd(
|
||||||
|
q: DefaultPusQueueHelper, target_str: str, mode_str: str, apid: int, unique_id: int
|
||||||
|
):
|
||||||
|
if mode_str == "off":
|
||||||
|
q.add_log_cmd(f"Sending Mode OFF to {target_str}")
|
||||||
|
q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.OFF, 0))
|
||||||
|
elif mode_str == "on":
|
||||||
|
q.add_log_cmd(f"Sending Mode ON to {target_str}")
|
||||||
|
q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.ON, 0))
|
||||||
|
elif mode_str == "normal":
|
||||||
|
q.add_log_cmd(f"Sending Mode NORMAL to {target_str}")
|
||||||
|
q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.NORMAL, 0))
|
@ -1,5 +1,4 @@
|
|||||||
import datetime
|
import datetime
|
||||||
import struct
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from spacepackets.ccsds import CdsShortTimestamp
|
from spacepackets.ccsds import CdsShortTimestamp
|
||||||
@ -8,7 +7,6 @@ from spacepackets.seqcount import FileSeqCountProvider
|
|||||||
from tmtccmd import ProcedureWrapper, TcHandlerBase
|
from tmtccmd import ProcedureWrapper, TcHandlerBase
|
||||||
from tmtccmd.config import CmdTreeNode
|
from tmtccmd.config import CmdTreeNode
|
||||||
from tmtccmd.pus import VerificationWrapper
|
from tmtccmd.pus import VerificationWrapper
|
||||||
from tmtccmd.pus.tc.s200_fsfw_mode import Mode
|
|
||||||
from tmtccmd.tmtc import (
|
from tmtccmd.tmtc import (
|
||||||
DefaultPusQueueHelper,
|
DefaultPusQueueHelper,
|
||||||
FeedWrapper,
|
FeedWrapper,
|
||||||
@ -18,9 +16,9 @@ from tmtccmd.tmtc import (
|
|||||||
TcQueueEntryType,
|
TcQueueEntryType,
|
||||||
)
|
)
|
||||||
from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd
|
from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd
|
||||||
from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservice
|
|
||||||
|
|
||||||
from pytmtc.common import AcsId, Apid
|
from pytmtc.common import Apid
|
||||||
|
from pytmtc.mgms import create_mgm_cmds
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -115,21 +113,6 @@ def create_cmd_definition_tree() -> CmdTreeNode:
|
|||||||
return root_node
|
return root_node
|
||||||
|
|
||||||
|
|
||||||
def create_set_mode_cmd(
|
|
||||||
apid: int, unique_id: int, mode: int, submode: int
|
|
||||||
) -> PusTelecommand:
|
|
||||||
app_data = bytearray()
|
|
||||||
app_data.extend(struct.pack("!I", unique_id))
|
|
||||||
app_data.extend(struct.pack("!I", mode))
|
|
||||||
app_data.extend(struct.pack("!H", submode))
|
|
||||||
return PusTelecommand(
|
|
||||||
service=200,
|
|
||||||
subservice=ModeSubservice.TC_MODE_COMMAND,
|
|
||||||
apid=apid,
|
|
||||||
app_data=app_data,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str):
|
def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str):
|
||||||
# It should always be at least the root path "/", so we split of the empty portion left of it.
|
# It should always be at least the root path "/", so we split of the empty portion left of it.
|
||||||
cmd_path_list = cmd_path.split("/")[1:]
|
cmd_path_list = cmd_path.split("/")[1:]
|
||||||
@ -165,32 +148,4 @@ def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str):
|
|||||||
if cmd_path_list[0] == "acs":
|
if cmd_path_list[0] == "acs":
|
||||||
assert len(cmd_path_list) >= 2
|
assert len(cmd_path_list) >= 2
|
||||||
if cmd_path_list[1] == "mgms":
|
if cmd_path_list[1] == "mgms":
|
||||||
assert len(cmd_path_list) >= 3
|
create_mgm_cmds(q, cmd_path_list)
|
||||||
if cmd_path_list[2] == "hk":
|
|
||||||
if cmd_path_list[3] == "one_shot_hk":
|
|
||||||
q.add_log_cmd("Sending HK one shot request")
|
|
||||||
# TODO: Fix
|
|
||||||
# q.add_pus_tc(
|
|
||||||
# create_request_one_hk_command(
|
|
||||||
# make_addressable_id(Apid.ACS, AcsId.MGM_SET)
|
|
||||||
# )
|
|
||||||
# )
|
|
||||||
if cmd_path_list[2] == "mode":
|
|
||||||
if cmd_path_list[3] == "set_mode":
|
|
||||||
handle_set_mode_cmd(
|
|
||||||
q, "MGM 0", cmd_path_list[4], Apid.ACS, AcsId.MGM_0
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def handle_set_mode_cmd(
|
|
||||||
q: DefaultPusQueueHelper, target_str: str, mode_str: str, apid: int, unique_id: int
|
|
||||||
):
|
|
||||||
if mode_str == "off":
|
|
||||||
q.add_log_cmd(f"Sending Mode OFF to {target_str}")
|
|
||||||
q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.OFF, 0))
|
|
||||||
elif mode_str == "on":
|
|
||||||
q.add_log_cmd(f"Sending Mode ON to {target_str}")
|
|
||||||
q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.ON, 0))
|
|
||||||
elif mode_str == "normal":
|
|
||||||
q.add_log_cmd(f"Sending Mode NORMAL to {target_str}")
|
|
||||||
q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.NORMAL, 0))
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user