meier/plocSupervisor #13

Closed
meierj wants to merge 4 commits from meier/plocSupervisor into develop
6 changed files with 188 additions and 14 deletions

View File

@ -29,3 +29,4 @@ class CustomServiceList(enum.Enum):
REACTION_WHEEL_4 = "reaction_wheel_4" REACTION_WHEEL_4 = "reaction_wheel_4"
RAD_SENSOR = "rad_sensor" RAD_SENSOR = "rad_sensor"
PLOC_SUPV = "ploc_supv" PLOC_SUPV = "ploc_supv"
PLOC_UPDATER = "ploc_updater"

View File

@ -117,9 +117,24 @@ class EiveHookObject(TmTcHookBase):
"32": ("PLOC Supervisor: MRAM Dump", {OpCodeDictKeys.TIMEOUT: 2.0}), "32": ("PLOC Supervisor: MRAM Dump", {OpCodeDictKeys.TIMEOUT: 2.0}),
"33": ("PLOC Supervisor: Print CPU stats", {OpCodeDictKeys.TIMEOUT: 2.0}), "33": ("PLOC Supervisor: Print CPU stats", {OpCodeDictKeys.TIMEOUT: 2.0}),
"34": ("PLOC Supervisor: Set debug verbosity", {OpCodeDictKeys.TIMEOUT: 2.0}), "34": ("PLOC Supervisor: Set debug verbosity", {OpCodeDictKeys.TIMEOUT: 2.0}),
"35": ("PLOC Supervisor: Set GPIO", {OpCodeDictKeys.TIMEOUT: 2.0}),
"36": ("PLOC Supervisor: Read GPIO", {OpCodeDictKeys.TIMEOUT: 2.0}),
"37": ("PLOC Supervisor: Restart supervisor", {OpCodeDictKeys.TIMEOUT: 2.0}),
"38": ("PLOC Supervisor: Factory reset clear all", {OpCodeDictKeys.TIMEOUT: 2.0}),
"39": ("PLOC Supervisor: Factory reset clear mirror entries", {OpCodeDictKeys.TIMEOUT: 2.0}),
"40": ("PLOC Supervisor: Factory reset clear circular entries", {OpCodeDictKeys.TIMEOUT: 2.0}),
"41": ("PLOC Supervisor: CAN loopback test", {OpCodeDictKeys.TIMEOUT: 2.0}),
} }
service_ploc_supv_tuple = ("PLOC Supervisor", op_code_dict_srv_ploc_supv) service_ploc_supv_tuple = ("PLOC Supervisor", op_code_dict_srv_ploc_supv)
op_code_dict_srv_ploc_updater = {
"0": ("Ploc Updater: Update partion A on NVM0", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("Ploc Updater: Update partion B on NVM0", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("Ploc Updater: Update partion A on NVM1", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("Ploc Updater: Update partion B on NVM1", {OpCodeDictKeys.TIMEOUT: 2.0}),
}
service_ploc_updater_tuple = ("Ploc Updater", op_code_dict_srv_ploc_updater)
service_op_code_dict[CustomServiceList.ACU.value] = service_acu_tuple service_op_code_dict[CustomServiceList.ACU.value] = service_acu_tuple
service_op_code_dict[CustomServiceList.TMP1075_1.value] = service_tmp1075_1_tuple service_op_code_dict[CustomServiceList.TMP1075_1.value] = service_tmp1075_1_tuple
service_op_code_dict[CustomServiceList.TMP1075_2.value] = service_tmp1075_2_tuple service_op_code_dict[CustomServiceList.TMP1075_2.value] = service_tmp1075_2_tuple
@ -134,6 +149,7 @@ class EiveHookObject(TmTcHookBase):
service_op_code_dict[CustomServiceList.REACTION_WHEEL_4.value] = service_rw_tuple service_op_code_dict[CustomServiceList.REACTION_WHEEL_4.value] = service_rw_tuple
service_op_code_dict[CustomServiceList.RAD_SENSOR.value] = service_rad_sensor_tuple service_op_code_dict[CustomServiceList.RAD_SENSOR.value] = service_rad_sensor_tuple
service_op_code_dict[CustomServiceList.PLOC_SUPV.value] = service_ploc_supv_tuple service_op_code_dict[CustomServiceList.PLOC_SUPV.value] = service_ploc_supv_tuple
service_op_code_dict[CustomServiceList.PLOC_UPDATER.value] = service_ploc_updater_tuple
return service_op_code_dict return service_op_code_dict
def get_json_config_file_path(self) -> str: def get_json_config_file_path(self) -> str:

View File

@ -26,6 +26,7 @@ RW4_ID = bytes([0x44, 0x12, 0x00, 0x4])
START_TRACKER_ID = bytes([0x44, 0x13, 0x00, 0x1]) START_TRACKER_ID = bytes([0x44, 0x13, 0x00, 0x1])
RAD_SENSOR_ID = bytes([0x44, 0x32, 0x00, 0xA5]) RAD_SENSOR_ID = bytes([0x44, 0x32, 0x00, 0xA5])
PLOC_SUPV_ID = bytes([0x44, 0x33, 0x00, 0x16]) PLOC_SUPV_ID = bytes([0x44, 0x33, 0x00, 0x16])
PLOC_UPDATER_ID = bytes([0x44, 0x33, 0x00, 0x00])
def get_object_ids() -> Dict[bytes, list]: def get_object_ids() -> Dict[bytes, list]:

View File

@ -12,6 +12,9 @@ from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT from tmtccmd.tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.utility.logger import get_console_logger
LOGGER = get_console_logger()
latchup_id_dict = { latchup_id_dict = {
"0": "0.85V", "0": "0.85V",
@ -60,9 +63,11 @@ class SupvActionIds:
SET_GPIO = 34 SET_GPIO = 34
READ_GPIO = 35 READ_GPIO = 35
RESTART_SUPERVISOR = 36 RESTART_SUPERVISOR = 36
FACTORY_RESET = 37 FACTORY_RESET_CLEAR_ALL = 37
REQUEST_LOGGING_DATA = 38 REQUEST_LOGGING_DATA = 38
UPDATE_IMAGE_DATA = 39 UPDATE_IMAGE_DATA = 39
FACTORY_RESET_CLEAR_MIRROR = 40
FACTORY_RESET_CLEAR_CIRCULAR = 41
class SupvHkIds: class SupvHkIds:
@ -242,6 +247,41 @@ def pack_ploc_supv_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code:
command = pack_set_debug_verbosity_cmd(object_id) command = pack_set_debug_verbosity_cmd(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=49, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=49, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "35":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Set GPIO command"))
command = pack_set_gpio_cmd(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=50, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "36":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Read GPIO command"))
command = pack_read_gpio_cmd(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=51, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "37":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Restart supervisor"))
command = command = object_id + struct.pack('!I', SupvActionIds.RESTART_SUPERVISOR)
command = PusTelecommand(service=8, subservice=128, ssc=52, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "38":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Factory reset clear all"))
command = command = object_id + struct.pack('!I', SupvActionIds.FACTORY_RESET_CLEAR_ALL)
command = PusTelecommand(service=8, subservice=128, ssc=53, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "39":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Factory reset clear mirror entries"))
command = command = object_id + struct.pack('!I', SupvActionIds.FACTORY_RESET_CLEAR_MIRROR)
command = PusTelecommand(service=8, subservice=128, ssc=54, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "40":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Factory reset clear circular entries"))
command = command = object_id + struct.pack('!I', SupvActionIds.FACTORY_RESET_CLEAR_CIRCULAR)
command = PusTelecommand(service=8, subservice=128, ssc=55, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "41":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: CAN loopback test"))
command = command = object_id + struct.pack('!I', SupvActionIds.CAN_LOOPBACK_TEST)
command = PusTelecommand(service=8, subservice=128, ssc=56, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue return tc_queue
@ -488,7 +528,17 @@ def pack_print_cpu_stats_cmd(object_id: bytearray) -> bytearray:
def pack_set_debug_verbosity_cmd(object_id: bytearray) -> bytearray: def pack_set_debug_verbosity_cmd(object_id: bytearray) -> bytearray:
command = bytearray() command = bytearray()
print("Specify debug verbosity") verbosity = get_debug_verbosity()
command = object_id + struct.pack('!I', SupvActionIds.SET_DBG_VERBOSITY)
command = command + struct.pack('!B', verbosity)
return command
def get_debug_verbosity() -> int:
tries = 0
while tries < 3:
try:
print("Debug verbosity options")
verbosity_options_dict = { verbosity_options_dict = {
0: "None", 0: "None",
1: "Error", 1: "Error",
@ -499,6 +549,35 @@ def pack_set_debug_verbosity_cmd(object_id: bytearray) -> bytearray:
for entry in verbosity_options_dict.items(): for entry in verbosity_options_dict.items():
print("{:<6} | {}".format(entry[0], entry[1])) print("{:<6} | {}".format(entry[0], entry[1]))
verbosity = int(input("Specify verbosity key: ")) verbosity = int(input("Specify verbosity key: "))
command = object_id + struct.pack('!I', SupvActionIds.SET_DBG_VERBOSITY) if verbosity > len(verbosity_options_dict) - 1:
command = command + struct.pack('!B', verbosity) raise ValueError
return verbosity
except ValueError:
LOGGER.warning("Invalid verbosity key specified")
tries = tries + 1
LOGGER.error("get_debug_verbosity: Exceeded max tries to input verbosity key")
quit()
def pack_set_gpio_cmd(object_id: bytearray) -> bytearray:
port = int(input("Specify port: "))
pin = int(input("Specify pin: "))
val = int(input("Specify val: "))
command = bytearray()
command = object_id + struct.pack('!I', SupvActionIds.SET_GPIO)
command = command + struct.pack('!B', port)
command = command + struct.pack('!B', pin)
command = command + struct.pack('!B', val)
return command return command
def pack_read_gpio_cmd(object_id: bytearray) -> bytearray:
port = int(input("Specify port: "))
pin = int(input("Specify pin: "))
command = bytearray()
command = object_id + struct.pack('!I', SupvActionIds.READ_GPIO)
command = command + struct.pack('!B', port)
command = command + struct.pack('!B', pin)
return command

73
pus_tc/ploc_upater.py Normal file
View File

@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
"""
@file ploc_supervisor.py
@brief Tests for commanding the supervisor of the PLOC.
The supervisor is programmed by Thales.
@author J. Meier
@date 10.07.2021
"""
import struct
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.utility.logger import get_console_logger
LOGGER = get_console_logger()
latchup_id_dict = {
"0": "0.85V",
"1": "1.8V",
"2": "MISC",
"3": "3.3V",
"4": "NVM_4XO",
"5": "MISSION",
"6": "SAFECOTS"
}
class UpdaterActionIds:
UPDATE_NVM0_A = 0
UPDATE_NVM0_B = 1
UPDATE_NVM1_A = 2
UPDATE_NVM1_B = 3
class ImagePathDefs:
imageNvm0A = "/mnt/sd0/ploc/updateNvm0A.bin"
imageNvm0B = "/mnt/sd0/ploc/updateNvm0B.bin"
imageNvm1A = "/mnt/sd0/ploc/updateNvm1A.bin"
imageNvm1B = "/mnt/sd0/ploc/updateNvm1B.bin"
def pack_ploc_updater_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str) -> TcQueueT:
tc_queue.appendleft(
(QueueCommands.PRINT,
"Testing PLOC updater with object id: 0x" + object_id.hex())
)
if op_code == "0":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Update partition A on NVM0"))
command = object_id + struct.pack('!I', UpdaterActionIds.UPDATE_NVM0_A) + \
bytearray(ImagePathDefs.imageNvm0A, 'utf-8')
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "1":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Update partition B on NVM0"))
command = object_id + struct.pack('!I', UpdaterActionIds.UPDATE_NVM0_B) + \
bytearray(ImagePathDefs.imageNvm0B, 'utf-8')
command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "2":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Update partition A on NVM1"))
command = object_id + struct.pack('!I', UpdaterActionIds.UPDATE_NVM1_A) + \
bytearray(ImagePathDefs.imageNvm1A, 'utf-8')
command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "3":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Update partition B on NVM1"))
command = object_id + struct.pack('!I', UpdaterActionIds.UPDATE_NVM1_B) + \
bytearray(ImagePathDefs.imageNvm1B, 'utf-8')
command = PusTelecommand(service=8, subservice=128, ssc=33, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())

View File

@ -26,10 +26,11 @@ from pus_tc.ploc_supervisor import pack_ploc_supv_test_into
from pus_tc.heater import pack_heater_test_into from pus_tc.heater import pack_heater_test_into
from pus_tc.reaction_wheels import pack_single_rw_test_into from pus_tc.reaction_wheels import pack_single_rw_test_into
from pus_tc.rad_sensor import pack_rad_sensor_test_into from pus_tc.rad_sensor import pack_rad_sensor_test_into
from pus_tc.ploc_upater import pack_ploc_updater_test_into
from config.definitions import CustomServiceList from config.definitions import CustomServiceList
from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, ACU_HANDLER_ID, \ from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, ACU_HANDLER_ID, \
TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, PLOC_MPSOC_ID, RW1_ID, RW2_ID, RW3_ID, RW4_ID, \ TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, PLOC_MPSOC_ID, RW1_ID, RW2_ID, RW3_ID, RW4_ID, \
RAD_SENSOR_ID, PLOC_SUPV_ID RAD_SENSOR_ID, PLOC_SUPV_ID, PLOC_UPDATER_ID
LOGGER = get_console_logger() LOGGER = get_console_logger()
@ -93,6 +94,9 @@ def pack_service_queue_user(service: Union[str, int], op_code: str, service_queu
if service == CustomServiceList.PLOC_SUPV.value: if service == CustomServiceList.PLOC_SUPV.value:
object_id = PLOC_SUPV_ID object_id = PLOC_SUPV_ID
return pack_ploc_supv_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) return pack_ploc_supv_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.PLOC_UPDATER.value:
object_id = PLOC_UPDATER_ID
return pack_ploc_updater_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
LOGGER.warning("Invalid Service !") LOGGER.warning("Invalid Service !")