Add ACS Subsystem Commands #117
24
.run/Subsystem IDLE.run.xml
Normal file
24
.run/Subsystem IDLE.run.xml
Normal file
@ -0,0 +1,24 @@
|
||||
<component name="ProjectRunConfigurationManager">
|
||||
<configuration default="false" name="Subsystem IDLE" type="PythonConfigurationType" factoryName="Python" folderName="ACS">
|
||||
<module name="tmtc" />
|
||||
<option name="INTERPRETER_OPTIONS" value="" />
|
||||
<option name="PARENT_ENVS" value="true" />
|
||||
<envs>
|
||||
<env name="PYTHONUNBUFFERED" value="1" />
|
||||
</envs>
|
||||
<option name="SDK_HOME" value="" />
|
||||
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
|
||||
<option name="IS_MODULE_SDK" value="true" />
|
||||
<option name="ADD_CONTENT_ROOTS" value="true" />
|
||||
<option name="ADD_SOURCE_ROOTS" value="true" />
|
||||
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
|
||||
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtcc.py" />
|
||||
<option name="PARAMETERS" value="-s acs_subsystem -o idle -l" />
|
||||
<option name="SHOW_COMMAND_LINE" value="false" />
|
||||
<option name="EMULATE_TERMINAL" value="true" />
|
||||
<option name="MODULE_MODE" value="false" />
|
||||
<option name="REDIRECT_INPUT" value="false" />
|
||||
<option name="INPUT_FILE" value="" />
|
||||
<method v="2" />
|
||||
</configuration>
|
||||
</component>
|
24
.run/Subsystem SAFE.run.xml
Normal file
24
.run/Subsystem SAFE.run.xml
Normal file
@ -0,0 +1,24 @@
|
||||
<component name="ProjectRunConfigurationManager">
|
||||
<configuration default="false" name="Subsystem SAFE" type="PythonConfigurationType" factoryName="Python" folderName="ACS">
|
||||
<module name="tmtc" />
|
||||
<option name="INTERPRETER_OPTIONS" value="" />
|
||||
<option name="PARENT_ENVS" value="true" />
|
||||
<envs>
|
||||
<env name="PYTHONUNBUFFERED" value="1" />
|
||||
</envs>
|
||||
<option name="SDK_HOME" value="" />
|
||||
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
|
||||
<option name="IS_MODULE_SDK" value="true" />
|
||||
<option name="ADD_CONTENT_ROOTS" value="true" />
|
||||
<option name="ADD_SOURCE_ROOTS" value="true" />
|
||||
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
|
||||
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtcc.py" />
|
||||
<option name="PARAMETERS" value="-s acs_subsystem -o safe -l" />
|
||||
<option name="SHOW_COMMAND_LINE" value="false" />
|
||||
<option name="EMULATE_TERMINAL" value="true" />
|
||||
<option name="MODULE_MODE" value="false" />
|
||||
<option name="REDIRECT_INPUT" value="false" />
|
||||
<option name="INPUT_FILE" value="" />
|
||||
<method v="2" />
|
||||
</configuration>
|
||||
</component>
|
@ -58,8 +58,9 @@ class CustomServiceList(str, enum.Enum):
|
||||
STR_IMG_HELPER = "str_img_helper"
|
||||
SYRLINKS = "syrlinks"
|
||||
ACS_CTRL = "acs_ctrl"
|
||||
ACS_ASS = "acs_ass"
|
||||
SUS_ASS = "sus_ass"
|
||||
ACS_SS = "acs_subsystem"
|
||||
ACS_BRD_ASS = "acs_brd_ass"
|
||||
SUS_BRD_ASS = "sus_brd_ass"
|
||||
TCS = "tcs"
|
||||
TCS_ASS = "tcs_ass"
|
||||
TIME = "time"
|
||||
|
@ -181,15 +181,15 @@ Event ID (dec); Event ID (hex); Name; Severity; Description; File Path
|
||||
12709;0x31a5;I_MPA_OUT_OF_BOUNDS;MEDIUM;P1: 0 -> too low, 1 -> too high P2: Float value;mission/devices/PayloadPcduHandler.h
|
||||
12710;0x31a6;U_HPA_OUT_OF_BOUNDS;MEDIUM;P1: 0 -> too low, 1 -> too high P2: Float value;mission/devices/PayloadPcduHandler.h
|
||||
12711;0x31a7;I_HPA_OUT_OF_BOUNDS;MEDIUM;P1: 0 -> too low, 1 -> too high P2: Float value;mission/devices/PayloadPcduHandler.h
|
||||
12800;0x3200;TRANSITION_OTHER_SIDE_FAILED;HIGH;;mission/system/AcsBoardAssembly.h
|
||||
12801;0x3201;NOT_ENOUGH_DEVICES_DUAL_MODE;HIGH;;mission/system/AcsBoardAssembly.h
|
||||
12802;0x3202;POWER_STATE_MACHINE_TIMEOUT;MEDIUM;;mission/system/AcsBoardAssembly.h
|
||||
12803;0x3203;SIDE_SWITCH_TRANSITION_NOT_ALLOWED;LOW;Not implemented, would increase already high complexity. Operator should instead command the assembly off first and then command the assembly on into the desired mode/submode combination;mission/system/AcsBoardAssembly.h
|
||||
12900;0x3264;TRANSITION_OTHER_SIDE_FAILED;HIGH;;mission/system/SusAssembly.h
|
||||
12901;0x3265;NOT_ENOUGH_DEVICES_DUAL_MODE;HIGH;;mission/system/SusAssembly.h
|
||||
12902;0x3266;POWER_STATE_MACHINE_TIMEOUT;MEDIUM;;mission/system/SusAssembly.h
|
||||
12903;0x3267;SIDE_SWITCH_TRANSITION_NOT_ALLOWED;LOW;Not implemented, would increase already high complexity. Operator should instead command the assembly off first and then command the assembly on into the desired mode/submode combination;mission/system/SusAssembly.h
|
||||
13000;0x32c8;CHILDREN_LOST_MODE;MEDIUM;;mission/system/TcsBoardAssembly.h
|
||||
12800;0x3200;TRANSITION_OTHER_SIDE_FAILED;HIGH;;mission/system/objects/AcsBoardAssembly.h
|
||||
12801;0x3201;NOT_ENOUGH_DEVICES_DUAL_MODE;HIGH;;mission/system/objects/AcsBoardAssembly.h
|
||||
12802;0x3202;POWER_STATE_MACHINE_TIMEOUT;MEDIUM;;mission/system/objects/AcsBoardAssembly.h
|
||||
12803;0x3203;SIDE_SWITCH_TRANSITION_NOT_ALLOWED;LOW;Not implemented, would increase already high complexity. Operator should instead command the assembly off first and then command the assembly on into the desired mode/submode combination;mission/system/objects/AcsBoardAssembly.h
|
||||
12900;0x3264;TRANSITION_OTHER_SIDE_FAILED;HIGH;;mission/system/objects/SusAssembly.h
|
||||
12901;0x3265;NOT_ENOUGH_DEVICES_DUAL_MODE;HIGH;;mission/system/objects/SusAssembly.h
|
||||
12902;0x3266;POWER_STATE_MACHINE_TIMEOUT;MEDIUM;;mission/system/objects/SusAssembly.h
|
||||
12903;0x3267;SIDE_SWITCH_TRANSITION_NOT_ALLOWED;LOW;Not implemented, would increase already high complexity. Operator should instead command the assembly off first and then command the assembly on into the desired mode/submode combination;mission/system/objects/SusAssembly.h
|
||||
13000;0x32c8;CHILDREN_LOST_MODE;MEDIUM;;mission/system/objects/TcsBoardAssembly.h
|
||||
13100;0x332c;GPS_FIX_CHANGE;INFO;Fix has changed. P1: Old fix. P2: New fix 0: Not seen, 1: No Fix, 2: 2D-Fix, 3: 3D-Fix;mission/devices/devicedefinitions/GPSDefinitions.h
|
||||
13200;0x3390;P60_BOOT_COUNT;INFO;P60 boot count is broadcasted once at SW startup. P1: Boot count;mission/devices/P60DockHandler.h
|
||||
13201;0x3391;BATT_MODE;INFO;Battery mode is broadcasted at startup. P1: Mode;mission/devices/P60DockHandler.h
|
||||
|
|
@ -122,6 +122,7 @@ SUS_5_N_LOC_XFYMZB_PT_ZB = bytes([0x44, 0x12, 0x00, 0x37])
|
||||
SUS_11_R_LOC_XBYMZB_PT_ZB = bytes([0x44, 0x12, 0x00, 0x43])
|
||||
|
||||
# System and Assembly Objects
|
||||
ACS_SUBSYSTEM_ID = bytes([0x73, 0x01, 0x00, 0x01])
|
||||
ACS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x01])
|
||||
SUS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x02])
|
||||
TCS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x03])
|
||||
|
@ -136,5 +136,7 @@
|
||||
0x73000100;TM_FUNNEL
|
||||
0x73000101;PUS_TM_FUNNEL
|
||||
0x73000102;CFDP_TM_FUNNEL
|
||||
0x73010000;EIVE_SYSTEM
|
||||
0x73010001;ACS_SUBSYSTEM
|
||||
0x73500000;CCSDS_IP_CORE_BRIDGE
|
||||
0xFFFFFFFF;NO_OBJECT
|
||||
|
|
@ -163,65 +163,8 @@ def add_time_cmds(defs: TmtcDefinitionWrapper):
|
||||
|
||||
@tmtc_definitions_provider
|
||||
def add_system_cmds(defs: TmtcDefinitionWrapper):
|
||||
from pus_tc.system.acs import AcsOpCodes, SusOpCodes
|
||||
import pus_tc.system.controllers as controllers
|
||||
|
||||
oce = OpCodeEntry()
|
||||
oce.add(
|
||||
keys=AcsOpCodes.ACS_ASS_A_SIDE,
|
||||
info="Switch to ACS board A side",
|
||||
)
|
||||
oce.add(
|
||||
keys=AcsOpCodes.ACS_ASS_B_SIDE,
|
||||
info="Switch to ACS board B side",
|
||||
)
|
||||
oce.add(
|
||||
keys=AcsOpCodes.ACS_ASS_DUAL_MODE,
|
||||
info="Switch to ACS board dual mode",
|
||||
)
|
||||
oce.add(
|
||||
keys=AcsOpCodes.ACS_ASS_A_ON,
|
||||
info="Switch ACS board A side on",
|
||||
)
|
||||
oce.add(
|
||||
keys=AcsOpCodes.ACS_ASS_B_ON,
|
||||
info="Switch ACS board B side on",
|
||||
)
|
||||
oce.add(
|
||||
keys=AcsOpCodes.ACS_ASS_DUAL_ON,
|
||||
info="Switch ACS board dual mode on",
|
||||
)
|
||||
oce.add(
|
||||
keys=AcsOpCodes.ACS_ASS_OFF,
|
||||
info="Switch off ACS board",
|
||||
)
|
||||
defs.add_service(
|
||||
name=CustomServiceList.ACS_ASS.value, info="ACS Assemblies", op_code_entry=oce
|
||||
)
|
||||
|
||||
oce = OpCodeEntry()
|
||||
oce.add(
|
||||
keys=SusOpCodes.SUS_ASS_NOM_SIDE,
|
||||
info="Switch SUS board to nominal side",
|
||||
)
|
||||
oce.add(
|
||||
keys=SusOpCodes.SUS_ASS_RED_SIDE,
|
||||
info="Switch SUS board to redundant side",
|
||||
)
|
||||
oce.add(
|
||||
keys=SusOpCodes.SUS_ASS_OFF,
|
||||
info="Switch off SUS board",
|
||||
)
|
||||
oce.add(
|
||||
keys=SusOpCodes.SUS_ASS_DUAL_MODE,
|
||||
info="Switch SUS board to dual mode",
|
||||
)
|
||||
defs.add_service(
|
||||
name=CustomServiceList.SUS_ASS.value,
|
||||
info="SUS Assembly",
|
||||
op_code_entry=oce,
|
||||
)
|
||||
|
||||
oce = OpCodeEntry()
|
||||
oce.add(
|
||||
keys=controllers.OpCodes.THERMAL_CONTROLLER,
|
||||
|
@ -38,7 +38,8 @@ from pus_tc.system.core import pack_core_commands
|
||||
from pus_tc.devs.star_tracker import pack_star_tracker_commands
|
||||
from pus_tc.devs.syrlinks_hk_handler import pack_syrlinks_command
|
||||
from pus_tc.devs.gps import pack_gps_command
|
||||
from pus_tc.system.acs import pack_acs_command, pack_sus_cmds
|
||||
from tmtc.acs_board import pack_acs_command
|
||||
from tmtc.sus_board import pack_sus_cmds
|
||||
from pus_tc.devs.plpcdu import pack_pl_pcdu_commands
|
||||
from pus_tc.devs.str_img_helper import pack_str_img_helper_command
|
||||
from pus_tc.system.tcs import pack_tcs_sys_commands
|
||||
@ -185,12 +186,8 @@ def handle_default_procedure(
|
||||
)
|
||||
if service == CustomServiceList.PROCEDURE.value:
|
||||
return pack_proc_commands(q=queue_helper, op_code=op_code)
|
||||
if service == CustomServiceList.SUS_ASS.value:
|
||||
return pack_sus_cmds(q=queue_helper, op_code=op_code)
|
||||
if service == CustomServiceList.PL_PCDU.value:
|
||||
return pack_pl_pcdu_commands(q=queue_helper, op_code=op_code)
|
||||
if service == CustomServiceList.ACS_ASS.value:
|
||||
return pack_acs_command(q=queue_helper, op_code=op_code)
|
||||
if service == CustomServiceList.TCS_ASS.value:
|
||||
return pack_tcs_sys_commands(q=queue_helper, op_code=op_code)
|
||||
if service == CustomServiceList.RW_ASSEMBLY.value:
|
||||
|
@ -4,7 +4,7 @@ from tmtccmd.tc import DefaultPusQueueHelper
|
||||
from tmtccmd.tc.pus_200_fsfw_modes import Modes
|
||||
from tmtccmd.util import ObjectIdU32, ObjectIdBase
|
||||
|
||||
from .common import command_mode
|
||||
from tmtc.common import pack_mode_cmd_with_info
|
||||
import config.object_ids as obj_ids
|
||||
|
||||
from pus_tc.prompt_parameters import prompt_parameters_cli, prompt_parameters_gui
|
||||
@ -36,7 +36,7 @@ def pack_cmd_ctrl_to_prompted_mode(
|
||||
print("Invalid Mode, defaulting to OFF")
|
||||
mode = 0
|
||||
submode = int(parameters["Submode"])
|
||||
command_mode(
|
||||
pack_mode_cmd_with_info(
|
||||
object_id=object_id.as_bytes,
|
||||
mode=mode,
|
||||
submode=submode,
|
||||
@ -48,7 +48,7 @@ def pack_cmd_ctrl_to_prompted_mode(
|
||||
def pack_cmd_ctrl_to_off(
|
||||
q: DefaultPusQueueHelper, object_id: Union[ObjectIdBase, ObjectIdU32]
|
||||
):
|
||||
command_mode(
|
||||
pack_mode_cmd_with_info(
|
||||
object_id=object_id.as_bytes,
|
||||
mode=Modes.OFF,
|
||||
submode=0,
|
||||
@ -58,7 +58,7 @@ def pack_cmd_ctrl_to_off(
|
||||
|
||||
|
||||
def pack_cmd_ctrl_to_on(q: DefaultPusQueueHelper, object_id: ObjectIdU32):
|
||||
command_mode(
|
||||
pack_mode_cmd_with_info(
|
||||
object_id=object_id.as_bytes,
|
||||
mode=Modes.ON,
|
||||
submode=0,
|
||||
@ -70,7 +70,7 @@ def pack_cmd_ctrl_to_on(q: DefaultPusQueueHelper, object_id: ObjectIdU32):
|
||||
def pack_cmd_ctrl_to_nml(
|
||||
q: DefaultPusQueueHelper, object_id: Union[ObjectIdBase, ObjectIdU32]
|
||||
):
|
||||
command_mode(
|
||||
pack_mode_cmd_with_info(
|
||||
object_id=object_id.as_bytes,
|
||||
mode=Modes.NORMAL,
|
||||
submode=0,
|
||||
|
@ -34,7 +34,8 @@ from pus_tc.devs.sus import SetIds
|
||||
from pus_tc.devs.star_tracker import SetIds as StrSetIds
|
||||
from tmtc.acs.reaction_wheels import RwSetIds, rw_speed_up_cmd_consec
|
||||
from pus_tc.system.controllers import pack_cmd_ctrl_to_off, pack_cmd_ctrl_to_nml
|
||||
from pus_tc.system.acs import pack_acs_command, pack_sus_cmds
|
||||
from tmtc.acs_board import pack_acs_command
|
||||
from tmtc.sus_board import pack_sus_cmds
|
||||
from tmtc.acs.imtq import pack_imtq_test_into, pack_dipole_command
|
||||
from pus_tc.devs.star_tracker import pack_star_tracker_commands
|
||||
from tmtc.acs.reaction_wheels import pack_rw_ass_cmds, pack_set_speed_command
|
||||
|
@ -7,11 +7,10 @@ from tmtccmd.tc import DefaultPusQueueHelper
|
||||
from tmtccmd.tc.pus_200_fsfw_modes import Modes
|
||||
from tmtccmd.tc.pus_3_fsfw_hk import (
|
||||
make_sid,
|
||||
generate_one_diag_command,
|
||||
generate_one_hk_command,
|
||||
)
|
||||
|
||||
from .common import command_mode
|
||||
from tmtc.common import pack_mode_cmd_with_info
|
||||
from config.object_ids import TCS_BOARD_ASS_ID, TCS_CONTROLLER
|
||||
|
||||
|
||||
@ -62,7 +61,7 @@ def pack_tcs_sys_commands(q: DefaultPusQueueHelper, op_code: str):
|
||||
|
||||
def pack_tcs_ass_cmds(q: DefaultPusQueueHelper, op_code: str):
|
||||
if op_code in OpCodes.TCS_BOARD_ASS_NORMAL:
|
||||
command_mode(
|
||||
pack_mode_cmd_with_info(
|
||||
object_id=TCS_BOARD_ASS_ID,
|
||||
mode=Modes.NORMAL,
|
||||
submode=0,
|
||||
@ -70,7 +69,7 @@ def pack_tcs_ass_cmds(q: DefaultPusQueueHelper, op_code: str):
|
||||
info=Info.TCS_BOARD_ASS_NORMAL,
|
||||
)
|
||||
if op_code in OpCodes.TCS_BOARD_ASS_OFF:
|
||||
command_mode(
|
||||
pack_mode_cmd_with_info(
|
||||
object_id=TCS_BOARD_ASS_ID,
|
||||
mode=Modes.OFF,
|
||||
submode=0,
|
||||
|
@ -3,6 +3,9 @@ import os.path
|
||||
from datetime import datetime
|
||||
from config.object_ids import get_object_ids
|
||||
from pus_tm.defs import PrintWrapper
|
||||
from pus_tm.verification_handler import generic_retval_printout
|
||||
from tmtc.acs_subsystem import AcsModes
|
||||
from tmtccmd.tc.pus_200_fsfw_modes import Modes
|
||||
|
||||
from tmtccmd.tm import Service5Tm
|
||||
from tmtccmd.logging import get_console_logger
|
||||
@ -50,6 +53,11 @@ def handle_event_packet(raw_tm: bytes, printer: FsfwTmTcPrinter):
|
||||
)
|
||||
LOGGER.info(generic_event_string)
|
||||
specific_handler = True
|
||||
if info.name == "MODE_TRANSITION_FAILED":
|
||||
reason = generic_retval_printout(tm.param_1)
|
||||
for string in reason:
|
||||
pw.dlog(f"Reason from event parameter 1: {string}")
|
||||
pw.dlog(f"Mode, sequence or table: {tm.param_2:#08x}")
|
||||
if info.name == "SUPV_UPDATE_PROGRESS" or info.name == "WRITE_MEMORY_FAILED":
|
||||
additional_event_info = f"Additional info: {info.info}"
|
||||
context = (
|
||||
@ -59,7 +67,28 @@ def handle_event_packet(raw_tm: bytes, printer: FsfwTmTcPrinter):
|
||||
pw.dlog(additional_event_info)
|
||||
pw.dlog(context)
|
||||
if info.name == "MODE_INFO":
|
||||
pw.dlog(f"Mode: {tm.param_1}")
|
||||
mode_name = "Unknown"
|
||||
if obj_name == "ACS_SUBSYSTEM":
|
||||
if tm.param_1 == Modes.OFF:
|
||||
mode_name = "Off"
|
||||
elif tm.param_1 == AcsModes.IDLE:
|
||||
mode_name = "Idle"
|
||||
elif tm.param_1 == AcsModes.DETUMBLE:
|
||||
mode_name = "Detumble"
|
||||
elif tm.param_1 == AcsModes.SAFE:
|
||||
mode_name = "Safe"
|
||||
elif tm.param_1 == AcsModes.TARGET_PT:
|
||||
mode_name = "Target Pointing"
|
||||
else:
|
||||
if tm.param_1 == Modes.OFF:
|
||||
mode_name = "Off"
|
||||
elif tm.param_1 == Modes.ON:
|
||||
mode_name = "On"
|
||||
elif tm.param_1 == Modes.NORMAL:
|
||||
mode_name = "Normal"
|
||||
elif tm.param_1 == Modes.RAW:
|
||||
mode_name = "Raw"
|
||||
pw.dlog(f"Mode Number {tm.param_1}, Mode Name {mode_name}")
|
||||
pw.dlog(f"Submode: {tm.param_2}")
|
||||
else:
|
||||
specific_handler = False
|
||||
|
@ -1,6 +1,7 @@
|
||||
"""Core EIVE TM handler module
|
||||
"""
|
||||
from config.object_ids import get_object_ids
|
||||
from config.retvals import get_retval_dict
|
||||
from spacepackets.ecss import PusTelemetry
|
||||
from spacepackets.ecss.pus_17_test import Service17Tm
|
||||
from spacepackets.util import PrintFormats
|
||||
@ -9,10 +10,12 @@ from tmtccmd.logging.pus import RawTmtcTimedLogWrapper
|
||||
from tmtccmd.pus import VerificationWrapper
|
||||
from tmtccmd.tm import Service20FsfwTm, Service200FsfwTm
|
||||
from tmtccmd.tm.pus_17_test import Service17TmExtended
|
||||
from tmtccmd.tm.pus_200_fsfw_modes import Subservices as ModeSubservices
|
||||
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
|
||||
from .defs import PrintWrapper
|
||||
|
||||
from .event_handler import handle_event_packet
|
||||
from .verification_handler import handle_service_1_fsfw_packet
|
||||
from .verification_handler import handle_service_1_fsfw_packet, generic_retval_printout
|
||||
from .hk_handling import handle_hk_packet
|
||||
from .action_reply_handler import handle_action_reply
|
||||
|
||||
@ -36,6 +39,7 @@ def pus_factory_hook(
|
||||
return
|
||||
service = tm_packet.service
|
||||
obj_id_dict = get_object_ids()
|
||||
pw = PrintWrapper(printer)
|
||||
dedicated_handler = True
|
||||
if service == 1:
|
||||
handle_service_1_fsfw_packet(wrapper=verif_wrapper, raw_tm=packet)
|
||||
@ -55,7 +59,17 @@ def pus_factory_hook(
|
||||
dedicated_handler = False
|
||||
elif service == 200:
|
||||
tm_packet = Service200FsfwTm.unpack(raw_telemetry=packet)
|
||||
dedicated_handler = False
|
||||
if tm_packet.subservice == ModeSubservices.TM_CANT_REACH_MODE:
|
||||
obj_id = tm_packet.object_id
|
||||
obj_id_obj = obj_id_dict.get(obj_id)
|
||||
retval = tm_packet.return_value
|
||||
string_list = generic_retval_printout(retval)
|
||||
pw.dlog(f"Received Mode Reply from {obj_id_obj}: Can't reach mode.")
|
||||
for string in string_list:
|
||||
pw.dlog(f"Reason: {string}")
|
||||
dedicated_handler = True
|
||||
else:
|
||||
dedicated_handler = False
|
||||
else:
|
||||
LOGGER.info(f"The service {service} is not implemented in Telemetry Factory")
|
||||
tm_packet.print_source_data(PrintFormats.HEX)
|
||||
|
@ -1,3 +1,5 @@
|
||||
from typing import List, Optional
|
||||
|
||||
from spacepackets.ecss.pus_1_verification import UnpackParams, Service1Tm
|
||||
from tmtccmd.logging import get_console_logger
|
||||
from tmtccmd.pus import VerificationWrapper
|
||||
@ -25,28 +27,37 @@ def handle_service_1_fsfw_packet(wrapper: VerificationWrapper, raw_tm: bytes):
|
||||
else:
|
||||
wrapper.log_to_console(tm_packet, res)
|
||||
wrapper.log_to_file(tm_packet, res)
|
||||
retval_dict = get_retval_dict()
|
||||
if tm_packet.has_failure_notice:
|
||||
retval_info = retval_dict.get(tm_packet.error_code.val)
|
||||
if retval_info is None:
|
||||
raw_err = tm_packet.error_code.val
|
||||
LOGGER.info(
|
||||
f"No returnvalue information found for error code with subsystem ID"
|
||||
f" {(raw_err >> 8) & 0xff} and unique ID {raw_err & 0xff}"
|
||||
)
|
||||
else:
|
||||
retval_string = (
|
||||
f"Error Code information for code {tm_packet.error_code.val:#06x} | "
|
||||
f"Name: {retval_info.name} | Info: {retval_info.info}"
|
||||
)
|
||||
error_param_1_str = (
|
||||
f"Error Parameter 1: hex {fsfw_wrapper.error_param_1:#010x} "
|
||||
f"dec {fsfw_wrapper.error_param_1} "
|
||||
)
|
||||
error_param_2_str = (
|
||||
f"Error Parameter 2: hex {fsfw_wrapper.error_param_2:#010x} "
|
||||
f"dec {fsfw_wrapper.error_param_2}"
|
||||
)
|
||||
wrapper.dlog(retval_string)
|
||||
wrapper.dlog(error_param_1_str)
|
||||
wrapper.dlog(error_param_2_str)
|
||||
str_list = generic_retval_printout(
|
||||
tm_packet.error_code.val,
|
||||
fsfw_wrapper.error_param_1,
|
||||
fsfw_wrapper.error_param_2,
|
||||
)
|
||||
for string in str_list:
|
||||
wrapper.dlog(string)
|
||||
|
||||
|
||||
def generic_retval_printout(
|
||||
retval: int, p1: Optional[int] = None, p2: Optional[int] = None
|
||||
) -> List[str]:
|
||||
retval_dict = get_retval_dict()
|
||||
retval_info = retval_dict.get(retval)
|
||||
if retval_info is None:
|
||||
raw_err = retval
|
||||
return [
|
||||
f"No returnvalue information found for error code with "
|
||||
f"subsystem ID {(raw_err >> 8) & 0xff} and unique ID {raw_err & 0xff}"
|
||||
]
|
||||
else:
|
||||
retval_string = (
|
||||
f"Error Code information for code {retval:#06x} | "
|
||||
f"Name: {retval_info.name} | Info: {retval_info.info}"
|
||||
)
|
||||
string_list = [retval_string]
|
||||
if p1:
|
||||
error_param_1_str = f"Error Parameter 1: hex {p1:#010x} " f"dec {p1} "
|
||||
string_list.append(error_param_1_str)
|
||||
if p2:
|
||||
error_param_2_str = f"Error Parameter 2: hex {p2:#010x} " f"dec {p2}"
|
||||
string_list.append(error_param_2_str)
|
||||
return string_list
|
||||
|
@ -0,0 +1 @@
|
||||
from .acs_subsystem import add_acs_subsystem_cmds
|
@ -1,27 +1,27 @@
|
||||
import enum
|
||||
|
||||
from tmtccmd.tc import DefaultPusQueueHelper
|
||||
from config.definitions import CustomServiceList
|
||||
from tmtccmd.config.tmtc import (
|
||||
tmtc_definitions_provider,
|
||||
TmtcDefinitionWrapper,
|
||||
OpCodeEntry,
|
||||
)
|
||||
from tmtccmd.tc import DefaultPusQueueHelper, service_provider
|
||||
from tmtccmd.tc.decorator import ServiceProviderParams
|
||||
from tmtccmd.tc.pus_200_fsfw_modes import Modes
|
||||
from config.object_ids import ACS_BOARD_ASS_ID, SUS_BOARD_ASS_ID
|
||||
from config.object_ids import ACS_BOARD_ASS_ID
|
||||
|
||||
from .common import command_mode
|
||||
from tmtc.common import pack_mode_cmd_with_info
|
||||
|
||||
|
||||
class AcsOpCodes:
|
||||
ACS_ASS_A_SIDE = ["0", "a"]
|
||||
ACS_ASS_B_SIDE = ["1", "b"]
|
||||
ACS_ASS_DUAL_MODE = ["2", "dual"]
|
||||
ACS_ASS_DUAL_MODE = ["2", "d"]
|
||||
ACS_ASS_OFF = ["3", "off"]
|
||||
ACS_ASS_A_ON = ["4", "a_on"]
|
||||
ACS_ASS_B_ON = ["5", "b_on"]
|
||||
ACS_ASS_DUAL_ON = ["6", "dual_on"]
|
||||
|
||||
|
||||
class SusOpCodes:
|
||||
SUS_ASS_NOM_SIDE = ["0", "nom"]
|
||||
SUS_ASS_RED_SIDE = ["1", "red"]
|
||||
SUS_ASS_DUAL_MODE = ["2", "dual"]
|
||||
SUS_ASS_OFF = ["3", "off"]
|
||||
ACS_ASS_A_ON = ["4", "ao"]
|
||||
ACS_ASS_B_ON = ["5", "bo"]
|
||||
ACS_ASS_DUAL_ON = ["6", "do"]
|
||||
|
||||
|
||||
class DualSideSubmodes(enum.IntEnum):
|
||||
@ -30,9 +30,12 @@ class DualSideSubmodes(enum.IntEnum):
|
||||
DUAL_SIDE = 2
|
||||
|
||||
|
||||
def pack_acs_command(q: DefaultPusQueueHelper, op_code: str):
|
||||
@service_provider(CustomServiceList.ACS_BRD_ASS)
|
||||
def pack_acs_command(p: ServiceProviderParams):
|
||||
op_code = p.op_code
|
||||
q = p.queue_helper
|
||||
if op_code in AcsOpCodes.ACS_ASS_A_SIDE:
|
||||
command_mode(
|
||||
pack_mode_cmd_with_info(
|
||||
object_id=ACS_BOARD_ASS_ID,
|
||||
mode=Modes.NORMAL,
|
||||
submode=DualSideSubmodes.A_SIDE,
|
||||
@ -40,7 +43,7 @@ def pack_acs_command(q: DefaultPusQueueHelper, op_code: str):
|
||||
info="Switching to ACS board assembly A side",
|
||||
)
|
||||
if op_code in AcsOpCodes.ACS_ASS_B_SIDE:
|
||||
command_mode(
|
||||
pack_mode_cmd_with_info(
|
||||
object_id=ACS_BOARD_ASS_ID,
|
||||
mode=Modes.NORMAL,
|
||||
submode=DualSideSubmodes.B_SIDE,
|
||||
@ -48,7 +51,7 @@ def pack_acs_command(q: DefaultPusQueueHelper, op_code: str):
|
||||
info="Switching to ACS board assembly B side",
|
||||
)
|
||||
if op_code in AcsOpCodes.ACS_ASS_DUAL_MODE:
|
||||
command_mode(
|
||||
pack_mode_cmd_with_info(
|
||||
object_id=ACS_BOARD_ASS_ID,
|
||||
mode=Modes.NORMAL,
|
||||
submode=DualSideSubmodes.DUAL_SIDE,
|
||||
@ -56,7 +59,7 @@ def pack_acs_command(q: DefaultPusQueueHelper, op_code: str):
|
||||
info="Switching to ACS board assembly dual mode",
|
||||
)
|
||||
if op_code in AcsOpCodes.ACS_ASS_A_ON:
|
||||
command_mode(
|
||||
pack_mode_cmd_with_info(
|
||||
object_id=ACS_BOARD_ASS_ID,
|
||||
mode=Modes.ON,
|
||||
submode=DualSideSubmodes.A_SIDE,
|
||||
@ -64,7 +67,7 @@ def pack_acs_command(q: DefaultPusQueueHelper, op_code: str):
|
||||
info="Switching ACS board assembly A side on",
|
||||
)
|
||||
if op_code in AcsOpCodes.ACS_ASS_B_ON:
|
||||
command_mode(
|
||||
pack_mode_cmd_with_info(
|
||||
object_id=ACS_BOARD_ASS_ID,
|
||||
mode=Modes.ON,
|
||||
submode=DualSideSubmodes.B_SIDE,
|
||||
@ -72,7 +75,7 @@ def pack_acs_command(q: DefaultPusQueueHelper, op_code: str):
|
||||
info="Switching ACS board assembly B side on",
|
||||
)
|
||||
if op_code in AcsOpCodes.ACS_ASS_DUAL_ON:
|
||||
command_mode(
|
||||
pack_mode_cmd_with_info(
|
||||
object_id=ACS_BOARD_ASS_ID,
|
||||
mode=Modes.ON,
|
||||
submode=DualSideSubmodes.B_SIDE,
|
||||
@ -80,7 +83,7 @@ def pack_acs_command(q: DefaultPusQueueHelper, op_code: str):
|
||||
info="Switching ACS board assembly dual side on",
|
||||
)
|
||||
if op_code in AcsOpCodes.ACS_ASS_OFF:
|
||||
command_mode(
|
||||
pack_mode_cmd_with_info(
|
||||
object_id=ACS_BOARD_ASS_ID,
|
||||
mode=Modes.OFF,
|
||||
submode=0,
|
||||
@ -89,36 +92,39 @@ def pack_acs_command(q: DefaultPusQueueHelper, op_code: str):
|
||||
)
|
||||
|
||||
|
||||
def pack_sus_cmds(q: DefaultPusQueueHelper, op_code: str):
|
||||
if op_code in SusOpCodes.SUS_ASS_NOM_SIDE:
|
||||
command_mode(
|
||||
object_id=SUS_BOARD_ASS_ID,
|
||||
mode=Modes.NORMAL,
|
||||
submode=DualSideSubmodes.A_SIDE,
|
||||
q=q,
|
||||
info="Switching to SUS board to nominal side",
|
||||
)
|
||||
if op_code in SusOpCodes.SUS_ASS_RED_SIDE:
|
||||
command_mode(
|
||||
object_id=SUS_BOARD_ASS_ID,
|
||||
mode=Modes.NORMAL,
|
||||
submode=DualSideSubmodes.B_SIDE,
|
||||
q=q,
|
||||
info="Switching to SUS board to redundant side",
|
||||
)
|
||||
if op_code in SusOpCodes.SUS_ASS_OFF:
|
||||
command_mode(
|
||||
object_id=SUS_BOARD_ASS_ID,
|
||||
mode=Modes.OFF,
|
||||
submode=0,
|
||||
q=q,
|
||||
info="Switching SUS board off",
|
||||
)
|
||||
if op_code in SusOpCodes.SUS_ASS_DUAL_MODE:
|
||||
command_mode(
|
||||
object_id=SUS_BOARD_ASS_ID,
|
||||
mode=Modes.NORMAL,
|
||||
submode=DualSideSubmodes.DUAL_SIDE,
|
||||
q=q,
|
||||
info="Switching to SUS board to dual side",
|
||||
)
|
||||
@tmtc_definitions_provider
|
||||
def add_acs_board_cmds(defs: TmtcDefinitionWrapper):
|
||||
oce = OpCodeEntry()
|
||||
oce.add(
|
||||
keys=AcsOpCodes.ACS_ASS_A_SIDE,
|
||||
info="Switch to ACS board A side",
|
||||
)
|
||||
oce.add(
|
||||
keys=AcsOpCodes.ACS_ASS_B_SIDE,
|
||||
info="Switch to ACS board B side",
|
||||
)
|
||||
oce.add(
|
||||
keys=AcsOpCodes.ACS_ASS_DUAL_MODE,
|
||||
info="Switch to ACS board dual mode",
|
||||
)
|
||||
oce.add(
|
||||
keys=AcsOpCodes.ACS_ASS_A_ON,
|
||||
info="Switch ACS board A side on",
|
||||
)
|
||||
oce.add(
|
||||
keys=AcsOpCodes.ACS_ASS_B_ON,
|
||||
info="Switch ACS board B side on",
|
||||
)
|
||||
oce.add(
|
||||
keys=AcsOpCodes.ACS_ASS_DUAL_ON,
|
||||
info="Switch ACS board dual mode on",
|
||||
)
|
||||
oce.add(
|
||||
keys=AcsOpCodes.ACS_ASS_OFF,
|
||||
info="Switch off ACS board",
|
||||
)
|
||||
defs.add_service(
|
||||
name=CustomServiceList.ACS_BRD_ASS.value,
|
||||
info="ACS Board Assemblie",
|
||||
op_code_entry=oce,
|
||||
)
|
@ -2,7 +2,7 @@ import enum
|
||||
import socket
|
||||
import struct
|
||||
from socket import AF_INET
|
||||
from typing import Tuple, Optional
|
||||
from typing import Tuple
|
||||
|
||||
from config.definitions import CustomServiceList
|
||||
from config.object_ids import ACS_CONTROLLER
|
||||
|
85
tmtc/acs_subsystem.py
Normal file
85
tmtc/acs_subsystem.py
Normal file
@ -0,0 +1,85 @@
|
||||
import enum
|
||||
from typing import Tuple, Dict
|
||||
|
||||
from spacepackets.ecss import PusTelecommand
|
||||
from .common import pack_mode_cmd_with_info
|
||||
from config.object_ids import ACS_SUBSYSTEM_ID
|
||||
from config.definitions import CustomServiceList
|
||||
from tmtccmd.config.tmtc import (
|
||||
tmtc_definitions_provider,
|
||||
TmtcDefinitionWrapper,
|
||||
OpCodeEntry,
|
||||
)
|
||||
from tmtccmd.tc.pus_200_fsfw_modes import Subservices as ModeSubservices
|
||||
from tmtccmd.tc import service_provider
|
||||
from tmtccmd.tc.decorator import ServiceProviderParams
|
||||
|
||||
|
||||
class OpCodes(str, enum.Enum):
|
||||
OFF = "off"
|
||||
SAFE = "safe"
|
||||
DETUMBLE = "detumble"
|
||||
IDLE = "idle"
|
||||
TARGET_PT = "target"
|
||||
REPORT_ALL_MODES = "all_modes"
|
||||
|
||||
|
||||
class AcsModes(enum.IntEnum):
|
||||
OFF = 0
|
||||
SAFE = 1 << 24
|
||||
DETUMBLE = 2 << 24
|
||||
IDLE = 3 << 24
|
||||
TARGET_PT = 4 << 24
|
||||
|
||||
|
||||
class Info(str, enum.Enum):
|
||||
OFF = "Off Command"
|
||||
SAFE = "Safe Mode Command"
|
||||
DETUMBLE = "Detumble Mode Command"
|
||||
IDLE = "Idle Mode Command"
|
||||
TARGET_PT = "Target Pointing Mode Command"
|
||||
REPORT_ALL_MODES = "Report All Modes Recursively"
|
||||
|
||||
|
||||
HANDLER_LIST: Dict[str, Tuple[int, str]] = {
|
||||
OpCodes.OFF: (AcsModes.OFF, Info.OFF),
|
||||
OpCodes.IDLE: (AcsModes.IDLE, Info.IDLE),
|
||||
OpCodes.SAFE: (AcsModes.SAFE, Info.SAFE),
|
||||
OpCodes.DETUMBLE: (AcsModes.DETUMBLE, Info.DETUMBLE),
|
||||
}
|
||||
|
||||
|
||||
@service_provider(CustomServiceList.ACS_SS.value)
|
||||
def build_acs_subsystem_cmd(p: ServiceProviderParams):
|
||||
op_code = p.op_code
|
||||
q = p.queue_helper
|
||||
info_prefix = "ACS Subsystem"
|
||||
if op_code in OpCodes.REPORT_ALL_MODES:
|
||||
q.add_log_cmd(f"{info_prefix}: {Info.REPORT_ALL_MODES}")
|
||||
q.add_pus_tc(
|
||||
PusTelecommand(
|
||||
service=200,
|
||||
subservice=ModeSubservices.TC_MODE_ANNOUNCE_RECURSIVE,
|
||||
app_data=ACS_SUBSYSTEM_ID,
|
||||
)
|
||||
)
|
||||
mode_info_tup = HANDLER_LIST.get(op_code)
|
||||
if mode_info_tup is None:
|
||||
return
|
||||
pack_mode_cmd_with_info(
|
||||
object_id=ACS_SUBSYSTEM_ID,
|
||||
info=f"{info_prefix}: {mode_info_tup[1]}",
|
||||
submode=0,
|
||||
mode=mode_info_tup[0],
|
||||
q=q,
|
||||
)
|
||||
|
||||
|
||||
@tmtc_definitions_provider
|
||||
def add_acs_subsystem_cmds(defs: TmtcDefinitionWrapper):
|
||||
oce = OpCodeEntry()
|
||||
oce.add(OpCodes.OFF, Info.OFF)
|
||||
oce.add(OpCodes.SAFE, Info.SAFE)
|
||||
oce.add(OpCodes.IDLE, Info.IDLE)
|
||||
oce.add(OpCodes.REPORT_ALL_MODES, Info.REPORT_ALL_MODES)
|
||||
defs.add_service(CustomServiceList.ACS_SS, "ACS Subsystem", oce)
|
@ -3,18 +3,25 @@ from typing import Union
|
||||
from spacepackets.ecss.tc import PusTelecommand
|
||||
from tmtccmd.tc import DefaultPusQueueHelper
|
||||
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes, Subservices
|
||||
from tmtccmd.util import ObjectIdU32
|
||||
|
||||
|
||||
def command_mode(
|
||||
object_id: bytes,
|
||||
def pack_mode_cmd_with_info(
|
||||
object_id: Union[ObjectIdU32, bytes],
|
||||
mode: Union[int, Modes],
|
||||
submode: int,
|
||||
q: DefaultPusQueueHelper,
|
||||
info: str,
|
||||
):
|
||||
if isinstance(object_id, ObjectIdU32):
|
||||
object_id_bytes = object_id.as_bytes
|
||||
elif isinstance(object_id, bytes):
|
||||
object_id_bytes = object_id
|
||||
else:
|
||||
raise ValueError("Invalid Object ID type")
|
||||
q.add_log_cmd(info)
|
||||
mode_data = pack_mode_data(
|
||||
object_id=object_id,
|
||||
object_id=object_id_bytes,
|
||||
mode=mode,
|
||||
submode=submode,
|
||||
)
|
83
tmtc/sus_board.py
Normal file
83
tmtc/sus_board.py
Normal file
@ -0,0 +1,83 @@
|
||||
from config.definitions import CustomServiceList
|
||||
from config.object_ids import SUS_BOARD_ASS_ID
|
||||
from tmtc.acs_board import DualSideSubmodes
|
||||
from tmtc.common import pack_mode_cmd_with_info
|
||||
from tmtccmd.config.tmtc import (
|
||||
tmtc_definitions_provider,
|
||||
TmtcDefinitionWrapper,
|
||||
OpCodeEntry,
|
||||
)
|
||||
from tmtccmd.tc import service_provider
|
||||
from tmtccmd.tc.decorator import ServiceProviderParams
|
||||
from tmtccmd.tc.pus_200_fsfw_modes import Modes
|
||||
|
||||
|
||||
class SusOpCodes:
|
||||
SUS_ASS_NOM_SIDE = ["0", "nom"]
|
||||
SUS_ASS_RED_SIDE = ["1", "red"]
|
||||
SUS_ASS_DUAL_MODE = ["2", "dual"]
|
||||
SUS_ASS_OFF = ["3", "off"]
|
||||
|
||||
|
||||
@service_provider(CustomServiceList.SUS_BRD_ASS)
|
||||
def pack_sus_cmds(p: ServiceProviderParams):
|
||||
op_code = p.op_code
|
||||
q = p.queue_helper
|
||||
if op_code in SusOpCodes.SUS_ASS_NOM_SIDE:
|
||||
pack_mode_cmd_with_info(
|
||||
object_id=SUS_BOARD_ASS_ID,
|
||||
mode=Modes.NORMAL,
|
||||
submode=DualSideSubmodes.A_SIDE,
|
||||
q=q,
|
||||
info="Switching to SUS board to nominal side",
|
||||
)
|
||||
if op_code in SusOpCodes.SUS_ASS_RED_SIDE:
|
||||
pack_mode_cmd_with_info(
|
||||
object_id=SUS_BOARD_ASS_ID,
|
||||
mode=Modes.NORMAL,
|
||||
submode=DualSideSubmodes.B_SIDE,
|
||||
q=q,
|
||||
info="Switching to SUS board to redundant side",
|
||||
)
|
||||
if op_code in SusOpCodes.SUS_ASS_OFF:
|
||||
pack_mode_cmd_with_info(
|
||||
object_id=SUS_BOARD_ASS_ID,
|
||||
mode=Modes.OFF,
|
||||
submode=0,
|
||||
q=q,
|
||||
info="Switching SUS board off",
|
||||
)
|
||||
if op_code in SusOpCodes.SUS_ASS_DUAL_MODE:
|
||||
pack_mode_cmd_with_info(
|
||||
object_id=SUS_BOARD_ASS_ID,
|
||||
mode=Modes.NORMAL,
|
||||
submode=DualSideSubmodes.DUAL_SIDE,
|
||||
q=q,
|
||||
info="Switching to SUS board to dual side",
|
||||
)
|
||||
|
||||
|
||||
@tmtc_definitions_provider
|
||||
def add_sus_board_cmds(defs: TmtcDefinitionWrapper):
|
||||
oce = OpCodeEntry()
|
||||
oce.add(
|
||||
keys=SusOpCodes.SUS_ASS_NOM_SIDE,
|
||||
info="Switch SUS board to nominal side",
|
||||
)
|
||||
oce.add(
|
||||
keys=SusOpCodes.SUS_ASS_RED_SIDE,
|
||||
info="Switch SUS board to redundant side",
|
||||
)
|
||||
oce.add(
|
||||
keys=SusOpCodes.SUS_ASS_OFF,
|
||||
info="Switch off SUS board",
|
||||
)
|
||||
oce.add(
|
||||
keys=SusOpCodes.SUS_ASS_DUAL_MODE,
|
||||
info="Switch SUS board to dual mode",
|
||||
)
|
||||
defs.add_service(
|
||||
name=CustomServiceList.SUS_BRD_ASS.value,
|
||||
info="SUS Board Assembly",
|
||||
op_code_entry=oce,
|
||||
)
|
Loading…
Reference in New Issue
Block a user