Compare commits

...

10 Commits

13 changed files with 137 additions and 82 deletions

View File

@ -10,6 +10,19 @@ list yields a list of all related PRs for each release.
# [unreleased]
# [v2.12.0] 2023-02-06
## Changed
- Updated the subsystem IDs to avoid value clashes with regular device handler mode IDs.
# [v2.11.0] 2023-02-06
## Fixed
- TMP1075 comands: Implement proper prompt for device select.
- TMP1075 commands: Add OFF, ON, NORMAL, and HK command
# [v2.10.0] 2023-02-03
tmtccmd v4.0.0rc0

View File

@ -3,10 +3,10 @@ from pathlib import Path
SW_NAME = "eive-tmtc"
VERSION_MAJOR = 2
VERSION_MINOR = 10
VERSION_MINOR = 12
VERSION_REVISION = 0
__version__ = "2.10.0"
__version__ = "2.12.0"
EIVE_TMTC_ROOT = Path(__file__).parent
PACKAGE_ROOT = EIVE_TMTC_ROOT.parent

View File

@ -38,8 +38,6 @@ class CustomServiceList(str, enum.Enum):
ACS = "acs"
COM_SS = "com"
BPX_BATTERY = "bpx"
TMP1075_1 = "tmp1075_1"
TMP1075_2 = "tmp1075_2"
HEATER = "heater"
IMTQ = "imtq"
PLOC_SUPV = "ploc_supv"
@ -53,7 +51,7 @@ class CustomServiceList(str, enum.Enum):
REACTION_WHEEL_4 = "rw_4"
RW_ASSEMBLY = "rw_ass"
RAD_SENSOR = "rad_sensor"
GPS_CTRL = "gnss-ctrl"
GPS_CTRL = "gps"
PLOC_MEMORY_DUMPER = "ploc_memory_dumper"
CORE = "core"
STAR_TRACKER = "star_tracker"
@ -71,6 +69,7 @@ class CustomServiceList(str, enum.Enum):
TIME = "time"
PROCEDURE = "proc"
RTD = "rtd"
TMP1075 = "tcs_tmp"
TVTTESTPROCEDURE = "tvtestproc"
CONTROLLERS = "controllers"
SCEX = "scex"

View File

