all import changes

This commit is contained in:
2023-02-01 16:25:17 +01:00
parent 6b657b5623
commit c8ea75b2ad
22 changed files with 203 additions and 212 deletions

View File

@ -1 +0,0 @@

View File

@ -1,107 +0,0 @@
from spacepackets.ecss import PusTelecommand
from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.config.object_ids import BPX_HANDLER_ID
from tmtccmd.config.tmtc import (
tmtc_definitions_provider,
TmtcDefinitionWrapper,
OpCodeEntry,
)
from tmtccmd.tc import service_provider
from tmtccmd.tc.decorator import ServiceProviderParams
from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode
from tmtccmd.tc.pus_200_fsfw_mode import Subservice as ModeSubservices
class BpxSetId:
GET_HK_SET = 0
GET_CFG_SET = 5
class BpxActionId:
REBOOT = 2
RESET_COUNTERS = 3
SET_CFG = 4
GET_CFG = 5
class BpxOpCode:
HK = ["0", "hk"]
OFF = ["off"]
ON = ["on"]
RST_BOOT_CNT = ["1", "rst_boot_cnt"]
REQUEST_CFG = ["2", "cfg"]
REQUEST_CFG_HK = ["3", "cfg_hk"]
REBOOT = ["4", "reboot"]
@tmtc_definitions_provider
def add_bpx_cmd_definitions(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
oce.add(keys=BpxOpCode.ON, info="On command")
oce.add(keys=BpxOpCode.OFF, info="Off command")
oce.add(keys=BpxOpCode.HK, info="Request BPX HK")
oce.add(keys=BpxOpCode.RST_BOOT_CNT, info="Reset Boot Count")
oce.add(keys=BpxOpCode.REQUEST_CFG, info="Request Configuration Struct (Step 1)")
oce.add(
keys=BpxOpCode.REQUEST_CFG_HK, info="Request Configuration Struct HK (Step 2)"
)
oce.add(keys=BpxOpCode.REBOOT, info="Reboot Command")
defs.add_service(
name=CustomServiceList.BPX_BATTERY.value,
info="BPX Battery Handler",
op_code_entry=oce,
)
@service_provider(CustomServiceList.BPX_BATTERY.value)
def pack_bpx_commands(p: ServiceProviderParams):
op_code = p.op_code
q = p.queue_helper
if op_code in BpxOpCode.HK:
q.add_log_cmd("Requesting BPX battery HK set")
sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetId.GET_HK_SET)
q.add_pus_tc(generate_one_hk_command(sid=sid))
if op_code in BpxOpCode.OFF:
q.add_log_cmd("Off mode")
mode_cmd = pack_mode_data(BPX_HANDLER_ID, Mode.OFF, 0)
q.add_pus_tc(
PusTelecommand(
service=200,
subservice=ModeSubservices.TC_MODE_COMMAND,
app_data=mode_cmd,
)
)
if op_code in BpxOpCode.ON:
q.add_log_cmd("On mode")
mode_cmd = pack_mode_data(BPX_HANDLER_ID, Mode.ON, 0)
q.add_pus_tc(
PusTelecommand(
service=200,
subservice=ModeSubservices.TC_MODE_COMMAND,
app_data=mode_cmd,
)
)
if op_code in BpxOpCode.RST_BOOT_CNT:
q.add_log_cmd("Resetting reboot counters")
q.add_pus_tc(
create_action_cmd(
object_id=BPX_HANDLER_ID, action_id=BpxActionId.RESET_COUNTERS
)
)
if op_code in BpxOpCode.REQUEST_CFG:
q.add_log_cmd("Requesting configuration struct")
q.add_pus_tc(
create_action_cmd(object_id=BPX_HANDLER_ID, action_id=BpxActionId.GET_CFG)
)
if op_code in BpxOpCode.REQUEST_CFG_HK:
q.add_log_cmd("Requesting configuration struct HK")
sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetId.GET_CFG_SET)
q.add_pus_tc(generate_one_hk_command(sid=sid))
if op_code in BpxOpCode.REBOOT:
q.add_log_cmd("Rebooting BPX battery")
q.add_pus_tc(
create_action_cmd(object_id=BPX_HANDLER_ID, action_id=BpxActionId.REBOOT)
)

View File

@ -1,47 +0,0 @@
import logging
from eive_tmtc.config.definitions import CustomServiceList
from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry
from tmtccmd.config.tmtc import tmtc_definitions_provider
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
_LOGGER = logging.getLogger(__name__)
class OpCode:
REQ_OS_HK = ["0", "hk-os"]
RESET_GNSS = ["5", "reset"]
class Info:
REQ_OS_HK = "Request One-Shot HK"
RESET_GNSS = "Reset GNSS using reset pin"
class SetId:
HK = 0
@tmtc_definitions_provider
def add_gps_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
oce.add(keys=OpCode.RESET_GNSS, info=Info.RESET_GNSS)
oce.add(keys=OpCode.REQ_OS_HK, info=Info.REQ_OS_HK)
defs.add_service(
name=CustomServiceList.GPS_CTRL.value,
info="GPS/GNSS Controller",
op_code_entry=oce,
)
def pack_gps_command(object_id: bytes, q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCode.RESET_GNSS:
# TODO: This needs to be re-implemented
_LOGGER.warning("Reset pin handling needs to be re-implemented")
if op_code in OpCode.REQ_OS_HK:
q.add_log_cmd(f"GMSS: {Info.REQ_OS_HK}")
q.add_pus_tc(
generate_one_hk_command(sid=make_sid(object_id=object_id, set_id=SetId.HK))
)

View File

@ -1,30 +0,0 @@
# -*- coding: utf-8 -*-
"""
@file pdec_handler.py
@brief Test commanding of PDEC Handler
@author J. Meier
@date 22.11.2021
"""
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import DefaultPusQueueHelper
class CommandId:
# prints the clcw to the console. Useful for debugging
PRINT_CLCW = bytearray([0x0, 0x0, 0x0, 0x0])
# Print PDEC monitor register
PRINT_PDEC_MON = bytearray([0x0, 0x0, 0x0, 0x1])
def pack_pdec_handler_test(
object_id: bytearray, q: DefaultPusQueueHelper, op_code: str
):
q.add_log_cmd(f"Testing PDEC handler with object id: {object_id.hex()}")
if op_code == "0":
q.add_log_cmd("PDEC Handler: Print CLCW")
command = object_id + CommandId.PRINT_CLCW
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "1":
q.add_log_cmd("PDEC Handler: Print PDEC monitor register")
command = object_id + CommandId.PRINT_PDEC_MON
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))

