156 lines
4.7 KiB
Python
156 lines
4.7 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
@file solar_array_deployment.py
|
|
@brief The test function in this file simply returns a command which triggers the solar array
|
|
deployment.
|
|
@author J. Meier
|
|
@date 15.02.2021
|
|
"""
|
|
import logging
|
|
import struct
|
|
|
|
from eive_tmtc.config.definitions import CustomServiceList
|
|
from eive_tmtc.config.object_ids import SOLAR_ARRAY_DEPLOYMENT_ID
|
|
from spacepackets.ecss import PusTelecommand
|
|
from tmtccmd.config.tmtc import (
|
|
tmtc_definitions_provider,
|
|
TmtcDefinitionWrapper,
|
|
OpCodeEntry,
|
|
)
|
|
from tmtccmd.tc import service_provider, DefaultPusQueueHelper
|
|
from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd
|
|
from tmtccmd.tc.decorator import ServiceProviderParams
|
|
|
|
|
|
class OpCode:
|
|
MANUAL_DEPLOYMENT = "man_depl"
|
|
BURN_SA_0_ONLY = "burn_sa_0"
|
|
BURN_SA_1_ONLY = "burn_sa_1"
|
|
|
|
|
|
class Info:
|
|
MANUAL_DEPLOYMENT = "Manual Solar Array Deployment"
|
|
BURN_SA_0_ONLY = "Only burn SA0"
|
|
BURN_SA_1_ONLY = "Only burn SA1"
|
|
|
|
|
|
class ActionId:
|
|
MANUAL_DEPLOYMENT = 5
|
|
|
|
|
|
@tmtc_definitions_provider
|
|
def add_sa_depl_cmds(defs: TmtcDefinitionWrapper):
|
|
oce = OpCodeEntry()
|
|
oce.add(keys=OpCode.MANUAL_DEPLOYMENT, info=Info.MANUAL_DEPLOYMENT)
|
|
oce.add(keys=OpCode.BURN_SA_0_ONLY, info=Info.BURN_SA_0_ONLY)
|
|
oce.add(keys=OpCode.BURN_SA_1_ONLY, info=Info.BURN_SA_1_ONLY)
|
|
defs.add_service(
|
|
name=CustomServiceList.SA_DEPLYOMENT,
|
|
info="Solar Array Deployment",
|
|
op_code_entry=oce,
|
|
)
|
|
|
|
|
|
@service_provider(CustomServiceList.SA_DEPLYOMENT)
|
|
def pack_solar_array_deployment_test_into(p: ServiceProviderParams):
|
|
q = p.queue_helper
|
|
op_code = p.op_code
|
|
switch_interval_ms = 0
|
|
if op_code == OpCode.MANUAL_DEPLOYMENT:
|
|
while True:
|
|
burn_time_secs = prompt_burn_time()
|
|
if burn_time_secs < 0:
|
|
continue
|
|
# Default configuration: Burn each side for half of the burn time.
|
|
switch_interval_ms = int(round(burn_time_secs * 0.5 * 1000))
|
|
break
|
|
while True:
|
|
dry_run = prompt_dry_run()
|
|
if dry_run < 0:
|
|
continue
|
|
dry_run = bool(dry_run)
|
|
break
|
|
if dry_run:
|
|
dry_run_str = " as dry run"
|
|
else:
|
|
dry_run_str = ""
|
|
q.add_log_cmd(
|
|
f"Testing S/A Deployment with burn time {burn_time_secs}{dry_run_str}"
|
|
)
|
|
q.add_pus_tc(
|
|
pack_manual_array_depl_cmd(burn_time_secs, switch_interval_ms, dry_run)
|
|
)
|
|
elif op_code in OpCode.BURN_SA_0_ONLY:
|
|
burn_one_channel_only(q, 0)
|
|
elif op_code in OpCode.BURN_SA_1_ONLY:
|
|
burn_one_channel_only(q, 1)
|
|
|
|
|
|
def prompt_burn_time() -> int:
|
|
burn_time = int(input("Please specify burn time in seconds [0-120 secs]: "))
|
|
if burn_time < 0 or burn_time > 120:
|
|
logging.getLogger(__name__).warning(f"Invalid burn time {burn_time}")
|
|
return -1
|
|
return burn_time
|
|
|
|
|
|
def prompt_dry_run() -> int:
|
|
dry_run = input("Dry run? [y/n]: ")
|
|
if dry_run in ["yes", "y", "1"]:
|
|
return 1
|
|
elif dry_run in ["no", "n", "0"]:
|
|
return 0
|
|
else:
|
|
logging.getLogger(__name__).warning("Invalid input for dry run parameter")
|
|
return -1
|
|
|
|
|
|
def burn_one_channel_only(q: DefaultPusQueueHelper, channel: int):
|
|
while True:
|
|
burn_time_secs = prompt_burn_time()
|
|
if burn_time_secs < 0:
|
|
continue
|
|
break
|
|
while True:
|
|
dry_run = prompt_dry_run()
|
|
if dry_run < 0:
|
|
continue
|
|
dry_run = bool(dry_run)
|
|
break
|
|
if dry_run:
|
|
dry_run_str = " as dry run"
|
|
else:
|
|
dry_run_str = ""
|
|
q.add_log_cmd(
|
|
f"Testing S/A Deployment Channel {channel} only with "
|
|
f"burn time {burn_time_secs}{dry_run_str}"
|
|
)
|
|
q.add_pus_tc(pack_one_channel_only_cmd(burn_time_secs, channel, dry_run))
|
|
|
|
|
|
def pack_one_channel_only_cmd(
|
|
burn_time_seconds: int, channel: int, dry_run: bool
|
|
) -> PusTelecommand:
|
|
user_data = bytearray()
|
|
user_data.extend(struct.pack("!I", burn_time_seconds))
|
|
# Burn channel for the entire time
|
|
user_data.extend(struct.pack("!I", round(int((burn_time_seconds + 1) * 1000))))
|
|
user_data.append(channel)
|
|
user_data.append(dry_run)
|
|
return create_action_cmd(
|
|
SOLAR_ARRAY_DEPLOYMENT_ID, ActionId.MANUAL_DEPLOYMENT, user_data
|
|
)
|
|
|
|
|
|
def pack_manual_array_depl_cmd(
|
|
burn_time_seconds: int, channel_switch_interval_ms: int, dry_run: bool
|
|
) -> PusTelecommand:
|
|
user_data = bytearray()
|
|
user_data.extend(struct.pack("!I", burn_time_seconds))
|
|
user_data.extend(struct.pack("!I", channel_switch_interval_ms))
|
|
user_data.append(0)
|
|
user_data.append(dry_run)
|
|
return create_action_cmd(
|
|
SOLAR_ARRAY_DEPLOYMENT_ID, ActionId.MANUAL_DEPLOYMENT, user_data
|
|
)
|