@ -5,9 +5,10 @@
"""
import logging
import os.path
from typing import Dict
from eive_tmtc import EIVE_TMTC_ROOT
from tmtccmd.util.obj_id import ObjectIdDictT
from tmtccmd.util.obj_id import ObjectIdDictT, ObjectIdU32
from tmtccmd.fsfw import parse_fsfw_objects_csv
@ -30,8 +31,10 @@ SCEX_HANDLER_ID = bytes([0x44, 0x33, 0x00, 0x32])
# Thermal Object IDs
THERMAL_CONTROLLER_ID = bytes([0x43, 0x40, 0x00, 0x01])
HEATER_CONTROLLER_ID = bytes([0x44, 0x41, 0x00, 0xA4])
TMP_1075_1_HANDLER_ID = bytes([0x44, 0x42, 0x00, 0x04])
TMP_1075_2_HANDLER_ID = bytes([0x44, 0x42, 0x00, 0x05])
TMP1075_HANDLER_TCS_BRD_0_ID = bytes([0x44, 0x42, 0x00, 0x04])
TMP1075_HANDLER_TCS_BRD_1_ID = bytes([0x44, 0x42, 0x00, 0x05])
TMP1075_HANDLER_PLPCDU_0_ID = bytes([0x44, 0x42, 0x00, 0x06])
TMP1075_HANDLER_IF_BRD_ID = bytes([0x44, 0x42, 0x00, 0x08])
# Communication Object IDs
SYRLINKS_HANDLER_ID = bytes([0x44, 0x53, 0x00, 0xA3])
@ -139,7 +142,10 @@ ACS_CONTROLLER = bytes([0x43, 0x00, 0x00, 0x02])
CORE_CONTROLLER_ID = bytes([0x43, 0x00, 0x00, 0x03])
def get_object_ids() -> ObjectIdDictT:
ObjectIdDict = Dict[bytes, ObjectIdU32]
def get_object_ids() -> ObjectIdDict:
global __OBJECT_ID_DICT
if not os.path.exists(DEFAULT_OBJECTS_CSV_PATH):
logging.getLogger(__name__).warning(

View File

@ -23,14 +23,6 @@ def get_eive_service_op_code_dict() -> TmtcDefinitionWrapper:
return def_wrapper
@tmtc_definitions_provider
def add_tmp_sens_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
oce.add("0", "TMP1075 Tests")
defs.add_service(CustomServiceList.TMP1075_1.value, "TMP1075 1", oce)
defs.add_service(CustomServiceList.TMP1075_2.value, "TMP1075 2", oce)
@tmtc_definitions_provider
def add_pdec_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()

View File

@ -45,8 +45,10 @@ from eive_tmtc.config.object_ids import (
PDU_1_HANDLER_ID,
PDU_2_HANDLER_ID,
ACU_HANDLER_ID,
TMP_1075_1_HANDLER_ID,
TMP_1075_2_HANDLER_ID,
TMP1075_HANDLER_TCS_BRD_0_ID,
TMP1075_HANDLER_TCS_BRD_1_ID,
TMP1075_HANDLER_PLPCDU_0_ID,
TMP1075_HANDLER_IF_BRD_ID,
HEATER_CONTROLLER_ID,
IMTQ_HANDLER_ID,
RW1_ID,
@ -74,6 +76,8 @@ from eive_tmtc.pus_tc.system.proc import pack_proc_commands
import eive_tmtc.config.object_ids as oids
from tmtccmd.util import ObjectIdU32
from eive_tmtc.utility.input_helper import InputHelper
def handle_default_procedure(
tc_base: TcHandlerBase,
@ -103,13 +107,16 @@ def handle_default_procedure(
return pack_acu_commands(object_id=object_id, q=queue_helper, op_code=op_code)
if service == CustomServiceList.TCS.value:
return pack_tcs_sys_commands(q=queue_helper, op_code=op_code)
if service == CustomServiceList.TMP1075_1.value:
object_id = cast(ObjectIdU32, obj_id_man.get(TMP_1075_1_HANDLER_ID))
return pack_tmp1075_test_into(
object_id=object_id, q=queue_helper, op_code=op_code
)
if service == CustomServiceList.TMP1075_2.value:
object_id = cast(ObjectIdU32, obj_id_man.get(TMP_1075_2_HANDLER_ID))
if service == CustomServiceList.TMP1075.value:
menu_dict = {
"0": ("TMP1075 TCS Board 0", TMP1075_HANDLER_TCS_BRD_0_ID),
"1": ("TMP1075 TCS Board 1", TMP1075_HANDLER_TCS_BRD_1_ID),
"2": ("TMP1075 PL PCDU 0", TMP1075_HANDLER_PLPCDU_0_ID),
"4": ("TMP1075 IF Board", TMP1075_HANDLER_IF_BRD_ID),
}
input_helper = InputHelper(menu_dict)
tmp_select = input_helper.get_key()
object_id = obj_id_man.get(menu_dict[tmp_select][1])
return pack_tmp1075_test_into(
object_id=object_id, q=queue_helper, op_code=op_code
)

View File

@ -514,8 +514,6 @@ def handle_raw_mgm_data(pw: PrintWrapper, hk_data: bytes):
for entry in zip(print_str_list, formatted_list):
pw.dlog(f"{entry[0].ljust(28)}: {entry[1]}")
current_idx += 1
if PERFORM_MGM_CALIBRATION:
perform_mgm_calibration(pw, mgm_0_lis3_floats_ut)
assert current_idx == 61
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=6)
@ -528,31 +526,48 @@ def handle_mgm_data_processed(pw: PrintWrapper, hk_data: bytes):
pw.dlog("Recieved HK set too small")
return
current_idx = 0
for i in range(5):
fmt_str = "!fff"
inc_len = struct.calcsize(fmt_str)
mgm_vec = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
mgm_vec = [f"{val:8.3f}" for val in mgm_vec]
pw.dlog(f"MGM {i}: {mgm_vec}")
current_idx += inc_len
fmt_str = "!fff"
inc_len = struct.calcsize(fmt_str)
mgm_0 = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len])
mgm_0_str = [f"{val:8.3f}" for val in mgm_0]
pw.dlog(f"MGM 0 Vec: {mgm_0_str}")
current_idx += inc_len
mgm_1 = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len])
mgm_1_str = [f"{val:8.3f}" for val in mgm_1]
pw.dlog(f"MGM 1 Vec: {mgm_1_str}")
current_idx += inc_len
mgm_2 = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len])
mgm_2_str = [f"{val:8.3f}" for val in mgm_2]
pw.dlog(f"MGM 2 Vec: {mgm_2_str}")
current_idx += inc_len
mgm_3 = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len])
mgm_3_str = [f"{val:8.3f}" for val in mgm_3]
pw.dlog(f"MGM 3 Vec: {mgm_3_str}")
current_idx += inc_len
mgm_4 = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len])
mgm_4_str = [f"{val:8.3f}" for val in mgm_4]
pw.dlog(f"MGM 4 Vec: {mgm_4_str}")
current_idx += inc_len
fmt_str = "!ddd"
inc_len = struct.calcsize(fmt_str)
mgm_vec_tot = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
mgm_vec_tot = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len])
mgm_vec_tot = [f"{val:8.3f}" for val in mgm_vec_tot]
current_idx += inc_len
pw.dlog(f"MGM Total Vec: {mgm_vec_tot}")
mgm_vec_tot_deriv = struct.unpack(
fmt_str, hk_data[current_idx : current_idx + inc_len]
fmt_str, hk_data[current_idx: current_idx + inc_len]
)
mgm_vec_tot_deriv = [f"{val:8.3f}" for val in mgm_vec_tot_deriv]
pw.dlog(f"MGM Total Vec Deriv: {mgm_vec_tot_deriv}")
current_idx += inc_len
mag_igrf_model = struct.unpack(
fmt_str, hk_data[current_idx : current_idx + inc_len]
fmt_str, hk_data[current_idx: current_idx + inc_len]
)
mag_igrf_model = [f"{val:8.3f}" for val in mag_igrf_model]
pw.dlog(f"MAG IGRF Model: {mag_igrf_model}")
current_idx += inc_len
if PERFORM_MGM_CALIBRATION:
perform_mgm_calibration(pw, mgm_3)
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=8)

View File

@ -13,8 +13,8 @@ _LOGGER = logging.getLogger(__name__)
class OpCode:
REQ_OS_HK = ["0", "hk-os"]
RESET_GNSS = ["5", "reset"]
REQ_OS_HK = ["hk"]
RESET_GNSS = ["reset"]
class Info:

View File

@ -29,13 +29,13 @@ class OpCode(str, enum.Enum):
class AcsMode(enum.IntEnum):
OFF = 0
SAFE = 2
DETUMBLE = 3
IDLE = 4
PTG_TARGET_NADIR = 5
PTG_TARGET = 6
PTG_TARGET_GS = 7
PTG_TARGET_INERTIAL = 8
SAFE = 10
DETUMBLE = 11
IDLE = 12
PTG_TARGET_NADIR = 13
PTG_TARGET = 14
PTG_TARGET_GS = 15
PTG_TARGET_INERTIAL = 16
class Info(str, enum.Enum):

View File

@ -22,11 +22,11 @@ class ParameterId(enum.IntEnum):
class Submode(enum.IntEnum):
RX_ONLY = 0
RX_AND_TX_DEF_DATARATE = 1
RX_AND_TX_LOW_DATARATE = 2
RX_AND_TX_HIGH_DATARATE = 3
RX_AND_TX_CARRIER_WAVE = 4
RX_ONLY = 10
RX_AND_TX_DEF_DATARATE = 11
RX_AND_TX_LOW_DATARATE = 12
RX_AND_TX_HIGH_DATARATE = 13
RX_AND_TX_CARRIER_WAVE = 14
class OpCode:

View File

@ -12,6 +12,15 @@ from tmtccmd.tc.decorator import ServiceProviderParams
from tmtccmd.tc.pus_200_fsfw_mode import Subservice as ModeSubservice
class ModeId:
OFF = 0
SUPV_ONLY = 10
MPSOC_STREAM = 11
CAM_STREAM = 12
EARTH_OBSV = 13
SCEX = 14
class OpCode(str, enum.Enum):
OFF = "off"
REPORT_ALL_MODES = "report_modes"

View File

@ -7,27 +7,31 @@
"""
import enum
from eive_tmtc.config.definitions import CustomServiceList
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.config.tmtc import (
tmtc_definitions_provider,
TmtcDefinitionWrapper,
OpCodeEntry,
)
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_mode import Mode, pack_mode_data
from tmtccmd.pus.s8_fsfw_funccmd import make_action_id
from tmtccmd.tc.pus_3_fsfw_hk import create_request_one_hk_command, make_sid
from tmtccmd.util import ObjectIdU32
class Tmp1075TestProcedure:
"""
@brief Use this class to define the tests to perform for the Tmp1075.
@details Setting all to True will run all tests.
Setting all to False will only run the tests set to True.
"""
class OpCode:
OFF = "off"
ON = "on"
NML = "nml"
HK = "hk"
all = False
start_adc_conversion = False
get_temp = False
set_mode_normal = (
True # Setting mode to normal starts continuous temperature reading
)
set_mode_on = False # If mode is MODE_ON, temperature will only be read on command
class Info:
OFF = "Off"
ON = "On"
NML = "Normal"
HK = "HK"
class Tmp1075ActionId(enum.IntEnum):
@ -35,30 +39,40 @@ class Tmp1075ActionId(enum.IntEnum):
START_ADC_CONV = 2
class SetId:
TMEPERATURE = 1
def pack_tmp1075_test_into(
object_id: ObjectIdU32, op_code: str, q: DefaultPusQueueHelper
):
q.add_log_cmd(
f"Testing Tmp1075 Temperature Sensor Handler with object id: {object_id.as_hex_string}"
f"Testing Tmp1075 Temperature Sensor Handler with object id: {object_id}"
)
obyt = object_id.as_bytes
if Tmp1075TestProcedure.all or Tmp1075TestProcedure.start_adc_conversion:
q.add_log_cmd("TMP1075: Starting new temperature conversion")
command = obyt + make_action_id(Tmp1075ActionId.GET_TEMP)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if Tmp1075TestProcedure.all or Tmp1075TestProcedure.get_temp:
q.add_log_cmd("TMP1075: Read temperature")
command = obyt + make_action_id(Tmp1075ActionId.START_ADC_CONV)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if Tmp1075TestProcedure.set_mode_normal:
if op_code == OpCode.OFF:
q.add_log_cmd("TMP1075: Set Normal Off")
mode_data = pack_mode_data(obyt, Mode.OFF, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
if op_code == OpCode.NML:
q.add_log_cmd("TMP1075: Set Mode Normal")
mode_data = pack_mode_data(obyt, Mode.NORMAL, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
if Tmp1075TestProcedure.set_mode_on:
if op_code == OpCode.ON:
q.add_log_cmd("TMP1075: Set Mode On")
mode_data = pack_mode_data(obyt, Mode.ON, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
if op_code == OpCode.HK:
q.add_log_cmd("TMP1075: Request One-Shot HK")
q.add_pus_tc(create_request_one_hk_command(make_sid(obyt, SetId.TMEPERATURE)))
return q
@tmtc_definitions_provider
def add_tmp_sens_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
oce.add(OpCode.OFF, Info.OFF)
oce.add(OpCode.ON, Info.ON)
oce.add(OpCode.NML, Info.NML)
oce.add(OpCode.HK, Info.HK)
defs.add_service(CustomServiceList.TMP1075.value, "TMP1075 Temperature Sensor", oce)

View File

@ -6,13 +6,13 @@
@date 13.02.2021
"""
import logging
from typing import Tuple, Dict
_LOGGER = logging.getLogger(__name__)
class InputHelper:
def __init__(self, menu: dict):
def __init__(self, menu: Dict[str, Tuple[str, ...]]):
"""
@brief Constructor
@param menu The menu describing the input options