View File

@ -1,450 +0,0 @@
import enum
import logging
import struct
import time
from typing import Optional
from eive_tmtc.config.definitions import CustomServiceList
from tmtccmd.config import TmtcDefinitionWrapper
from tmtccmd.config.tmtc import OpCodeEntry, tmtc_definitions_provider
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import (
make_sid,
generate_one_diag_command,
enable_periodic_hk_command_with_interval,
disable_periodic_hk_command,
)
from tmtccmd.tc.pus_11_tc_sched import (
create_enable_tc_sched_cmd,
create_time_tagged_cmd,
)
from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode, Subservice
from tmtccmd.tc.pus_20_fsfw_param import (
pack_scalar_double_param_app_data,
create_load_param_cmd,
pack_boolean_parameter_app_data,
)
from spacepackets.ecss.tc import PusTelecommand
from eive_tmtc.config.object_ids import PL_PCDU_ID
_LOGGER = logging.getLogger(__name__)
class OpCode:
SWITCH_HPA_ON_PROC = ["0", "proc_hpa"]
SWITCH_ON = ["2", "on"]
SWITCH_OFF = ["3", "off"]
NORMAL_SSR = ["4", "nml_ssr"]
NORMAL_DRO = ["5", "nml_dro"]
NORMAL_X8 = ["6", "nml_x8"]
NORMAL_TX = ["7", "nml_tx"]
NORMAL_MPA = ["8", "nml_mpa"]
NORMAL_HPA = ["9", "nml_hpa"]
ENABLE_HK = ["enable_hk"]
DISABLE_HK = ["disable_hk"]
REQ_OS_HK = ["hk_os"]
INJECT_SSR_TO_DRO_FAILURE = ["10", "inject_ssr_dro_fault"]
INJECT_DRO_TO_X8_FAILURE = ["11", "inject_dro_x8_fault"]
INJECT_X8_TO_TX_FAILURE = ["12", "inject_x8_tx_fault"]
INJECT_TX_TO_MPA_FAILURE = ["13", "inject_tx_mpa_fault"]
INJECT_MPA_TO_HPA_FAILURE = ["14", "inject_mpa_hpa_fault"]
INJECT_ALL_ON_FAILURE = ["15", "inject_all_on_fault"]
class Info:
NORMAL = "ADC modules normal"
SWITCH_ON = "Switching on"
SWITCH_OFF = "Switching off"
NORMAL_SSR = f"{NORMAL}, SSR on"
NORMAL_DRO = f"{NORMAL},DRO on"
NORMAL_X8 = f"{NORMAL}, X8 on"
NORMAL_TX = f"{NORMAL}, TX on"
NORMAL_MPA = f"{NORMAL}, MPA on"
NORMAL_HPA = f"{NORMAL}, HPA on"
REQ_OS_HK = "Request One Shot HK"
SWITCH_HPA_ON_PROC = "Switch HPA on procedure"
ENABLE_HK = "Enable HK"
DISABLE_HK = "Disable HK"
class SetId(enum.IntEnum):
ADC = 0
class NormalSubmodesMask(enum.IntEnum):
SOLID_STATE_RELAYS_ADC_ON = 0
DRO_ON = 1
X8_ON = 2
TX_ON = 3
MPA_ON = 4
HPA_ON = 5
class ParamIds(enum.IntEnum):
NEG_V_LOWER_BOUND = 0
NEG_V_UPPER_BOUND = 1
DRO_U_LOWER_BOUND = 2
DRO_U_UPPER_BOUND = 3
DRO_I_UPPER_BOUND = 4
X8_U_LOWER_BOUND = 5
X8_U_UPPER_BOUND = 6
X8_I_UPPER_BOUND = 7
TX_U_LOWER_BOUND = 8
TX_U_UPPER_BOUND = 9
TX_I_UPPER_BOUND = 10
MPA_U_LOWER_BOUND = 11
MPA_U_UPPER_BOUND = 12
MPA_I_UPPER_BOUND = 13
HPA_U_LOWER_BOUND = 14
HPA_U_UPPER_BOUND = 15
HPA_I_UPPER_BOUND = 16
SSR_TO_DRO_WAIT_TIME = 17
DRO_TO_X8_WAIT_TIME = 18
X8_TO_TX_WAIT_TIME = 19
TX_TO_MPA_WAIT_TIME = 20
MPA_TO_HPA_WAIT_TIME = 21
INJECT_SSR_TO_DRO_FAILURE = 30
INJECT_DRO_TO_X8_FAILURE = 31
INJECT_X8_TO_TX_FAILURE = 32
INJECT_TX_TO_MPA_FAILURE = 33
INJECT_MPA_TO_HPA_FAILURE = 34
INJECT_ALL_ON_FAILURE = 35
@tmtc_definitions_provider
def add_pl_pcdu_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
oce.add(keys=OpCode.SWITCH_HPA_ON_PROC, info=Info.SWITCH_HPA_ON_PROC)
oce.add(keys=OpCode.SWITCH_ON, info=Info.SWITCH_ON)
oce.add(keys=OpCode.SWITCH_OFF, info=Info.SWITCH_OFF)
oce.add(keys=OpCode.NORMAL_SSR, info=Info.NORMAL_SSR)
oce.add(keys=OpCode.NORMAL_DRO, info=Info.NORMAL_DRO)
oce.add(keys=OpCode.NORMAL_X8, info=Info.NORMAL_X8)
oce.add(keys=OpCode.NORMAL_TX, info=Info.NORMAL_TX)
oce.add(keys=OpCode.NORMAL_MPA, info=Info.NORMAL_MPA)
oce.add(keys=OpCode.NORMAL_HPA, info=Info.NORMAL_HPA)
oce.add(keys=OpCode.REQ_OS_HK, info=Info.REQ_OS_HK)
oce.add(keys=OpCode.ENABLE_HK, info=Info.ENABLE_HK)
oce.add(
keys=OpCode.INJECT_SSR_TO_DRO_FAILURE,
info="Inject failure SSR to DRO transition",
)
oce.add(
keys=OpCode.INJECT_DRO_TO_X8_FAILURE,
info="Inject failure in DRO to X8 transition",
)
oce.add(
keys=OpCode.INJECT_X8_TO_TX_FAILURE,
info="Inject failure in X8 to TX transition",
)
oce.add(
keys=OpCode.INJECT_TX_TO_MPA_FAILURE,
info="Inject failure in TX to MPA transition",
)
oce.add(
keys=OpCode.INJECT_MPA_TO_HPA_FAILURE,
info="Inject failure in MPA to HPA transition",
)
oce.add(keys=OpCode.INJECT_ALL_ON_FAILURE, info="Inject failure in all on mode")
defs.add_service(CustomServiceList.PL_PCDU.value, "PL PCDU", oce)
def pack_pl_pcdu_commands(q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCode.SWITCH_ON:
pack_pl_pcdu_mode_cmd(q=q, info=Info.SWITCH_ON, mode=Mode.ON, submode=0)
if op_code in OpCode.SWITCH_OFF:
pack_pl_pcdu_mode_cmd(q=q, info=Info.SWITCH_OFF, mode=Mode.OFF, submode=0)
if op_code in OpCode.ENABLE_HK:
interval = float(
input("Please enter HK collection interval in floating point seconds: ")
)
cmds = enable_periodic_hk_command_with_interval(
diag=True, sid=make_sid(PL_PCDU_ID, SetId.ADC), interval_seconds=interval
)
q.add_log_cmd(f"Enable PL PCDU HK with interval of {interval} seconds")
for cmd in cmds:
q.add_pus_tc(cmd)
if op_code in OpCode.DISABLE_HK:
cmd = disable_periodic_hk_command(
diag=True, sid=make_sid(PL_PCDU_ID, SetId.ADC)
)
q.add_log_cmd("Disabling PL PCDU HK")
q.add_pus_tc(cmd)
if op_code in OpCode.NORMAL_SSR:
pack_pl_pcdu_mode_cmd(
q=q,
info=Info.NORMAL_SSR,
mode=Mode.NORMAL,
submode=submode_mask_to_submode(
NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON
),
)
if op_code in OpCode.NORMAL_DRO:
pack_pl_pcdu_mode_cmd(
q=q,
info=Info.NORMAL_DRO,
mode=Mode.NORMAL,
submode=submode_mask_to_submode(NormalSubmodesMask.DRO_ON),
)
if op_code in OpCode.NORMAL_X8:
pack_pl_pcdu_mode_cmd(
q=q,
info=Info.NORMAL_X8,
mode=Mode.NORMAL,
submode=submode_mask_to_submode(NormalSubmodesMask.X8_ON),
)
if op_code in OpCode.NORMAL_TX:
pack_pl_pcdu_mode_cmd(
q=q,
info=Info.NORMAL_TX,
mode=Mode.NORMAL,
submode=submode_mask_to_submode(NormalSubmodesMask.TX_ON),
)
if op_code in OpCode.NORMAL_MPA:
pack_pl_pcdu_mode_cmd(
q=q,
info=Info.NORMAL_MPA,
mode=Mode.NORMAL,
submode=submode_mask_to_submode(NormalSubmodesMask.MPA_ON),
)
if op_code in OpCode.NORMAL_HPA:
pack_pl_pcdu_mode_cmd(
q=q,
info=Info.NORMAL_HPA,
mode=Mode.NORMAL,
submode=submode_mask_to_submode(NormalSubmodesMask.HPA_ON),
)
if op_code in OpCode.REQ_OS_HK:
q.add_log_cmd(f"PL PCDU: {Info.REQ_OS_HK}")
q.add_pus_tc(
generate_one_diag_command(
sid=make_sid(object_id=PL_PCDU_ID, set_id=SetId.ADC)
)
)
if op_code in OpCode.SWITCH_HPA_ON_PROC:
hpa_on_procedure(q)
if op_code in OpCode.INJECT_ALL_ON_FAILURE:
pack_failure_injection_cmd(
q=q,
param_id=ParamIds.INJECT_ALL_ON_FAILURE,
print_str="All On",
)
def hpa_on_procedure(q: DefaultPusQueueHelper):
delay_dro_to_x8 = request_wait_time()
if delay_dro_to_x8 is None:
delay_dro_to_x8 = 900
q.add_log_cmd(
f"Starting procedure to switch on PL PCDU HPA with DRO to X8 "
f"delay of {delay_dro_to_x8} seconds"
)
pl_pcdu_on = PusTelecommand(
service=200,
subservice=Subservice.TC_MODE_COMMAND,
app_data=pack_mode_data(object_id=PL_PCDU_ID, mode=Mode.ON, submode=0),
)
ssr_on = PusTelecommand(
service=200,
subservice=Subservice.TC_MODE_COMMAND,
app_data=pack_mode_data(
object_id=PL_PCDU_ID,
mode=Mode.NORMAL,
submode=submode_mask_to_submode(
NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON
),
),
)
dro_on = PusTelecommand(
service=200,
subservice=Subservice.TC_MODE_COMMAND,
app_data=pack_mode_data(
object_id=PL_PCDU_ID,
mode=Mode.NORMAL,
submode=submode_mask_to_submode(NormalSubmodesMask.DRO_ON),
),
)
x8_on = PusTelecommand(
service=200,
subservice=Subservice.TC_MODE_COMMAND,
app_data=pack_mode_data(
object_id=PL_PCDU_ID,
mode=Mode.NORMAL,
submode=submode_mask_to_submode(NormalSubmodesMask.X8_ON),
),
)
tx_on = PusTelecommand(
service=200,
subservice=Subservice.TC_MODE_COMMAND,
app_data=pack_mode_data(
object_id=PL_PCDU_ID,
mode=Mode.NORMAL,
submode=submode_mask_to_submode(NormalSubmodesMask.TX_ON),
),
)
mpa_on = PusTelecommand(
service=200,
subservice=Subservice.TC_MODE_COMMAND,
app_data=pack_mode_data(
object_id=PL_PCDU_ID,
mode=Mode.NORMAL,
submode=submode_mask_to_submode(NormalSubmodesMask.MPA_ON),
),
)
hpa_on = PusTelecommand(
service=200,
subservice=Subservice.TC_MODE_COMMAND,
app_data=pack_mode_data(
object_id=PL_PCDU_ID,
mode=Mode.NORMAL,
submode=submode_mask_to_submode(NormalSubmodesMask.HPA_ON),
),
)
current_time = time.time()
enb_sched = create_enable_tc_sched_cmd()
sched_time = int(round(current_time + 10))
q.add_pus_tc(enb_sched)
tagged_on_cmd = create_time_tagged_cmd(
release_time=struct.pack("!I", sched_time),
tc_to_insert=pl_pcdu_on,
)
q.add_pus_tc(tagged_on_cmd)
sched_time += 5
tagged_ssr_cmd = create_time_tagged_cmd(
release_time=struct.pack("!I", sched_time),
tc_to_insert=ssr_on,
)
q.add_pus_tc(tagged_ssr_cmd)
sched_time += 5
tagged_dro_cmd = create_time_tagged_cmd(
release_time=struct.pack("!I", sched_time), tc_to_insert=dro_on
)
q.add_pus_tc(tagged_dro_cmd)
sched_time += delay_dro_to_x8
sched_time = int(round(sched_time))
tagged_x8_cmd = create_time_tagged_cmd(
release_time=struct.pack("!I", sched_time), tc_to_insert=x8_on
)
q.add_pus_tc(tagged_x8_cmd)
sched_time += 5
tagged_tx_cmd = create_time_tagged_cmd(
release_time=struct.pack("!I", sched_time), tc_to_insert=tx_on
)
q.add_pus_tc(tagged_tx_cmd)
sched_time += 5
tagged_mpa_cmd = create_time_tagged_cmd(
release_time=struct.pack("!I", sched_time), tc_to_insert=mpa_on
)
q.add_pus_tc(tagged_mpa_cmd)
sched_time += 5
tagged_hpa_cmd = create_time_tagged_cmd(
release_time=struct.pack("!I", sched_time), tc_to_insert=hpa_on
)
q.add_pus_tc(tagged_hpa_cmd)
def request_wait_time() -> Optional[float]:
while True:
wait_time = input("Please enter DRO to X8 wait time in seconds, x to cancel: ")
if wait_time.lower() == "x":
return None
try:
wait_time = float(wait_time)
except ValueError:
_LOGGER.warning("Invalid input")
continue
if wait_time <= 0:
_LOGGER.warning("Invalid input")
else:
return wait_time
def submode_mask_to_submode(on_tgt: NormalSubmodesMask) -> int:
if on_tgt == NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON:
return 1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON
if on_tgt == NormalSubmodesMask.DRO_ON:
return 1 << NormalSubmodesMask.DRO_ON | (
1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON
)
if on_tgt == NormalSubmodesMask.X8_ON:
return (
1 << NormalSubmodesMask.DRO_ON
| (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON)
| (1 << NormalSubmodesMask.X8_ON)
)
if on_tgt == NormalSubmodesMask.TX_ON:
return (
1 << NormalSubmodesMask.DRO_ON
| (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON)
| (1 << NormalSubmodesMask.X8_ON)
| (1 << NormalSubmodesMask.TX_ON)
)
if on_tgt == NormalSubmodesMask.MPA_ON:
return (
1 << NormalSubmodesMask.DRO_ON
| (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON)
| (1 << NormalSubmodesMask.X8_ON)
| (1 << NormalSubmodesMask.TX_ON)
| (1 << NormalSubmodesMask.MPA_ON)
)
if on_tgt == NormalSubmodesMask.HPA_ON:
return (
1 << NormalSubmodesMask.DRO_ON
| (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON)
| (1 << NormalSubmodesMask.X8_ON)
| (1 << NormalSubmodesMask.TX_ON)
| (1 << NormalSubmodesMask.MPA_ON)
| (1 << NormalSubmodesMask.HPA_ON)
)
def pack_wait_time_cmd(q: DefaultPusQueueHelper, param_id: int, print_str: str):
wait_time = request_wait_time()
q.add_log_cmd(f"Updating {print_str} wait time to {wait_time}")
if wait_time is None:
return
param_data = pack_scalar_double_param_app_data(
object_id=PL_PCDU_ID,
domain_id=0,
unique_id=param_id,
parameter=wait_time,
)
q.add_pus_tc(create_load_param_cmd(app_data=param_data))
def pack_failure_injection_cmd(q: DefaultPusQueueHelper, param_id: int, print_str: str):
q.add_log_cmd(f"Inserting {print_str} error")
param_data = pack_boolean_parameter_app_data(
object_id=PL_PCDU_ID, domain_id=0, unique_id=param_id, parameter=True
)
q.add_pus_tc(create_load_param_cmd(app_data=param_data))
def pack_pl_pcdu_mode_cmd(
q: DefaultPusQueueHelper, info: str, mode: Mode, submode: int
):
q.add_log_cmd(info)
mode_data = pack_mode_data(object_id=PL_PCDU_ID, mode=mode, submode=submode)
q.add_pus_tc(
PusTelecommand(
service=200, subservice=Subservice.TC_MODE_COMMAND, app_data=mode_data
)
)

View File

@ -1,97 +0,0 @@
# -*- coding: utf-8 -*-
"""
@file rad_sensor.py
@brief Tests for the radiation sensor handler
@author J. Meier
@date 01.07.2021
"""
import struct
from eive_tmtc.config.definitions import CustomServiceList
from spacepackets.ecss.tc import PusTelecommand
from eive_tmtc.pus_tc.service_200_mode import pack_mode_data, Mode
from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry
from tmtccmd.config.tmtc import tmtc_definitions_provider
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
from tmtccmd.util import ObjectIdU32
class SetId:
HK = 3
class OpCode:
ON = ["0", "on"]
NORMAL = ["1", "normal"]
OFF = ["2", "off"]
REQ_HK_ONCE = ["3", "hk-os"]
DEBUG_ON = ["10", "dbg-on"]
DEBUG_OFF = ["11", "dbg-off"]
class Info:
ON = "Switch Rad Sensor on"
NORMAL = "Switch Rad Sensor normal"
OFF = "Switch Rad sensor off"
REQ_OS_HK = "Request one-shot HK"
DEBUG_ON = "Switch debug output on"
DEBUG_OFF = "Switch debug output off"
class CommandId:
START_CONVERSIONS = 2
READ_CONVERSIONS = 3
ENABLE_DEBUG_OUTPUT = 4
DISABLE_DEBUG_OUTPUT = 5
@tmtc_definitions_provider
def add_rad_sens_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
oce.add(info=Info.ON, keys=OpCode.ON)
oce.add(info=Info.OFF, keys=OpCode.OFF)
oce.add(info=Info.NORMAL, keys=OpCode.NORMAL)
oce.add(info=Info.REQ_OS_HK, keys=OpCode.REQ_HK_ONCE)
oce.add(info=Info.DEBUG_ON, keys=OpCode.DEBUG_ON)
oce.add(info=Info.DEBUG_OFF, keys=OpCode.DEBUG_OFF)
defs.add_service(
name=CustomServiceList.RAD_SENSOR.value,
info="Radiation Sensor",
op_code_entry=oce,
)
def pack_rad_sensor_test_into(
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
q.add_log_cmd(f"Commanding Radiation sensor handler {object_id}")
if op_code in OpCode.ON:
rad_sensor_mode_cmd(object_id, Mode.ON, Info.ON, q)
if op_code in OpCode.NORMAL:
rad_sensor_mode_cmd(object_id, Mode.NORMAL, Info.NORMAL, q)
if op_code in OpCode.OFF:
rad_sensor_mode_cmd(object_id, Mode.OFF, Info.OFF, q)
if op_code in OpCode.REQ_HK_ONCE:
q.add_log_cmd(f"Rad sensor: {Info.REQ_OS_HK}")
q.add_pus_tc(
generate_one_hk_command(sid=make_sid(object_id.as_bytes, set_id=SetId.HK))
)
if op_code in OpCode.DEBUG_ON:
q.add_log_cmd(f"Rad sensor: {Info.DEBUG_ON}")
command = object_id.as_bytes + struct.pack("!I", CommandId.ENABLE_DEBUG_OUTPUT)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code in OpCode.DEBUG_OFF:
q.add_log_cmd(f"Rad sensor: {Info.DEBUG_OFF}")
command = object_id.as_bytes + struct.pack("!I", CommandId.DISABLE_DEBUG_OUTPUT)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
def rad_sensor_mode_cmd(
object_id: ObjectIdU32, mode: Mode, info: str, q: DefaultPusQueueHelper
):
q.add_log_cmd(f"Rad sensor: {info}")
mode_data = pack_mode_data(object_id.as_bytes, mode, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))

View File

@ -1,116 +0,0 @@
from typing import Optional
import struct
from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.pus_tc.devs.pdec_handler import CommandId
from spacepackets.ecss import PusTelecommand
from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry
from tmtccmd.config.tmtc import tmtc_definitions_provider
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.util import ObjectIdU32
from tmtccmd.tc.pus_200_fsfw_mode import Mode, pack_mode_data, Subservice
import eive_tmtc.config.object_ids as oids
from eive_tmtc.config.object_ids import get_object_ids
RTD_IDS = [
oids.RTD_0_PLOC_HSPD,
oids.RTD_1_PLOC_MISSIONBRD,
oids.RTD_2_4K_CAM,
oids.RTD_3_DAC_HSPD,
oids.RTD_4_STR,
oids.RTD_5_RW1_MX_MY,
oids.RTD_6_DRO,
oids.RTD_7_SCEX,
oids.RTD_8_X8,
oids.RTD_9_HPA,
oids.RTD_10_PL_TX,
oids.RTD_11_MPA,
oids.RTD_12_ACU,
oids.RTD_13_PLPCDU_HSPD,
oids.RTD_14_TCS_BRD,
oids.RTD_15_IMTQ,
]
class CommandId:
WRITE_CONFIG = 6
class OpCode:
ON = ["0", "on"]
OFF = ["1", "off"]
NORMAL = ["2", "normal"]
WRITE_CONFIG = ["3", "Write config"]
class Info:
ON = "Switch handler on"
OFF = "Switch handler off"
NORMAL = "Switch handler normal"
WRITE_CONFIG = "Write config"
@tmtc_definitions_provider
def specify_rtd_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
oce.add(keys=OpCode.ON, info=Info.ON)
oce.add(keys=OpCode.NORMAL, info=Info.NORMAL)
oce.add(keys=OpCode.OFF, info=Info.OFF)
oce.add(keys=OpCode.WRITE_CONFIG, info=Info.WRITE_CONFIG)
defs.add_service(
name=CustomServiceList.RTD.value, info="RTD commands", op_code_entry=oce
)
def pack_rtd_commands(
op_code: str, object_id: Optional[ObjectIdU32], q: DefaultPusQueueHelper
):
if object_id is not None and object_id not in RTD_IDS:
print("Specified object ID not a valid RTD ID")
object_id = None
if object_id is None:
tgt_rtd_idx = prompt_rtd_idx()
object_id_dict = get_object_ids()
object_id = object_id_dict.get(RTD_IDS[tgt_rtd_idx])
if op_code in OpCode.ON:
app_data = pack_mode_data(object_id=object_id.as_bytes, mode=Mode.ON, submode=0)
q.add_pus_tc(
PusTelecommand(
service=200, subservice=Subservice.TC_MODE_COMMAND, app_data=app_data
)
)
if op_code in OpCode.NORMAL:
app_data = pack_mode_data(
object_id=object_id.as_bytes, mode=Mode.NORMAL, submode=0
)
q.add_pus_tc(
PusTelecommand(
service=200, subservice=Subservice.TC_MODE_COMMAND, app_data=app_data
)
)
if op_code in OpCode.OFF:
app_data = pack_mode_data(
object_id=object_id.as_bytes, mode=Mode.OFF, submode=0
)
q.add_pus_tc(
PusTelecommand(
service=200, subservice=Subservice.TC_MODE_COMMAND, app_data=app_data
)
)
if op_code in OpCode.WRITE_CONFIG:
command = object_id.as_bytes + struct.pack("!I", CommandId.WRITE_CONFIG)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
def prompt_rtd_idx():
while True:
rtd_idx = input("Please specify RTD index [0-15]: ")
if not rtd_idx.isdigit():
print("Invalid input")
continue
rtd_idx = int(rtd_idx)
if rtd_idx < 0 or rtd_idx > 15:
print("Invalid device index")
continue
return rtd_idx

View File

@ -1,208 +0,0 @@
import enum
import json
from spacepackets.ecss import PusTelecommand
from eive_tmtc.config.definitions import CustomServiceList
from tmtccmd.config.tmtc import tmtc_definitions_provider
from tmtccmd.tc.pus_200_fsfw_mode import Mode, pack_mode_data, Subservice
from tmtccmd.tc import service_provider
from tmtccmd.tc.decorator import ServiceProviderParams
from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd
from tmtccmd.config import OpCodeEntry, TmtcDefinitionWrapper
from eive_tmtc.config.object_ids import SCEX_HANDLER_ID
USE_SCEX_CONF_FILE = True
class OpCode:
PING = ["0", "ping"]
ION_CMD = ["1", "ion"]
TEMP_CMD = ["2", "temp"]
EXP_STATUS_CMD = ["3", "expstatus"]
ONE_CELLS_CMD = ["4", "onecell"]
ALL_CELLS_CMD = ["5", "allcells"]
FRAM = ["6", "fram"]
SWITCH_ON = ["7", "on"]
SWITCH_OFF = ["8", "off"]
class ActionId(enum.IntEnum):
PING = 7
ION_CMD = 4
TEMP_CMD = 3
EXP_STATUS_CMD = 2
ONE_CELLS_CMD = 6
ALL_CELLS_CMD = 5
FRAM = 1
class Info:
PING = "Send Ping command"
ION_CMD = "Read Ion"
TEMP_CMD = "Read Temperature"
EXP_STATUS_CMD = "Read Experiment Status"
ONE_CELLS_CMD = "One Cell"
ALL_CELLS_CMD = "All Cells"
FRAM = "Read FRAM"
SWITCH_ON = "Switch Scex on"
SWITCH_OFF = "Switch Scex off"
@tmtc_definitions_provider
def add_scex_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
oce.add(keys=OpCode.PING, info=Info.PING)
oce.add(keys=OpCode.ION_CMD, info=Info.ION_CMD)
oce.add(keys=OpCode.TEMP_CMD, info=Info.TEMP_CMD)
oce.add(keys=OpCode.EXP_STATUS_CMD, info=Info.EXP_STATUS_CMD)
oce.add(keys=OpCode.ONE_CELLS_CMD, info=Info.ONE_CELLS_CMD)
oce.add(keys=OpCode.ALL_CELLS_CMD, info=Info.ALL_CELLS_CMD)
oce.add(keys=OpCode.FRAM, info=Info.FRAM)
oce.add(keys=OpCode.SWITCH_ON, info=Info.SWITCH_ON)
oce.add(keys=OpCode.SWITCH_OFF, info=Info.SWITCH_OFF)
defs.add_service(
name=CustomServiceList.SCEX.value, info="SCEX Device", op_code_entry=oce
)
@service_provider(CustomServiceList.SCEX.value)
def pack_scex_cmds(p: ServiceProviderParams):
op_code = p.op_code
q = p.queue_helper
if op_code in OpCode.SWITCH_ON:
q.add_log_cmd(Info.SWITCH_ON)
q.add_pus_tc(
PusTelecommand(
service=200,
subservice=Subservice.TC_MODE_COMMAND,
app_data=pack_mode_data(SCEX_HANDLER_ID, Mode.ON, 0),
)
)
if op_code in OpCode.SWITCH_OFF:
q.add_log_cmd(Info.SWITCH_OFF)
q.add_pus_tc(
PusTelecommand(
service=200,
subservice=Subservice.TC_MODE_COMMAND,
app_data=pack_mode_data(SCEX_HANDLER_ID, Mode.OFF, 0),
)
)
if op_code in OpCode.PING:
q.add_log_cmd(Info.PING)
app_data = bytes([0])
q.add_pus_tc(create_action_cmd(SCEX_HANDLER_ID, ActionId.PING, app_data))
if op_code in OpCode.ION_CMD:
q.add_log_cmd(Info.ION_CMD)
app_data = bytes([0])
q.add_pus_tc(create_action_cmd(SCEX_HANDLER_ID, ActionId.ION_CMD, app_data))
if op_code in OpCode.TEMP_CMD:
q.add_log_cmd(Info.TEMP_CMD)
app_data = bytes([0])
q.add_pus_tc(create_action_cmd(SCEX_HANDLER_ID, ActionId.TEMP_CMD, app_data))
if op_code in OpCode.EXP_STATUS_CMD:
q.add_log_cmd(Info.EXP_STATUS_CMD)
app_data = bytes([0])
q.add_pus_tc(
create_action_cmd(SCEX_HANDLER_ID, ActionId.EXP_STATUS_CMD, app_data)
)
# one cell
if op_code in OpCode.ONE_CELLS_CMD:
q.add_log_cmd(Info.ONE_CELLS_CMD)
app_data = bytearray([0])
# cell number
cn = 0
while True:
cell_select = input("Which solar cell should be measured? (1-10): ")
if not cell_select.isdigit():
print("Invalid cell number. Try again.")
continue
cell_select = int(cell_select)
if cell_select < 1 or cell_select > 10:
print(
f"Invalid cell number {cell_select}, "
f"Please enter a valid number: "
)
continue
cn = cell_select - 1
break
if USE_SCEX_CONF_FILE:
with open("template/scex_conf.json") as json_file:
json_data = json.load(json_file)
first_dac = json_data["first_dac"]
last_dac = json_data["last_dac"]
res_switch1 = json_data["res_switch1"]
res_switch2 = json_data["res_switch2"]
dac_weight1 = json_data["dac_weight1"]
dac_weight2 = json_data["dac_weight2"]
dac_weight3 = json_data["dac_weight3"]
# in app_data
# app_data.extend(struct.pack("!H", first_dac))
app_data.append(cell_select)
append_16_bit_val(packet=app_data, val=first_dac[cn])
append_16_bit_val(packet=app_data, val=last_dac[cn])
append_16_bit_val(packet=app_data, val=res_switch1[cn])
append_16_bit_val(packet=app_data, val=res_switch2[cn])
app_data.append(dac_weight1[cn])
app_data.append(dac_weight2[cn])
app_data.append(dac_weight3[cn])
q.add_pus_tc(
create_action_cmd(SCEX_HANDLER_ID, ActionId.ONE_CELLS_CMD, app_data)
)
if op_code in OpCode.ALL_CELLS_CMD:
q.add_log_cmd(Info.ALL_CELLS_CMD)
app_data = bytearray([0])
# cell number
cn = 0
if USE_SCEX_CONF_FILE:
with open("template/scex_conf.json") as json_file:
json_data = json.load(json_file)
first_dac = json_data["first_dac"]
last_dac = json_data["last_dac"]
res_switch1 = json_data["res_switch1"]
res_switch2 = json_data["res_switch2"]
dac_weight1 = json_data["dac_weight1"]
dac_weight2 = json_data["dac_weight2"]
dac_weight3 = json_data["dac_weight3"]
# in app_data
# app_data.extend(struct.pack("!H", first_dac))
append_16_bit_val(packet=app_data, val=first_dac[cn])
append_16_bit_val(packet=app_data, val=last_dac[cn])
append_16_bit_val(packet=app_data, val=res_switch1[cn])
append_16_bit_val(packet=app_data, val=res_switch2[cn])
app_data.append(dac_weight1[cn])
app_data.append(dac_weight2[cn])
app_data.append(dac_weight3[cn])
q.add_pus_tc(
create_action_cmd(SCEX_HANDLER_ID, ActionId.ALL_CELLS_CMD, app_data)
)
if op_code in OpCode.FRAM:
q.add_log_cmd(Info.FRAM)
app_data = bytes([0])
q.add_pus_tc(create_action_cmd(SCEX_HANDLER_ID, ActionId.FRAM, app_data))
def append_16_bit_val(packet: bytearray, val: int):
packet.append((val >> 8) & 0xFF)
packet.append(val & 0xFF)

View File

@ -1,40 +0,0 @@
# -*- coding: utf-8 -*-
"""
@file str_img_helper.py
@brief Commanding of the star tracker image helper object which is responsible for uploading
and downloading images to/from the star tracker.
@details Images to uplaod must be previously transferred to the OBC with the CFDP protocol.
Also downloaded images will be stored on the filesystem of the OBC and can be transferred via CFDP.
@author J. Meier
@date 29.11.2021
"""
import struct
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.util import ObjectIdU32
class Commands:
UPLOAD_IMAGE = 0
DOWNLOAD_IMAGE = 1
class ImagePathDefs:
uploadFile = "/mnt/sd0/startracker/gemma.bin"
def pack_str_img_helper_command(
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
q.add_log_cmd(
f"Testing star tracker image helper object id: {object_id.as_hex_string}"
)
if op_code == "0":
q.add_log_cmd("Star tracker image helper: Upload image")
command = (
object_id.as_bytes
+ struct.pack("!I", Commands.UPLOAD_IMAGE)
+ bytearray(ImagePathDefs.uploadFile, "utf-8")
)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))

