diff --git a/.run/Subsystem IDLE.run.xml b/.run/Subsystem IDLE.run.xml
new file mode 100644
index 0000000..e1b9ef4
--- /dev/null
+++ b/.run/Subsystem IDLE.run.xml
@@ -0,0 +1,24 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/Subsystem SAFE.run.xml b/.run/Subsystem SAFE.run.xml
new file mode 100644
index 0000000..246e253
--- /dev/null
+++ b/.run/Subsystem SAFE.run.xml
@@ -0,0 +1,24 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/config/definitions.py b/config/definitions.py
index f0ca38d..578aecb 100644
--- a/config/definitions.py
+++ b/config/definitions.py
@@ -58,8 +58,9 @@ class CustomServiceList(str, enum.Enum):
STR_IMG_HELPER = "str_img_helper"
SYRLINKS = "syrlinks"
ACS_CTRL = "acs_ctrl"
- ACS_ASS = "acs_ass"
- SUS_ASS = "sus_ass"
+ ACS_SS = "acs_subsystem"
+ ACS_BRD_ASS = "acs_brd_ass"
+ SUS_BRD_ASS = "sus_brd_ass"
TCS = "tcs"
TCS_ASS = "tcs_ass"
TIME = "time"
diff --git a/config/events.csv b/config/events.csv
index b3f2fc7..05354cd 100644
--- a/config/events.csv
+++ b/config/events.csv
@@ -181,15 +181,15 @@ Event ID (dec); Event ID (hex); Name; Severity; Description; File Path
12709;0x31a5;I_MPA_OUT_OF_BOUNDS;MEDIUM;P1: 0 -> too low, 1 -> too high P2: Float value;mission/devices/PayloadPcduHandler.h
12710;0x31a6;U_HPA_OUT_OF_BOUNDS;MEDIUM;P1: 0 -> too low, 1 -> too high P2: Float value;mission/devices/PayloadPcduHandler.h
12711;0x31a7;I_HPA_OUT_OF_BOUNDS;MEDIUM;P1: 0 -> too low, 1 -> too high P2: Float value;mission/devices/PayloadPcduHandler.h
-12800;0x3200;TRANSITION_OTHER_SIDE_FAILED;HIGH;;mission/system/AcsBoardAssembly.h
-12801;0x3201;NOT_ENOUGH_DEVICES_DUAL_MODE;HIGH;;mission/system/AcsBoardAssembly.h
-12802;0x3202;POWER_STATE_MACHINE_TIMEOUT;MEDIUM;;mission/system/AcsBoardAssembly.h
-12803;0x3203;SIDE_SWITCH_TRANSITION_NOT_ALLOWED;LOW;Not implemented, would increase already high complexity. Operator should instead command the assembly off first and then command the assembly on into the desired mode/submode combination;mission/system/AcsBoardAssembly.h
-12900;0x3264;TRANSITION_OTHER_SIDE_FAILED;HIGH;;mission/system/SusAssembly.h
-12901;0x3265;NOT_ENOUGH_DEVICES_DUAL_MODE;HIGH;;mission/system/SusAssembly.h
-12902;0x3266;POWER_STATE_MACHINE_TIMEOUT;MEDIUM;;mission/system/SusAssembly.h
-12903;0x3267;SIDE_SWITCH_TRANSITION_NOT_ALLOWED;LOW;Not implemented, would increase already high complexity. Operator should instead command the assembly off first and then command the assembly on into the desired mode/submode combination;mission/system/SusAssembly.h
-13000;0x32c8;CHILDREN_LOST_MODE;MEDIUM;;mission/system/TcsBoardAssembly.h
+12800;0x3200;TRANSITION_OTHER_SIDE_FAILED;HIGH;;mission/system/objects/AcsBoardAssembly.h
+12801;0x3201;NOT_ENOUGH_DEVICES_DUAL_MODE;HIGH;;mission/system/objects/AcsBoardAssembly.h
+12802;0x3202;POWER_STATE_MACHINE_TIMEOUT;MEDIUM;;mission/system/objects/AcsBoardAssembly.h
+12803;0x3203;SIDE_SWITCH_TRANSITION_NOT_ALLOWED;LOW;Not implemented, would increase already high complexity. Operator should instead command the assembly off first and then command the assembly on into the desired mode/submode combination;mission/system/objects/AcsBoardAssembly.h
+12900;0x3264;TRANSITION_OTHER_SIDE_FAILED;HIGH;;mission/system/objects/SusAssembly.h
+12901;0x3265;NOT_ENOUGH_DEVICES_DUAL_MODE;HIGH;;mission/system/objects/SusAssembly.h
+12902;0x3266;POWER_STATE_MACHINE_TIMEOUT;MEDIUM;;mission/system/objects/SusAssembly.h
+12903;0x3267;SIDE_SWITCH_TRANSITION_NOT_ALLOWED;LOW;Not implemented, would increase already high complexity. Operator should instead command the assembly off first and then command the assembly on into the desired mode/submode combination;mission/system/objects/SusAssembly.h
+13000;0x32c8;CHILDREN_LOST_MODE;MEDIUM;;mission/system/objects/TcsBoardAssembly.h
13100;0x332c;GPS_FIX_CHANGE;INFO;Fix has changed. P1: Old fix. P2: New fix 0: Not seen, 1: No Fix, 2: 2D-Fix, 3: 3D-Fix;mission/devices/devicedefinitions/GPSDefinitions.h
13200;0x3390;P60_BOOT_COUNT;INFO;P60 boot count is broadcasted once at SW startup. P1: Boot count;mission/devices/P60DockHandler.h
13201;0x3391;BATT_MODE;INFO;Battery mode is broadcasted at startup. P1: Mode;mission/devices/P60DockHandler.h
diff --git a/config/object_ids.py b/config/object_ids.py
index 180dbd8..cb47a47 100644
--- a/config/object_ids.py
+++ b/config/object_ids.py
@@ -122,6 +122,7 @@ SUS_5_N_LOC_XFYMZB_PT_ZB = bytes([0x44, 0x12, 0x00, 0x37])
SUS_11_R_LOC_XBYMZB_PT_ZB = bytes([0x44, 0x12, 0x00, 0x43])
# System and Assembly Objects
+ACS_SUBSYSTEM_ID = bytes([0x73, 0x01, 0x00, 0x01])
ACS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x01])
SUS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x02])
TCS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x03])
diff --git a/config/objects.csv b/config/objects.csv
index 1e44c67..20cd092 100644
--- a/config/objects.csv
+++ b/config/objects.csv
@@ -136,5 +136,7 @@
0x73000100;TM_FUNNEL
0x73000101;PUS_TM_FUNNEL
0x73000102;CFDP_TM_FUNNEL
+0x73010000;EIVE_SYSTEM
+0x73010001;ACS_SUBSYSTEM
0x73500000;CCSDS_IP_CORE_BRIDGE
0xFFFFFFFF;NO_OBJECT
diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py
index e18ba84..685c41a 100644
--- a/pus_tc/cmd_definitions.py
+++ b/pus_tc/cmd_definitions.py
@@ -163,65 +163,8 @@ def add_time_cmds(defs: TmtcDefinitionWrapper):
@tmtc_definitions_provider
def add_system_cmds(defs: TmtcDefinitionWrapper):
- from pus_tc.system.acs import AcsOpCodes, SusOpCodes
import pus_tc.system.controllers as controllers
- oce = OpCodeEntry()
- oce.add(
- keys=AcsOpCodes.ACS_ASS_A_SIDE,
- info="Switch to ACS board A side",
- )
- oce.add(
- keys=AcsOpCodes.ACS_ASS_B_SIDE,
- info="Switch to ACS board B side",
- )
- oce.add(
- keys=AcsOpCodes.ACS_ASS_DUAL_MODE,
- info="Switch to ACS board dual mode",
- )
- oce.add(
- keys=AcsOpCodes.ACS_ASS_A_ON,
- info="Switch ACS board A side on",
- )
- oce.add(
- keys=AcsOpCodes.ACS_ASS_B_ON,
- info="Switch ACS board B side on",
- )
- oce.add(
- keys=AcsOpCodes.ACS_ASS_DUAL_ON,
- info="Switch ACS board dual mode on",
- )
- oce.add(
- keys=AcsOpCodes.ACS_ASS_OFF,
- info="Switch off ACS board",
- )
- defs.add_service(
- name=CustomServiceList.ACS_ASS.value, info="ACS Assemblies", op_code_entry=oce
- )
-
- oce = OpCodeEntry()
- oce.add(
- keys=SusOpCodes.SUS_ASS_NOM_SIDE,
- info="Switch SUS board to nominal side",
- )
- oce.add(
- keys=SusOpCodes.SUS_ASS_RED_SIDE,
- info="Switch SUS board to redundant side",
- )
- oce.add(
- keys=SusOpCodes.SUS_ASS_OFF,
- info="Switch off SUS board",
- )
- oce.add(
- keys=SusOpCodes.SUS_ASS_DUAL_MODE,
- info="Switch SUS board to dual mode",
- )
- defs.add_service(
- name=CustomServiceList.SUS_ASS.value,
- info="SUS Assembly",
- op_code_entry=oce,
- )
-
oce = OpCodeEntry()
oce.add(
keys=controllers.OpCodes.THERMAL_CONTROLLER,
diff --git a/pus_tc/procedure_packer.py b/pus_tc/procedure_packer.py
index a788af3..7715c4e 100644
--- a/pus_tc/procedure_packer.py
+++ b/pus_tc/procedure_packer.py
@@ -38,7 +38,8 @@ from pus_tc.system.core import pack_core_commands
from pus_tc.devs.star_tracker import pack_star_tracker_commands
from pus_tc.devs.syrlinks_hk_handler import pack_syrlinks_command
from pus_tc.devs.gps import pack_gps_command
-from pus_tc.system.acs import pack_acs_command, pack_sus_cmds
+from tmtc.acs_board import pack_acs_command
+from tmtc.sus_board import pack_sus_cmds
from pus_tc.devs.plpcdu import pack_pl_pcdu_commands
from pus_tc.devs.str_img_helper import pack_str_img_helper_command
from pus_tc.system.tcs import pack_tcs_sys_commands
@@ -185,12 +186,8 @@ def handle_default_procedure(
)
if service == CustomServiceList.PROCEDURE.value:
return pack_proc_commands(q=queue_helper, op_code=op_code)
- if service == CustomServiceList.SUS_ASS.value:
- return pack_sus_cmds(q=queue_helper, op_code=op_code)
if service == CustomServiceList.PL_PCDU.value:
return pack_pl_pcdu_commands(q=queue_helper, op_code=op_code)
- if service == CustomServiceList.ACS_ASS.value:
- return pack_acs_command(q=queue_helper, op_code=op_code)
if service == CustomServiceList.TCS_ASS.value:
return pack_tcs_sys_commands(q=queue_helper, op_code=op_code)
if service == CustomServiceList.RW_ASSEMBLY.value:
diff --git a/pus_tc/system/controllers.py b/pus_tc/system/controllers.py
index f02a907..33252d2 100644
--- a/pus_tc/system/controllers.py
+++ b/pus_tc/system/controllers.py
@@ -4,7 +4,7 @@ from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import Modes
from tmtccmd.util import ObjectIdU32, ObjectIdBase
-from .common import command_mode
+from tmtc.common import pack_mode_cmd_with_info
import config.object_ids as obj_ids
from pus_tc.prompt_parameters import prompt_parameters_cli, prompt_parameters_gui
@@ -36,7 +36,7 @@ def pack_cmd_ctrl_to_prompted_mode(
print("Invalid Mode, defaulting to OFF")
mode = 0
submode = int(parameters["Submode"])
- command_mode(
+ pack_mode_cmd_with_info(
object_id=object_id.as_bytes,
mode=mode,
submode=submode,
@@ -48,7 +48,7 @@ def pack_cmd_ctrl_to_prompted_mode(
def pack_cmd_ctrl_to_off(
q: DefaultPusQueueHelper, object_id: Union[ObjectIdBase, ObjectIdU32]
):
- command_mode(
+ pack_mode_cmd_with_info(
object_id=object_id.as_bytes,
mode=Modes.OFF,
submode=0,
@@ -58,7 +58,7 @@ def pack_cmd_ctrl_to_off(
def pack_cmd_ctrl_to_on(q: DefaultPusQueueHelper, object_id: ObjectIdU32):
- command_mode(
+ pack_mode_cmd_with_info(
object_id=object_id.as_bytes,
mode=Modes.ON,
submode=0,
@@ -70,7 +70,7 @@ def pack_cmd_ctrl_to_on(q: DefaultPusQueueHelper, object_id: ObjectIdU32):
def pack_cmd_ctrl_to_nml(
q: DefaultPusQueueHelper, object_id: Union[ObjectIdBase, ObjectIdU32]
):
- command_mode(
+ pack_mode_cmd_with_info(
object_id=object_id.as_bytes,
mode=Modes.NORMAL,
submode=0,
diff --git a/pus_tc/system/proc.py b/pus_tc/system/proc.py
index 081419f..65db9da 100644
--- a/pus_tc/system/proc.py
+++ b/pus_tc/system/proc.py
@@ -34,7 +34,8 @@ from pus_tc.devs.sus import SetIds
from pus_tc.devs.star_tracker import SetIds as StrSetIds
from tmtc.acs.reaction_wheels import RwSetIds, rw_speed_up_cmd_consec
from pus_tc.system.controllers import pack_cmd_ctrl_to_off, pack_cmd_ctrl_to_nml
-from pus_tc.system.acs import pack_acs_command, pack_sus_cmds
+from tmtc.acs_board import pack_acs_command
+from tmtc.sus_board import pack_sus_cmds
from tmtc.acs.imtq import pack_imtq_test_into, pack_dipole_command
from pus_tc.devs.star_tracker import pack_star_tracker_commands
from tmtc.acs.reaction_wheels import pack_rw_ass_cmds, pack_set_speed_command
diff --git a/pus_tc/system/tcs.py b/pus_tc/system/tcs.py
index 5335d15..05220be 100644
--- a/pus_tc/system/tcs.py
+++ b/pus_tc/system/tcs.py
@@ -7,11 +7,10 @@ from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import Modes
from tmtccmd.tc.pus_3_fsfw_hk import (
make_sid,
- generate_one_diag_command,
generate_one_hk_command,
)
-from .common import command_mode
+from tmtc.common import pack_mode_cmd_with_info
from config.object_ids import TCS_BOARD_ASS_ID, TCS_CONTROLLER
@@ -62,7 +61,7 @@ def pack_tcs_sys_commands(q: DefaultPusQueueHelper, op_code: str):
def pack_tcs_ass_cmds(q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCodes.TCS_BOARD_ASS_NORMAL:
- command_mode(
+ pack_mode_cmd_with_info(
object_id=TCS_BOARD_ASS_ID,
mode=Modes.NORMAL,
submode=0,
@@ -70,7 +69,7 @@ def pack_tcs_ass_cmds(q: DefaultPusQueueHelper, op_code: str):
info=Info.TCS_BOARD_ASS_NORMAL,
)
if op_code in OpCodes.TCS_BOARD_ASS_OFF:
- command_mode(
+ pack_mode_cmd_with_info(
object_id=TCS_BOARD_ASS_ID,
mode=Modes.OFF,
submode=0,
diff --git a/pus_tm/event_handler.py b/pus_tm/event_handler.py
index 68df29a..1cd06bb 100644
--- a/pus_tm/event_handler.py
+++ b/pus_tm/event_handler.py
@@ -3,6 +3,9 @@ import os.path
from datetime import datetime
from config.object_ids import get_object_ids
from pus_tm.defs import PrintWrapper
+from pus_tm.verification_handler import generic_retval_printout
+from tmtc.acs_subsystem import AcsModes
+from tmtccmd.tc.pus_200_fsfw_modes import Modes
from tmtccmd.tm import Service5Tm
from tmtccmd.logging import get_console_logger
@@ -50,6 +53,11 @@ def handle_event_packet(raw_tm: bytes, printer: FsfwTmTcPrinter):
)
LOGGER.info(generic_event_string)
specific_handler = True
+ if info.name == "MODE_TRANSITION_FAILED":
+ reason = generic_retval_printout(tm.param_1)
+ for string in reason:
+ pw.dlog(f"Reason from event parameter 1: {string}")
+ pw.dlog(f"Mode, sequence or table: {tm.param_2:#08x}")
if info.name == "SUPV_UPDATE_PROGRESS" or info.name == "WRITE_MEMORY_FAILED":
additional_event_info = f"Additional info: {info.info}"
context = (
@@ -59,7 +67,28 @@ def handle_event_packet(raw_tm: bytes, printer: FsfwTmTcPrinter):
pw.dlog(additional_event_info)
pw.dlog(context)
if info.name == "MODE_INFO":
- pw.dlog(f"Mode: {tm.param_1}")
+ mode_name = "Unknown"
+ if obj_name == "ACS_SUBSYSTEM":
+ if tm.param_1 == Modes.OFF:
+ mode_name = "Off"
+ elif tm.param_1 == AcsModes.IDLE:
+ mode_name = "Idle"
+ elif tm.param_1 == AcsModes.DETUMBLE:
+ mode_name = "Detumble"
+ elif tm.param_1 == AcsModes.SAFE:
+ mode_name = "Safe"
+ elif tm.param_1 == AcsModes.TARGET_PT:
+ mode_name = "Target Pointing"
+ else:
+ if tm.param_1 == Modes.OFF:
+ mode_name = "Off"
+ elif tm.param_1 == Modes.ON:
+ mode_name = "On"
+ elif tm.param_1 == Modes.NORMAL:
+ mode_name = "Normal"
+ elif tm.param_1 == Modes.RAW:
+ mode_name = "Raw"
+ pw.dlog(f"Mode Number {tm.param_1}, Mode Name {mode_name}")
pw.dlog(f"Submode: {tm.param_2}")
else:
specific_handler = False
diff --git a/pus_tm/factory_hook.py b/pus_tm/factory_hook.py
index a62b9d4..8bf842d 100644
--- a/pus_tm/factory_hook.py
+++ b/pus_tm/factory_hook.py
@@ -1,6 +1,7 @@
"""Core EIVE TM handler module
"""
from config.object_ids import get_object_ids
+from config.retvals import get_retval_dict
from spacepackets.ecss import PusTelemetry
from spacepackets.ecss.pus_17_test import Service17Tm
from spacepackets.util import PrintFormats
@@ -9,10 +10,12 @@ from tmtccmd.logging.pus import RawTmtcTimedLogWrapper
from tmtccmd.pus import VerificationWrapper
from tmtccmd.tm import Service20FsfwTm, Service200FsfwTm
from tmtccmd.tm.pus_17_test import Service17TmExtended
+from tmtccmd.tm.pus_200_fsfw_modes import Subservices as ModeSubservices
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
+from .defs import PrintWrapper
from .event_handler import handle_event_packet
-from .verification_handler import handle_service_1_fsfw_packet
+from .verification_handler import handle_service_1_fsfw_packet, generic_retval_printout
from .hk_handling import handle_hk_packet
from .action_reply_handler import handle_action_reply
@@ -36,6 +39,7 @@ def pus_factory_hook(
return
service = tm_packet.service
obj_id_dict = get_object_ids()
+ pw = PrintWrapper(printer)
dedicated_handler = True
if service == 1:
handle_service_1_fsfw_packet(wrapper=verif_wrapper, raw_tm=packet)
@@ -55,7 +59,17 @@ def pus_factory_hook(
dedicated_handler = False
elif service == 200:
tm_packet = Service200FsfwTm.unpack(raw_telemetry=packet)
- dedicated_handler = False
+ if tm_packet.subservice == ModeSubservices.TM_CANT_REACH_MODE:
+ obj_id = tm_packet.object_id
+ obj_id_obj = obj_id_dict.get(obj_id)
+ retval = tm_packet.return_value
+ string_list = generic_retval_printout(retval)
+ pw.dlog(f"Received Mode Reply from {obj_id_obj}: Can't reach mode.")
+ for string in string_list:
+ pw.dlog(f"Reason: {string}")
+ dedicated_handler = True
+ else:
+ dedicated_handler = False
else:
LOGGER.info(f"The service {service} is not implemented in Telemetry Factory")
tm_packet.print_source_data(PrintFormats.HEX)
diff --git a/pus_tm/verification_handler.py b/pus_tm/verification_handler.py
index b3d4a6a..c9671e9 100644
--- a/pus_tm/verification_handler.py
+++ b/pus_tm/verification_handler.py
@@ -1,3 +1,5 @@
+from typing import List, Optional
+
from spacepackets.ecss.pus_1_verification import UnpackParams, Service1Tm
from tmtccmd.logging import get_console_logger
from tmtccmd.pus import VerificationWrapper
@@ -25,28 +27,37 @@ def handle_service_1_fsfw_packet(wrapper: VerificationWrapper, raw_tm: bytes):
else:
wrapper.log_to_console(tm_packet, res)
wrapper.log_to_file(tm_packet, res)
- retval_dict = get_retval_dict()
if tm_packet.has_failure_notice:
- retval_info = retval_dict.get(tm_packet.error_code.val)
- if retval_info is None:
- raw_err = tm_packet.error_code.val
- LOGGER.info(
- f"No returnvalue information found for error code with subsystem ID"
- f" {(raw_err >> 8) & 0xff} and unique ID {raw_err & 0xff}"
- )
- else:
- retval_string = (
- f"Error Code information for code {tm_packet.error_code.val:#06x} | "
- f"Name: {retval_info.name} | Info: {retval_info.info}"
- )
- error_param_1_str = (
- f"Error Parameter 1: hex {fsfw_wrapper.error_param_1:#010x} "
- f"dec {fsfw_wrapper.error_param_1} "
- )
- error_param_2_str = (
- f"Error Parameter 2: hex {fsfw_wrapper.error_param_2:#010x} "
- f"dec {fsfw_wrapper.error_param_2}"
- )
- wrapper.dlog(retval_string)
- wrapper.dlog(error_param_1_str)
- wrapper.dlog(error_param_2_str)
+ str_list = generic_retval_printout(
+ tm_packet.error_code.val,
+ fsfw_wrapper.error_param_1,
+ fsfw_wrapper.error_param_2,
+ )
+ for string in str_list:
+ wrapper.dlog(string)
+
+
+def generic_retval_printout(
+ retval: int, p1: Optional[int] = None, p2: Optional[int] = None
+) -> List[str]:
+ retval_dict = get_retval_dict()
+ retval_info = retval_dict.get(retval)
+ if retval_info is None:
+ raw_err = retval
+ return [
+ f"No returnvalue information found for error code with "
+ f"subsystem ID {(raw_err >> 8) & 0xff} and unique ID {raw_err & 0xff}"
+ ]
+ else:
+ retval_string = (
+ f"Error Code information for code {retval:#06x} | "
+ f"Name: {retval_info.name} | Info: {retval_info.info}"
+ )
+ string_list = [retval_string]
+ if p1:
+ error_param_1_str = f"Error Parameter 1: hex {p1:#010x} " f"dec {p1} "
+ string_list.append(error_param_1_str)
+ if p2:
+ error_param_2_str = f"Error Parameter 2: hex {p2:#010x} " f"dec {p2}"
+ string_list.append(error_param_2_str)
+ return string_list
diff --git a/tmtc/__init__.py b/tmtc/__init__.py
index e69de29..8d644b7 100644
--- a/tmtc/__init__.py
+++ b/tmtc/__init__.py
@@ -0,0 +1 @@
+from .acs_subsystem import add_acs_subsystem_cmds
diff --git a/pus_tc/system/acs.py b/tmtc/acs_board.py
similarity index 51%
rename from pus_tc/system/acs.py
rename to tmtc/acs_board.py
index 6558fe1..a07b152 100644
--- a/pus_tc/system/acs.py
+++ b/tmtc/acs_board.py
@@ -1,27 +1,27 @@
import enum
-from tmtccmd.tc import DefaultPusQueueHelper
+from config.definitions import CustomServiceList
+from tmtccmd.config.tmtc import (
+ tmtc_definitions_provider,
+ TmtcDefinitionWrapper,
+ OpCodeEntry,
+)
+from tmtccmd.tc import DefaultPusQueueHelper, service_provider
+from tmtccmd.tc.decorator import ServiceProviderParams
from tmtccmd.tc.pus_200_fsfw_modes import Modes
-from config.object_ids import ACS_BOARD_ASS_ID, SUS_BOARD_ASS_ID
+from config.object_ids import ACS_BOARD_ASS_ID
-from .common import command_mode
+from tmtc.common import pack_mode_cmd_with_info
class AcsOpCodes:
ACS_ASS_A_SIDE = ["0", "a"]
ACS_ASS_B_SIDE = ["1", "b"]
- ACS_ASS_DUAL_MODE = ["2", "dual"]
+ ACS_ASS_DUAL_MODE = ["2", "d"]
ACS_ASS_OFF = ["3", "off"]
- ACS_ASS_A_ON = ["4", "a_on"]
- ACS_ASS_B_ON = ["5", "b_on"]
- ACS_ASS_DUAL_ON = ["6", "dual_on"]
-
-
-class SusOpCodes:
- SUS_ASS_NOM_SIDE = ["0", "nom"]
- SUS_ASS_RED_SIDE = ["1", "red"]
- SUS_ASS_DUAL_MODE = ["2", "dual"]
- SUS_ASS_OFF = ["3", "off"]
+ ACS_ASS_A_ON = ["4", "ao"]
+ ACS_ASS_B_ON = ["5", "bo"]
+ ACS_ASS_DUAL_ON = ["6", "do"]
class DualSideSubmodes(enum.IntEnum):
@@ -30,9 +30,12 @@ class DualSideSubmodes(enum.IntEnum):
DUAL_SIDE = 2
-def pack_acs_command(q: DefaultPusQueueHelper, op_code: str):
+@service_provider(CustomServiceList.ACS_BRD_ASS)
+def pack_acs_command(p: ServiceProviderParams):
+ op_code = p.op_code
+ q = p.queue_helper
if op_code in AcsOpCodes.ACS_ASS_A_SIDE:
- command_mode(
+ pack_mode_cmd_with_info(
object_id=ACS_BOARD_ASS_ID,
mode=Modes.NORMAL,
submode=DualSideSubmodes.A_SIDE,
@@ -40,7 +43,7 @@ def pack_acs_command(q: DefaultPusQueueHelper, op_code: str):
info="Switching to ACS board assembly A side",
)
if op_code in AcsOpCodes.ACS_ASS_B_SIDE:
- command_mode(
+ pack_mode_cmd_with_info(
object_id=ACS_BOARD_ASS_ID,
mode=Modes.NORMAL,
submode=DualSideSubmodes.B_SIDE,
@@ -48,7 +51,7 @@ def pack_acs_command(q: DefaultPusQueueHelper, op_code: str):
info="Switching to ACS board assembly B side",
)
if op_code in AcsOpCodes.ACS_ASS_DUAL_MODE:
- command_mode(
+ pack_mode_cmd_with_info(
object_id=ACS_BOARD_ASS_ID,
mode=Modes.NORMAL,
submode=DualSideSubmodes.DUAL_SIDE,
@@ -56,7 +59,7 @@ def pack_acs_command(q: DefaultPusQueueHelper, op_code: str):
info="Switching to ACS board assembly dual mode",
)
if op_code in AcsOpCodes.ACS_ASS_A_ON:
- command_mode(
+ pack_mode_cmd_with_info(
object_id=ACS_BOARD_ASS_ID,
mode=Modes.ON,
submode=DualSideSubmodes.A_SIDE,
@@ -64,7 +67,7 @@ def pack_acs_command(q: DefaultPusQueueHelper, op_code: str):
info="Switching ACS board assembly A side on",
)
if op_code in AcsOpCodes.ACS_ASS_B_ON:
- command_mode(
+ pack_mode_cmd_with_info(
object_id=ACS_BOARD_ASS_ID,
mode=Modes.ON,
submode=DualSideSubmodes.B_SIDE,
@@ -72,7 +75,7 @@ def pack_acs_command(q: DefaultPusQueueHelper, op_code: str):
info="Switching ACS board assembly B side on",
)
if op_code in AcsOpCodes.ACS_ASS_DUAL_ON:
- command_mode(
+ pack_mode_cmd_with_info(
object_id=ACS_BOARD_ASS_ID,
mode=Modes.ON,
submode=DualSideSubmodes.B_SIDE,
@@ -80,7 +83,7 @@ def pack_acs_command(q: DefaultPusQueueHelper, op_code: str):
info="Switching ACS board assembly dual side on",
)
if op_code in AcsOpCodes.ACS_ASS_OFF:
- command_mode(
+ pack_mode_cmd_with_info(
object_id=ACS_BOARD_ASS_ID,
mode=Modes.OFF,
submode=0,
@@ -89,36 +92,39 @@ def pack_acs_command(q: DefaultPusQueueHelper, op_code: str):
)
-def pack_sus_cmds(q: DefaultPusQueueHelper, op_code: str):
- if op_code in SusOpCodes.SUS_ASS_NOM_SIDE:
- command_mode(
- object_id=SUS_BOARD_ASS_ID,
- mode=Modes.NORMAL,
- submode=DualSideSubmodes.A_SIDE,
- q=q,
- info="Switching to SUS board to nominal side",
- )
- if op_code in SusOpCodes.SUS_ASS_RED_SIDE:
- command_mode(
- object_id=SUS_BOARD_ASS_ID,
- mode=Modes.NORMAL,
- submode=DualSideSubmodes.B_SIDE,
- q=q,
- info="Switching to SUS board to redundant side",
- )
- if op_code in SusOpCodes.SUS_ASS_OFF:
- command_mode(
- object_id=SUS_BOARD_ASS_ID,
- mode=Modes.OFF,
- submode=0,
- q=q,
- info="Switching SUS board off",
- )
- if op_code in SusOpCodes.SUS_ASS_DUAL_MODE:
- command_mode(
- object_id=SUS_BOARD_ASS_ID,
- mode=Modes.NORMAL,
- submode=DualSideSubmodes.DUAL_SIDE,
- q=q,
- info="Switching to SUS board to dual side",
- )
+@tmtc_definitions_provider
+def add_acs_board_cmds(defs: TmtcDefinitionWrapper):
+ oce = OpCodeEntry()
+ oce.add(
+ keys=AcsOpCodes.ACS_ASS_A_SIDE,
+ info="Switch to ACS board A side",
+ )
+ oce.add(
+ keys=AcsOpCodes.ACS_ASS_B_SIDE,
+ info="Switch to ACS board B side",
+ )
+ oce.add(
+ keys=AcsOpCodes.ACS_ASS_DUAL_MODE,
+ info="Switch to ACS board dual mode",
+ )
+ oce.add(
+ keys=AcsOpCodes.ACS_ASS_A_ON,
+ info="Switch ACS board A side on",
+ )
+ oce.add(
+ keys=AcsOpCodes.ACS_ASS_B_ON,
+ info="Switch ACS board B side on",
+ )
+ oce.add(
+ keys=AcsOpCodes.ACS_ASS_DUAL_ON,
+ info="Switch ACS board dual mode on",
+ )
+ oce.add(
+ keys=AcsOpCodes.ACS_ASS_OFF,
+ info="Switch off ACS board",
+ )
+ defs.add_service(
+ name=CustomServiceList.ACS_BRD_ASS.value,
+ info="ACS Board Assemblie",
+ op_code_entry=oce,
+ )
diff --git a/tmtc/acs_ctrl.py b/tmtc/acs_ctrl.py
index 71cc1ba..bf8b7fd 100644
--- a/tmtc/acs_ctrl.py
+++ b/tmtc/acs_ctrl.py
@@ -2,7 +2,7 @@ import enum
import socket
import struct
from socket import AF_INET
-from typing import Tuple, Optional
+from typing import Tuple
from config.definitions import CustomServiceList
from config.object_ids import ACS_CONTROLLER
diff --git a/tmtc/acs_subsystem.py b/tmtc/acs_subsystem.py
new file mode 100644
index 0000000..26477ec
--- /dev/null
+++ b/tmtc/acs_subsystem.py
@@ -0,0 +1,85 @@
+import enum
+from typing import Tuple, Dict
+
+from spacepackets.ecss import PusTelecommand
+from .common import pack_mode_cmd_with_info
+from config.object_ids import ACS_SUBSYSTEM_ID
+from config.definitions import CustomServiceList
+from tmtccmd.config.tmtc import (
+ tmtc_definitions_provider,
+ TmtcDefinitionWrapper,
+ OpCodeEntry,
+)
+from tmtccmd.tc.pus_200_fsfw_modes import Subservices as ModeSubservices
+from tmtccmd.tc import service_provider
+from tmtccmd.tc.decorator import ServiceProviderParams
+
+
+class OpCodes(str, enum.Enum):
+ OFF = "off"
+ SAFE = "safe"
+ DETUMBLE = "detumble"
+ IDLE = "idle"
+ TARGET_PT = "target"
+ REPORT_ALL_MODES = "all_modes"
+
+
+class AcsModes(enum.IntEnum):
+ OFF = 0
+ SAFE = 1 << 24
+ DETUMBLE = 2 << 24
+ IDLE = 3 << 24
+ TARGET_PT = 4 << 24
+
+
+class Info(str, enum.Enum):
+ OFF = "Off Command"
+ SAFE = "Safe Mode Command"
+ DETUMBLE = "Detumble Mode Command"
+ IDLE = "Idle Mode Command"
+ TARGET_PT = "Target Pointing Mode Command"
+ REPORT_ALL_MODES = "Report All Modes Recursively"
+
+
+HANDLER_LIST: Dict[str, Tuple[int, str]] = {
+ OpCodes.OFF: (AcsModes.OFF, Info.OFF),
+ OpCodes.IDLE: (AcsModes.IDLE, Info.IDLE),
+ OpCodes.SAFE: (AcsModes.SAFE, Info.SAFE),
+ OpCodes.DETUMBLE: (AcsModes.DETUMBLE, Info.DETUMBLE),
+}
+
+
+@service_provider(CustomServiceList.ACS_SS.value)
+def build_acs_subsystem_cmd(p: ServiceProviderParams):
+ op_code = p.op_code
+ q = p.queue_helper
+ info_prefix = "ACS Subsystem"
+ if op_code in OpCodes.REPORT_ALL_MODES:
+ q.add_log_cmd(f"{info_prefix}: {Info.REPORT_ALL_MODES}")
+ q.add_pus_tc(
+ PusTelecommand(
+ service=200,
+ subservice=ModeSubservices.TC_MODE_ANNOUNCE_RECURSIVE,
+ app_data=ACS_SUBSYSTEM_ID,
+ )
+ )
+ mode_info_tup = HANDLER_LIST.get(op_code)
+ if mode_info_tup is None:
+ return
+ pack_mode_cmd_with_info(
+ object_id=ACS_SUBSYSTEM_ID,
+ info=f"{info_prefix}: {mode_info_tup[1]}",
+ submode=0,
+ mode=mode_info_tup[0],
+ q=q,
+ )
+
+
+@tmtc_definitions_provider
+def add_acs_subsystem_cmds(defs: TmtcDefinitionWrapper):
+ oce = OpCodeEntry()
+ oce.add(OpCodes.OFF, Info.OFF)
+ oce.add(OpCodes.SAFE, Info.SAFE)
+ oce.add(OpCodes.IDLE, Info.IDLE)
+ oce.add(OpCodes.REPORT_ALL_MODES, Info.REPORT_ALL_MODES)
+ defs.add_service(CustomServiceList.ACS_SS, "ACS Subsystem", oce)
diff --git a/pus_tc/system/common.py b/tmtc/common.py
similarity index 59%
rename from pus_tc/system/common.py
rename to tmtc/common.py
index c1bd1a4..e078ce5 100644
--- a/pus_tc/system/common.py
+++ b/tmtc/common.py
@@ -3,18 +3,25 @@ from typing import Union
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes, Subservices
+from tmtccmd.util import ObjectIdU32
-def command_mode(
- object_id: bytes,
+def pack_mode_cmd_with_info(
+ object_id: Union[ObjectIdU32, bytes],
mode: Union[int, Modes],
submode: int,
q: DefaultPusQueueHelper,
info: str,
):
+ if isinstance(object_id, ObjectIdU32):
+ object_id_bytes = object_id.as_bytes
+ elif isinstance(object_id, bytes):
+ object_id_bytes = object_id
+ else:
+ raise ValueError("Invalid Object ID type")
q.add_log_cmd(info)
mode_data = pack_mode_data(
- object_id=object_id,
+ object_id=object_id_bytes,
mode=mode,
submode=submode,
)
diff --git a/tmtc/sus_board.py b/tmtc/sus_board.py
new file mode 100644
index 0000000..6fb0687
--- /dev/null
+++ b/tmtc/sus_board.py
@@ -0,0 +1,83 @@
+from config.definitions import CustomServiceList
+from config.object_ids import SUS_BOARD_ASS_ID
+from tmtc.acs_board import DualSideSubmodes
+from tmtc.common import pack_mode_cmd_with_info
+from tmtccmd.config.tmtc import (
+ tmtc_definitions_provider,
+ TmtcDefinitionWrapper,
+ OpCodeEntry,
+)
+from tmtccmd.tc import service_provider
+from tmtccmd.tc.decorator import ServiceProviderParams
+from tmtccmd.tc.pus_200_fsfw_modes import Modes
+
+
+class SusOpCodes:
+ SUS_ASS_NOM_SIDE = ["0", "nom"]
+ SUS_ASS_RED_SIDE = ["1", "red"]
+ SUS_ASS_DUAL_MODE = ["2", "dual"]
+ SUS_ASS_OFF = ["3", "off"]
+
+
+@service_provider(CustomServiceList.SUS_BRD_ASS)
+def pack_sus_cmds(p: ServiceProviderParams):
+ op_code = p.op_code
+ q = p.queue_helper
+ if op_code in SusOpCodes.SUS_ASS_NOM_SIDE:
+ pack_mode_cmd_with_info(
+ object_id=SUS_BOARD_ASS_ID,
+ mode=Modes.NORMAL,
+ submode=DualSideSubmodes.A_SIDE,
+ q=q,
+ info="Switching to SUS board to nominal side",
+ )
+ if op_code in SusOpCodes.SUS_ASS_RED_SIDE:
+ pack_mode_cmd_with_info(
+ object_id=SUS_BOARD_ASS_ID,
+ mode=Modes.NORMAL,
+ submode=DualSideSubmodes.B_SIDE,
+ q=q,
+ info="Switching to SUS board to redundant side",
+ )
+ if op_code in SusOpCodes.SUS_ASS_OFF:
+ pack_mode_cmd_with_info(
+ object_id=SUS_BOARD_ASS_ID,
+ mode=Modes.OFF,
+ submode=0,
+ q=q,
+ info="Switching SUS board off",
+ )
+ if op_code in SusOpCodes.SUS_ASS_DUAL_MODE:
+ pack_mode_cmd_with_info(
+ object_id=SUS_BOARD_ASS_ID,
+ mode=Modes.NORMAL,
+ submode=DualSideSubmodes.DUAL_SIDE,
+ q=q,
+ info="Switching to SUS board to dual side",
+ )
+
+
+@tmtc_definitions_provider
+def add_sus_board_cmds(defs: TmtcDefinitionWrapper):
+ oce = OpCodeEntry()
+ oce.add(
+ keys=SusOpCodes.SUS_ASS_NOM_SIDE,
+ info="Switch SUS board to nominal side",
+ )
+ oce.add(
+ keys=SusOpCodes.SUS_ASS_RED_SIDE,
+ info="Switch SUS board to redundant side",
+ )
+ oce.add(
+ keys=SusOpCodes.SUS_ASS_OFF,
+ info="Switch off SUS board",
+ )
+ oce.add(
+ keys=SusOpCodes.SUS_ASS_DUAL_MODE,
+ info="Switch SUS board to dual mode",
+ )
+ defs.add_service(
+ name=CustomServiceList.SUS_BRD_ASS.value,
+ info="SUS Board Assembly",
+ op_code_entry=oce,
+ )