diff --git a/.idea/runConfigurations/BPX.xml b/.idea/runConfigurations/BPX.xml
new file mode 100644
index 0000000..a1fc5d3
--- /dev/null
+++ b/.idea/runConfigurations/BPX.xml
@@ -0,0 +1,24 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/runConfigurations/BPX_Request_HK.xml b/.idea/runConfigurations/BPX_Request_HK.xml
new file mode 100644
index 0000000..9783034
--- /dev/null
+++ b/.idea/runConfigurations/BPX_Request_HK.xml
@@ -0,0 +1,24 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/runConfigurations/BPX_Reset_Reboot_Counter.xml b/.idea/runConfigurations/BPX_Reset_Reboot_Counter.xml
new file mode 100644
index 0000000..6d0b32d
--- /dev/null
+++ b/.idea/runConfigurations/BPX_Reset_Reboot_Counter.xml
@@ -0,0 +1,24 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/config/cmd_definitions.py b/config/cmd_definitions.py
new file mode 100644
index 0000000..589e581
--- /dev/null
+++ b/config/cmd_definitions.py
@@ -0,0 +1,239 @@
+from tmtccmd.config import (
+ add_op_code_entry,
+ add_service_op_code_entry,
+ ServiceOpCodeDictT,
+ OpCodeDictKeys,
+)
+from config.definitions import CustomServiceList
+from pus_tc.bpx_batt import BpxOpCodes
+
+
+def add_bpx_cmd_definitions(cmd_dict: ServiceOpCodeDictT):
+ op_code_dict = dict()
+ add_op_code_entry(
+ op_code_dict=op_code_dict, keys=BpxOpCodes.HK, info="Request BPX HK"
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict, keys=BpxOpCodes.RST_BOOT_CNT, info="Reset Boot Count"
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=BpxOpCodes.REQUEST_CFG,
+ info="Request Configuration Struct (Step 1)",
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=BpxOpCodes.REQUEST_CFG_HK,
+ info="Request Configuration Struct HK (Step 2)",
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict, keys=BpxOpCodes.REBOOT, info="Reboot Command"
+ )
+ add_service_op_code_entry(
+ srv_op_code_dict=cmd_dict,
+ name=CustomServiceList.BPX_BATTERY.value,
+ info="BPX Battery Handler",
+ op_code_entry=op_code_dict,
+ )
+
+
+def add_core_controller_definitions(cmd_dict: ServiceOpCodeDictT):
+ od = dict()
+ add_op_code_entry(op_code_dict=od, keys=["0", "reboot"], info="Reboot with Prompt")
+ add_op_code_entry(op_code_dict=od, keys=["1", "reboot_0_0"], info="Reboot 0 0")
+ add_op_code_entry(op_code_dict=od, keys=["2", "reboot_0_1"], info="Reboot 0 1")
+ add_op_code_entry(op_code_dict=od, keys=["3", "reboot_1_0"], info="Reboot 1 0")
+ add_op_code_entry(op_code_dict=od, keys=["4", "reboot_1_1"], info="Reboot 1 1")
+ add_service_op_code_entry(
+ srv_op_code_dict=cmd_dict,
+ name=CustomServiceList.CORE.value,
+ info="Reboot Self",
+ op_code_entry=od,
+ )
+
+
+def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
+ from pus_tc.p60dock import P60OpCodes, GomspaceOpCodes
+ from pus_tc.pdu1 import Pdu1OpCodes
+ from pus_tc.pdu2 import Pdu2OpCodes
+
+ op_code_dict = dict()
+ add_op_code_entry(op_code_dict=op_code_dict, keys="0", info="P60 Tests")
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=P60OpCodes.STACK_3V3_ON.value,
+ info="P60 Dock: Turn stack 3V3 on",
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=P60OpCodes.STACK_3V3_OFF.value,
+ info="P60 Dock: Turn stack 3V3 off",
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=P60OpCodes.STACK_5V_ON.value,
+ info="P60 Dock: Turn stack 5V on",
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=P60OpCodes.STACK_5V_OFF.value,
+ info="P60 Dock: Turn stack 5V off",
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=GomspaceOpCodes.PRINT_SWITCH_V_I.value,
+ info="P60 Dock: Print Switches, Voltages, Currents",
+ )
+ add_service_op_code_entry(
+ srv_op_code_dict=cmd_dict,
+ name=CustomServiceList.P60DOCK.value,
+ info="P60 Device",
+ op_code_entry=op_code_dict,
+ )
+
+ op_code_dict = dict()
+ add_op_code_entry(op_code_dict=op_code_dict, keys="0", info="PDU1 Tests")
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=Pdu1OpCodes.TCS_BOARD_ON.value,
+ info="PDU1: Turn TCS board on",
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=Pdu1OpCodes.TCS_BOARD_OFF.value,
+ info="PDU1: Turn TCS board off",
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=Pdu1OpCodes.STAR_TRACKER_ON.value,
+ info="PDU1: Turn star tracker on",
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=Pdu1OpCodes.STAR_TRACKER_OFF.value,
+ info="PDU1: Turn star tracker off",
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=Pdu1OpCodes.SUS_NOMINAL_ON.value,
+ info="PDU1: Turn SUS nominal on",
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=Pdu1OpCodes.SUS_NOMINAL_OFF.value,
+ info="PDU1: Turn SUS nominal off",
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=Pdu1OpCodes.ACS_A_SIDE_ON.value,
+ info="PDU1: Turn ACS A side on",
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=Pdu1OpCodes.ACS_A_SIDE_OFF.value,
+ info="PDU1: Turn ACS A side off",
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=Pdu1OpCodes.SYRLINKS_ON.value,
+ info="PDU1: Turn Syrlinks on",
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=Pdu1OpCodes.SYRLINKS_OFF.value,
+ info="PDU1: Turn Syrlinks off",
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=Pdu1OpCodes.MGT_ON.value,
+ info="PDU1: Turn MGT on",
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=Pdu1OpCodes.MGT_OFF.value,
+ info="PDU1: Turn MGT on",
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=Pdu1OpCodes.SCEX_ON.value,
+ info="PDU1: Turn Solar Cell Experiment on",
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=Pdu1OpCodes.SCEX_OFF.value,
+ info="PDU1: Turn Solar Cell Experiment off",
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=GomspaceOpCodes.PRINT_SWITCH_V_I.value,
+ info="PDU1: Print Switches, Voltages, Currents",
+ )
+ add_service_op_code_entry(
+ srv_op_code_dict=cmd_dict,
+ name=CustomServiceList.PDU1.value,
+ info="PDU1 Device",
+ op_code_entry=op_code_dict,
+ )
+
+ op_code_dict = dict()
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys="0",
+ info="PDU2 Tests",
+ options={OpCodeDictKeys.TIMEOUT: 2.0},
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=Pdu2OpCodes.ACS_SIDE_B_ON.value,
+ info="PDU2: Turn ACS Side B on",
+ options={OpCodeDictKeys.TIMEOUT: 2.0},
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=Pdu2OpCodes.ACS_SIDE_B_OFF.value,
+ info="PDU2: Turn ACS Side B off",
+ options={OpCodeDictKeys.TIMEOUT: 2.0},
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=Pdu2OpCodes.SUS_REDUNDANT_ON.value,
+ info="PDU2: Turn SUS redundant on",
+ options={OpCodeDictKeys.TIMEOUT: 2.0},
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=Pdu2OpCodes.SUS_REDUNDANT_OFF.value,
+ info="PDU2: Turn SUS redundant off",
+ options={OpCodeDictKeys.TIMEOUT: 2.0},
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=Pdu2OpCodes.RW_ON.value,
+ info="PDU2: Turn reaction wheels on",
+ options={OpCodeDictKeys.TIMEOUT: 2.0},
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=Pdu2OpCodes.RW_OFF.value,
+ info="PDU2: Turn reaction wheels off",
+ options={OpCodeDictKeys.TIMEOUT: 2.0},
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=Pdu2OpCodes.Q7S_OFF.value,
+ info="Q7S Off",
+ options={OpCodeDictKeys.TIMEOUT: 2.0},
+ )
+ add_op_code_entry(
+ op_code_dict=op_code_dict,
+ keys=GomspaceOpCodes.PRINT_SWITCH_V_I.value,
+ info="PDU2: Print Switches, Voltages, Currents",
+ options={OpCodeDictKeys.TIMEOUT: 2.0},
+ )
+
+ add_service_op_code_entry(
+ srv_op_code_dict=cmd_dict,
+ name="pdu2",
+ info="PDU2 Device",
+ op_code_entry=op_code_dict,
+ )
diff --git a/config/definitions.py b/config/definitions.py
index 990b4c1..3a104aa 100644
--- a/config/definitions.py
+++ b/config/definitions.py
@@ -17,6 +17,7 @@ class CustomServiceList(enum.Enum):
PDU2 = "pdu2"
ACU = "acu"
ACS = "acs"
+ BPX_BATTERY = "bpx"
TMP1075_1 = "tmp1075_1"
TMP1075_2 = "tmp1075_2"
HEATER = "heater"
diff --git a/config/hook_implementations.py b/config/hook_implementations.py
index c3c7663..9fa73ef 100644
--- a/config/hook_implementations.py
+++ b/config/hook_implementations.py
@@ -1,18 +1,18 @@
import argparse
from typing import Union, Dict, Tuple
-from tmtccmd.config.definitions import ServiceOpCodeDictT
+from tmtccmd.config.definitions import (
+ ServiceOpCodeDictT,
+ HkReplyUnpacked,
+ DataReplyUnpacked,
+)
from tmtccmd.tm.service_3_base import Service3Base
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.core.backend import TmTcHandler
from tmtccmd.config.hook import TmTcHookBase
from tmtccmd.utility.tmtc_printer import TmTcPrinter
-from tmtccmd.config.globals import (
- OpCodeDictKeys,
- add_op_code_entry,
- add_service_op_code_entry,
-)
+from tmtccmd.config.globals import OpCodeDictKeys
from config.object_ids import RW1_ID
from config.definitions import CustomServiceList
@@ -79,7 +79,7 @@ class EiveHookObject(TmTcHookBase):
@staticmethod
def handle_service_8_telemetry(
object_id: bytes, action_id: int, custom_data: bytearray
- ) -> Tuple[list, list]:
+ ) -> DataReplyUnpacked:
from pus_tm.service_8_hook import user_analyze_service_8_data
return user_analyze_service_8_data(
@@ -89,7 +89,7 @@ class EiveHookObject(TmTcHookBase):
@staticmethod
def handle_service_3_housekeeping(
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
- ) -> Tuple[list, list, bytearray, int]:
+ ) -> HkReplyUnpacked:
from pus_tm.hk_handling import handle_user_hk_packet
return handle_user_hk_packet(
@@ -110,22 +110,15 @@ class EiveHookObject(TmTcHookBase):
def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
- from pus_tc.pdu1 import Pdu1OpCodes
- from pus_tc.pdu2 import Pdu2OpCodes
- from pus_tc.p60dock import P60OpCodes
- from gomspace.gomspace_common import GomspaceOpCodes
+ from config.cmd_definitions import (
+ add_bpx_cmd_definitions,
+ add_core_controller_definitions,
+ add_pcdu_cmds,
+ )
from pus_tc.gps import GpsOpCodes
- op_code_dict = {
- "reboot": ("Reboot with Prompt", {OpCodeDictKeys.TIMEOUT: 2.0}),
- "reboot_self": ("Reboot Self", {OpCodeDictKeys.TIMEOUT: 4.0}),
- "reboot_0_0": ("Reboot 0 0", {OpCodeDictKeys.TIMEOUT: 4.0}),
- "reboot_0_1": ("Reboot 0 1", {OpCodeDictKeys.TIMEOUT: 4.0}),
- "reboot_1_0": ("Reboot 1 0", {OpCodeDictKeys.TIMEOUT: 4.0}),
- "reboot_1_1": ("Reboot 1 1", {OpCodeDictKeys.TIMEOUT: 4.0}),
- }
- service_tuple = ("Core Controller", op_code_dict)
- service_op_code_dict[CustomServiceList.CORE.value] = service_tuple
+ add_bpx_cmd_definitions(cmd_dict=service_op_code_dict)
+ add_core_controller_definitions(cmd_dict=service_op_code_dict)
op_code_dict = {
GpsOpCodes.RESET_GNSS.value: ("Reset GPS", {OpCodeDictKeys.TIMEOUT: 2.0})
@@ -149,170 +142,7 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
service_tuple = ("TMP1075 2", op_code_dict)
service_op_code_dict[CustomServiceList.TMP1075_2.value] = service_tuple
- op_code_dict = dict()
- add_op_code_entry(
- op_code_dict=op_code_dict,
- keys="0",
- info="P60 Tests",
- options={OpCodeDictKeys.TIMEOUT: 2.0},
- )
- add_op_code_entry(
- op_code_dict=op_code_dict,
- keys=P60OpCodes.STACK_3V3_ON.value,
- info="P60 Dock: Turn stack 3V3 on",
- options={OpCodeDictKeys.TIMEOUT: 2.0},
- )
- add_op_code_entry(
- op_code_dict=op_code_dict,
- keys=P60OpCodes.STACK_3V3_OFF.value,
- info="P60 Dock: Turn stack 3V3 off",
- options={OpCodeDictKeys.TIMEOUT: 2.0},
- )
- add_op_code_entry(
- op_code_dict=op_code_dict,
- keys=P60OpCodes.STACK_5V_ON.value,
- info="P60 Dock: Turn stack 5V on",
- options={OpCodeDictKeys.TIMEOUT: 2.0},
- )
- add_op_code_entry(
- op_code_dict=op_code_dict,
- keys=P60OpCodes.STACK_5V_OFF.value,
- info="P60 Dock: Turn stack 5V off",
- options={OpCodeDictKeys.TIMEOUT: 2.0},
- )
- add_op_code_entry(
- op_code_dict=op_code_dict,
- keys=GomspaceOpCodes.PRINT_SWITCH_V_I.value,
- info="P60 Dock: Print Switches, Voltages, Currents",
- options={OpCodeDictKeys.TIMEOUT: 2.0},
- )
- add_service_op_code_entry(
- srv_op_code_dict=service_op_code_dict,
- name=CustomServiceList.P60DOCK.value,
- info="P60 Device",
- op_code_entry=op_code_dict,
- )
-
- op_code_dict_srv_pdu1 = {
- "0": ("PDU1 Tests", {OpCodeDictKeys.TIMEOUT: 2.0}),
- Pdu1OpCodes.TCS_BOARD_ON.value: (
- "PDU1: Turn TCS board on",
- {OpCodeDictKeys.TIMEOUT: 2.0},
- ),
- Pdu1OpCodes.TCS_BOARD_OFF.value: (
- "PDU1: Turn TCS board off",
- {OpCodeDictKeys.TIMEOUT: 2.0},
- ),
- Pdu1OpCodes.STAR_TRACKER_ON.value: (
- "PDU1: Turn star tracker on",
- {OpCodeDictKeys.TIMEOUT: 2.0},
- ),
- Pdu1OpCodes.STAR_TRACKER_OFF.value: (
- "PDU1: Turn star tracker off",
- {OpCodeDictKeys.TIMEOUT: 2.0},
- ),
- Pdu1OpCodes.SUS_NOMINAL_ON.value: (
- "PDU1: Turn SUS nominal on",
- {OpCodeDictKeys.TIMEOUT: 2.0},
- ),
- Pdu1OpCodes.SUS_NOMINAL_OFF.value: (
- "PDU1: Turn SUS nominal off",
- {OpCodeDictKeys.TIMEOUT: 2.0},
- ),
- Pdu1OpCodes.ACS_A_SIDE_ON.value: (
- "PDU1: Turn ACS Side A on",
- {OpCodeDictKeys.TIMEOUT: 2.0},
- ),
- Pdu1OpCodes.ACS_A_SIDE_OFF.value: (
- "PDU1: Turn ACS Side A off",
- {OpCodeDictKeys.TIMEOUT: 2.0},
- ),
- Pdu1OpCodes.SYRLINKS_ON.value: (
- "PDU1: Turn Syrlinks on",
- {OpCodeDictKeys.TIMEOUT: 2.0},
- ),
- Pdu1OpCodes.SYRLINKS_OFF.value: (
- "PDU1: Turn Syrlinks off",
- {OpCodeDictKeys.TIMEOUT: 2.0},
- ),
- Pdu1OpCodes.MGT_ON.value: (
- "PDU1: Turn MGT on",
- {OpCodeDictKeys.TIMEOUT: 2.0},
- ),
- Pdu1OpCodes.MGT_OFF.value: (
- "PDU1: Turn MGT off",
- {OpCodeDictKeys.TIMEOUT: 2.0},
- ),
- GomspaceOpCodes.PRINT_SWITCH_V_I.value: (
- "PDU1: Print Switches, Voltages, Currents",
- {OpCodeDictKeys.TIMEOUT: 2.0},
- ),
- }
- service_pdu1_tuple = ("PDU1 Device", op_code_dict_srv_pdu1)
-
- op_code_dict = dict()
- add_op_code_entry(
- op_code_dict=op_code_dict,
- keys="0",
- info="PDU2 Tests",
- options={OpCodeDictKeys.TIMEOUT: 2.0},
- )
- add_op_code_entry(
- op_code_dict=op_code_dict,
- keys=Pdu2OpCodes.ACS_SIDE_B_ON.value,
- info="PDU2: Turn ACS Side B on",
- options={OpCodeDictKeys.TIMEOUT: 2.0},
- )
- add_op_code_entry(
- op_code_dict=op_code_dict,
- keys=Pdu2OpCodes.ACS_SIDE_B_OFF.value,
- info="PDU2: Turn ACS Side B off",
- options={OpCodeDictKeys.TIMEOUT: 2.0},
- )
- add_op_code_entry(
- op_code_dict=op_code_dict,
- keys=Pdu2OpCodes.SUS_REDUNDANT_ON.value,
- info="PDU2: Turn SUS redundant on",
- options={OpCodeDictKeys.TIMEOUT: 2.0},
- )
- add_op_code_entry(
- op_code_dict=op_code_dict,
- keys=Pdu2OpCodes.SUS_REDUNDANT_OFF.value,
- info="PDU2: Turn SUS redundant off",
- options={OpCodeDictKeys.TIMEOUT: 2.0},
- )
- add_op_code_entry(
- op_code_dict=op_code_dict,
- keys=Pdu2OpCodes.RW_ON.value,
- info="PDU2: Turn reaction wheels on",
- options={OpCodeDictKeys.TIMEOUT: 2.0},
- )
- add_op_code_entry(
- op_code_dict=op_code_dict,
- keys=Pdu2OpCodes.RW_OFF.value,
- info="PDU2: Turn reaction wheels off",
- options={OpCodeDictKeys.TIMEOUT: 2.0},
- )
- add_op_code_entry(
- op_code_dict=op_code_dict,
- keys=Pdu2OpCodes.Q7S_OFF.value,
- info="Q7S Off",
- options={OpCodeDictKeys.TIMEOUT: 2.0},
- )
- add_op_code_entry(
- op_code_dict=op_code_dict,
- keys=GomspaceOpCodes.PRINT_SWITCH_V_I.value,
- info="PDU2: Print Switches, Voltages, Currents",
- options={OpCodeDictKeys.TIMEOUT: 2.0},
- )
-
- add_service_op_code_entry(
- srv_op_code_dict=service_op_code_dict,
- name="pdu2",
- info="PDU2 Device",
- op_code_entry=op_code_dict,
- )
- # service_pdu2_tuple = ("PDU2 Device", op_code_dict_srv_pdu2)
+ add_pcdu_cmds(cmd_dict=service_op_code_dict)
op_code_dict_srv_heater = {
"0": ("Heater Tests", {OpCodeDictKeys.TIMEOUT: 2.0}),
@@ -538,7 +368,10 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
"48": ("Star Tracker: Request camera parameter", {OpCodeDictKeys.TIMEOUT: 2.0}),
"49": ("Star Tracker: Request limits", {OpCodeDictKeys.TIMEOUT: 2.0}),
"50": ("Star Tracker: Request blob parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
- "51": ("Star Tracker: Set image processor parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
+ "51": (
+ "Star Tracker: Set image processor parameters",
+ {OpCodeDictKeys.TIMEOUT: 2.0},
+ ),
}
service_star_tracker_tuple = ("Star tracker", op_code_dict_srv_star_tracker)
@@ -548,10 +381,22 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
"2": ("CCSDS Handler: Enable transmitter", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("CCSDS Handler: Disable transmitter", {OpCodeDictKeys.TIMEOUT: 2.0}),
"4": ("CCSDS Handler: Set arbitrary bitrate", {OpCodeDictKeys.TIMEOUT: 2.0}),
- "5": ("CCSDS Handler: Enable tx clock manipulator", {OpCodeDictKeys.TIMEOUT: 2.0}),
- "6": ("CCSDS Handler: Disable tx clock manipulator", {OpCodeDictKeys.TIMEOUT: 2.0}),
- "7": ("CCSDS Handler: Update tx data on rising edge", {OpCodeDictKeys.TIMEOUT: 2.0}),
- "8": ("CCSDS Handler: Update tx data on falling edge", {OpCodeDictKeys.TIMEOUT: 2.0}),
+ "5": (
+ "CCSDS Handler: Enable tx clock manipulator",
+ {OpCodeDictKeys.TIMEOUT: 2.0},
+ ),
+ "6": (
+ "CCSDS Handler: Disable tx clock manipulator",
+ {OpCodeDictKeys.TIMEOUT: 2.0},
+ ),
+ "7": (
+ "CCSDS Handler: Update tx data on rising edge",
+ {OpCodeDictKeys.TIMEOUT: 2.0},
+ ),
+ "8": (
+ "CCSDS Handler: Update tx data on falling edge",
+ {OpCodeDictKeys.TIMEOUT: 2.0},
+ ),
}
service_ccsds_handler_tuple = ("CCSDS Handler", op_code_dict_srv_ccsds_handler)
@@ -580,9 +425,6 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
"Syrlinks Handler",
op_code_dict_srv_syrlinks_handler,
)
-
- service_op_code_dict[CustomServiceList.PDU1.value] = service_pdu1_tuple
- # service_op_code_dict[CustomServiceList.PDU2.value] = service_pdu2_tuple
service_op_code_dict[CustomServiceList.HEATER.value] = service_heater_tuple
service_op_code_dict[CustomServiceList.IMTQ.value] = service_imtq_tuple
service_op_code_dict[CustomServiceList.REACTION_WHEEL_1.value] = service_rw_tuple
diff --git a/config/object_ids.py b/config/object_ids.py
index 387418a..2bbe62d 100644
--- a/config/object_ids.py
+++ b/config/object_ids.py
@@ -15,6 +15,7 @@ P60_DOCK_HANDLER = bytes([0x44, 0x25, 0x00, 0x00])
PDU_1_HANDLER_ID = bytes([0x44, 0x25, 0x00, 0x01])
PDU_2_HANDLER_ID = bytes([0x44, 0x25, 0x00, 0x02])
ACU_HANDLER_ID = bytes([0x44, 0x25, 0x00, 0x03])
+BPX_HANDLER_ID = bytes([0x44, 0x26, 0x00, 0x00])
# Thermal Object IDs
HEATER_ID = bytes([0x44, 0x41, 0x00, 0xA4])
diff --git a/pus_tc/bpx_batt.py b/pus_tc/bpx_batt.py
new file mode 100644
index 0000000..a9f6311
--- /dev/null
+++ b/pus_tc/bpx_batt.py
@@ -0,0 +1,56 @@
+from tmtccmd.tc.definitions import TcQueueT, QueueCommands
+from config.object_ids import BPX_HANDLER_ID
+from tmtccmd.tc.service_8_functional_cmd import generate_action_command
+from tmtccmd.tc.service_3_housekeeping import generate_one_hk_command, make_sid
+
+
+class BpxSetIds:
+ GET_HK_SET = 0
+ GET_CFG_SET = 5
+
+
+class BpxActionIds:
+ REBOOT = 2
+ RESET_COUNTERS = 3
+ SET_CFG = 4
+ GET_CFG = 5
+
+
+class BpxOpCodes:
+ HK = ["0", "hk"]
+ RST_BOOT_CNT = ["1", "rst_boot_cnt"]
+ REQUEST_CFG = ["2", "cfg"]
+ REQUEST_CFG_HK = ["3", "cfg_hk"]
+ REBOOT = ["4", "reboot"]
+
+
+def pack_bpx_commands(tc_queue: TcQueueT, op_code: str):
+ if op_code in BpxOpCodes.HK:
+ tc_queue.appendleft((QueueCommands.PRINT, "Requesting BPX battery HK set"))
+ sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetIds.GET_HK_SET)
+ cmd = generate_one_hk_command(sid=sid, ssc=0)
+ tc_queue.appendleft(cmd.pack_command_tuple())
+ if op_code in BpxOpCodes.RST_BOOT_CNT:
+ tc_queue.appendleft((QueueCommands.PRINT, "Resetting reboot counters"))
+ cmd = generate_action_command(
+ object_id=BPX_HANDLER_ID, action_id=BpxActionIds.RESET_COUNTERS
+ )
+ tc_queue.appendleft(cmd.pack_command_tuple())
+ if op_code in BpxOpCodes.REQUEST_CFG:
+ tc_queue.appendleft((QueueCommands.PRINT, "Requesting configuration struct"))
+ cmd = generate_action_command(
+ object_id=BPX_HANDLER_ID, action_id=BpxActionIds.GET_CFG
+ )
+ tc_queue.appendleft(cmd.pack_command_tuple())
+ if op_code in BpxOpCodes.REQUEST_CFG_HK:
+ tc_queue.appendleft((QueueCommands.PRINT, "Requesting configuration struct HK"))
+ sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetIds.GET_CFG_SET)
+ cmd = generate_one_hk_command(sid=sid, ssc=0)
+ tc_queue.appendleft(cmd.pack_command_tuple())
+ if op_code in BpxOpCodes.REBOOT:
+ tc_queue.appendleft((QueueCommands.PRINT, "Rebooting BPX battery"))
+ cmd = generate_action_command(
+ object_id=BPX_HANDLER_ID, action_id=BpxActionIds.REBOOT
+ )
+ tc_queue.appendleft(cmd.pack_command_tuple())
+ pass
diff --git a/pus_tc/ccsds_handler.py b/pus_tc/ccsds_handler.py
index acf004f..a7e400d 100644
--- a/pus_tc/ccsds_handler.py
+++ b/pus_tc/ccsds_handler.py
@@ -43,22 +43,26 @@ def pack_ccsds_handler_test(
)
if op_code == "0":
tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Set low rate"))
- command = object_id + struct.pack('!I', CommandIds.SET_LOW_RATE)
+ command = object_id + struct.pack("!I", CommandIds.SET_LOW_RATE)
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "1":
tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Set high rate"))
- command = object_id + struct.pack('!I', CommandIds.SET_HIGH_RATE)
+ command = object_id + struct.pack("!I", CommandIds.SET_HIGH_RATE)
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "2":
- tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Enables the transmitter"))
- command = object_id + struct.pack('!I', CommandIds.EN_TRANSMITTER)
+ tc_queue.appendleft(
+ (QueueCommands.PRINT, "CCSDS Handler: Enables the transmitter")
+ )
+ command = object_id + struct.pack("!I", CommandIds.EN_TRANSMITTER)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "3":
- tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Disables the transmitter"))
- command = object_id + struct.pack('!I', CommandIds.DIS_TRANSMITTER)
+ tc_queue.appendleft(
+ (QueueCommands.PRINT, "CCSDS Handler: Disables the transmitter")
+ )
+ command = object_id + struct.pack("!I", CommandIds.DIS_TRANSMITTER)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "4":
@@ -66,27 +70,45 @@ def pack_ccsds_handler_test(
(QueueCommands.PRINT, "CCSDS Handler: Set arbitrary bitrate")
)
bitrate = int(input("Specify bit rate (bps): "))
- command = object_id + struct.pack('!I', CommandIds.ARBITRARY_BITRATE) + struct.pack('!I', bitrate)
+ command = (
+ object_id
+ + struct.pack("!I", CommandIds.ARBITRARY_BITRATE)
+ + struct.pack("!I", bitrate)
+ )
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "5":
- tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Enable tx clock manipulator"))
- command = object_id + struct.pack('!I', CommandIds.ENABLE_TX_CLK_MANIPULATOR)
+ tc_queue.appendleft(
+ (QueueCommands.PRINT, "CCSDS Handler: Enable tx clock manipulator")
+ )
+ command = object_id + struct.pack("!I", CommandIds.ENABLE_TX_CLK_MANIPULATOR)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "6":
- tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Disable tx clock manipulator"))
- command = object_id + struct.pack('!I', CommandIds.DISABLE_TX_CLK_MANIPULATOR)
+ tc_queue.appendleft(
+ (QueueCommands.PRINT, "CCSDS Handler: Disable tx clock manipulator")
+ )
+ command = object_id + struct.pack("!I", CommandIds.DISABLE_TX_CLK_MANIPULATOR)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "7":
- tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Update tx data on rising edge of tx clock"))
- command = object_id + struct.pack('!I', CommandIds.UPDATE_ON_RISING_EDGE)
+ tc_queue.appendleft(
+ (
+ QueueCommands.PRINT,
+ "CCSDS Handler: Update tx data on rising edge of tx clock",
+ )
+ )
+ command = object_id + struct.pack("!I", CommandIds.UPDATE_ON_RISING_EDGE)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "8":
- tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Update tx data on falling edge of tx clock"))
- command = object_id + struct.pack('!I', CommandIds.UPDATE_ON_FALLING_EDGE)
+ tc_queue.appendleft(
+ (
+ QueueCommands.PRINT,
+ "CCSDS Handler: Update tx data on falling edge of tx clock",
+ )
+ )
+ command = object_id + struct.pack("!I", CommandIds.UPDATE_ON_FALLING_EDGE)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
diff --git a/pus_tc/pdu1.py b/pus_tc/pdu1.py
index a7bd943..95419f6 100644
--- a/pus_tc/pdu1.py
+++ b/pus_tc/pdu1.py
@@ -27,6 +27,9 @@ class Pdu1OpCodes(enum.Enum):
SYRLINKS_OFF = "10"
MGT_ON = "11"
MGT_OFF = "12"
+ # Solar Cell Experiment
+ SCEX_ON = "13"
+ SCEX_OFF = "14"
class PDU1TestProcedure:
@@ -130,6 +133,24 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
Channel.off,
)
tc_queue.appendleft(command.pack_command_tuple())
+ if op_code == Pdu1OpCodes.SCEX_ON.value:
+ tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn Solar Cell Experiment on"))
+ command = pack_set_param_command(
+ object_id,
+ PDUConfigTable.out_en_5.parameter_address,
+ PDUConfigTable.out_en_5.parameter_size,
+ Channel.on,
+ )
+ tc_queue.appendleft(command.pack_command_tuple())
+ if op_code == Pdu1OpCodes.SCEX_OFF.value:
+ tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn Solar Cell Experiment off"))
+ command = pack_set_param_command(
+ object_id,
+ PDUConfigTable.out_en_5.parameter_address,
+ PDUConfigTable.out_en_5.parameter_size,
+ Channel.off,
+ )
+ tc_queue.appendleft(command.pack_command_tuple())
if op_code == GomspaceOpCodes.PRINT_SWITCH_V_I.value:
tc_queue.appendleft(
(QueueCommands.PRINT, "PDU1: Print Switches, Voltages, Currents")
diff --git a/pus_tc/star_tracker.py b/pus_tc/star_tracker.py
index 8112271..148d4a6 100644
--- a/pus_tc/star_tracker.py
+++ b/pus_tc/star_tracker.py
@@ -501,9 +501,14 @@ def pack_star_tracker_commands(
command = PusTelecommand(service=8, subservice=128, ssc=73, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "51":
- tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Set image processor parameters"))
- command = object_id + struct.pack('!I', StarTrackerActionIds.IMAGE_PROCESSOR) + \
- bytearray(ImagePathDefs.jsonFile, 'utf-8')
+ tc_queue.appendleft(
+ (QueueCommands.PRINT, "Star tracker: Set image processor parameters")
+ )
+ command = (
+ object_id
+ + struct.pack("!I", StarTrackerActionIds.IMAGE_PROCESSOR)
+ + bytearray(ImagePathDefs.jsonFile, "utf-8")
+ )
command = PusTelecommand(service=8, subservice=128, ssc=70, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
diff --git a/pus_tc/tc_packer_hook.py b/pus_tc/tc_packer_hook.py
index 853953c..0989deb 100644
--- a/pus_tc/tc_packer_hook.py
+++ b/pus_tc/tc_packer_hook.py
@@ -14,6 +14,7 @@ from pus_tc.service_200_mode import pack_service200_test_into
from pus_tc.p60dock import pack_p60dock_test_into
from pus_tc.pdu2 import pack_pdu2_commands
from pus_tc.pdu1 import pack_pdu1_commands
+from pus_tc.bpx_batt import pack_bpx_commands
from pus_tc.acu import pack_acu_test_into
from pus_tc.imtq import pack_imtq_test_into
from pus_tc.tmp1075 import pack_tmp1075_test_into
@@ -94,6 +95,8 @@ def pack_service_queue_user(
return pack_acu_test_into(
object_id=object_id, tc_queue=service_queue, op_code=op_code
)
+ if service == CustomServiceList.BPX_BATTERY.value:
+ return pack_bpx_commands(tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.TMP1075_1.value:
object_id = TMP_1075_1_HANDLER_ID
return pack_tmp1075_test_into(
diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py
index cb6896a..311a7eb 100644
--- a/pus_tm/hk_handling.py
+++ b/pus_tm/hk_handling.py
@@ -2,10 +2,11 @@
import struct
import os
import datetime
-from typing import Tuple
+from tmtccmd.config.definitions import HkReplyUnpacked
from tmtccmd.tm.service_3_housekeeping import Service3Base
from tmtccmd.utility.logger import get_console_logger
+from pus_tc.bpx_batt import BpxSetIds
from pus_tc.syrlinks_hk_handler import SetIds
from pus_tc.imtq import ImtqSetIds
from config.object_ids import (
@@ -13,6 +14,7 @@ from config.object_ids import (
IMTQ_HANDLER_ID,
GPS_HANDLER_0_ID,
GPS_HANDLER_1_ID,
+ BPX_HANDLER_ID,
)
LOGGER = get_console_logger()
@@ -20,7 +22,7 @@ LOGGER = get_console_logger()
def handle_user_hk_packet(
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
-) -> Tuple[list, list, bytearray, int]:
+) -> HkReplyUnpacked:
"""This function is called when a Service 3 Housekeeping packet is received."""
if object_id == SYRLINKS_HANDLER_ID:
if set_id == SetIds.RX_REGISTERS_DATASET:
@@ -29,7 +31,6 @@ def handle_user_hk_packet(
return handle_syrlinks_tx_registers_dataset(hk_data)
else:
LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
- return [], [], bytearray(), 0
elif object_id == IMTQ_HANDLER_ID:
if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and (
set_id <= ImtqSetIds.NEGATIVE_Z_TEST
@@ -37,21 +38,20 @@ def handle_user_hk_packet(
return handle_self_test_data(hk_data)
else:
LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
- return [], [], bytearray(), 0
elif object_id == GPS_HANDLER_0_ID or object_id == GPS_HANDLER_1_ID:
return handle_gps_data(hk_data=hk_data)
+ elif object_id == BPX_HANDLER_ID:
+ return handle_bpx_hk_data(hk_data=hk_data, set_id=set_id)
else:
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.")
- return [], [], bytearray(), 0
+ return HkReplyUnpacked()
def handle_syrlinks_rx_registers_dataset(
hk_data: bytearray,
-) -> Tuple[list, list, bytearray, int]:
- hk_header = []
- hk_content = []
- validity_buffer = bytearray()
- hk_header = [
+) -> HkReplyUnpacked:
+ reply = HkReplyUnpacked()
+ reply.header_list = [
"RX Status",
"RX Sensitivity",
"RX Frequency Shift",
@@ -69,7 +69,7 @@ def handle_syrlinks_rx_registers_dataset(
rx_demod_eb = struct.unpack("!I", hk_data[13:17])
rx_demod_n0 = struct.unpack("!I", hk_data[17:21])
rx_data_rate = hk_data[21]
- hk_content = [
+ reply.content_list = [
rx_status,
rx_sensitivity,
rx_frequency_shift,
@@ -79,28 +79,28 @@ def handle_syrlinks_rx_registers_dataset(
rx_demod_n0,
rx_data_rate,
]
- return hk_header, hk_content, validity_buffer, 8
+ reply.validity_buffer = hk_data[22:]
+ reply.num_of_vars = 8
+ return reply
def handle_syrlinks_tx_registers_dataset(
hk_data: bytearray,
-) -> Tuple[list, list, bytearray, int]:
- hk_header = []
- hk_content = []
- validity_buffer = bytearray()
- hk_header = ["TX Status", "TX Waveform", "TX AGC value"]
+) -> HkReplyUnpacked:
+ reply = HkReplyUnpacked()
+ reply.header_list = ["TX Status", "TX Waveform", "TX AGC value"]
tx_status = hk_data[0]
tx_waveform = hk_data[1]
tx_agc_value = struct.unpack("!H", hk_data[2:4])
- hk_content = [tx_status, tx_waveform, tx_agc_value]
- return hk_header, hk_content, validity_buffer, 3
+ reply.content_list = [tx_status, tx_waveform, tx_agc_value]
+ reply.validity_buffer = hk_data[4:]
+ reply.num_of_vars = 3
+ return reply
-def handle_self_test_data(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
- hk_header = []
- hk_content = []
- validity_buffer = bytearray()
- hk_header = [
+def handle_self_test_data(hk_data: bytearray) -> HkReplyUnpacked:
+ reply = HkReplyUnpacked()
+ reply.hk_header = [
"Init Err",
"Init Raw Mag X [nT]",
"Init Raw Mag Y [nT]",
@@ -186,7 +186,8 @@ def handle_self_test_data(hk_data: bytearray) -> Tuple[list, list, bytearray, in
fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0]
fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0]
- hk_content = [
+ reply.validity_buffer = hk_data[129:]
+ reply.content_list = [
init_err,
init_raw_mag_x,
init_raw_mag_y,
@@ -227,43 +228,49 @@ def handle_self_test_data(hk_data: bytearray) -> Tuple[list, list, bytearray, in
fina_coil_y_temperature,
fina_coil_z_temperature,
]
-
- return hk_header, hk_content, validity_buffer, len(hk_header)
+ reply.num_of_vars = len(reply.hk_header)
+ return reply
-def handle_gps_data(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
+def handle_gps_data(hk_data: bytearray) -> HkReplyUnpacked:
LOGGER.info(f"Received GPS data, HK data length {len(hk_data)}")
+ reply = HkReplyUnpacked()
var_index = 0
header_array = []
content_array = []
+ reply.header_list = [
+ "Latitude",
+ "Longitude",
+ "Altitude",
+ "Fix Mode",
+ "Sats in Use",
+ "Date",
+ "Unix Seconds",
+ ]
latitude = struct.unpack("!d", hk_data[0:8])[0]
- header_array.append("Latitude")
- content_array.append(latitude)
longitude = struct.unpack("!d", hk_data[8:16])[0]
- header_array.append("Longitude")
- content_array.append(longitude)
altitude = struct.unpack("!d", hk_data[16:24])[0]
- header_array.append("Altitude")
- content_array.append(altitude)
fix_mode = hk_data[24]
- header_array.append("Fix Mode")
- content_array.append(fix_mode)
sat_in_use = hk_data[25]
- header_array.append("Sats in Use")
- content_array.append(sat_in_use)
year = struct.unpack("!H", hk_data[26:28])[0]
month = hk_data[28]
day = hk_data[29]
hours = hk_data[30]
minutes = hk_data[31]
seconds = hk_data[32]
- header_array.append("Date")
date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}"
- content_array.append(date_string)
unix_seconds = struct.unpack("!I", hk_data[33:37])[0]
- header_array.append("Unix Seconds")
- content_array.append(unix_seconds)
+ content_array = [
+ latitude,
+ longitude,
+ altitude,
+ fix_mode,
+ sat_in_use,
+ date_string,
+ unix_seconds,
+ ]
var_index += 13
+ reply.num_of_vars = var_index
if not os.path.isfile("gps_log.txt"):
with open("gps_log.txt", "w") as gps_file:
gps_file.write(
@@ -275,5 +282,58 @@ def handle_gps_data(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
f"{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, "
f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n"
)
- validity_buffer = hk_data[37:39]
- return header_array, content_array, validity_buffer, var_index
+ reply.validity_buffer = hk_data[37:39]
+ return reply
+
+
+def handle_bpx_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked:
+ LOGGER.info(f"Received BPX data, HK data length {len(hk_data)}")
+ reply = HkReplyUnpacked()
+ if set_id == BpxSetIds.GET_HK_SET:
+ charge_current = struct.unpack("!H", hk_data[0:2])[0]
+ discharge_current = struct.unpack("!H", hk_data[2:4])[0]
+ heater_current = struct.unpack("!H", hk_data[4:6])[0]
+ batt_voltage = struct.unpack("!H", hk_data[6:8])[0]
+ batt_temp_1 = struct.unpack("!h", hk_data[8:10])[0]
+ batt_temp_2 = struct.unpack("!h", hk_data[10:12])[0]
+ batt_temp_3 = struct.unpack("!h", hk_data[12:14])[0]
+ batt_temp_4 = struct.unpack("!h", hk_data[14:16])[0]
+ reboot_cntr = struct.unpack("!I", hk_data[16:20])[0]
+ boot_cause = hk_data[20]
+ reply.header_list = [
+ "Charge Current",
+ "Discharge Current",
+ "Heater Current",
+ "Battery Voltage",
+ "Batt Temp 1",
+ "Batt Temp 2",
+ "Batt Temp 3",
+ "Batt Temp 4",
+ "Reboot Counter",
+ "Boot Cause",
+ ]
+ reply.content_list = [
+ charge_current,
+ discharge_current,
+ heater_current,
+ batt_voltage,
+ batt_temp_1,
+ batt_temp_2,
+ batt_temp_3,
+ batt_temp_4,
+ reboot_cntr,
+ boot_cause,
+ ]
+ reply.validity_buffer = hk_data[21:]
+ elif set_id == BpxSetIds.GET_CFG_SET:
+ battheat_mode = hk_data[0]
+ battheat_low = struct.unpack("!b", hk_data[1:2])[0]
+ battheat_high = struct.unpack("!b", hk_data[2:3])[0]
+ reply.header_list = [
+ "Battery Heater Mode",
+ "Battery Heater Low Limit",
+ "Battery Heater High Limit",
+ ]
+ reply.content_list = [battheat_mode, battheat_low, battheat_high]
+ reply.validity_buffer = hk_data[3:]
+ return reply
diff --git a/pus_tm/service_8_hook.py b/pus_tm/service_8_hook.py
index f1910b6..735e64f 100644
--- a/pus_tm/service_8_hook.py
+++ b/pus_tm/service_8_hook.py
@@ -1,18 +1,18 @@
import struct
-from typing import Tuple
from config.object_ids import *
from pus_tc.imtq import ImtqActionIds
from pus_tc.ploc_mpsoc import PlocReplyIds
from pus_tc.ploc_supervisor import SupvActionIds
from pus_tc.star_tracker import StarTrackerActionIds
from tmtccmd.utility.logger import get_console_logger
+from tmtccmd.config.definitions import DataReplyUnpacked
LOGGER = get_console_logger()
def user_analyze_service_8_data(
object_id: bytes, action_id: int, custom_data: bytearray
-) -> Tuple[list, list]:
+) -> DataReplyUnpacked:
"""
This function is called by the TMTC core if a Service 8 data reply (subservice 130)
is received. The user can return a tuple of two lists, where the first list
@@ -25,15 +25,16 @@ def user_analyze_service_8_data(
@return:
"""
if object_id == PDU_2_HANDLER_ID:
- header_list = ["PDU2 Service 8 Reply"]
-
+ reply = DataReplyUnpacked()
+ reply.header_list = ["PDU2 Service 8 Reply"]
data_string = str()
for index in range(len(custom_data)):
data_string += str(hex(custom_data[index])) + " , "
data_string = data_string.rstrip()
data_string = data_string.rstrip(",")
data_string = data_string.rstrip()
- content_list = [data_string]
+ reply.content_list = [data_string]
+ return reply
elif object_id == IMTQ_HANDLER_ID:
return handle_imtq_replies(action_id, custom_data)
elif object_id == PLOC_MPSOC_ID:
@@ -42,56 +43,58 @@ def user_analyze_service_8_data(
return handle_supervisor_replies(action_id, custom_data)
elif object_id == STAR_TRACKER_ID:
return handle_startracker_replies(action_id, custom_data)
- else:
- header_list = []
- content_list = []
- return header_list, content_list
+ return DataReplyUnpacked()
-def handle_imtq_replies(action_id: int, custom_data: bytearray) -> Tuple[list, list]:
- header_list = []
- content_list = []
+def handle_imtq_replies(action_id: int, custom_data: bytearray) -> DataReplyUnpacked:
+ reply = DataReplyUnpacked()
if action_id == struct.unpack("!I", ImtqActionIds.get_commanded_dipole)[0]:
- header_list = ["Commanded X-Dipole", "Commanded Y-Dipole", "Commanded Z-Dipole"]
+ reply.header_list = [
+ "Commanded X-Dipole",
+ "Commanded Y-Dipole",
+ "Commanded Z-Dipole",
+ ]
x_dipole = struct.unpack("!H", custom_data[:2])
y_dipole = struct.unpack("!H", custom_data[2:4])
z_dipole = struct.unpack("!H", custom_data[4:6])
- content_list = [x_dipole[0], y_dipole[0], z_dipole[0]]
+ reply.content_list = [x_dipole[0], y_dipole[0], z_dipole[0]]
+ return reply
-def handle_ploc_replies(action_id: int, custom_data: bytearray) -> Tuple[list, list]:
- header_list = []
- content_list = []
+def handle_ploc_replies(action_id: int, custom_data: bytearray) -> DataReplyUnpacked:
+ reply = DataReplyUnpacked()
if action_id == PlocReplyIds.tm_mem_read_report:
- header_list = ["PLOC Memory Address", "PLOC Mem Len", "PLOC Read Memory Data"]
- content_list = [custom_data[:4], custom_data[4:6], custom_data[6:10]]
- return header_list, content_list
+ reply.header_list = [
+ "PLOC Memory Address",
+ "PLOC Mem Len",
+ "PLOC Read Memory Data",
+ ]
+ reply.content_list = [custom_data[:4], custom_data[4:6], custom_data[6:10]]
+ return reply
def handle_supervisor_replies(
action_id: int, custom_data: bytearray
-) -> Tuple[list, list]:
- header_list = []
- content_list = []
+) -> DataReplyUnpacked:
+ reply = DataReplyUnpacked()
if action_id == SupvActionIds.DUMP_MRAM:
- header_list = ["MRAM Dump"]
- content_list = [custom_data[: len(custom_data)]]
- return header_list, content_list
+ reply.header_list = ["MRAM Dump"]
+ reply.content_list = [custom_data[: len(custom_data)]]
+ return reply
def handle_startracker_replies(
action_id: int, custom_data: bytearray
-) -> Tuple[list, list]:
- header_list = []
- content_list = []
+) -> DataReplyUnpacked:
+ reply = DataReplyUnpacked()
if action_id == StarTrackerActionIds.CHECKSUM:
if len(custom_data) != 5:
LOGGER.warning(
"Star tracker reply has invalid length {0}".format(len(custom_data))
)
- return header_list, content_list
- header_list = ["Checksum", "Checksum valid"]
+ return reply
+ reply.header_list = ["Checksum", "Checksum valid"]
print(custom_data[4])
checksum_valid_flag = custom_data[4] >> 8
- content_list = ["0x" + custom_data[:4].hex(), checksum_valid_flag]
- return header_list, content_list
+ reply.content_list = ["0x" + custom_data[:4].hex(), checksum_valid_flag]
+ return reply
diff --git a/requirements.txt b/requirements.txt
index 344657c..4545207 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1 +1 @@
-tmtccmd>=1.10.2
+tmtccmd>=1.11.0
diff --git a/spacepackets b/spacepackets
index 0039a0e..3265de6 160000
--- a/spacepackets
+++ b/spacepackets
@@ -1 +1 @@
-Subproject commit 0039a0ec67217765b9dabfbc35dcb34b6ff81c08
+Subproject commit 3265de69717e2f718f4c740d77a823f9811f8348
diff --git a/tmtccmd b/tmtccmd
index bfa459c..49cf288 160000
--- a/tmtccmd
+++ b/tmtccmd
@@ -1 +1 @@
-Subproject commit bfa459ccc3c7189a77374a68f0217d448525b34b
+Subproject commit 49cf288831216c0680aedab88e31d684ba5b8da8