View File

@ -1,2 +0,0 @@
class SetId:
HK = 3

View File

@ -1,65 +0,0 @@
# -*- coding: utf-8 -*-
"""
@file tmp1075.py
@brief TMP1075 tests
@author J. Meier
@date 06.01.2021
"""
import enum
from spacepackets.ecss.tc import PusTelecommand
from eive_tmtc.pus_tc.service_200_mode import pack_mode_data
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_mode import Mode
from tmtccmd.pus.s8_fsfw_funccmd import make_action_id
from tmtccmd.util import ObjectIdU32
class Tmp1075TestProcedure:
"""
@brief Use this class to define the tests to perform for the Tmp1075.
@details Setting all to True will run all tests.
Setting all to False will only run the tests set to True.
"""
all = False
start_adc_conversion = False
get_temp = False
set_mode_normal = (
True # Setting mode to normal starts continuous temperature reading
)
set_mode_on = False # If mode is MODE_ON, temperature will only be read on command
class Tmp1075ActionId(enum.IntEnum):
GET_TEMP = 1
START_ADC_CONV = 2
def pack_tmp1075_test_into(
object_id: ObjectIdU32, op_code: str, q: DefaultPusQueueHelper
):
q.add_log_cmd(
f"Testing Tmp1075 Temperature Sensor Handler with object id: {object_id.as_hex_string}"
)
obyt = object_id.as_bytes
if Tmp1075TestProcedure.all or Tmp1075TestProcedure.start_adc_conversion:
q.add_log_cmd("TMP1075: Starting new temperature conversion")
command = obyt + make_action_id(Tmp1075ActionId.GET_TEMP)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if Tmp1075TestProcedure.all or Tmp1075TestProcedure.get_temp:
q.add_log_cmd("TMP1075: Read temperature")
command = obyt + make_action_id(Tmp1075ActionId.START_ADC_CONV)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if Tmp1075TestProcedure.set_mode_normal:
q.add_log_cmd("TMP1075: Set Mode Normal")
mode_data = pack_mode_data(obyt, Mode.NORMAL, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
if Tmp1075TestProcedure.set_mode_on:
q.add_log_cmd("TMP1075: Set Mode On")
mode_data = pack_mode_data(obyt, Mode.ON, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
return q

View File

@ -4,8 +4,8 @@ import logging
from typing import cast
from eive_tmtc.tmtc.power.power import pack_power_commands
from eive_tmtc.pus_tc.devs.rtd import pack_rtd_commands
from eive_tmtc.pus_tc.devs.scex import pack_scex_cmds
from eive_tmtc.tmtc.tcs.rtd import pack_rtd_commands
from eive_tmtc.tmtc.payload.scex import pack_scex_cmds
from eive_tmtc.pus_tc.system.controllers import (
pack_cmd_ctrl_to_prompted_mode,
get_object_from_op_code,
@ -13,7 +13,6 @@ from eive_tmtc.pus_tc.system.controllers import (
from eive_tmtc.tmtc.tcs.subsystem import pack_tcs_sys_commands
from tmtccmd import DefaultProcedureInfo, TcHandlerBase
from tmtccmd.config import CoreServiceList
from tmtccmd.logging import get_console_logger
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.decorator import (
route_to_registered_service_handlers,
@ -65,11 +64,11 @@ from eive_tmtc.config.object_ids import (
get_object_ids,
)
from eive_tmtc.pus_tc.devs.tmp1075 import pack_tmp1075_test_into
from eive_tmtc.pus_tc.devs.gps import pack_gps_command
from eive_tmtc.pus_tc.devs.rad_sensor import pack_rad_sensor_test_into
from eive_tmtc.pus_tc.devs.plpcdu import pack_pl_pcdu_commands
from eive_tmtc.pus_tc.devs.str_img_helper import pack_str_img_helper_command
from eive_tmtc.tmtc.tcs.tmp1075 import pack_tmp1075_test_into
from eive_tmtc.tmtc.acs.gps import pack_gps_command
from eive_tmtc.tmtc.payload.rad_sensor import pack_rad_sensor_test_into
from eive_tmtc.tmtc.power.plpcdu import pack_pl_pcdu_commands
from eive_tmtc.tmtc.acs.str_img_helper import pack_str_img_helper_command
from eive_tmtc.pus_tc.system.proc import pack_proc_commands
import eive_tmtc.config.object_ids as oids

View File

@ -39,14 +39,16 @@ from eive_tmtc.tmtc.acs.imtq import pack_imtq_test_into, pack_dipole_command
from eive_tmtc.tmtc.acs.star_tracker import pack_star_tracker_commands
from eive_tmtc.tmtc.acs.reaction_wheels import pack_rw_ass_cmds, pack_set_speed_command
from eive_tmtc.pus_tc.devs.sus import SetId
from eive_tmtc.pus_tc.devs.bpx_batt import BpxSetId
from eive_tmtc.pus_tc.devs.rad_sensor import SetId as RadSetIds
from eive_tmtc.pus_tc.devs.mgms import MgmLis3SetId as MgmLis3SetIds_0_2
from eive_tmtc.pus_tc.devs.mgms import MgmRm3100SetId as MgmRm3100SetIds_1_3
from eive_tmtc.pus_tc.devs.gyros import AdisGyroSetId as AdisGyroSetIds_0_2
from eive_tmtc.pus_tc.devs.gyros import L3gGyroSetId as L3gGyroSetIds_1_3
from eive_tmtc.pus_tc.devs.gps import SetId as GpsSetIds
from eive_tmtc.tmtc.acs.sus import SetId
from eive_tmtc.tmtc.power.bpx_batt import BpxSetId
from eive_tmtc.tmtc.payload.rad_sensor import SetId as RadSetIds
from eive_tmtc.tmtc.acs.mgms import MgmLis3SetId as MgmLis3SetIds_0_2
from eive_tmtc.tmtc.acs.mgms import MgmRm3100SetId as MgmRm3100SetIds_1_3
from eive_tmtc.tmtc.acs.gyros import (
AdisGyroSetId as AdisGyroSetIds_0_2,
L3gGyroSetId as L3gGyroSetIds_1_3,
)
from eive_tmtc.tmtc.acs.gps import SetId as GpsSetIds
class OpCode: