Merge pull request 'Update SA DEPL commands' (#127) from update_sa_depl_cmds into main

Reviewed-on: #127
This commit is contained in:
Robin Müller 2023-01-20 14:21:59 +01:00
commit a2cd9245a2
3 changed files with 101 additions and 22 deletions

View File

@ -17,7 +17,7 @@ from tmtccmd.pus.s11_tc_sched import (
from tmtccmd.tc.pus_3_fsfw_hk import * from tmtccmd.tc.pus_3_fsfw_hk import *
import eive_tmtc.config.object_ids as oids import eive_tmtc.config.object_ids as oids
from eive_tmtc.tmtc.tcs import OpCode as TcsOpCodes, pack_tcs_sys_commands from eive_tmtc.tmtc.tcs import OpCodeAssy as TcsOpCodes, pack_tcs_sys_commands
from eive_tmtc.pus_tc.devs.bpx_batt import BpxSetId from eive_tmtc.pus_tc.devs.bpx_batt import BpxSetId
from eive_tmtc.tmtc.core import SetId as CoreSetIds from eive_tmtc.tmtc.core import SetId as CoreSetIds
from eive_tmtc.tmtc.power.common_power import SetId as GsSetIds from eive_tmtc.tmtc.power.common_power import SetId as GsSetIds

View File

@ -41,6 +41,7 @@ class SetId(enum.IntEnum):
CTRL_VAL_DATA = 8 CTRL_VAL_DATA = 8
ACTUATOR_CMD_DATA = 9 ACTUATOR_CMD_DATA = 9
class Submode(enum.IntEnum): class Submode(enum.IntEnum):
SAFE = 2 SAFE = 2
DETUMBLE = 3 DETUMBLE = 3

View File

@ -9,12 +9,13 @@ import struct
from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.config.object_ids import SOLAR_ARRAY_DEPLOYMENT_ID from eive_tmtc.config.object_ids import SOLAR_ARRAY_DEPLOYMENT_ID
from spacepackets.ecss import PusTelecommand
from tmtccmd.config.tmtc import ( from tmtccmd.config.tmtc import (
tmtc_definitions_provider, tmtc_definitions_provider,
TmtcDefinitionWrapper, TmtcDefinitionWrapper,
OpCodeEntry, OpCodeEntry,
) )
from tmtccmd.tc import service_provider from tmtccmd.tc import service_provider, DefaultPusQueueHelper
from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd
from tmtccmd.tc.decorator import ServiceProviderParams from tmtccmd.tc.decorator import ServiceProviderParams
from tmtccmd import get_console_logger from tmtccmd import get_console_logger
@ -24,10 +25,14 @@ LOGGER = get_console_logger()
class OpCode: class OpCode:
MANUAL_DEPLOYMENT = "man_depl" MANUAL_DEPLOYMENT = "man_depl"
BURN_SA_0_ONLY = "burn_sa_0"
BURN_SA_1_ONLY = "burn_sa_1"
class Info: class Info:
MANUAL_DEPLOYMENT = "Manual Solar Array Deployment" MANUAL_DEPLOYMENT = "Manual Solar Array Deployment"
BURN_SA_0_ONLY = "Only burn SA0"
BURN_SA_1_ONLY = "Only burn SA1"
class ActionId: class ActionId:
@ -38,6 +43,8 @@ class ActionId:
def add_sa_depl_cmds(defs: TmtcDefinitionWrapper): def add_sa_depl_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry() oce = OpCodeEntry()
oce.add(keys=OpCode.MANUAL_DEPLOYMENT, info=Info.MANUAL_DEPLOYMENT) oce.add(keys=OpCode.MANUAL_DEPLOYMENT, info=Info.MANUAL_DEPLOYMENT)
oce.add(keys=OpCode.BURN_SA_0_ONLY, info=Info.BURN_SA_0_ONLY)
oce.add(keys=OpCode.BURN_SA_1_ONLY, info=Info.BURN_SA_1_ONLY)
defs.add_service( defs.add_service(
name=CustomServiceList.SA_DEPLYOMENT, name=CustomServiceList.SA_DEPLYOMENT,
info="Solar Array Deployment", info="Solar Array Deployment",
@ -48,31 +55,102 @@ def add_sa_depl_cmds(defs: TmtcDefinitionWrapper):
@service_provider(CustomServiceList.SA_DEPLYOMENT) @service_provider(CustomServiceList.SA_DEPLYOMENT)
def pack_solar_array_deployment_test_into(p: ServiceProviderParams): def pack_solar_array_deployment_test_into(p: ServiceProviderParams):
q = p.queue_helper q = p.queue_helper
user_data = bytearray() op_code = p.op_code
switch_interval_ms = 0
if op_code == OpCode.MANUAL_DEPLOYMENT:
while True: while True:
burn_time = int(input("Please specify burn time in seconds [0-120 secs]: ")) burn_time_secs = prompt_burn_time()
if burn_time < 0 or burn_time > 120: if burn_time_secs < 0:
LOGGER.warning(f"Invalid burn time {burn_time}")
continue continue
user_data.extend(struct.pack("!I", burn_time)) # Default configuration: Burn each side for half of the burn time.
switch_interval_ms = int(round(burn_time_secs * 0.5 * 1000))
break break
while True: while True:
dry_run = input("Dry run? [y/n]: ") dry_run = prompt_dry_run()
if dry_run in ["yes", "y", "1"]: if dry_run < 0:
dry_run = 1
elif dry_run in ["no", "n", "0"]:
dry_run = 0
else:
LOGGER.warning("Invalid input for dry run parameter")
continue continue
user_data.append(dry_run) dry_run = bool(dry_run)
break break
if dry_run == 1: if dry_run:
dry_run_str = " as dry run" dry_run_str = " as dry run"
else: else:
dry_run_str = "" dry_run_str = ""
q.add_log_cmd(f"Testing S/A Deployment with burn time {burn_time}{dry_run_str}") q.add_log_cmd(
command = create_action_cmd( f"Testing S/A Deployment with burn time {burn_time_secs}{dry_run_str}"
)
q.add_pus_tc(
pack_manual_array_depl_cmd(burn_time_secs, switch_interval_ms, dry_run)
)
elif op_code in OpCode.BURN_SA_0_ONLY:
burn_one_channel_only(q, 0)
elif op_code in OpCode.BURN_SA_1_ONLY:
burn_one_channel_only(q, 1)
def prompt_burn_time() -> int:
burn_time = int(input("Please specify burn time in seconds [0-120 secs]: "))
if burn_time < 0 or burn_time > 120:
LOGGER.warning(f"Invalid burn time {burn_time}")
return -1
return burn_time
def prompt_dry_run() -> int:
dry_run = input("Dry run? [y/n]: ")
if dry_run in ["yes", "y", "1"]:
return 1
elif dry_run in ["no", "n", "0"]:
return 0
else:
LOGGER.warning("Invalid input for dry run parameter")
return -1
def burn_one_channel_only(q: DefaultPusQueueHelper, channel: int):
while True:
burn_time_secs = prompt_burn_time()
if burn_time_secs < 0:
continue
break
while True:
dry_run = prompt_dry_run()
if dry_run < 0:
continue
dry_run = bool(dry_run)
break
if dry_run:
dry_run_str = " as dry run"
else:
dry_run_str = ""
q.add_log_cmd(
f"Testing S/A Deployment Channel {channel} only with "
f"burn time {burn_time_secs}{dry_run_str}"
)
q.add_pus_tc(pack_one_channel_only_cmd(burn_time_secs, channel, dry_run))
def pack_one_channel_only_cmd(
burn_time_seconds: int, channel: int, dry_run: bool
) -> PusTelecommand:
user_data = bytearray()
user_data.extend(struct.pack("!I", burn_time_seconds))
# Burn channel for the entire time
user_data.extend(struct.pack("!I", round(int((burn_time_seconds + 1) * 1000))))
user_data.append(channel)
user_data.append(dry_run)
return create_action_cmd(
SOLAR_ARRAY_DEPLOYMENT_ID, ActionId.MANUAL_DEPLOYMENT, user_data
)
def pack_manual_array_depl_cmd(
burn_time_seconds: int, channel_switch_interval_ms: int, dry_run: bool
) -> PusTelecommand:
user_data = bytearray()
user_data.extend(struct.pack("!I", burn_time_seconds))
user_data.extend(struct.pack("!I", channel_switch_interval_ms))
user_data.append(0)
user_data.append(dry_run)
return create_action_cmd(
SOLAR_ARRAY_DEPLOYMENT_ID, ActionId.MANUAL_DEPLOYMENT, user_data SOLAR_ARRAY_DEPLOYMENT_ID, ActionId.MANUAL_DEPLOYMENT, user_data
) )
q.add_pus_tc(command)