diff --git a/.idea/runConfigurations/PLOC_SUPV_Test.xml b/.idea/runConfigurations/PLOC_SUPV_Test.xml new file mode 100644 index 0000000..843dd32 --- /dev/null +++ b/.idea/runConfigurations/PLOC_SUPV_Test.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/config/definitions.py b/config/definitions.py index f5ca0d0..771eb06 100644 --- a/config/definitions.py +++ b/config/definitions.py @@ -28,3 +28,4 @@ class CustomServiceList(enum.Enum): REACTION_WHEEL_3 = "reaction_wheel_3" REACTION_WHEEL_4 = "reaction_wheel_4" RAD_SENSOR = "rad_sensor" + PLOC_SUPV = "ploc_supv" diff --git a/config/hook_implementations.py b/config/hook_implementations.py index 2e50d6e..83c34e6 100644 --- a/config/hook_implementations.py +++ b/config/hook_implementations.py @@ -81,6 +81,23 @@ class EiveHookObject(TmTcHookBase): } service_rad_sensor_tuple = ("Radiation Sensor", op_code_dict_srv_rad_sensor) + op_code_dict_srv_ploc_supv = { + "0": ("PLOC Supervisor: Set mode on", {OpCodeDictKeys.TIMEOUT: 2.0}), + "1": ("PLOC Supervisor: Set mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}), + "2": ("PLOC Supervisor: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}), + "3": ("PLOC Supervisor: Get HK Report", {OpCodeDictKeys.TIMEOUT: 2.0}), + "4": ("PLOC Supervisor: Restart MPSoC", {OpCodeDictKeys.TIMEOUT: 2.0}), + "5": ("PLOC Supervisor: Start MPSoC", {OpCodeDictKeys.TIMEOUT: 2.0}), + "6": ("PLOC Supervisor: Shutdown MPSoC", {OpCodeDictKeys.TIMEOUT: 2.0}), + "7": ("PLOC Supervisor: Select MPSoC boot image", {OpCodeDictKeys.TIMEOUT: 2.0}), + "8": ("PLOC Supervisor: Set max restart tries", {OpCodeDictKeys.TIMEOUT: 2.0}), + "9": ("PLOC Supervisor: Reset MPSoC", {OpCodeDictKeys.TIMEOUT: 2.0}), + "10": ("PLOC Supervisor: Set time reference", {OpCodeDictKeys.TIMEOUT: 2.0}), + "11": ("PLOC Supervisor: Set boot timeout", {OpCodeDictKeys.TIMEOUT: 2.0}), + "12": ("PLOC Supervisor: Disable Hk", {OpCodeDictKeys.TIMEOUT: 2.0}), + } + service_ploc_supv_tuple = ("PLOC Supervisor", op_code_dict_srv_ploc_supv) + service_op_code_dict[CustomServiceList.ACU.value] = service_acu_tuple service_op_code_dict[CustomServiceList.TMP1075_1.value] = service_tmp1075_1_tuple service_op_code_dict[CustomServiceList.TMP1075_2.value] = service_tmp1075_2_tuple @@ -94,6 +111,7 @@ class EiveHookObject(TmTcHookBase): service_op_code_dict[CustomServiceList.REACTION_WHEEL_3.value] = service_rw_tuple service_op_code_dict[CustomServiceList.REACTION_WHEEL_4.value] = service_rw_tuple service_op_code_dict[CustomServiceList.RAD_SENSOR.value] = service_rad_sensor_tuple + service_op_code_dict[CustomServiceList.PLOC_SUPV.value] = service_ploc_supv_tuple return service_op_code_dict def get_json_config_file_path(self) -> str: diff --git a/config/object_ids.py b/config/object_ids.py index 92412fe..5d6b3e7 100644 --- a/config/object_ids.py +++ b/config/object_ids.py @@ -17,13 +17,15 @@ HEATER_ID = bytes([0x44, 0x41, 0x00, 0xA4]) PCDU_HANDLER_ID = bytes([0x44, 0x20, 0x00, 0xA1]) SOLAR_ARRAY_DEPLOYMENT_ID = bytes([0x44, 0x41, 0x10, 0xA2]) SYRLINKS_HANDLER = bytes([0x44, 0x53, 0x00, 0xA3]) -IMTQ_HANDLER_ID = bytearray([0x44, 0x14, 0x00, 0x14]) -PLOC_ID = bytearray([0x44, 0x33, 0x00, 0x15]) -RW1_ID = bytes([0x44, 0x21, 0x00, 0x1]) -RW2_ID = bytes([0x44, 0x21, 0x00, 0x2]) -RW3_ID = bytes([0x44, 0x21, 0x00, 0x3]) -RW4_ID = bytes([0x44, 0x21, 0x00, 0x4]) +IMTQ_HANDLER_ID = bytes([0x44, 0x14, 0x00, 0x14]) +PLOC_MPSOC_ID = bytes([0x44, 0x33, 0x00, 0x15]) +RW1_ID = bytes([0x44, 0x12, 0x00, 0x1]) +RW2_ID = bytes([0x44, 0x12, 0x00, 0x2]) +RW3_ID = bytes([0x44, 0x12, 0x00, 0x3]) +RW4_ID = bytes([0x44, 0x12, 0x00, 0x4]) +START_TRACKER_ID = bytes([0x44, 0x13, 0x00, 0x1]) RAD_SENSOR_ID = bytes([0x44, 0x32, 0x00, 0xA5]) +PLOC_SUPV_ID = bytes([0x44, 0x33, 0x00, 0x16]) def get_object_ids() -> Dict[bytes, list]: @@ -44,5 +46,6 @@ def get_object_ids() -> Dict[bytes, list]: RW3_ID: "Reaction Wheel 3", RW4_ID: "Reaction Wheel 4", RAD_SENSOR_ID: "Radiation Sensor", + PLOC_SUPV_ID: "PLOC Supervisor", }) return object_id_dict diff --git a/pus_tc/ploc.py b/pus_tc/ploc_mpsoc.py similarity index 89% rename from pus_tc/ploc.py rename to pus_tc/ploc_mpsoc.py index b8d593a..b8613ea 100644 --- a/pus_tc/ploc.py +++ b/pus_tc/ploc_mpsoc.py @@ -1,9 +1,10 @@ # -*- coding: utf-8 -*- """ -@file ploc.py -@brief TMP1075 tests +@file ploc_mpsoc.py +@brief Tests for commanding the MPSoC of the PLOC. + The MPSoC is programmed by the ILH. @author J. Meier -@date 06.01.2021 +@date 06.03.2021 """ import struct @@ -33,10 +34,10 @@ class PlocReplyIds: tm_mem_read_report = 6 -def pack_ploc_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT: +def pack_ploc_mpsoc_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT: tc_queue.appendleft( (QueueCommands.PRINT, - "Testing PLOC Handler with object id: 0x" + object_id.hex()) + "Testing PLOC MPSoC with object id: 0x" + object_id.hex()) ) if PlocTestProcedure.all or PlocTestProcedure.test_tc_mem_write: diff --git a/pus_tc/ploc_supervisor.py b/pus_tc/ploc_supervisor.py new file mode 100644 index 0000000..7d80905 --- /dev/null +++ b/pus_tc/ploc_supervisor.py @@ -0,0 +1,114 @@ +# -*- coding: utf-8 -*- +""" +@file ploc_supervisor.py +@brief Tests for commanding the supervisor of the PLOC. + The supervisor is programmed by Thales. +@author J. Meier +@date 10.07.2021 +""" +import struct + +from tmtccmd.config.definitions import QueueCommands + +from tmtccmd.pus_tc.packer import TcQueueT +from tmtccmd.ecss.tc import PusTelecommand + + +class SupvActionIds: + HK_REPORT = 1 + RESTART_MPSOC = 2 + START_MPSOC = 3 + SHUTWOWN_MPSOC = 4 + SEL_MPSOC_BOOT_IMAGE = 5 + SET_BOOT_TIMEOUT = 6 + SET_MAX_RESTART_TRIES = 7 + RESET_MPSOC = 8 + SET_TIME_REF = 9 + DISABLE_HK = 10 + + +class SupvHkIds: + HK_REPORT = 52 + + +def pack_ploc_supv_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str) -> TcQueueT: + tc_queue.appendleft( + (QueueCommands.PRINT, + "Testing PLOC Supervisor with object id: 0x" + object_id.hex()) + ) + + if op_code == "3": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: TC Get Hk Report")) + command = object_id + struct.pack('!I', SupvActionIds.HK_REPORT) + command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "4": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Restart MPSoC")) + command = object_id + struct.pack('!I', SupvActionIds.RESTART_MPSOC) + command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "5": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Start MPSoC")) + command = object_id + struct.pack('!I', SupvActionIds.START_MPSOC) + command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "6": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Shutdown MPSoC")) + command = object_id + struct.pack('!I', SupvActionIds.SHUTWOWN_MPSOC) + command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "7": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Select MPSoC boot image")) + mem = int(input("MEM (NVM0 - 0 or NVM1 - 1):")) + bp0 = int(input("BP0 (0 or 1):")) + bp1 = int(input("BP1 (0 or 1):")) + bp2 = int(input("BP2 (0 or 1):")) + command = pack_sel_boot_image_cmd(object_id, mem, bp0, bp1, bp2) + command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "8": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Set max restart tries")) + restart_tries = int(input("Set maximum restart tries:")) + command = object_id + struct.pack('!I', SupvActionIds.SET_MAX_RESTART_TRIES) + struct.pack('!B', restart_tries) + command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "9": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Reset MPSoC")) + command = object_id + struct.pack('!I', SupvActionIds.RESET_MPSOC) + command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "10": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Set time reference")) + command = object_id + struct.pack('!I', SupvActionIds.SET_TIME_REF) + command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "11": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Set boot timeout")) + boot_timeout = int(input("Specify boot timeout [ms]:")) + command = object_id + struct.pack('!I', SupvActionIds.SET_BOOT_TIMEOUT) + struct.pack('!I', boot_timeout) + command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "12": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Disable HK")) + command = object_id + struct.pack('!I', SupvActionIds.DISABLE_HK) + command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + return tc_queue + + +def pack_sel_boot_image_cmd(object_id: bytearray, mem: int, bp0: int, bp1: int, bp2: int) -> bytearray: + """ This function can be used to generate the command to select the image from which the MPSoC will boot + @param object_id The object id of the PLOC supervisor handler. + @param mem The memory from which the MPSoC shall boot (NVM0 - 0, NVM1 - 1) + @param bp0 Partition pin 0 + @param bp1 Partition pin 1 + @param bp2 Partition pin 2 + """ + command = bytearray() + command = object_id + struct.pack('!I', SupvActionIds.SEL_MPSOC_BOOT_IMAGE) + command = command + struct.pack('!B', mem) + command = command + struct.pack('!B', bp0) + command = command + struct.pack('!B', bp1) + command = command + struct.pack('!B', bp2) + return command diff --git a/pus_tc/tc_packer_hook.py b/pus_tc/tc_packer_hook.py index 56ea866..236740c 100644 --- a/pus_tc/tc_packer_hook.py +++ b/pus_tc/tc_packer_hook.py @@ -20,14 +20,15 @@ from pus_tc.pdu1 import pack_pdu1_test_into from pus_tc.acu import pack_acu_test_into from pus_tc.imtq import pack_imtq_test_into from pus_tc.tmp1075 import pack_tmp1075_test_into -from pus_tc.ploc import pack_ploc_test_into +from pus_tc.ploc_mpsoc import pack_ploc_mpsoc_test_into +from pus_tc.ploc_supervisor import pack_ploc_supv_test_into from pus_tc.heater import pack_heater_test_into from pus_tc.reaction_wheels import pack_single_rw_test_into from pus_tc.rad_sensor import pack_rad_sensor_test_into from config.definitions import CustomServiceList from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, ACU_HANDLER_ID, \ - TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, PLOC_ID, RW1_ID, RW2_ID, RW3_ID, RW4_ID, \ - RAD_SENSOR_ID + TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, PLOC_MPSOC_ID, RW1_ID, RW2_ID, RW3_ID, RW4_ID, \ + RAD_SENSOR_ID, PLOC_SUPV_ID LOGGER = get_console_logger() @@ -69,8 +70,8 @@ def pack_service_queue_user(service: Union[str, int], op_code: str, service_queu object_id = IMTQ_HANDLER_ID return pack_imtq_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.PLOC.value: - object_id = PLOC_ID - return pack_ploc_test_into(object_id=object_id, tc_queue=service_queue) + object_id = PLOC_MPSOC_ID + return pack_ploc_mpsoc_test_into(object_id=object_id, tc_queue=service_queue) if service == CustomServiceList.REACTION_WHEEL_1.value: object_id = RW1_ID return pack_single_rw_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) @@ -86,6 +87,9 @@ def pack_service_queue_user(service: Union[str, int], op_code: str, service_queu if service == CustomServiceList.RAD_SENSOR.value: object_id = RAD_SENSOR_ID return pack_rad_sensor_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) + if service == CustomServiceList.PLOC_SUPV.value: + object_id = PLOC_SUPV_ID + return pack_ploc_supv_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) LOGGER.warning("Invalid Service !") diff --git a/pus_tm/service_8_hook.py b/pus_tm/service_8_hook.py index 6b491c0..4d50c4a 100644 --- a/pus_tm/service_8_hook.py +++ b/pus_tm/service_8_hook.py @@ -2,7 +2,7 @@ import struct from typing import Tuple from config.object_ids import * from pus_tc.imtq import ImtqActionIds -from pus_tc.ploc import PlocReplyIds +from pus_tc.ploc_mpsoc import PlocReplyIds def user_analyze_service_8_data( @@ -30,7 +30,7 @@ def user_analyze_service_8_data( content_list = [data_string] elif object_id == IMTQ_HANDLER_ID: return handle_imtq_replies(action_id, custom_data) - elif object_id == PLOC_ID: + elif object_id == PLOC_MPSOC_ID: return handle_ploc_replies(action_id, custom_data) else: header_list = []