This commit is contained in:
Robin Müller 2021-03-19 18:01:17 +01:00
parent dc999188c0
commit 6e3bef7ac5
8 changed files with 43 additions and 68 deletions

View File

@ -7,7 +7,7 @@
import enum
class CustomServiceList(enum.IntEnum):
class CustomServiceList(enum.Enum):
P60DOCK = "p60dock"
PDU1 = "pdu1"
PDU2 = "pdu2"

View File

@ -11,9 +11,9 @@ import argparse
# All globals can be added here and will be part of a globals dictionary.
from config.definitions import CustomServiceList
from config.custom_mode_op import CustomModeList
from tmtccmd.core.definitions import CoreComInterfaces
from tmtccmd.core.definitions import CoreComInterfaces, CoreServiceList
from tmtccmd.defaults.globals_setup import set_default_globals_pre_args_parsing, \
set_default_globals_post_args_parsing
set_default_globals_post_args_parsing, get_core_service_dict
from tmtccmd.core.globals_manager import update_global
from tmtccmd.core.definitions import CoreGlobalIds
from tmtccmd.utility.tmtcc_logger import get_logger
@ -27,20 +27,11 @@ class CustomGlobalIds(enum.Enum):
def set_globals_pre_args_parsing(gui: bool = False):
set_default_globals_pre_args_parsing(apid=0x65, com_if_id=CoreComInterfaces.EthernetUDP)
set_default_globals_pre_args_parsing(apid=0x65, com_if_id=CoreComInterfaces.TCPIP_UDP)
servicelist = dict()
servicelist[CustomServiceList.SERVICE_2] = ["Service 2 Raw Commanding"]
servicelist[CustomServiceList.SERVICE_3] = ["Service 3 Housekeeping"]
servicelist[CustomServiceList.SERVICE_5] = ["Service 5 Event"]
servicelist[CustomServiceList.SERVICE_8] = ["Service 8 Functional Commanding"]
servicelist[CustomServiceList.SERVICE_9] = ["Service 9 Time"]
servicelist[CustomServiceList.SERVICE_17] = ["Service 17 Test"]
servicelist[CustomServiceList.SERVICE_20] = ["Service 20 Parameters"]
servicelist[CustomServiceList.SERVICE_23] = ["Service 23 File Management"]
servicelist[CustomServiceList.SERVICE_200] = ["Service 200 Mode Management"]
update_global(CoreGlobalIds.SERVICE, CustomServiceList.SERVICE_17)
update_global(CoreGlobalIds.SERVICELIST, servicelist)
servicelist = get_core_service_dict()
update_global(CoreGlobalIds.CURRENT_SERVICE, CoreServiceList.SERVICE_17)
update_global(CoreGlobalIds.SERVICE_DICT, servicelist)
def add_globals_post_args_parsing(args: argparse.Namespace):

View File

@ -20,34 +20,33 @@ PCDU_HANDLER = bytearray([0x44, 0x00, 0x10, 0x00])
SOLAR_ARRAY_DEPLOYMENT = bytearray([0x44, 0x00, 0x10, 0x01])
class ObjectIds(enum.IntEnum):
from enum import auto
PUS_SERVICE_17 = 0
TEST_DEVICE = 1
class CustomObjectIds(enum.IntEnum):
PUS_SERVICE_17_ID = 0
TEST_DEVICE_ID = 1
P60DOCK_HANDLER_ID = 2
PDU1_HANDLER_ID = 3
PDU2_HANDLER_ID = 4
PCDU_HANDLER = 5
PCDU_HANDLER_ID = 5
ACU_HANDLER_ID = 6
TMP1075_1_HANDLER_ID = 7
TMP1075_2_HANDLER_ID = 8
HEATER = 9
SOLAR_ARRAY_DEPLOYMENT = 10
HEATER_ID = 9
SOLAR_ARRAY_DEPLOYMENT_ID = 10
def set_object_ids() -> Dict[int, bytearray]:
o_ids = ObjectIds
o_ids = CustomObjectIds
object_id_dict = ({
o_ids.PUS_SERVICE_17: PUS_SERVICE_17,
o_ids.TEST_DEVICE: TEST_DEVICE,
o_ids.PUS_SERVICE_17_ID: PUS_SERVICE_17,
o_ids.TEST_DEVICE_ID: TEST_DEVICE,
o_ids.P60DOCK_HANDLER_ID: P60_DOCK_HANDLER,
o_ids.PDU1_HANDLER_ID: PDU_1_HANDLER,
o_ids.PDU2_HANDLER_ID: PDU_2_HANDLER,
o_ids.ACU_HANDLER_ID: ACU_HANDLER,
o_ids.TMP1075_1_HANDLER_ID: TMP_1075_1_HANDLER,
o_ids.TMP1075_2_HANDLER_ID: TMP_1075_2_HANDLER,
o_ids.HEATER: HEATER,
o_ids.PCDU_HANDLER: PCDU_HANDLER,
o_ids.SOLAR_ARRAY_DEPLOYMENT: SOLAR_ARRAY_DEPLOYMENT,
o_ids.HEATER_ID: HEATER,
o_ids.PCDU_HANDLER_ID: PCDU_HANDLER,
o_ids.SOLAR_ARRAY_DEPLOYMENT_ID: SOLAR_ARRAY_DEPLOYMENT,
})
return object_id_dict

