meier/plocSupervisor #7

Merged
muellerr merged 7 commits from meier/plocSupervisor into develop 2021-07-24 14:40:57 +02:00
8 changed files with 183 additions and 18 deletions

View File

@ -0,0 +1,24 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="PLOC SUPV Test" type="PythonConfigurationType" factoryName="Python" folderName="UDP">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-c udp -s ploc_supv -l -t 6 --hk" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

View File

@ -28,3 +28,4 @@ class CustomServiceList(enum.Enum):
REACTION_WHEEL_3 = "reaction_wheel_3" REACTION_WHEEL_3 = "reaction_wheel_3"
REACTION_WHEEL_4 = "reaction_wheel_4" REACTION_WHEEL_4 = "reaction_wheel_4"
RAD_SENSOR = "rad_sensor" RAD_SENSOR = "rad_sensor"
PLOC_SUPV = "ploc_supv"

View File

@ -81,6 +81,23 @@ class EiveHookObject(TmTcHookBase):
} }
service_rad_sensor_tuple = ("Radiation Sensor", op_code_dict_srv_rad_sensor) service_rad_sensor_tuple = ("Radiation Sensor", op_code_dict_srv_rad_sensor)
op_code_dict_srv_ploc_supv = {
"0": ("PLOC Supervisor: Set mode on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("PLOC Supervisor: Set mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("PLOC Supervisor: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("PLOC Supervisor: Get HK Report", {OpCodeDictKeys.TIMEOUT: 2.0}),
"4": ("PLOC Supervisor: Restart MPSoC", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": ("PLOC Supervisor: Start MPSoC", {OpCodeDictKeys.TIMEOUT: 2.0}),
"6": ("PLOC Supervisor: Shutdown MPSoC", {OpCodeDictKeys.TIMEOUT: 2.0}),
"7": ("PLOC Supervisor: Select MPSoC boot image", {OpCodeDictKeys.TIMEOUT: 2.0}),
"8": ("PLOC Supervisor: Set max restart tries", {OpCodeDictKeys.TIMEOUT: 2.0}),
"9": ("PLOC Supervisor: Reset MPSoC", {OpCodeDictKeys.TIMEOUT: 2.0}),
"10": ("PLOC Supervisor: Set time reference", {OpCodeDictKeys.TIMEOUT: 2.0}),
"11": ("PLOC Supervisor: Set boot timeout", {OpCodeDictKeys.TIMEOUT: 2.0}),
"12": ("PLOC Supervisor: Disable Hk", {OpCodeDictKeys.TIMEOUT: 2.0}),
}
service_ploc_supv_tuple = ("PLOC Supervisor", op_code_dict_srv_ploc_supv)
service_op_code_dict[CustomServiceList.ACU.value] = service_acu_tuple service_op_code_dict[CustomServiceList.ACU.value] = service_acu_tuple
service_op_code_dict[CustomServiceList.TMP1075_1.value] = service_tmp1075_1_tuple service_op_code_dict[CustomServiceList.TMP1075_1.value] = service_tmp1075_1_tuple
service_op_code_dict[CustomServiceList.TMP1075_2.value] = service_tmp1075_2_tuple service_op_code_dict[CustomServiceList.TMP1075_2.value] = service_tmp1075_2_tuple
@ -94,6 +111,7 @@ class EiveHookObject(TmTcHookBase):
service_op_code_dict[CustomServiceList.REACTION_WHEEL_3.value] = service_rw_tuple service_op_code_dict[CustomServiceList.REACTION_WHEEL_3.value] = service_rw_tuple
service_op_code_dict[CustomServiceList.REACTION_WHEEL_4.value] = service_rw_tuple service_op_code_dict[CustomServiceList.REACTION_WHEEL_4.value] = service_rw_tuple
service_op_code_dict[CustomServiceList.RAD_SENSOR.value] = service_rad_sensor_tuple service_op_code_dict[CustomServiceList.RAD_SENSOR.value] = service_rad_sensor_tuple
service_op_code_dict[CustomServiceList.PLOC_SUPV.value] = service_ploc_supv_tuple
return service_op_code_dict return service_op_code_dict
def get_json_config_file_path(self) -> str: def get_json_config_file_path(self) -> str:

View File

@ -17,13 +17,15 @@ HEATER_ID = bytes([0x44, 0x41, 0x00, 0xA4])
PCDU_HANDLER_ID = bytes([0x44, 0x20, 0x00, 0xA1]) PCDU_HANDLER_ID = bytes([0x44, 0x20, 0x00, 0xA1])
SOLAR_ARRAY_DEPLOYMENT_ID = bytes([0x44, 0x41, 0x10, 0xA2]) SOLAR_ARRAY_DEPLOYMENT_ID = bytes([0x44, 0x41, 0x10, 0xA2])
SYRLINKS_HANDLER = bytes([0x44, 0x53, 0x00, 0xA3]) SYRLINKS_HANDLER = bytes([0x44, 0x53, 0x00, 0xA3])
IMTQ_HANDLER_ID = bytearray([0x44, 0x14, 0x00, 0x14]) IMTQ_HANDLER_ID = bytes([0x44, 0x14, 0x00, 0x14])
PLOC_ID = bytearray([0x44, 0x33, 0x00, 0x15]) PLOC_MPSOC_ID = bytes([0x44, 0x33, 0x00, 0x15])
meierj marked this conversation as resolved Outdated

make this and IMTQ_HANDLER_ID bytes type too?

make this and `IMTQ_HANDLER_ID` `bytes` type too?
RW1_ID = bytes([0x44, 0x21, 0x00, 0x1]) RW1_ID = bytes([0x44, 0x12, 0x00, 0x1])
RW2_ID = bytes([0x44, 0x21, 0x00, 0x2]) RW2_ID = bytes([0x44, 0x12, 0x00, 0x2])
RW3_ID = bytes([0x44, 0x21, 0x00, 0x3]) RW3_ID = bytes([0x44, 0x12, 0x00, 0x3])
RW4_ID = bytes([0x44, 0x21, 0x00, 0x4]) RW4_ID = bytes([0x44, 0x12, 0x00, 0x4])
START_TRACKER_ID = bytes([0x44, 0x13, 0x00, 0x1])
RAD_SENSOR_ID = bytes([0x44, 0x32, 0x00, 0xA5]) RAD_SENSOR_ID = bytes([0x44, 0x32, 0x00, 0xA5])
PLOC_SUPV_ID = bytes([0x44, 0x33, 0x00, 0x16])
def get_object_ids() -> Dict[bytes, list]: def get_object_ids() -> Dict[bytes, list]:
@ -44,5 +46,6 @@ def get_object_ids() -> Dict[bytes, list]:
RW3_ID: "Reaction Wheel 3", RW3_ID: "Reaction Wheel 3",
RW4_ID: "Reaction Wheel 4", RW4_ID: "Reaction Wheel 4",
RAD_SENSOR_ID: "Radiation Sensor", RAD_SENSOR_ID: "Radiation Sensor",
PLOC_SUPV_ID: "PLOC Supervisor",
}) })
return object_id_dict return object_id_dict

