diff --git a/.idea/runConfigurations/Unittests_in_tmtccmd.xml b/.idea/runConfigurations/Unittests_in_tmtccmd.xml
index f6549f0..5cdbb08 100644
--- a/.idea/runConfigurations/Unittests_in_tmtccmd.xml
+++ b/.idea/runConfigurations/Unittests_in_tmtccmd.xml
@@ -4,7 +4,7 @@
-
+
diff --git a/.idea/runConfigurations/tmtccli_example.xml b/.idea/runConfigurations/tmtccli_example.xml
new file mode 100644
index 0000000..4760217
--- /dev/null
+++ b/.idea/runConfigurations/tmtccli_example.xml
@@ -0,0 +1,24 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/runConfigurations/tmtcgui_example.xml b/.idea/runConfigurations/tmtcgui_example.xml
new file mode 100644
index 0000000..7234a88
--- /dev/null
+++ b/.idea/runConfigurations/tmtcgui_example.xml
@@ -0,0 +1,24 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/config/custom_mode_op.py b/config/custom_mode_op.py
index 159e350..64b9033 100644
--- a/config/custom_mode_op.py
+++ b/config/custom_mode_op.py
@@ -6,7 +6,7 @@
import enum
from tmtccmd.core.backend import TmTcHandler
-from tmtccmd.utility.logger import get_console_logger
+from tmtccmd.logging import get_console_logger
LOGGER = get_console_logger()
diff --git a/config/globals_config.py b/config/globals_config.py
index 947558b..bb2640b 100644
--- a/config/globals_config.py
+++ b/config/globals_config.py
@@ -12,11 +12,8 @@ import argparse
from config.definitions import CustomServiceList, PUS_APID
from config.custom_mode_op import CustomModeList
from tmtccmd.config.definitions import CoreComInterfaces
-from tmtccmd.config.globals import (
- set_default_globals_pre_args_parsing,
- set_default_globals_post_args_parsing,
-)
-from tmtccmd.utility.logger import get_console_logger
+from tmtccmd.config.globals import set_default_globals_pre_args_parsing
+from tmtccmd.logging import get_console_logger
LOGGER = get_console_logger()
@@ -34,12 +31,3 @@ def set_globals_pre_args_parsing(gui: bool = False):
tm_apid=PUS_APID,
com_if_id=CoreComInterfaces.TCPIP_UDP.value,
)
-
-
-def add_globals_post_args_parsing(args: argparse.Namespace, json_cfg_path: str):
- set_default_globals_post_args_parsing(
- args=args,
- custom_services_list=[CustomServiceList],
- custom_modes_list=[CustomModeList],
- json_cfg_path=json_cfg_path,
- )
diff --git a/config/hook_implementations.py b/config/hook_implementations.py
index 177e6f9..aa7bdf8 100644
--- a/config/hook_implementations.py
+++ b/config/hook_implementations.py
@@ -13,7 +13,7 @@ from tmtccmd.pus.obj_id import ObjectIdDictT
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.core.backend import TmTcHandler
from tmtccmd.config.hook import TmTcHookBase
-from tmtccmd.utility.tmtc_printer import TmTcPrinter
+from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.config.globals import OpCodeDictKeys
from config.definitions import CustomServiceList
@@ -21,43 +21,24 @@ from config.retvals import get_retval_dict
class EiveHookObject(TmTcHookBase):
+ def __init__(self, json_cfg_path: str):
+ super().__init__(json_cfg_path=json_cfg_path)
+
def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT:
from tmtccmd.config.globals import get_default_service_op_code_dict
service_op_code_dict = get_default_service_op_code_dict()
-
get_eive_service_op_code_dict(service_op_code_dict=service_op_code_dict)
-
return service_op_code_dict
- def get_json_config_file_path(self) -> str:
- """The user can specify a path and filename for the JSON configuration file by overriding
- this function.
- :return:
- """
- return "config/tmtc_config.json"
-
- def add_globals_pre_args_parsing(self, gui: bool = False):
- from config.globals_config import set_globals_pre_args_parsing
-
- set_globals_pre_args_parsing(gui=gui)
-
- def add_globals_post_args_parsing(self, args: argparse.Namespace):
- from config.globals_config import add_globals_post_args_parsing
-
- add_globals_post_args_parsing(
- args=args, json_cfg_path=self.get_json_config_file_path()
- )
-
def assign_communication_interface(
- self, com_if_key: str, tmtc_printer: TmTcPrinter
+ self, com_if_key: str
) -> Union[CommunicationInterface, None]:
from tmtccmd.config.com_if import create_communication_interface_default
return create_communication_interface_default(
com_if_key=com_if_key,
- tmtc_printer=tmtc_printer,
- json_cfg_path=self.get_json_config_file_path(),
+ json_cfg_path=self.json_cfg_path,
space_packet_ids=(0x0865,),
)
@@ -78,39 +59,6 @@ class EiveHookObject(TmTcHookBase):
return get_object_ids()
- @staticmethod
- def handle_service_8_telemetry(
- object_id: bytes, action_id: int, custom_data: bytearray
- ) -> DataReplyUnpacked:
- from pus_tm.service_8_hook import user_analyze_service_8_data
-
- return user_analyze_service_8_data(
- object_id=object_id, action_id=action_id, custom_data=custom_data
- )
-
- @staticmethod
- def handle_service_3_housekeeping(
- object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
- ) -> HkReplyUnpacked:
- from pus_tm.hk_handling import handle_user_hk_packet
-
- return handle_user_hk_packet(
- object_id=object_id,
- set_id=set_id,
- hk_data=hk_data,
- service3_packet=service3_packet,
- )
-
- @staticmethod
- def handle_service_5_event(
- object_id: bytes, event_id: int, param_1: int, param_2: int
- ) -> str:
- from pus_tm.event_handler import handle_event_packet
-
- return handle_event_packet(
- object_id=object_id, event_id=event_id, param_1=param_1, param_2=param_2
- )
-
def get_retval_dict(self) -> RetvalDictT:
return get_retval_dict()
diff --git a/config/object_ids.py b/config/object_ids.py
index 08ecc6c..a70adb6 100644
--- a/config/object_ids.py
+++ b/config/object_ids.py
@@ -5,8 +5,9 @@
"""
import os.path
from tmtccmd.pus.obj_id import ObjectIdDictT
-from tmtccmd.utility.fsfw import parse_fsfw_objects_csv
-from tmtccmd.utility.logger import get_console_logger
+from tmtccmd.fsfw import parse_fsfw_objects_csv
+from tmtccmd.logging import get_console_logger
+
LOGGER = get_console_logger()
DEFAULT_OBJECTS_CSV_PATH = "config/objects.csv"
diff --git a/config/retvals.py b/config/retvals.py
index ab1a78d..d6f7be9 100644
--- a/config/retvals.py
+++ b/config/retvals.py
@@ -1,6 +1,6 @@
import os
-from tmtccmd.utility.fsfw import parse_fsfw_returnvalues_csv, RetvalDictT
-from tmtccmd.utility.logger import get_console_logger
+from tmtccmd.fsfw import parse_fsfw_returnvalues_csv, RetvalDictT
+from tmtccmd.logging import get_console_logger
DEFAULT_RETVAL_CSV_NAME = "config/returnvalues.csv"
__RETVAL_DICT = None
diff --git a/config/version.py b/config/version.py
index eb51c97..56db5af 100644
--- a/config/version.py
+++ b/config/version.py
@@ -1,6 +1,6 @@
SW_NAME = "eive"
VERSION_MAJOR = 1
-VERSION_MINOR = 8
+VERSION_MINOR = 9
VERSION_SUBMINOR = 0
-__version__ = "1.8.0"
+__version__ = "1.9.0"
diff --git a/gomspace/gomspace_common.py b/gomspace/gomspace_common.py
index 4130665..546730f 100644
--- a/gomspace/gomspace_common.py
+++ b/gomspace/gomspace_common.py
@@ -23,11 +23,11 @@ class GomspaceDeviceActionIds(enum.IntEnum):
PRINT_LATCHUPS = 33
-class GomspaceOpCodes(enum.Enum):
+class GomspaceOpCodes:
# Request HK
- REQUEST_HK_ONCE = "128"
- PRINT_SWITCH_V_I = "129"
- PRINT_LATCHUPS = "130"
+ REQUEST_HK_ONCE = ["req-hk-once", "128"]
+ PRINT_SWITCH_V_I = ["print-switch-vi", "129"]
+ PRINT_LATCHUPS = ["print-latchups", "130"]
class SetIds:
diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py
index a41d75e..fafaa78 100644
--- a/pus_tc/cmd_definitions.py
+++ b/pus_tc/cmd_definitions.py
@@ -180,47 +180,47 @@ def add_pl_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
- from pus_tc.devs.p60dock import P60OpCodes, GomspaceOpCodes
+ from pus_tc.devs.p60dock import P60OpCodes, GomspaceOpCodes, Info
from pus_tc.devs.pdu1 import Pdu1OpCodes
from pus_tc.devs.pdu2 import Pdu2OpCodes
op_code_dict = dict()
- add_op_code_entry(op_code_dict=op_code_dict, keys="0", info="P60 Tests")
add_op_code_entry(
op_code_dict=op_code_dict,
- keys=P60OpCodes.STACK_3V3_ON.value,
- info="P60 Dock: Turn stack 3V3 on",
+ keys=P60OpCodes.STACK_3V3_ON,
+ info=Info.STACK_3V3_ON,
)
add_op_code_entry(
op_code_dict=op_code_dict,
- keys=P60OpCodes.STACK_3V3_OFF.value,
- info="P60 Dock: Turn stack 3V3 off",
+ keys=P60OpCodes.STACK_3V3_OFF,
+ info=Info.STACK_3V3_OFF,
)
add_op_code_entry(
op_code_dict=op_code_dict,
- keys=P60OpCodes.STACK_5V_ON.value,
- info="P60 Dock: Turn stack 5V on",
+ keys=P60OpCodes.STACK_5V_ON,
+ info=Info.STACK_5V_ON,
)
add_op_code_entry(
op_code_dict=op_code_dict,
- keys=P60OpCodes.STACK_5V_OFF.value,
- info="P60 Dock: Turn stack 5V off",
+ keys=P60OpCodes.STACK_5V_OFF,
+ info=Info.STACK_5V_OFF,
)
add_op_code_entry(
op_code_dict=op_code_dict,
- keys=GomspaceOpCodes.REQUEST_HK_ONCE.value,
+ keys=GomspaceOpCodes.REQUEST_HK_ONCE,
info="P60 Dock: Request HK once",
)
add_op_code_entry(
op_code_dict=op_code_dict,
- keys=GomspaceOpCodes.PRINT_SWITCH_V_I.value,
+ keys=GomspaceOpCodes.PRINT_SWITCH_V_I,
info="P60 Dock: Print Switches, Voltages, Currents",
)
add_op_code_entry(
op_code_dict=op_code_dict,
- keys=GomspaceOpCodes.PRINT_LATCHUPS.value,
+ keys=GomspaceOpCodes.PRINT_LATCHUPS,
info="P60 Dock: Print Latchups",
)
+ add_op_code_entry(op_code_dict=op_code_dict, keys=P60OpCodes.TEST, info="P60 Tests")
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
name=CustomServiceList.P60DOCK.value,
@@ -311,17 +311,17 @@ def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
)
add_op_code_entry(
op_code_dict=op_code_dict,
- keys=GomspaceOpCodes.REQUEST_HK_ONCE.value,
+ keys=GomspaceOpCodes.REQUEST_HK_ONCE,
info="PDU1: Request HK once",
)
add_op_code_entry(
op_code_dict=op_code_dict,
- keys=GomspaceOpCodes.PRINT_SWITCH_V_I.value,
+ keys=GomspaceOpCodes.PRINT_SWITCH_V_I,
info="PDU1: Print Switches, Voltages, Currents",
)
add_op_code_entry(
op_code_dict=op_code_dict,
- keys=GomspaceOpCodes.PRINT_LATCHUPS.value,
+ keys=GomspaceOpCodes.PRINT_LATCHUPS,
info="PDU1: Print Latchups",
)
add_op_code_entry(
@@ -418,18 +418,18 @@ def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
)
add_op_code_entry(
op_code_dict=op_code_dict,
- keys=GomspaceOpCodes.REQUEST_HK_ONCE.value,
+ keys=GomspaceOpCodes.REQUEST_HK_ONCE,
info="PDU2: Request HK once",
)
add_op_code_entry(
op_code_dict=op_code_dict,
- keys=GomspaceOpCodes.PRINT_SWITCH_V_I.value,
+ keys=GomspaceOpCodes.PRINT_SWITCH_V_I,
info="PDU2: Print Switches, Voltages, Currents",
options={OpCodeDictKeys.TIMEOUT: 2.0},
)
add_op_code_entry(
op_code_dict=op_code_dict,
- keys=GomspaceOpCodes.PRINT_LATCHUPS.value,
+ keys=GomspaceOpCodes.PRINT_LATCHUPS,
info="PDU2: Print Latchups",
)
add_service_op_code_entry(
diff --git a/pus_tc/devs/p60dock.py b/pus_tc/devs/p60dock.py
index e9d08ee..2f358ca 100644
--- a/pus_tc/devs/p60dock.py
+++ b/pus_tc/devs/p60dock.py
@@ -7,17 +7,32 @@
"""
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
-from tmtccmd.tc.service_3_housekeeping import generate_one_hk_command, make_sid
+from tmtccmd.tc.service_3_housekeeping import (
+ generate_one_hk_command,
+ make_sid,
+ generate_one_diag_command,
+)
from gomspace.gomspace_common import *
from config.object_ids import P60_DOCK_HANDLER
-class P60OpCodes(enum.Enum):
- TEST = "0"
- STACK_3V3_ON = "1"
- STACK_3V3_OFF = "2"
- STACK_5V_ON = "3"
- STACK_5V_OFF = "4"
+HK_SET_ID = 0x3
+
+
+class P60OpCodes:
+ STACK_3V3_ON = ["stack-3v3-on", "1"]
+ STACK_3V3_OFF = ["stack-3v3-off", "2"]
+ STACK_5V_ON = ["stack-5v-on", "3"]
+ STACK_5V_OFF = ["stack-5v-off", "4"]
+ TEST = ["test", "0"]
+
+
+class Info:
+ PREFIX = "P60 Dock"
+ STACK_3V3_ON = f"{PREFIX}: Turn Stack 3V3 on"
+ STACK_3V3_OFF = f"{PREFIX}: Turn Stack 3V3 off"
+ STACK_5V_ON = f"{PREFIX}: Turn Stack 5V on"
+ STACK_5V_OFF = f"{PREFIX}: Turn Stack 5V off"
class P60DockTestProcedure:
@@ -76,8 +91,8 @@ class P60DockHkTable:
def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
- if op_code == P60OpCodes.STACK_3V3_ON.value:
- tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning stack 3V3 on"))
+ if op_code in P60OpCodes.STACK_3V3_ON:
+ tc_queue.appendleft((QueueCommands.PRINT, Info.STACK_3V3_ON))
command = pack_set_param_command(
object_id,
P60DockConfigTable.out_en_9.parameter_address,
@@ -85,8 +100,8 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
Channel.on,
)
tc_queue.appendleft(command.pack_command_tuple())
- if op_code == P60OpCodes.STACK_3V3_OFF.value:
- tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning stack 3V3 off"))
+ if op_code in P60OpCodes.STACK_3V3_OFF:
+ tc_queue.appendleft((QueueCommands.PRINT, Info.STACK_3V3_OFF))
command = pack_set_param_command(
object_id,
P60DockConfigTable.out_en_9.parameter_address,
@@ -94,8 +109,8 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
Channel.off,
)
tc_queue.appendleft(command.pack_command_tuple())
- if op_code == P60OpCodes.STACK_5V_ON.value:
- tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning stack 5V on"))
+ if op_code in P60OpCodes.STACK_5V_ON:
+ tc_queue.appendleft((QueueCommands.PRINT, Info.STACK_5V_ON))
command = pack_set_param_command(
object_id,
P60DockConfigTable.out_en_10.parameter_address,
@@ -103,8 +118,8 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
Channel.on,
)
tc_queue.appendleft(command.pack_command_tuple())
- if op_code == P60OpCodes.STACK_5V_OFF.value:
- tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning stack 5V off"))
+ if op_code in P60OpCodes.STACK_5V_OFF:
+ tc_queue.appendleft((QueueCommands.PRINT, Info.STACK_5V_OFF))
command = pack_set_param_command(
object_id,
P60DockConfigTable.out_en_10.parameter_address,
@@ -112,12 +127,12 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
Channel.off,
)
tc_queue.appendleft(command.pack_command_tuple())
- if op_code == GomspaceOpCodes.REQUEST_HK_ONCE.value:
+ if op_code in GomspaceOpCodes.REQUEST_HK_ONCE:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Requesting HK Table Once"))
hk_sid = make_sid(object_id=P60_DOCK_HANDLER, set_id=SetIds.P60_DOCK)
command = generate_one_hk_command(sid=hk_sid, ssc=0)
tc_queue.appendleft(command.pack_command_tuple())
- if op_code == GomspaceOpCodes.PRINT_SWITCH_V_I.value:
+ if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:
tc_queue.appendleft(
(QueueCommands.PRINT, "P60 Dock: Print Switches, Voltages, Currents")
)
@@ -125,7 +140,7 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I
)
tc_queue.appendleft(command.pack_command_tuple())
- if op_code == GomspaceOpCodes.PRINT_LATCHUPS.value:
+ if op_code in GomspaceOpCodes.PRINT_LATCHUPS:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Print Latchups"))
command = generate_action_command(
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS
diff --git a/pus_tc/devs/pdu1.py b/pus_tc/devs/pdu1.py
index eaaa3c1..80e3696 100644
--- a/pus_tc/devs/pdu1.py
+++ b/pus_tc/devs/pdu1.py
@@ -210,12 +210,12 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
Channel.off,
)
tc_queue.appendleft(command.pack_command_tuple())
- if op_code == GomspaceOpCodes.REQUEST_HK_ONCE.value:
+ if op_code in GomspaceOpCodes.REQUEST_HK_ONCE:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Requesting HK Table Once"))
hk_sid = make_sid(object_id=PDU_1_HANDLER_ID, set_id=SetIds.PDU_1)
command = generate_one_hk_command(sid=hk_sid, ssc=0)
tc_queue.appendleft(command.pack_command_tuple())
- if op_code == GomspaceOpCodes.PRINT_SWITCH_V_I.value:
+ if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:
tc_queue.appendleft(
(QueueCommands.PRINT, "PDU1: Print Switches, Voltages, Currents")
)
@@ -223,7 +223,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I
)
tc_queue.appendleft(command.pack_command_tuple())
- if op_code == GomspaceOpCodes.PRINT_LATCHUPS.value:
+ if op_code in GomspaceOpCodes.PRINT_LATCHUPS:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Print Latchups"))
command = generate_action_command(
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS
diff --git a/pus_tc/devs/pdu2.py b/pus_tc/devs/pdu2.py
index db800fa..5ac74af 100644
--- a/pus_tc/devs/pdu2.py
+++ b/pus_tc/devs/pdu2.py
@@ -215,9 +215,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == Pdu2OpCodes.PL_CAMERA_ON.value:
- tc_queue.appendleft(
- (QueueCommands.PRINT, "PDU2: Turn payload camera on")
- )
+ tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn payload camera on"))
command = pack_set_param_command(
object_id,
PDUConfigTable.out_en_8.parameter_address,
@@ -226,9 +224,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == Pdu2OpCodes.PL_CAMERA_OFF.value:
- tc_queue.appendleft(
- (QueueCommands.PRINT, "PDU2: Turn payload camera off")
- )
+ tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn payload camera off"))
command = pack_set_param_command(
object_id,
PDUConfigTable.out_en_8.parameter_address,
@@ -236,12 +232,12 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
Channel.off,
)
tc_queue.appendleft(command.pack_command_tuple())
- if op_code == GomspaceOpCodes.REQUEST_HK_ONCE.value:
+ if op_code in GomspaceOpCodes.REQUEST_HK_ONCE:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Requesting HK Table Once"))
hk_sid = make_sid(object_id=PDU_2_HANDLER_ID, set_id=SetIds.PDU_2)
command = generate_one_hk_command(sid=hk_sid, ssc=0)
tc_queue.appendleft(command.pack_command_tuple())
- if op_code == GomspaceOpCodes.PRINT_SWITCH_V_I.value:
+ if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:
tc_queue.appendleft(
(QueueCommands.PRINT, "PDU2: Print Switches, Currents, Voltahes")
)
@@ -249,7 +245,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I
)
tc_queue.appendleft(command.pack_command_tuple())
- if op_code == GomspaceOpCodes.PRINT_LATCHUPS.value:
+ if op_code in GomspaceOpCodes.PRINT_LATCHUPS:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Print Latchups"))
command = generate_action_command(
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS
diff --git a/pus_tc/devs/ploc_mpsoc.py b/pus_tc/devs/ploc_mpsoc.py
index 8872b2a..4806e15 100644
--- a/pus_tc/devs/ploc_mpsoc.py
+++ b/pus_tc/devs/ploc_mpsoc.py
@@ -10,7 +10,7 @@ import struct
import enum
from tmtccmd.config.definitions import QueueCommands
-from tmtccmd.utility.logger import get_console_logger
+from tmtccmd.logging import get_console_logger
from tmtccmd.tc.packer import TcQueueT
from spacepackets.ecss.tc import PusTelecommand
from utility.input_helper import InputHelper
@@ -70,16 +70,12 @@ def pack_ploc_mpsoc_commands(
)
if op_code == "0":
- tc_queue.appendleft(
- (QueueCommands.PRINT, "PLOC MPSoC: Set mode off")
- )
+ tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Set mode off"))
command = pack_mode_data(object_id, Modes.OFF, 0)
command = PusTelecommand(service=200, subservice=1, ssc=9, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "1":
- tc_queue.appendleft(
- (QueueCommands.PRINT, "PLOC MPSoC: Set mode on")
- )
+ tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Set mode on"))
command = pack_mode_data(object_id, Modes.ON, 0)
command = PusTelecommand(service=200, subservice=1, ssc=10, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
@@ -142,8 +138,10 @@ def pack_ploc_mpsoc_commands(
command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "12":
- tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Reset OBSW sequence count"))
- command = object_id + struct.pack('!I', CommandIds.OBSW_RESET_SEQ_COUNT)
+ tc_queue.appendleft(
+ (QueueCommands.PRINT, "PLOC MPSoC: Reset OBSW sequence count")
+ )
+ command = object_id + struct.pack("!I", CommandIds.OBSW_RESET_SEQ_COUNT)
command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "13":
diff --git a/pus_tc/devs/ploc_supervisor.py b/pus_tc/devs/ploc_supervisor.py
index afc9170..25fb543 100644
--- a/pus_tc/devs/ploc_supervisor.py
+++ b/pus_tc/devs/ploc_supervisor.py
@@ -11,7 +11,7 @@ import struct
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
-from tmtccmd.utility.logger import get_console_logger
+from tmtccmd.logging import get_console_logger
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes
LOGGER = get_console_logger()
diff --git a/pus_tc/devs/plpcdu.py b/pus_tc/devs/plpcdu.py
index a3d04aa..7fddabc 100644
--- a/pus_tc/devs/plpcdu.py
+++ b/pus_tc/devs/plpcdu.py
@@ -9,7 +9,7 @@ from tmtccmd.tc.service_20_parameter import (
pack_fsfw_load_param_cmd,
pack_boolean_parameter_app_data,
)
-from tmtccmd.utility.logger import get_console_logger
+from tmtccmd.logging import get_console_logger
from spacepackets.ecss.tc import PusTelecommand
from config.object_ids import PL_PCDU_ID
@@ -52,14 +52,13 @@ class Info:
NORMAL_HPA = f"{NORMAL}, HPA on"
-class NormalSubmodes(enum.IntEnum):
- ALL_OFF = 0
- SOLID_STATE_RELAYS_ADC_ON = 1
- DRO_ON = 2
- X8_ON = 3
- TX_ON = 4
- MPA_ON = 5
- HPA_ON = 6
+class NormalSubmodesMask(enum.IntEnum):
+ SOLID_STATE_RELAYS_ADC_ON = 0
+ DRO_ON = 1
+ X8_ON = 2
+ TX_ON = 3
+ MPA_ON = 4
+ HPA_ON = 5
class ParamIds(enum.IntEnum):
@@ -114,42 +113,67 @@ def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str):
tc_queue=tc_queue,
info=Info.NORMAL_SSR,
mode=Modes.NORMAL,
- submode=NormalSubmodes.SOLID_STATE_RELAYS_ADC_ON,
+ submode=(1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON),
)
if op_code in OpCodes.NORMAL_DRO:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
info=Info.NORMAL_DRO,
mode=Modes.NORMAL,
- submode=NormalSubmodes.DRO_ON,
+ submode=(
+ 1 << NormalSubmodesMask.DRO_ON
+ | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON)
+ ),
)
if op_code in OpCodes.NORMAL_X8:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
info=Info.NORMAL_X8,
mode=Modes.NORMAL,
- submode=NormalSubmodes.X8_ON,
+ submode=(
+ 1 << NormalSubmodesMask.DRO_ON
+ | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON)
+ | (1 << NormalSubmodesMask.X8_ON)
+ ),
)
if op_code in OpCodes.NORMAL_TX:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
info=Info.NORMAL_TX,
mode=Modes.NORMAL,
- submode=NormalSubmodes.TX_ON,
+ submode=(
+ 1 << NormalSubmodesMask.DRO_ON
+ | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON)
+ | (1 << NormalSubmodesMask.X8_ON)
+ | (1 << NormalSubmodesMask.TX_ON)
+ ),
)
if op_code in OpCodes.NORMAL_MPA:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
info=Info.NORMAL_MPA,
mode=Modes.NORMAL,
- submode=NormalSubmodes.MPA_ON,
+ submode=(
+ 1 << NormalSubmodesMask.DRO_ON
+ | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON)
+ | (1 << NormalSubmodesMask.X8_ON)
+ | (1 << NormalSubmodesMask.TX_ON)
+ | (1 << NormalSubmodesMask.MPA_ON)
+ ),
)
if op_code in OpCodes.NORMAL_HPA:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
info=Info.NORMAL_HPA,
mode=Modes.NORMAL,
- submode=NormalSubmodes.HPA_ON,
+ submode=(
+ 1 << NormalSubmodesMask.DRO_ON
+ | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON)
+ | (1 << NormalSubmodesMask.X8_ON)
+ | (1 << NormalSubmodesMask.TX_ON)
+ | (1 << NormalSubmodesMask.MPA_ON)
+ | (1 << NormalSubmodesMask.HPA_ON)
+ ),
)
if op_code in OpCodes.INJECT_ALL_ON_FAILURE:
pack_failure_injection_cmd(
diff --git a/pus_tc/devs/rad_sensor.py b/pus_tc/devs/rad_sensor.py
index 4216a23..cd66930 100644
--- a/pus_tc/devs/rad_sensor.py
+++ b/pus_tc/devs/rad_sensor.py
@@ -49,24 +49,24 @@ def pack_rad_sensor_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code:
if op_code == "3":
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Start conversions"))
- command = object_id + struct.pack('!I', CommandIds.START_CONVERSIONS)
+ command = object_id + struct.pack("!I", CommandIds.START_CONVERSIONS)
command = PusTelecommand(service=8, subservice=128, ssc=43, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "4":
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Read conversions"))
- command = object_id + struct.pack('!I', CommandIds.READ_CONVERSIONS)
+ command = object_id + struct.pack("!I", CommandIds.READ_CONVERSIONS)
command = PusTelecommand(service=8, subservice=128, ssc=44, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "5":
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Enable debug output"))
- command = object_id + struct.pack('!I', CommandIds.ENABLE_DEBUG_OUTPUT)
+ command = object_id + struct.pack("!I", CommandIds.ENABLE_DEBUG_OUTPUT)
command = PusTelecommand(service=8, subservice=128, ssc=45, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "6":
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Disable debug output"))
- command = object_id + struct.pack('!I', CommandIds.DISABLE_DEBUG_OUTPUT)
+ command = object_id + struct.pack("!I", CommandIds.DISABLE_DEBUG_OUTPUT)
command = PusTelecommand(service=8, subservice=128, ssc=45, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
diff --git a/pus_tc/devs/star_tracker.py b/pus_tc/devs/star_tracker.py
index 6afccbe..47a06b0 100644
--- a/pus_tc/devs/star_tracker.py
+++ b/pus_tc/devs/star_tracker.py
@@ -12,7 +12,7 @@ from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes
-from tmtccmd.utility.logger import get_console_logger
+from tmtccmd.logging import get_console_logger
from utility.input_helper import InputHelper
diff --git a/pus_tc/devs/syrlinks_hk_handler.py b/pus_tc/devs/syrlinks_hk_handler.py
index 5665970..382177b 100644
--- a/pus_tc/devs/syrlinks_hk_handler.py
+++ b/pus_tc/devs/syrlinks_hk_handler.py
@@ -48,16 +48,12 @@ def pack_syrlinks_command(
)
if op_code == "0":
- tc_queue.appendleft(
- (QueueCommands.PRINT, "Syrlinks: Set mode off")
- )
+ tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set mode off"))
command = pack_mode_data(object_id, Modes.OFF, 0)
command = PusTelecommand(service=200, subservice=1, ssc=9, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "1":
- tc_queue.appendleft(
- (QueueCommands.PRINT, "Syrlinks: Set mode on")
- )
+ tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set mode on"))
command = pack_mode_data(object_id, Modes.ON, 0)
command = PusTelecommand(service=200, subservice=1, ssc=10, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
@@ -116,61 +112,42 @@ def pack_syrlinks_command(
command = PusTelecommand(service=8, subservice=128, ssc=16, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "12":
- tc_queue.appendleft(
- (QueueCommands.PRINT, "Syrlinks: Write LCL config")
- )
- command = object_id + struct.pack('!I', CommandIds.WRITE_LCL_CONFIG)
+ tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Write LCL config"))
+ command = object_id + struct.pack("!I", CommandIds.WRITE_LCL_CONFIG)
command = PusTelecommand(service=8, subservice=128, ssc=17, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "13":
- tc_queue.appendleft(
- (QueueCommands.PRINT, "Syrlinks: Read RX status registers")
- )
- command = object_id + struct.pack('!I', CommandIds.READ_RX_STATUS_REGISTERS)
+ tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read RX status registers"))
+ command = object_id + struct.pack("!I", CommandIds.READ_RX_STATUS_REGISTERS)
command = PusTelecommand(service=8, subservice=128, ssc=18, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "14":
- tc_queue.appendleft(
- (QueueCommands.PRINT, "Syrlinks: Read LCL config register")
- )
- command = object_id + struct.pack('!I', CommandIds.READ_LCL_CONFIG_REGISTER)
+ tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read LCL config register"))
+ command = object_id + struct.pack("!I", CommandIds.READ_LCL_CONFIG_REGISTER)
command = PusTelecommand(service=8, subservice=128, ssc=19, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
-<<<<<<< HEAD
if op_code == "15":
- tc_queue.appendleft(
- (QueueCommands.PRINT, "Syrlinks: Set waveform OQPSK")
- )
- command = object_id + struct.pack('!I', CommandIds.SET_WAVEFORM_OQPSK)
+ tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set waveform OQPSK"))
+ command = object_id + struct.pack("!I", CommandIds.SET_WAVEFORM_OQPSK)
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "16":
- tc_queue.appendleft(
- (QueueCommands.PRINT, "Syrlinks: Set waveform BPSK")
- )
- command = object_id + struct.pack('!I', CommandIds.SET_WAVEFORM_BPSK)
+ tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set waveform BPSK"))
+ command = object_id + struct.pack("!I", CommandIds.SET_WAVEFORM_BPSK)
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "17":
- tc_queue.appendleft(
- (QueueCommands.PRINT, "Syrlinks: Set second config")
- )
- command = object_id + struct.pack('!I', CommandIds.SET_SECOND_CONFIG)
+ tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set second config"))
+ command = object_id + struct.pack("!I", CommandIds.SET_SECOND_CONFIG)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "18":
- tc_queue.appendleft(
- (QueueCommands.PRINT, "Syrlinks: Enable debug printout")
- )
- command = object_id + struct.pack('!I', CommandIds.ENABLE_DEBUG)
+ tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Enable debug printout"))
+ command = object_id + struct.pack("!I", CommandIds.ENABLE_DEBUG)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "19":
- tc_queue.appendleft(
- (QueueCommands.PRINT, "Syrlinks: Disable debug printout")
- )
- command = object_id + struct.pack('!I', CommandIds.DISABLE_DEBUG)
+ tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Disable debug printout"))
+ command = object_id + struct.pack("!I", CommandIds.DISABLE_DEBUG)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
-=======
->>>>>>> develop
diff --git a/pus_tc/system/core.py b/pus_tc/system/core.py
index 61103f9..132ab8a 100644
--- a/pus_tc/system/core.py
+++ b/pus_tc/system/core.py
@@ -3,7 +3,7 @@ import enum
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_8_functional_cmd import generate_action_command
-from tmtccmd.utility.logger import get_console_logger
+from tmtccmd.logging import get_console_logger
from tmtccmd.tc.service_3_housekeeping import make_sid, generate_one_hk_command
from config.object_ids import CORE_CONTROLLER_ID
diff --git a/pus_tc/tc_packer_hook.py b/pus_tc/tc_packer_hook.py
index bda998f..c53c6bb 100644
--- a/pus_tc/tc_packer_hook.py
+++ b/pus_tc/tc_packer_hook.py
@@ -1,14 +1,19 @@
"""Hook function which packs telecommands based on service and operation code string
"""
+import logging
import os
from collections import deque
from typing import Union
+from spacepackets.ecss import PusTelecommand
+from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.config.definitions import CoreServiceList
-from tmtccmd.utility.logger import get_console_logger
+from tmtccmd.logging import get_console_logger
+from tmtccmd.logging.pus import log_raw_pus_tc
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_5_event import pack_generic_service5_test_into
from tmtccmd.pus.service_17_test import pack_service_17_ping_command
+from tmtccmd.logging import get_current_time_string
from pus_tc.service_200_mode import pack_service200_test_into
from pus_tc.devs.p60dock import pack_p60dock_test_into
@@ -68,6 +73,21 @@ from config.object_ids import (
LOGGER = get_console_logger()
+def pre_tc_send_cb(
+ packet: bytes,
+ com_if: CommunicationInterface,
+ pus_info: Union[PusTelecommand, any],
+ file_logger: logging.Logger,
+):
+ log_raw_pus_tc(
+ packet=packet, srv_subservice=(pus_info.service, pus_info.subservice)
+ )
+ tc_info_string = f"Sent {pus_info}"
+ LOGGER.info(tc_info_string)
+ file_logger.info(f"{get_current_time_string(True)}: {tc_info_string}")
+ com_if.send(data=packet)
+
+
def pack_service_queue_user(
service: Union[str, int], op_code: str, service_queue: TcQueueT
):
diff --git a/pus_tm/action_reply_handler.py b/pus_tm/action_reply_handler.py
new file mode 100644
index 0000000..cee50e1
--- /dev/null
+++ b/pus_tm/action_reply_handler.py
@@ -0,0 +1,107 @@
+import struct
+from config.object_ids import *
+from pus_tc.devs.imtq import ImtqActionIds
+from pus_tc.devs.ploc_mpsoc import PlocReplyIds
+from pus_tc.devs.ploc_supervisor import SupvActionIds
+from pus_tc.devs.star_tracker import StarTrackerActionIds
+from tmtccmd.logging import get_console_logger
+from tmtccmd.config.definitions import DataReplyUnpacked
+from tmtccmd.tm import Service8FsfwTm
+from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
+
+LOGGER = get_console_logger()
+
+
+def handle_action_reply(
+ raw_tm: bytes, printer: FsfwTmTcPrinter, obj_id_dict: ObjectIdDictT
+):
+ """Core Action reply handler
+ :return:
+ """
+ tm_packet = Service8FsfwTm.unpack(raw_telemetry=raw_tm)
+ printer.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
+ object_id = obj_id_dict.get(tm_packet.source_object_id)
+ custom_data = tm_packet.custom_data
+ action_id = tm_packet.action_id
+ generic_print_str = printer.generic_action_packet_tm_print(
+ packet=tm_packet, obj_id=object_id
+ )
+ print(generic_print_str)
+ printer.file_logger.info(generic_print_str)
+ if object_id == IMTQ_HANDLER_ID:
+ return handle_imtq_replies(action_id, printer, custom_data)
+ elif object_id == PLOC_MPSOC_ID:
+ return handle_ploc_replies(action_id, printer, custom_data)
+ elif object_id == PLOC_SUPV_ID:
+ return handle_supervisor_replies(action_id, printer, custom_data)
+ elif object_id == STAR_TRACKER_ID:
+ return handle_startracker_replies(action_id, printer, custom_data)
+
+
+def handle_imtq_replies(
+ action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
+):
+ if action_id == struct.unpack("!I", ImtqActionIds.get_commanded_dipole)[0]:
+ header_list = [
+ "Commanded X-Dipole",
+ "Commanded Y-Dipole",
+ "Commanded Z-Dipole",
+ ]
+ [x_dipole, y_dipole, z_dipole] = struct.unpack("!HHH", custom_data[0:6])
+ content_list = [x_dipole, y_dipole, z_dipole]
+ print(header_list)
+ print(content_list)
+ printer.file_logger.info(header_list)
+ printer.file_logger.info(content_list)
+
+
+def handle_ploc_replies(
+ action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
+):
+ if action_id == PlocReplyIds.tm_mem_read_report:
+ header_list = [
+ "PLOC Memory Address",
+ "PLOC Mem Len",
+ "PLOC Read Memory Data",
+ ]
+ content_list = [
+ "0x" + custom_data[:4].hex(),
+ struct.unpack("!H", custom_data[4:6])[0],
+ "0x" + custom_data[6:10].hex(),
+ ]
+ print(header_list)
+ print(content_list)
+ printer.file_logger.info(header_list)
+ printer.file_logger.info(content_list)
+
+
+def handle_supervisor_replies(
+ action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
+):
+ reply = DataReplyUnpacked()
+ if action_id == SupvActionIds.DUMP_MRAM:
+ header_list = ["MRAM Dump"]
+ content_list = [custom_data[: len(custom_data)]]
+ print(header_list)
+ print(content_list)
+ printer.file_logger.info(header_list)
+ printer.file_logger.info(content_list)
+
+
+def handle_startracker_replies(
+ action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
+):
+ if action_id == StarTrackerActionIds.CHECKSUM:
+ if len(custom_data) != 5:
+ LOGGER.warning(
+ "Star tracker reply has invalid length {0}".format(len(custom_data))
+ )
+ return
+ header_list = ["Checksum", "Checksum valid"]
+ print(custom_data[4])
+ checksum_valid_flag = custom_data[4] >> 8
+ content_list = ["0x" + custom_data[:4].hex(), checksum_valid_flag]
+ print(header_list)
+ print(content_list)
+ printer.file_logger.info(header_list)
+ printer.file_logger.info(content_list)
diff --git a/pus_tm/event_handler.py b/pus_tm/event_handler.py
index 8d64db5..9d0426d 100644
--- a/pus_tm/event_handler.py
+++ b/pus_tm/event_handler.py
@@ -1,8 +1,13 @@
+import logging
import os.path
-
+from datetime import datetime
from config.object_ids import get_object_ids
-from tmtccmd.utility.logger import get_console_logger
-from tmtccmd.utility.fsfw import parse_fsfw_events_csv, EventDictT, EventInfo
+
+from tmtccmd.tm import Service5Tm
+from tmtccmd.logging import get_console_logger
+from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
+from tmtccmd.fsfw import parse_fsfw_events_csv, EventDictT, EventInfo
+
LOGGER = get_console_logger()
DEFAULT_EVENTS_CSV_PATH = "config/events.csv"
@@ -21,25 +26,36 @@ def get_event_dict() -> EventDictT:
def handle_event_packet(
- object_id: bytes, event_id: int, param_1: int, param_2: int
+ raw_tm: bytes, printer: FsfwTmTcPrinter, file_logger: logging.Logger
) -> str:
+ tm = Service5Tm.unpack(raw_telemetry=raw_tm)
+ printer.handle_long_tm_print(packet_if=tm, info_if=tm)
additional_event_info = ""
event_dict = get_event_dict()
- info = event_dict.get(event_id)
+ info = event_dict.get(tm.event_id)
if info is None:
- LOGGER.warning(f"Event ID {event_id} has no information")
+ LOGGER.warning(f"Event ID {tm.event_id} has no information")
info = EventInfo()
info.name = "Unknown event"
obj_ids = get_object_ids()
- obj_id_obj = obj_ids.get(bytes(object_id))
+ obj_id_obj = obj_ids.get(tm.reporter_id.as_bytes)
if obj_id_obj is None:
- LOGGER.warning(f"Object ID 0x{object_id.hex()} has no name")
- obj_name = object_id.hex()
+ LOGGER.warning(f"Object ID 0x{tm.reporter_id.as_string} has no name")
+ obj_name = tm.reporter_id.as_string
else:
obj_name = obj_id_obj.name
- generic_event_string = f"Object {obj_name} generated Event {event_id} | {info.name}"
+ generic_event_string = (
+ f"Object {obj_name} generated Event {tm.event_id} | {info.name}"
+ )
if info.info != "":
additional_event_info = (
- f" | Additional info: {info.info} | P1: {param_1} | P2: {param_2}"
+ f"Additional info: {info.info} | P1: {tm.param_1} | P2: {tm.param_2}"
)
- return generic_event_string + additional_event_info
+ file_logger.info(
+ f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {generic_event_string}"
+ )
+ LOGGER.info(generic_event_string)
+ if additional_event_info != "":
+ file_logger.info(additional_event_info)
+ print(additional_event_info)
+ return generic_event_string + " | " + additional_event_info
diff --git a/pus_tm/factory_hook.py b/pus_tm/factory_hook.py
index 4532bd1..b5a318a 100644
--- a/pus_tm/factory_hook.py
+++ b/pus_tm/factory_hook.py
@@ -1,64 +1,80 @@
+"""Core EIVE TM handler module
"""
-@brief This file transfers control of TM parsing to the user
-@details Template configuration file. Copy this folder to the TMTC commander root and adapt
- it to your needs.
-"""
-from tmtccmd.tm.service_8_fsfw_functional_cmd import Service8FsfwTm
from spacepackets.ecss.tm import PusTelemetry
-from tmtccmd.utility.logger import get_console_logger
+from tmtccmd.logging import get_console_logger
+from tmtccmd.logging.pus import (
+ log_raw_pus_tm,
+ log_raw_unknown_packet,
+ PacketTypes,
+ create_tmtc_logger,
+)
-from tmtccmd.pus.service_1_verification import Service1TMExtended
from tmtccmd.pus.service_17_test import Service17TMExtended
-from tmtccmd.tm.service_3_fsfw_housekeeping import Service3FsfwTm
from tmtccmd.tm.service_20_fsfw_parameters import Service20FsfwTm
-from tmtccmd.tm.service_5_event import Service5Tm
from tmtccmd.tm.service_200_fsfw_mode import Service200FsfwTm
-from tmtccmd.utility.tmtc_printer import TmTcPrinter, PrintFormats
+from tmtccmd.utility.tmtc_printer import PrintFormats, FsfwTmTcPrinter
from config.definitions import PUS_APID
+from config.object_ids import get_object_ids
+
+from .event_handler import handle_event_packet
+from .verification_handler import handle_service_1_packet
+from .hk_handling import handle_hk_packet
+from .action_reply_handler import handle_action_reply
LOGGER = get_console_logger()
-def ccsds_tm_handler(
- apid: int, raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter
-) -> None:
+FSFW_PRINTER = FsfwTmTcPrinter(file_logger=create_tmtc_logger())
+
+
+def ccsds_tm_handler(apid: int, raw_tm_packet: bytes, _user_args: any) -> None:
if apid == PUS_APID:
- pus_factory_hook(raw_tm_packet=raw_tm_packet, tmtc_printer=tmtc_printer)
+ pus_factory_hook(raw_tm_packet=raw_tm_packet)
-def pus_factory_hook(raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter):
+def pus_factory_hook(raw_tm_packet: bytes):
if len(raw_tm_packet) < 8:
LOGGER.warning("Detected packet shorter than 8 bytes!")
return
service_type = raw_tm_packet[7]
- tm_packet = None
+ subservice_type = raw_tm_packet[8]
+ file_logger = FSFW_PRINTER.file_logger
+ obj_id_dict = get_object_ids()
try:
if service_type == 1:
- tm_packet = Service1TMExtended.unpack(raw_telemetry=raw_tm_packet)
- if service_type == 3:
- tm_packet = Service3FsfwTm.unpack(
- raw_telemetry=raw_tm_packet, custom_hk_handling=False
+ handle_service_1_packet(printer=FSFW_PRINTER, raw_tm=raw_tm_packet)
+ elif service_type == 3:
+ handle_hk_packet(
+ printer=FSFW_PRINTER, raw_tm=raw_tm_packet, obj_id_dict=obj_id_dict
)
- if service_type == 5:
- tm_packet = Service5Tm.unpack(raw_telemetry=raw_tm_packet)
- if service_type == 8:
- tm_packet = Service8FsfwTm.unpack(raw_telemetry=raw_tm_packet)
- if service_type == 17:
+ elif service_type == 5:
+ handle_event_packet(
+ raw_tm=raw_tm_packet, printer=FSFW_PRINTER, file_logger=file_logger
+ )
+ elif service_type == 8:
+ handle_action_reply(
+ raw_tm=raw_tm_packet, printer=FSFW_PRINTER, obj_id_dict=obj_id_dict
+ )
+ elif service_type == 17:
tm_packet = Service17TMExtended.unpack(raw_telemetry=raw_tm_packet)
- if service_type == 20:
+ FSFW_PRINTER.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
+ elif service_type == 20:
tm_packet = Service20FsfwTm.unpack(raw_telemetry=raw_tm_packet)
- if service_type == 200:
+ FSFW_PRINTER.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
+ elif service_type == 200:
tm_packet = Service200FsfwTm.unpack(raw_telemetry=raw_tm_packet)
- if tm_packet is None:
+ FSFW_PRINTER.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
+ else:
LOGGER.info(
f"The service {service_type} is not implemented in Telemetry Factory"
)
tm_packet = PusTelemetry.unpack(raw_telemetry=raw_tm_packet)
tm_packet.print_source_data(PrintFormats.HEX)
- tmtc_printer.print_telemetry(
- packet_if=tm_packet, info_if=tm_packet, print_raw_tm=False
+ log_raw_pus_tm(
+ packet=raw_tm_packet, srv_subservice=(service_type, subservice_type)
)
except ValueError:
# TODO: Log faulty packet
LOGGER.warning("Invalid packet format detected")
+ log_raw_unknown_packet(packet=raw_tm_packet, packet_type=PacketTypes.TM)
diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py
index 6e23ac0..7fb2103 100644
--- a/pus_tm/hk_handling.py
+++ b/pus_tm/hk_handling.py
@@ -3,12 +3,18 @@ import struct
import os
import datetime
+from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.config.definitions import HkReplyUnpacked
-from tmtccmd.tm.service_3_fsfw_housekeeping import Service3Base
-from tmtccmd.utility.logger import get_console_logger
+from tmtccmd.tm.service_3_fsfw_housekeeping import (
+ Service3Base,
+ HkContentType,
+ Service3FsfwTm,
+)
+from tmtccmd.logging import get_console_logger
from pus_tc.devs.bpx_batt import BpxSetIds
from pus_tc.devs.syrlinks_hk_handler import SetIds
from pus_tc.devs.imtq import ImtqSetIds
+from tmtccmd.pus.obj_id import ObjectId, ObjectIdDictT
from config.object_ids import (
SYRLINKS_HANDLER_ID,
IMTQ_HANDLER_ID,
@@ -16,45 +22,81 @@ from config.object_ids import (
GPS_HANDLER_1_ID,
BPX_HANDLER_ID,
CORE_CONTROLLER_ID,
+ P60_DOCK_HANDLER,
+ PL_PCDU_ID
)
LOGGER = get_console_logger()
-def handle_user_hk_packet(
- object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
-) -> HkReplyUnpacked:
+def handle_hk_packet(
+ raw_tm: bytes,
+ obj_id_dict: ObjectIdDictT,
+ printer: FsfwTmTcPrinter,
+):
+ tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False)
+ named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes)
+ if named_obj_id is None:
+ named_obj_id = tm_packet.object_id
+ if tm_packet.subservice == 25 or tm_packet.subservice == 26:
+ hk_data = tm_packet.tm_data[8:]
+ printer.generic_hk_tm_print(
+ content_type=HkContentType.HK,
+ object_id=named_obj_id,
+ set_id=tm_packet.set_id,
+ hk_data=hk_data,
+ )
+ handle_regular_hk_print(
+ printer=printer,
+ object_id=named_obj_id,
+ hk_packet=tm_packet,
+ hk_data=hk_data,
+ )
+ if tm_packet.subservice == 10 or tm_packet.subservice == 12:
+ LOGGER.warning("HK definitions printout not implemented yet")
+
+
+def handle_regular_hk_print(
+ printer: FsfwTmTcPrinter,
+ object_id: ObjectId,
+ hk_packet: Service3Base,
+ hk_data: bytes,
+):
+ object_id = object_id.as_bytes
+ set_id = hk_packet.set_id
"""This function is called when a Service 3 Housekeeping packet is received."""
if object_id == SYRLINKS_HANDLER_ID:
if set_id == SetIds.RX_REGISTERS_DATASET:
- return handle_syrlinks_rx_registers_dataset(hk_data)
+ return handle_syrlinks_rx_registers_dataset(printer, hk_data)
elif set_id == SetIds.TX_REGISTERS_DATASET:
- return handle_syrlinks_tx_registers_dataset(hk_data)
+ return handle_syrlinks_tx_registers_dataset(printer, hk_data)
else:
- LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
+ LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id")
elif object_id == IMTQ_HANDLER_ID:
if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and (
set_id <= ImtqSetIds.NEGATIVE_Z_TEST
):
- return handle_self_test_data(hk_data)
+ return handle_self_test_data(printer, hk_data)
else:
- LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
+ LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id")
elif object_id == GPS_HANDLER_0_ID or object_id == GPS_HANDLER_1_ID:
- return handle_gps_data(hk_data=hk_data)
+ handle_gps_data(printer=printer, hk_data=hk_data)
elif object_id == BPX_HANDLER_ID:
- return handle_bpx_hk_data(hk_data=hk_data, set_id=set_id)
+ handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer)
elif object_id == CORE_CONTROLLER_ID:
- return handle_core_hk_data(hk_data=hk_data, set_id=set_id)
+ return handle_core_hk_data(printer=printer, hk_data=hk_data)
+ elif object_id == P60_DOCK_HANDLER:
+ handle_p60_hk_data(printer=printer, hk_data=hk_data)
+ elif object_id == PL_PCDU_ID:
+ log_to_both(printer, "Received PL PCDU HK data")
else:
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.")
return HkReplyUnpacked()
-def handle_syrlinks_rx_registers_dataset(
- hk_data: bytearray,
-) -> HkReplyUnpacked:
+def handle_syrlinks_rx_registers_dataset(printer: FsfwTmTcPrinter, hk_data: bytes):
reply = HkReplyUnpacked()
- reply.header_list = [
+ header_list = [
"RX Status",
"RX Sensitivity",
"RX Frequency Shift",
@@ -72,7 +114,7 @@ def handle_syrlinks_rx_registers_dataset(
rx_demod_eb = struct.unpack("!I", hk_data[13:17])
rx_demod_n0 = struct.unpack("!I", hk_data[17:21])
rx_data_rate = hk_data[21]
- reply.content_list = [
+ content_list = [
rx_status,
rx_sensitivity,
rx_frequency_shift,
@@ -82,28 +124,30 @@ def handle_syrlinks_rx_registers_dataset(
rx_demod_n0,
rx_data_rate,
]
- reply.validity_buffer = hk_data[22:]
- reply.num_of_vars = 8
- return reply
+ validity_buffer = hk_data[22:]
+ log_to_both(printer, str(header_list))
+ log_to_both(printer, str(content_list))
+ printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8)
def handle_syrlinks_tx_registers_dataset(
- hk_data: bytearray,
-) -> HkReplyUnpacked:
+ printer: FsfwTmTcPrinter,
+ hk_data: bytes,
+):
reply = HkReplyUnpacked()
- reply.header_list = ["TX Status", "TX Waveform", "TX AGC value"]
+ header_list = ["TX Status", "TX Waveform", "TX AGC value"]
tx_status = hk_data[0]
tx_waveform = hk_data[1]
tx_agc_value = struct.unpack("!H", hk_data[2:4])
- reply.content_list = [tx_status, tx_waveform, tx_agc_value]
- reply.validity_buffer = hk_data[4:]
- reply.num_of_vars = 3
- return reply
+ content_list = [tx_status, tx_waveform, tx_agc_value]
+ validity_buffer = hk_data[4:]
+ log_to_both(printer, str(header_list))
+ log_to_both(printer, str(content_list))
+ printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3)
-def handle_self_test_data(hk_data: bytearray) -> HkReplyUnpacked:
- reply = HkReplyUnpacked()
- reply.hk_header = [
+def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes):
+ header_list = [
"Init Err",
"Init Raw Mag X [nT]",
"Init Raw Mag Y [nT]",
@@ -189,8 +233,8 @@ def handle_self_test_data(hk_data: bytearray) -> HkReplyUnpacked:
fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0]
fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0]
- reply.validity_buffer = hk_data[129:]
- reply.content_list = [
+ validity_buffer = hk_data[129:]
+ content_list = [
init_err,
init_raw_mag_x,
init_raw_mag_y,
@@ -231,17 +275,17 @@ def handle_self_test_data(hk_data: bytearray) -> HkReplyUnpacked:
fina_coil_y_temperature,
fina_coil_z_temperature,
]
- reply.num_of_vars = len(reply.hk_header)
- return reply
+ num_of_vars = len(header_list)
+ log_to_both(printer, str(header_list))
+ log_to_both(printer, str(content_list))
+ printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)
-def handle_gps_data(hk_data: bytearray) -> HkReplyUnpacked:
+def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes):
LOGGER.info(f"Received GPS data, HK data length {len(hk_data)}")
reply = HkReplyUnpacked()
var_index = 0
- header_array = []
- content_array = []
- reply.header_list = [
+ header_list = [
"Latitude",
"Longitude",
"Altitude",
@@ -263,7 +307,7 @@ def handle_gps_data(hk_data: bytearray) -> HkReplyUnpacked:
seconds = hk_data[32]
date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}"
unix_seconds = struct.unpack("!I", hk_data[33:37])[0]
- content_array = [
+ content_list = [
latitude,
longitude,
altitude,
@@ -285,27 +329,29 @@ def handle_gps_data(hk_data: bytearray) -> HkReplyUnpacked:
f"{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, "
f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n"
)
- reply.header_list = header_array
- reply.content_list = content_array
- reply.validity_buffer = hk_data[37:39]
- return reply
+ validity_buffer = hk_data[37:39]
+ log_to_both(printer, str(header_list))
+ log_to_both(printer, str(content_list))
+ printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
-def handle_bpx_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked:
- LOGGER.info(f"Received BPX data, HK data length {len(hk_data)}")
- reply = HkReplyUnpacked()
+def handle_bpx_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
if set_id == BpxSetIds.GET_HK_SET:
- charge_current = struct.unpack("!H", hk_data[0:2])[0]
- discharge_current = struct.unpack("!H", hk_data[2:4])[0]
- heater_current = struct.unpack("!H", hk_data[4:6])[0]
- batt_voltage = struct.unpack("!H", hk_data[6:8])[0]
- batt_temp_1 = struct.unpack("!h", hk_data[8:10])[0]
- batt_temp_2 = struct.unpack("!h", hk_data[10:12])[0]
- batt_temp_3 = struct.unpack("!h", hk_data[12:14])[0]
- batt_temp_4 = struct.unpack("!h", hk_data[14:16])[0]
- reboot_cntr = struct.unpack("!I", hk_data[16:20])[0]
- boot_cause = hk_data[20]
- reply.header_list = [
+ fmt_str = "!HHHHhhhhIB"
+ inc_len = struct.calcsize(fmt_str)
+ (
+ charge_current,
+ discharge_current,
+ heater_current,
+ batt_voltage,
+ batt_temp_1,
+ batt_temp_2,
+ batt_temp_3,
+ batt_temp_4,
+ reboot_cntr,
+ boot_cause,
+ ) = struct.unpack(fmt_str, hk_data[0:inc_len])
+ header_list = [
"Charge Current",
"Discharge Current",
"Heater Current",
@@ -317,7 +363,7 @@ def handle_bpx_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked:
"Reboot Counter",
"Boot Cause",
]
- reply.content_list = [
+ content_list = [
charge_current,
discharge_current,
heater_current,
@@ -329,29 +375,189 @@ def handle_bpx_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked:
reboot_cntr,
boot_cause,
]
- reply.validity_buffer = hk_data[21:]
+ validity_buffer = hk_data[inc_len:]
+ log_to_both(printer, str(header_list))
+ log_to_both(printer, str(content_list))
+ printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
elif set_id == BpxSetIds.GET_CFG_SET:
battheat_mode = hk_data[0]
battheat_low = struct.unpack("!b", hk_data[1:2])[0]
battheat_high = struct.unpack("!b", hk_data[2:3])[0]
- reply.header_list = [
+ header_list = [
"Battery Heater Mode",
"Battery Heater Low Limit",
"Battery Heater High Limit",
]
- reply.content_list = [battheat_mode, battheat_low, battheat_high]
- reply.validity_buffer = hk_data[3:]
- return reply
+ content_list = [battheat_mode, battheat_low, battheat_high]
+ validity_buffer = hk_data[3:]
+ log_to_both(printer, str(header_list))
+ log_to_both(printer, str(content_list))
+ printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
-def handle_core_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked:
- reply = HkReplyUnpacked()
- reply.header_list = ["Chip Temperature [°C]", "PS Voltage [mV]", "PL Voltage [mV]"]
- temperature = struct.unpack("!f", hk_data[0:4])
- ps_voltage = struct.unpack("!f", hk_data[4:8])
- pl_voltage = struct.unpack("!f", hk_data[8:12])
- tx_agc_value = struct.unpack("!H", hk_data[2:4])
- reply.content_list = [temperature, ps_voltage, pl_voltage]
- reply.validity_buffer = hk_data[12:]
- reply.num_of_vars = 3
- return reply
+def handle_core_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes):
+
+ fmt_str = "!fffH"
+ inc_len = struct.calcsize(fmt_str)
+ (temperature, ps_voltage, pl_voltage, tx_agc_value) = struct.unpack(
+ fmt_str, hk_data[0 : 0 + inc_len]
+ )
+ printout = (
+ f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | "
+ f"PL Voltage [mV] {pl_voltage} | TX AGC {tx_agc_value}"
+ )
+ log_to_both(printer, printout)
+ printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=4)
+
+
+P60_INDEX_LIST = [
+ "ACU VCC",
+ "PDU1 VCC",
+ "X3 IDLE VCC",
+ "PDU2 VCC",
+ "ACU VBAT",
+ "PDU1 VBAT",
+ "X3 IDLE VBAT",
+ "PDU2 VBAT",
+ "STACK VBAT",
+ "STACK 3V3",
+ "STACK 5V",
+ "GS3V3",
+ "GS5V",
+]
+
+WDT_LIST = ["GND", "I2C", "CAN", "CSP0", "CSP1"]
+
+
+def handle_p60_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes):
+ current_idx = 0
+ current_list = []
+ for idx in range(13):
+ current_list.append(
+ struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0]
+ )
+ current_idx += 2
+ voltage_list = []
+ for idx in range(13):
+ voltage_list.append(
+ struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
+ )
+ current_idx += 2
+ out_enb_list = []
+ for idx in range(13):
+ out_enb_list.append(hk_data[current_idx])
+ current_idx += 1
+ header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]"
+ print(header_str)
+ printer.file_logger.info(header_str)
+ for idx in range(13):
+ out_enb = f"{out_enb_list[idx]}".ljust(6)
+ content_line = (
+ f"{P60_INDEX_LIST[idx].ljust(24)} | {out_enb} | "
+ f"{voltage_list[idx]:05} | {current_list[idx]:04}"
+ )
+ print(content_line)
+ printer.file_logger.info(content_line)
+ fmt_str = "!hhIIIhBBB"
+ inc_len = struct.calcsize(fmt_str)
+ (
+ temp0,
+ temp1,
+ boot_cause,
+ boot_count,
+ uptime,
+ reset_cause,
+ batt_mode,
+ heater_on,
+ conv_5v_on,
+ ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
+ current_idx += inc_len
+ util_info = (
+ f"Batt Mode {batt_mode} | Boot Count {boot_count} | Heater On {heater_on}"
+ )
+ util_info2 = (
+ f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime} | "
+ f"Conv 5V on {conv_5v_on}"
+ )
+ print(util_info)
+ print(util_info2)
+ printer.file_logger.info(util_info)
+ printer.file_logger.info(util_info2)
+ latchup_list = []
+ for idx in range(0, 13):
+ latchup_list.append(
+ struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
+ )
+ current_idx += 2
+ fmt_str = "!HhhHhh"
+ inc_len = struct.calcsize(fmt_str)
+ (
+ dock_vbat,
+ dock_vcc_current,
+ batt_current,
+ batt_voltage,
+ batt_temp_0,
+ batt_temp_1,
+ ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
+ current_idx += inc_len
+ device_types = []
+ device_statuses = []
+ for idx in range(8):
+ device_types.append(hk_data[current_idx])
+ current_idx += 1
+ for idx in range(8):
+ device_statuses.append(hk_data[current_idx])
+ current_idx += 1
+ dearm_status = hk_data[current_idx]
+ current_idx += 1
+ wdt_reboots_list = []
+ for idx in range(5):
+ wdt_reboots_list.append(
+ struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0]
+ )
+ current_idx += 4
+ time_pings_left_list = []
+ for idx in range(3):
+ time_pings_left_list.append(
+ struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0]
+ )
+ current_idx += 4
+ for idx in range(2):
+ time_pings_left_list.append(hk_data[current_idx])
+ current_idx += 1
+ batt_charge_current = struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0]
+ current_idx += 2
+ batt_discharge_current = struct.unpack(
+ "!h", hk_data[current_idx : current_idx + 2]
+ )[0]
+ current_idx += 2
+ ant6_depl_status = struct.unpack("!b", hk_data[current_idx : current_idx + 1])[0]
+ current_idx += 1
+ ar6_depl_status = struct.unpack("!b", hk_data[current_idx : current_idx + 1])[0]
+ current_idx += 1
+ wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)"
+ log_to_both(printer, wdt_info)
+ for idx in range(len(wdt_reboots_list)):
+ log_to_both(
+ printer,
+ f"{WDT_LIST[idx].ljust(5)} | "
+ f"{wdt_reboots_list[idx]:010} | {time_pings_left_list[idx]:010}",
+ )
+ temps = (
+ f"In C: Temp 0 {temp0 / 10.0} | Temp 1 {temp1 / 10.0} | "
+ f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0}"
+ )
+ batt_info = (
+ f"Batt: Current {batt_current} | Volt {batt_voltage} | "
+ f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}"
+ )
+ log_to_both(printer, temps)
+ log_to_both(printer, batt_info)
+ misc_info = f"Dearm {dearm_status} | ANT6 Depl {ant6_depl_status} | AR6 Deply {ar6_depl_status}"
+ log_to_both(printer, misc_info)
+ printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=36)
+
+
+def log_to_both(printer: FsfwTmTcPrinter, string: str):
+ print(string)
+ printer.file_logger.info(string)
diff --git a/pus_tm/service_8_hook.py b/pus_tm/service_8_hook.py
deleted file mode 100644
index 60d1d97..0000000
--- a/pus_tm/service_8_hook.py
+++ /dev/null
@@ -1,104 +0,0 @@
-import struct
-from config.object_ids import *
-from pus_tc.devs.imtq import ImtqActionIds
-from pus_tc.devs.ploc_mpsoc import PlocReplyIds
-from pus_tc.devs.ploc_supervisor import SupvActionIds
-from pus_tc.devs.star_tracker import StarTrackerActionIds
-from tmtccmd.utility.logger import get_console_logger
-from tmtccmd.config.definitions import DataReplyUnpacked
-
-LOGGER = get_console_logger()
-
-
-def user_analyze_service_8_data(
- object_id: bytes, action_id: int, custom_data: bytearray
-) -> DataReplyUnpacked:
- """
- This function is called by the TMTC core if a Service 8 data reply (subservice 130)
- is received. The user can return a tuple of two lists, where the first list
- is a list of header strings to print and the second list is a list of values to print.
- The TMTC core will take care of printing both lists and logging them.
-
- @param object_id:
- @param action_id:
- @param custom_data:
- @return:
- """
- if object_id == PDU_2_HANDLER_ID:
- reply = DataReplyUnpacked()
- reply.header_list = ["PDU2 Service 8 Reply"]
- data_string = str()
- for index in range(len(custom_data)):
- data_string += str(hex(custom_data[index])) + " , "
- data_string = data_string.rstrip()
- data_string = data_string.rstrip(",")
- data_string = data_string.rstrip()
- reply.content_list = [data_string]
- return reply
- elif object_id == IMTQ_HANDLER_ID:
- return handle_imtq_replies(action_id, custom_data)
- elif object_id == PLOC_MPSOC_ID:
- return handle_ploc_replies(action_id, custom_data)
- elif object_id == PLOC_SUPV_ID:
- return handle_supervisor_replies(action_id, custom_data)
- elif object_id == STAR_TRACKER_ID:
- return handle_startracker_replies(action_id, custom_data)
- return DataReplyUnpacked()
-
-
-def handle_imtq_replies(action_id: int, custom_data: bytearray) -> DataReplyUnpacked:
- reply = DataReplyUnpacked()
- if action_id == struct.unpack("!I", ImtqActionIds.get_commanded_dipole)[0]:
- reply.header_list = [
- "Commanded X-Dipole",
- "Commanded Y-Dipole",
- "Commanded Z-Dipole",
- ]
- x_dipole = struct.unpack("!H", custom_data[:2])
- y_dipole = struct.unpack("!H", custom_data[2:4])
- z_dipole = struct.unpack("!H", custom_data[4:6])
- reply.content_list = [x_dipole[0], y_dipole[0], z_dipole[0]]
- return reply
-
-
-def handle_ploc_replies(action_id: int, custom_data: bytearray) -> DataReplyUnpacked:
- reply = DataReplyUnpacked()
- if action_id == PlocReplyIds.tm_mem_read_report:
- reply.header_list = [
- "PLOC Memory Address",
- "PLOC Mem Len",
- "PLOC Read Memory Data",
- ]
- reply.content_list = [
- "0x" + custom_data[:4].hex(),
- struct.unpack("!H", custom_data[4:6])[0],
- "0x" + custom_data[6:10].hex(),
- ]
- return reply
-
-
-def handle_supervisor_replies(
- action_id: int, custom_data: bytearray
-) -> DataReplyUnpacked:
- reply = DataReplyUnpacked()
- if action_id == SupvActionIds.DUMP_MRAM:
- reply.header_list = ["MRAM Dump"]
- reply.content_list = [custom_data[: len(custom_data)]]
- return reply
-
-
-def handle_startracker_replies(
- action_id: int, custom_data: bytearray
-) -> DataReplyUnpacked:
- reply = DataReplyUnpacked()
- if action_id == StarTrackerActionIds.CHECKSUM:
- if len(custom_data) != 5:
- LOGGER.warning(
- "Star tracker reply has invalid length {0}".format(len(custom_data))
- )
- return reply
- reply.header_list = ["Checksum", "Checksum valid"]
- print(custom_data[4])
- checksum_valid_flag = custom_data[4] >> 8
- reply.content_list = ["0x" + custom_data[:4].hex(), checksum_valid_flag]
- return reply
diff --git a/pus_tm/verification_handler.py b/pus_tm/verification_handler.py
new file mode 100644
index 0000000..2047f1d
--- /dev/null
+++ b/pus_tm/verification_handler.py
@@ -0,0 +1,30 @@
+import logging
+from datetime import datetime
+from typing import cast
+
+from tmtccmd.pus.service_1_verification import Service1TMExtended
+from tmtccmd.logging import get_console_logger
+from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
+from config.retvals import get_retval_dict
+
+LOGGER = get_console_logger()
+
+
+def handle_service_1_packet(printer: FsfwTmTcPrinter, raw_tm: bytes):
+ tm_packet = Service1TMExtended.unpack(raw_telemetry=raw_tm)
+ printer.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
+ srv1_packet = cast(Service1TMExtended, tm_packet)
+ retval_dict = get_retval_dict()
+ if srv1_packet.has_tc_error_code:
+ retval_info = retval_dict.get(srv1_packet.error_code)
+ if retval_info is None:
+ LOGGER.info(
+ f"No returnvalue information found for error code {srv1_packet.error_code}"
+ )
+ else:
+ retval_string = (
+ f"Error Code information for code {srv1_packet.error_code}| "
+ f"Name: {retval_info.name} | Info: {retval_info.info}"
+ )
+ LOGGER.info(retval_string)
+ printer.file_logger.info(retval_string)
diff --git a/requirements.txt b/requirements.txt
index 9a6f784..7b75c9d 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1 +1 @@
-tmtccmd>=1.13.0
+tmtccmd>=2.0.1
diff --git a/spacepackets b/spacepackets
index 19e8a58..0501a26 160000
--- a/spacepackets
+++ b/spacepackets
@@ -1 +1 @@
-Subproject commit 19e8a588fa0723a5991f80bb2fd52dfc64f0ac64
+Subproject commit 0501a26b6f347cf48e0875658ac4eaca8cc7d819
diff --git a/tmtccli.py b/tmtccli.py
index 9561eb9..0ae3990 100755
--- a/tmtccli.py
+++ b/tmtccli.py
@@ -30,13 +30,16 @@ import sys
import traceback
try:
- from tmtccmd.runner import (
- initialize_tmtc_commander,
- run_tmtc_commander,
- add_ccsds_handler,
+ import tmtccmd.runner as tmtccmd
+ from tmtccmd.config import default_json_path, SetupArgs
+ from tmtccmd.config.args import (
+ create_default_args_parser,
+ add_default_tmtccmd_args,
+ parse_default_input_arguments,
)
- from tmtccmd.ccsds.handler import CcsdsTmHandler
- from tmtccmd.utility.logger import TMTC_LOGGER_NAME
+ from tmtccmd.ccsds.handler import CcsdsTmHandler, ApidHandler
+ from tmtccmd.logging import init_console_logger
+ from tmtccmd.logging.pus import create_tmtc_logger
except ImportError as error:
run_tmtc_commander = None
initialize_tmtc_commander = None
@@ -47,7 +50,6 @@ except ImportError as error:
try:
import spacepackets
- from spacepackets.log import set_custom_console_logger_name
except ImportError as error:
print(error)
print("Python spacepackets module could not be imported")
@@ -59,23 +61,33 @@ except ImportError as error:
from config.hook_implementations import EiveHookObject
from config.version import __version__
from config.definitions import PUS_APID
+from pus_tc.tc_packer_hook import pre_tc_send_cb
from pus_tm.factory_hook import ccsds_tm_handler
def main():
- from pus_tm.event_handler import handle_event_packet
-
- hook_obj = EiveHookObject()
print(f"-- eive tmtc version {__version__} --")
print(f"-- spacepackets version {spacepackets.__version__} --")
- set_custom_console_logger_name(logger_name=TMTC_LOGGER_NAME)
- initialize_tmtc_commander(hook_object=hook_obj)
- ccsds_handler = CcsdsTmHandler()
- ccsds_handler.add_tm_handler(
- apid=PUS_APID, pus_tm_handler=ccsds_tm_handler, max_queue_len=50
+ tmtccmd.init_printout(False)
+ tmtc_file_logger = create_tmtc_logger()
+ hook_obj = EiveHookObject(json_cfg_path=default_json_path())
+ arg_parser = create_default_args_parser()
+ add_default_tmtccmd_args(arg_parser)
+ args = parse_default_input_arguments(arg_parser, hook_obj)
+ setup_args = SetupArgs(
+ hook_obj=hook_obj, use_gui=False, apid=PUS_APID, cli_args=args
)
- add_ccsds_handler(ccsds_handler)
- run_tmtc_commander(False)
+ apid_handler = ApidHandler(cb=ccsds_tm_handler, queue_len=50, user_args=None)
+ ccsds_handler = CcsdsTmHandler()
+ ccsds_handler.add_tm_handler(apid=PUS_APID, handler=apid_handler)
+ tmtccmd.setup(setup_args=setup_args)
+ tmtccmd.add_ccsds_handler(ccsds_handler)
+ tmtc_backend = tmtccmd.create_default_tmtc_backend(
+ setup_args=setup_args,
+ tm_handler=ccsds_handler,
+ )
+ tmtc_backend.usr_send_wrapper = (pre_tc_send_cb, tmtc_file_logger)
+ tmtccmd.run(tmtc_backend=tmtc_backend)
if __name__ == "__main__":
diff --git a/tmtccmd b/tmtccmd
index 06aed2f..48b6b83 160000
--- a/tmtccmd
+++ b/tmtccmd
@@ -1 +1 @@
-Subproject commit 06aed2f309a105f8f0e183d359a432301eb6947d
+Subproject commit 48b6b8396eb3ea5ec4527ccb96f5909a29cd95f6
diff --git a/tmtcgui.py b/tmtcgui.py
index c5a7978..16759bc 100755
--- a/tmtcgui.py
+++ b/tmtcgui.py
@@ -35,8 +35,8 @@ from pus_tm.factory_hook import ccsds_tm_handler
try:
from tmtccmd.runner import (
- initialize_tmtc_commander,
- run_tmtc_commander,
+ init_tmtccmd,
+ run_tmtccmd,
add_ccsds_handler,
)
from tmtccmd.ccsds.handler import CcsdsTmHandler
@@ -56,13 +56,13 @@ def main():
hook_obj = EiveHookObject()
print(f"-- eive tmtc version {__version__}")
print(f"-- spacepackets version {spacepackets.__version__} --")
- initialize_tmtc_commander(hook_object=hook_obj)
+ init_tmtccmd(hook_object=hook_obj)
ccsds_handler = CcsdsTmHandler()
ccsds_handler.add_tm_handler(
apid=PUS_APID, pus_tm_handler=ccsds_tm_handler, max_queue_len=50
)
add_ccsds_handler(ccsds_handler)
- run_tmtc_commander(use_gui=True)
+ run_tmtccmd(use_gui=True)
if __name__ == "__main__":
diff --git a/utility/input_helper.py b/utility/input_helper.py
index 96c10e3..5c00856 100644
--- a/utility/input_helper.py
+++ b/utility/input_helper.py
@@ -6,7 +6,7 @@
@date 13.02.2021
"""
-from tmtccmd.utility.logger import get_console_logger
+from tmtccmd.logging import get_console_logger
LOGGER = get_console_logger()