View File

@ -11,7 +11,7 @@ from tmtccmd.pus_tc.base import PusTelecommand
from tmtccmd.core.definitions import QueueCommands
from gomspace.gomspace_common import *
from pus_tc.p60dock import P60DockConfigTable
from config.object_ids import ObjectIds
from config.object_ids import CustomObjectIds
from tmtccmd.core.object_id_manager import get_object_id
@ -57,7 +57,7 @@ def pack_acu_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft((QueueCommands.PRINT, "Testing ACU"))
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Enabling ACU connected to X1 slot (channel 0)"))
p60dock_object_id = get_object_id(ObjectIds.P60DOCK_HANDLER_ID)
p60dock_object_id = get_object_id(CustomObjectIds.P60DOCK_HANDLER_ID)
command = pack_set_param_command(p60dock_object_id, P60DockConfigTable.out_en_0.parameter_address,
P60DockConfigTable.out_en_0.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)

View File

@ -8,36 +8,34 @@
from tmtccmd.core.definitions import QueueCommands
from tmtccmd.pus_tc.base import PusTelecommand
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.core.object_id_manager import get_object_id
from tmtccmd.pus_tc.service_200_mode import pack_mode_data
from config.object_ids import ObjectIds
import struct
from config.object_ids import TEST_DEVICE
TEST_DEVICE_ID = get_object_id(ObjectIds.TEST_DEVICE)
TEST_DEVICE_OBJ_ID = TEST_DEVICE
def pack_service200_test_into(tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200"))
# Object ID: Dummy Device
object_id = TEST_DEVICE_ID
obj_id = TEST_DEVICE_OBJ_ID
# Set On Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode On"))
mode_data = pack_mode_data(object_id, 1, 0)
mode_data = pack_mode_data(obj_id, 1, 0)
command = PusTelecommand(service=200, subservice=1, ssc=2000, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
# Set Normal mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Normal"))
mode_data = pack_mode_data(object_id, 2, 0)
mode_data = pack_mode_data(obj_id, 2, 0)
command = PusTelecommand(service=200, subservice=1, ssc=2010, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
# Set Raw Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Raw"))
mode_data = pack_mode_data(object_id, 3, 0)
mode_data = pack_mode_data(obj_id, 3, 0)
command = PusTelecommand(service=200, subservice=1, ssc=2020, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
# Set Off Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Off"))
mode_data = pack_mode_data(object_id, 0, 0)
mode_data = pack_mode_data(obj_id, 0, 0)
command = PusTelecommand(service=200, subservice=1, ssc=2030, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service200.txt"))

View File

@ -8,6 +8,7 @@ import os
from collections import deque
from typing import Union
from tmtccmd.core.definitions import CoreServiceList
from tmtccmd.utility.tmtcc_logger import get_logger
from tmtccmd.pus_tc.base import TcQueueT
from tmtccmd.pus_tc.service_5_event import pack_generic_service5_test_into
@ -21,39 +22,39 @@ from pus_tc.acu import pack_acu_test_into
from pus_tc.tmp1075 import pack_tmp1075_test_into
from pus_tc.heater import pack_heater_test_into
from config.definitions import CustomServiceList
from config.object_ids import ObjectIds
from config.object_ids import CustomObjectIds
LOGGER = get_logger()
def pack_service_queue_user(service: Union[str, int], op_code: str, service_queue: TcQueueT):
if service == CustomServiceList.SERVICE_5:
if service == CoreServiceList.SERVICE_5:
return pack_generic_service5_test_into(service_queue)
if service == CustomServiceList.SERVICE_17:
if service == CoreServiceList.SERVICE_17:
return service_queue.appendleft(pack_service17_ping_command(ssc=1700).pack_command_tuple())
if service == CustomServiceList.P60DOCK.value:
object_id = get_object_id(ObjectIds.P60DOCK_HANDLER_ID)
object_id = get_object_id(CustomObjectIds.P60DOCK_HANDLER_ID)
return pack_p60dock_test_into(object_id, service_queue)
if service == CustomServiceList.PDU1.value:
pdu1_object_id = get_object_id(ObjectIds.PDU1_HANDLER_ID)
p60dock_object_id = get_object_id(ObjectIds.P60DOCK_HANDLER_ID)
pdu1_object_id = get_object_id(CustomObjectIds.PDU1_HANDLER_ID)
p60dock_object_id = get_object_id(CustomObjectIds.P60DOCK_HANDLER_ID)
return pack_pdu1_test_into(pdu1_object_id, p60dock_object_id, service_queue)
if service == CustomServiceList.PDU2.value:
pdu2_object_id = get_object_id(ObjectIds.PDU2_HANDLER_ID)
p60dock_object_id = get_object_id(ObjectIds.P60DOCK_HANDLER_ID)
pdu2_object_id = get_object_id(CustomObjectIds.PDU2_HANDLER_ID)
p60dock_object_id = get_object_id(CustomObjectIds.P60DOCK_HANDLER_ID)
return pack_pdu2_test_into(pdu2_object_id, p60dock_object_id, service_queue)
if service == CustomServiceList.ACU.value:
object_id = get_object_id(ObjectIds.ACU_HANDLER_ID)
object_id = get_object_id(CustomObjectIds.ACU_HANDLER_ID)
return pack_acu_test_into(object_id, service_queue)
if service == CustomServiceList.TMP1075_1.value:
object_id = get_object_id(ObjectIds.TMP1075_1_HANDLER_ID)
object_id = get_object_id(CustomObjectIds.TMP1075_1_HANDLER_ID)
return pack_tmp1075_test_into(object_id, service_queue)
if service == CustomServiceList.TMP1075_2.value:
object_id = get_object_id(ObjectIds.TMP1075_2_HANDLER_ID)
object_id = get_object_id(CustomObjectIds.TMP1075_2_HANDLER_ID)
return pack_tmp1075_test_into(object_id, service_queue)
if service == CustomServiceList.HEATER.value:
object_id = get_object_id(ObjectIds.HEATER)
object_id = get_object_id(CustomObjectIds.HEATER)
return pack_heater_test_into(object_id, service_queue)
LOGGER.warning("Invalid Service !")

View File

@ -17,13 +17,6 @@ def handle_user_hk_packet(object_id: int, set_id: int, hk_data: bytearray,
"""
This function is called when a Service 3 Housekeeping packet is received.
Please note that the object IDs should be compared by value because direct comparison of
enumerations does not work in Python. For example use:
if object_id.value == ObjectIds.TEST_OBJECT.value
to test equality based on the object ID list.
@param object_id:
@param set_id:
@param hk_data:

View File

@ -1,5 +1,5 @@
from typing import Tuple
from config.object_ids import ObjectIds
from config.object_ids import CustomObjectIds
def user_analyze_service_8_data(
@ -10,19 +10,12 @@ def user_analyze_service_8_data(
is a list of header strings to print and the second list is a list of values to print.
The TMTC core will take care of printing both lists and logging them.
Please note that the object IDs should be compared by value because direct comparison of
enumerations does not work in Python. For example use:
if object_id.value == ObjectIds.TEST_OBJECT.value
to test equality based on the object ID list.
@param object_id:
@param action_id:
@param custom_data:
@return:
"""
if object_id == ObjectIds.PDU2_HANDLER_ID.value:
if object_id == CustomObjectIds.PDU2_HANDLER_ID.value:
header_list = ['PDU2 Service 8 Reply']
data_string = str()