From 6e3bef7ac57dfbcfa9fc07046c4974692f3a0acc Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Fri, 19 Mar 2021 18:01:17 +0100 Subject: [PATCH] fixes --- config/definitions.py | 2 +- config/globals_config.py | 21 ++++++--------------- config/object_ids.py | 25 ++++++++++++------------- pus_tc/acu.py | 4 ++-- pus_tc/service_200_mode.py | 16 +++++++--------- pus_tc/tc_packer_hook.py | 25 +++++++++++++------------ pus_tm/hk_handling.py | 7 ------- pus_tm/service_8_hook.py | 11 ++--------- 8 files changed, 43 insertions(+), 68 deletions(-) diff --git a/config/definitions.py b/config/definitions.py index 2fdc4fe..f48c235 100644 --- a/config/definitions.py +++ b/config/definitions.py @@ -7,7 +7,7 @@ import enum -class CustomServiceList(enum.IntEnum): +class CustomServiceList(enum.Enum): P60DOCK = "p60dock" PDU1 = "pdu1" PDU2 = "pdu2" diff --git a/config/globals_config.py b/config/globals_config.py index ce1cde4..05864ef 100644 --- a/config/globals_config.py +++ b/config/globals_config.py @@ -11,9 +11,9 @@ import argparse # All globals can be added here and will be part of a globals dictionary. from config.definitions import CustomServiceList from config.custom_mode_op import CustomModeList -from tmtccmd.core.definitions import CoreComInterfaces +from tmtccmd.core.definitions import CoreComInterfaces, CoreServiceList from tmtccmd.defaults.globals_setup import set_default_globals_pre_args_parsing, \ - set_default_globals_post_args_parsing + set_default_globals_post_args_parsing, get_core_service_dict from tmtccmd.core.globals_manager import update_global from tmtccmd.core.definitions import CoreGlobalIds from tmtccmd.utility.tmtcc_logger import get_logger @@ -27,20 +27,11 @@ class CustomGlobalIds(enum.Enum): def set_globals_pre_args_parsing(gui: bool = False): - set_default_globals_pre_args_parsing(apid=0x65, com_if_id=CoreComInterfaces.EthernetUDP) + set_default_globals_pre_args_parsing(apid=0x65, com_if_id=CoreComInterfaces.TCPIP_UDP) - servicelist = dict() - servicelist[CustomServiceList.SERVICE_2] = ["Service 2 Raw Commanding"] - servicelist[CustomServiceList.SERVICE_3] = ["Service 3 Housekeeping"] - servicelist[CustomServiceList.SERVICE_5] = ["Service 5 Event"] - servicelist[CustomServiceList.SERVICE_8] = ["Service 8 Functional Commanding"] - servicelist[CustomServiceList.SERVICE_9] = ["Service 9 Time"] - servicelist[CustomServiceList.SERVICE_17] = ["Service 17 Test"] - servicelist[CustomServiceList.SERVICE_20] = ["Service 20 Parameters"] - servicelist[CustomServiceList.SERVICE_23] = ["Service 23 File Management"] - servicelist[CustomServiceList.SERVICE_200] = ["Service 200 Mode Management"] - update_global(CoreGlobalIds.SERVICE, CustomServiceList.SERVICE_17) - update_global(CoreGlobalIds.SERVICELIST, servicelist) + servicelist = get_core_service_dict() + update_global(CoreGlobalIds.CURRENT_SERVICE, CoreServiceList.SERVICE_17) + update_global(CoreGlobalIds.SERVICE_DICT, servicelist) def add_globals_post_args_parsing(args: argparse.Namespace): diff --git a/config/object_ids.py b/config/object_ids.py index 9c54650..3734b44 100644 --- a/config/object_ids.py +++ b/config/object_ids.py @@ -20,34 +20,33 @@ PCDU_HANDLER = bytearray([0x44, 0x00, 0x10, 0x00]) SOLAR_ARRAY_DEPLOYMENT = bytearray([0x44, 0x00, 0x10, 0x01]) -class ObjectIds(enum.IntEnum): - from enum import auto - PUS_SERVICE_17 = 0 - TEST_DEVICE = 1 +class CustomObjectIds(enum.IntEnum): + PUS_SERVICE_17_ID = 0 + TEST_DEVICE_ID = 1 P60DOCK_HANDLER_ID = 2 PDU1_HANDLER_ID = 3 PDU2_HANDLER_ID = 4 - PCDU_HANDLER = 5 + PCDU_HANDLER_ID = 5 ACU_HANDLER_ID = 6 TMP1075_1_HANDLER_ID = 7 TMP1075_2_HANDLER_ID = 8 - HEATER = 9 - SOLAR_ARRAY_DEPLOYMENT = 10 + HEATER_ID = 9 + SOLAR_ARRAY_DEPLOYMENT_ID = 10 def set_object_ids() -> Dict[int, bytearray]: - o_ids = ObjectIds + o_ids = CustomObjectIds object_id_dict = ({ - o_ids.PUS_SERVICE_17: PUS_SERVICE_17, - o_ids.TEST_DEVICE: TEST_DEVICE, + o_ids.PUS_SERVICE_17_ID: PUS_SERVICE_17, + o_ids.TEST_DEVICE_ID: TEST_DEVICE, o_ids.P60DOCK_HANDLER_ID: P60_DOCK_HANDLER, o_ids.PDU1_HANDLER_ID: PDU_1_HANDLER, o_ids.PDU2_HANDLER_ID: PDU_2_HANDLER, o_ids.ACU_HANDLER_ID: ACU_HANDLER, o_ids.TMP1075_1_HANDLER_ID: TMP_1075_1_HANDLER, o_ids.TMP1075_2_HANDLER_ID: TMP_1075_2_HANDLER, - o_ids.HEATER: HEATER, - o_ids.PCDU_HANDLER: PCDU_HANDLER, - o_ids.SOLAR_ARRAY_DEPLOYMENT: SOLAR_ARRAY_DEPLOYMENT, + o_ids.HEATER_ID: HEATER, + o_ids.PCDU_HANDLER_ID: PCDU_HANDLER, + o_ids.SOLAR_ARRAY_DEPLOYMENT_ID: SOLAR_ARRAY_DEPLOYMENT, }) return object_id_dict diff --git a/pus_tc/acu.py b/pus_tc/acu.py index cb638ac..41dd9c6 100644 --- a/pus_tc/acu.py +++ b/pus_tc/acu.py @@ -11,7 +11,7 @@ from tmtccmd.pus_tc.base import PusTelecommand from tmtccmd.core.definitions import QueueCommands from gomspace.gomspace_common import * from pus_tc.p60dock import P60DockConfigTable -from config.object_ids import ObjectIds +from config.object_ids import CustomObjectIds from tmtccmd.core.object_id_manager import get_object_id @@ -57,7 +57,7 @@ def pack_acu_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT: tc_queue.appendleft((QueueCommands.PRINT, "Testing ACU")) tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Enabling ACU connected to X1 slot (channel 0)")) - p60dock_object_id = get_object_id(ObjectIds.P60DOCK_HANDLER_ID) + p60dock_object_id = get_object_id(CustomObjectIds.P60DOCK_HANDLER_ID) command = pack_set_param_command(p60dock_object_id, P60DockConfigTable.out_en_0.parameter_address, P60DockConfigTable.out_en_0.parameter_size, Channel.on) command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) diff --git a/pus_tc/service_200_mode.py b/pus_tc/service_200_mode.py index 2e5e608..9c84504 100644 --- a/pus_tc/service_200_mode.py +++ b/pus_tc/service_200_mode.py @@ -8,36 +8,34 @@ from tmtccmd.core.definitions import QueueCommands from tmtccmd.pus_tc.base import PusTelecommand from tmtccmd.pus_tc.packer import TcQueueT -from tmtccmd.core.object_id_manager import get_object_id from tmtccmd.pus_tc.service_200_mode import pack_mode_data -from config.object_ids import ObjectIds -import struct +from config.object_ids import TEST_DEVICE -TEST_DEVICE_ID = get_object_id(ObjectIds.TEST_DEVICE) +TEST_DEVICE_OBJ_ID = TEST_DEVICE def pack_service200_test_into(tc_queue: TcQueueT) -> TcQueueT: tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200")) # Object ID: Dummy Device - object_id = TEST_DEVICE_ID + obj_id = TEST_DEVICE_OBJ_ID # Set On Mode tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode On")) - mode_data = pack_mode_data(object_id, 1, 0) + mode_data = pack_mode_data(obj_id, 1, 0) command = PusTelecommand(service=200, subservice=1, ssc=2000, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) # Set Normal mode tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Normal")) - mode_data = pack_mode_data(object_id, 2, 0) + mode_data = pack_mode_data(obj_id, 2, 0) command = PusTelecommand(service=200, subservice=1, ssc=2010, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) # Set Raw Mode tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Raw")) - mode_data = pack_mode_data(object_id, 3, 0) + mode_data = pack_mode_data(obj_id, 3, 0) command = PusTelecommand(service=200, subservice=1, ssc=2020, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) # Set Off Mode tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Off")) - mode_data = pack_mode_data(object_id, 0, 0) + mode_data = pack_mode_data(obj_id, 0, 0) command = PusTelecommand(service=200, subservice=1, ssc=2030, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service200.txt")) diff --git a/pus_tc/tc_packer_hook.py b/pus_tc/tc_packer_hook.py index 9ecd916..26528f4 100644 --- a/pus_tc/tc_packer_hook.py +++ b/pus_tc/tc_packer_hook.py @@ -8,6 +8,7 @@ import os from collections import deque from typing import Union +from tmtccmd.core.definitions import CoreServiceList from tmtccmd.utility.tmtcc_logger import get_logger from tmtccmd.pus_tc.base import TcQueueT from tmtccmd.pus_tc.service_5_event import pack_generic_service5_test_into @@ -21,39 +22,39 @@ from pus_tc.acu import pack_acu_test_into from pus_tc.tmp1075 import pack_tmp1075_test_into from pus_tc.heater import pack_heater_test_into from config.definitions import CustomServiceList -from config.object_ids import ObjectIds +from config.object_ids import CustomObjectIds LOGGER = get_logger() def pack_service_queue_user(service: Union[str, int], op_code: str, service_queue: TcQueueT): - if service == CustomServiceList.SERVICE_5: + if service == CoreServiceList.SERVICE_5: return pack_generic_service5_test_into(service_queue) - if service == CustomServiceList.SERVICE_17: + if service == CoreServiceList.SERVICE_17: return service_queue.appendleft(pack_service17_ping_command(ssc=1700).pack_command_tuple()) if service == CustomServiceList.P60DOCK.value: - object_id = get_object_id(ObjectIds.P60DOCK_HANDLER_ID) + object_id = get_object_id(CustomObjectIds.P60DOCK_HANDLER_ID) return pack_p60dock_test_into(object_id, service_queue) if service == CustomServiceList.PDU1.value: - pdu1_object_id = get_object_id(ObjectIds.PDU1_HANDLER_ID) - p60dock_object_id = get_object_id(ObjectIds.P60DOCK_HANDLER_ID) + pdu1_object_id = get_object_id(CustomObjectIds.PDU1_HANDLER_ID) + p60dock_object_id = get_object_id(CustomObjectIds.P60DOCK_HANDLER_ID) return pack_pdu1_test_into(pdu1_object_id, p60dock_object_id, service_queue) if service == CustomServiceList.PDU2.value: - pdu2_object_id = get_object_id(ObjectIds.PDU2_HANDLER_ID) - p60dock_object_id = get_object_id(ObjectIds.P60DOCK_HANDLER_ID) + pdu2_object_id = get_object_id(CustomObjectIds.PDU2_HANDLER_ID) + p60dock_object_id = get_object_id(CustomObjectIds.P60DOCK_HANDLER_ID) return pack_pdu2_test_into(pdu2_object_id, p60dock_object_id, service_queue) if service == CustomServiceList.ACU.value: - object_id = get_object_id(ObjectIds.ACU_HANDLER_ID) + object_id = get_object_id(CustomObjectIds.ACU_HANDLER_ID) return pack_acu_test_into(object_id, service_queue) if service == CustomServiceList.TMP1075_1.value: - object_id = get_object_id(ObjectIds.TMP1075_1_HANDLER_ID) + object_id = get_object_id(CustomObjectIds.TMP1075_1_HANDLER_ID) return pack_tmp1075_test_into(object_id, service_queue) if service == CustomServiceList.TMP1075_2.value: - object_id = get_object_id(ObjectIds.TMP1075_2_HANDLER_ID) + object_id = get_object_id(CustomObjectIds.TMP1075_2_HANDLER_ID) return pack_tmp1075_test_into(object_id, service_queue) if service == CustomServiceList.HEATER.value: - object_id = get_object_id(ObjectIds.HEATER) + object_id = get_object_id(CustomObjectIds.HEATER) return pack_heater_test_into(object_id, service_queue) LOGGER.warning("Invalid Service !") diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py index f08f498..b23a3ee 100644 --- a/pus_tm/hk_handling.py +++ b/pus_tm/hk_handling.py @@ -17,13 +17,6 @@ def handle_user_hk_packet(object_id: int, set_id: int, hk_data: bytearray, """ This function is called when a Service 3 Housekeeping packet is received. - Please note that the object IDs should be compared by value because direct comparison of - enumerations does not work in Python. For example use: - - if object_id.value == ObjectIds.TEST_OBJECT.value - - to test equality based on the object ID list. - @param object_id: @param set_id: @param hk_data: diff --git a/pus_tm/service_8_hook.py b/pus_tm/service_8_hook.py index faf9e79..f529b5f 100644 --- a/pus_tm/service_8_hook.py +++ b/pus_tm/service_8_hook.py @@ -1,5 +1,5 @@ from typing import Tuple -from config.object_ids import ObjectIds +from config.object_ids import CustomObjectIds def user_analyze_service_8_data( @@ -10,19 +10,12 @@ def user_analyze_service_8_data( is a list of header strings to print and the second list is a list of values to print. The TMTC core will take care of printing both lists and logging them. - Please note that the object IDs should be compared by value because direct comparison of - enumerations does not work in Python. For example use: - - if object_id.value == ObjectIds.TEST_OBJECT.value - - to test equality based on the object ID list. - @param object_id: @param action_id: @param custom_data: @return: """ - if object_id == ObjectIds.PDU2_HANDLER_ID.value: + if object_id == CustomObjectIds.PDU2_HANDLER_ID.value: header_list = ['PDU2 Service 8 Reply'] data_string = str()