View File

@ -1,9 +1,10 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
@file ploc.py @file ploc_mpsoc.py
@brief TMP1075 tests @brief Tests for commanding the MPSoC of the PLOC.
The MPSoC is programmed by the ILH.
@author J. Meier @author J. Meier
@date 06.01.2021 @date 06.03.2021
""" """
import struct import struct
@ -33,10 +34,10 @@ class PlocReplyIds:
tm_mem_read_report = 6 tm_mem_read_report = 6
def pack_ploc_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT: def pack_ploc_mpsoc_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft( tc_queue.appendleft(
(QueueCommands.PRINT, (QueueCommands.PRINT,
"Testing PLOC Handler with object id: 0x" + object_id.hex()) "Testing PLOC MPSoC with object id: 0x" + object_id.hex())
) )
if PlocTestProcedure.all or PlocTestProcedure.test_tc_mem_write: if PlocTestProcedure.all or PlocTestProcedure.test_tc_mem_write:

114
pus_tc/ploc_supervisor.py Normal file
View File

@ -0,0 +1,114 @@
# -*- coding: utf-8 -*-
"""
@file ploc_supervisor.py
@brief Tests for commanding the supervisor of the PLOC.
The supervisor is programmed by Thales.
@author J. Meier
@date 10.07.2021
"""
import struct
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand
class SupvActionIds:
HK_REPORT = 1
RESTART_MPSOC = 2
START_MPSOC = 3
SHUTWOWN_MPSOC = 4
SEL_MPSOC_BOOT_IMAGE = 5
SET_BOOT_TIMEOUT = 6
SET_MAX_RESTART_TRIES = 7
RESET_MPSOC = 8
SET_TIME_REF = 9
DISABLE_HK = 10
class SupvHkIds:
HK_REPORT = 52
def pack_ploc_supv_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str) -> TcQueueT:
tc_queue.appendleft(
(QueueCommands.PRINT,
"Testing PLOC Supervisor with object id: 0x" + object_id.hex())
)
if op_code == "3":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: TC Get Hk Report"))
command = object_id + struct.pack('!I', SupvActionIds.HK_REPORT)
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "4":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Restart MPSoC"))
command = object_id + struct.pack('!I', SupvActionIds.RESTART_MPSOC)
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "5":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Start MPSoC"))
command = object_id + struct.pack('!I', SupvActionIds.START_MPSOC)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "6":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Shutdown MPSoC"))
command = object_id + struct.pack('!I', SupvActionIds.SHUTWOWN_MPSOC)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "7":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Select MPSoC boot image"))
mem = int(input("MEM (NVM0 - 0 or NVM1 - 1):"))
bp0 = int(input("BP0 (0 or 1):"))
bp1 = int(input("BP1 (0 or 1):"))
bp2 = int(input("BP2 (0 or 1):"))
command = pack_sel_boot_image_cmd(object_id, mem, bp0, bp1, bp2)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "8":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Set max restart tries"))
restart_tries = int(input("Set maximum restart tries:"))
command = object_id + struct.pack('!I', SupvActionIds.SET_MAX_RESTART_TRIES) + struct.pack('!B', restart_tries)
command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "9":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Reset MPSoC"))
command = object_id + struct.pack('!I', SupvActionIds.RESET_MPSOC)
command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "10":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Set time reference"))
command = object_id + struct.pack('!I', SupvActionIds.SET_TIME_REF)
command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "11":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Set boot timeout"))
boot_timeout = int(input("Specify boot timeout [ms]:"))
command = object_id + struct.pack('!I', SupvActionIds.SET_BOOT_TIMEOUT) + struct.pack('!I', boot_timeout)
command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "12":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Disable HK"))
command = object_id + struct.pack('!I', SupvActionIds.DISABLE_HK)
command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue
def pack_sel_boot_image_cmd(object_id: bytearray, mem: int, bp0: int, bp1: int, bp2: int) -> bytearray:
""" This function can be used to generate the command to select the image from which the MPSoC will boot
@param object_id The object id of the PLOC supervisor handler.
@param mem The memory from which the MPSoC shall boot (NVM0 - 0, NVM1 - 1)
@param bp0 Partition pin 0
@param bp1 Partition pin 1
@param bp2 Partition pin 2
"""
command = bytearray()
command = object_id + struct.pack('!I', SupvActionIds.SEL_MPSOC_BOOT_IMAGE)
command = command + struct.pack('!B', mem)
command = command + struct.pack('!B', bp0)
command = command + struct.pack('!B', bp1)
command = command + struct.pack('!B', bp2)
return command

View File

@ -20,14 +20,15 @@ from pus_tc.pdu1 import pack_pdu1_test_into
from pus_tc.acu import pack_acu_test_into from pus_tc.acu import pack_acu_test_into
from pus_tc.imtq import pack_imtq_test_into from pus_tc.imtq import pack_imtq_test_into
from pus_tc.tmp1075 import pack_tmp1075_test_into from pus_tc.tmp1075 import pack_tmp1075_test_into
from pus_tc.ploc import pack_ploc_test_into from pus_tc.ploc_mpsoc import pack_ploc_mpsoc_test_into
from pus_tc.ploc_supervisor import pack_ploc_supv_test_into
from pus_tc.heater import pack_heater_test_into from pus_tc.heater import pack_heater_test_into
from pus_tc.reaction_wheels import pack_single_rw_test_into from pus_tc.reaction_wheels import pack_single_rw_test_into
from pus_tc.rad_sensor import pack_rad_sensor_test_into from pus_tc.rad_sensor import pack_rad_sensor_test_into
from config.definitions import CustomServiceList from config.definitions import CustomServiceList
from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, ACU_HANDLER_ID, \ from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, ACU_HANDLER_ID, \
TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, PLOC_ID, RW1_ID, RW2_ID, RW3_ID, RW4_ID, \ TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, PLOC_MPSOC_ID, RW1_ID, RW2_ID, RW3_ID, RW4_ID, \
RAD_SENSOR_ID RAD_SENSOR_ID, PLOC_SUPV_ID
LOGGER = get_console_logger() LOGGER = get_console_logger()
@ -69,8 +70,8 @@ def pack_service_queue_user(service: Union[str, int], op_code: str, service_queu
object_id = IMTQ_HANDLER_ID object_id = IMTQ_HANDLER_ID
return pack_imtq_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) return pack_imtq_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.PLOC.value: if service == CustomServiceList.PLOC.value:
object_id = PLOC_ID object_id = PLOC_MPSOC_ID
return pack_ploc_test_into(object_id=object_id, tc_queue=service_queue) return pack_ploc_mpsoc_test_into(object_id=object_id, tc_queue=service_queue)
if service == CustomServiceList.REACTION_WHEEL_1.value: if service == CustomServiceList.REACTION_WHEEL_1.value:
object_id = RW1_ID object_id = RW1_ID
return pack_single_rw_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) return pack_single_rw_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
@ -86,6 +87,9 @@ def pack_service_queue_user(service: Union[str, int], op_code: str, service_queu
if service == CustomServiceList.RAD_SENSOR.value: if service == CustomServiceList.RAD_SENSOR.value:
object_id = RAD_SENSOR_ID object_id = RAD_SENSOR_ID
return pack_rad_sensor_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) return pack_rad_sensor_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.PLOC_SUPV.value:
object_id = PLOC_SUPV_ID
return pack_ploc_supv_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
LOGGER.warning("Invalid Service !") LOGGER.warning("Invalid Service !")

View File

@ -2,7 +2,7 @@ import struct
from typing import Tuple from typing import Tuple
from config.object_ids import * from config.object_ids import *
from pus_tc.imtq import ImtqActionIds from pus_tc.imtq import ImtqActionIds
from pus_tc.ploc import PlocReplyIds from pus_tc.ploc_mpsoc import PlocReplyIds
def user_analyze_service_8_data( def user_analyze_service_8_data(
@ -30,7 +30,7 @@ def user_analyze_service_8_data(
content_list = [data_string] content_list = [data_string]
elif object_id == IMTQ_HANDLER_ID: elif object_id == IMTQ_HANDLER_ID:
return handle_imtq_replies(action_id, custom_data) return handle_imtq_replies(action_id, custom_data)
elif object_id == PLOC_ID: elif object_id == PLOC_MPSOC_ID:
return handle_ploc_replies(action_id, custom_data) return handle_ploc_replies(action_id, custom_data)
else: else:
header_list = [] header_list = []