Merge pull request 'meier/plocSupervisor' (#14) from meier/plocSupervisor into develop
Reviewed-on: #14
This commit is contained in:
commit
f00cd8f6bd
@ -29,4 +29,5 @@ class CustomServiceList(enum.Enum):
|
|||||||
REACTION_WHEEL_4 = "reaction_wheel_4"
|
REACTION_WHEEL_4 = "reaction_wheel_4"
|
||||||
RAD_SENSOR = "rad_sensor"
|
RAD_SENSOR = "rad_sensor"
|
||||||
PLOC_SUPV = "ploc_supv"
|
PLOC_SUPV = "ploc_supv"
|
||||||
|
PLOC_UPDATER = "ploc_updater"
|
||||||
CORE = 'core'
|
CORE = 'core'
|
||||||
|
@ -131,9 +131,24 @@ class EiveHookObject(TmTcHookBase):
|
|||||||
"32": ("PLOC Supervisor: MRAM Dump", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
"32": ("PLOC Supervisor: MRAM Dump", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||||
"33": ("PLOC Supervisor: Print CPU stats", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
"33": ("PLOC Supervisor: Print CPU stats", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||||
"34": ("PLOC Supervisor: Set debug verbosity", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
"34": ("PLOC Supervisor: Set debug verbosity", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||||
|
"35": ("PLOC Supervisor: Set GPIO", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||||
|
"36": ("PLOC Supervisor: Read GPIO", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||||
|
"37": ("PLOC Supervisor: Restart supervisor", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||||
|
"38": ("PLOC Supervisor: Factory reset clear all", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||||
|
"39": ("PLOC Supervisor: Factory reset clear mirror entries", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||||
|
"40": ("PLOC Supervisor: Factory reset clear circular entries", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||||
|
"41": ("PLOC Supervisor: CAN loopback test", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||||
}
|
}
|
||||||
service_ploc_supv_tuple = ("PLOC Supervisor", op_code_dict_srv_ploc_supv)
|
service_ploc_supv_tuple = ("PLOC Supervisor", op_code_dict_srv_ploc_supv)
|
||||||
|
|
||||||
|
op_code_dict_srv_ploc_updater = {
|
||||||
|
"0": ("Ploc Updater: Update partion A on NVM0", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||||
|
"1": ("Ploc Updater: Update partion B on NVM0", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||||
|
"2": ("Ploc Updater: Update partion A on NVM1", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||||
|
"3": ("Ploc Updater: Update partion B on NVM1", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||||
|
}
|
||||||
|
service_ploc_updater_tuple = ("Ploc Updater", op_code_dict_srv_ploc_updater)
|
||||||
|
|
||||||
service_op_code_dict[CustomServiceList.P60DOCK.value] = service_p60_tuple
|
service_op_code_dict[CustomServiceList.P60DOCK.value] = service_p60_tuple
|
||||||
service_op_code_dict[CustomServiceList.PDU1.value] = service_pdu1_tuple
|
service_op_code_dict[CustomServiceList.PDU1.value] = service_pdu1_tuple
|
||||||
service_op_code_dict[CustomServiceList.PDU1.value] = service_pdu2_tuple
|
service_op_code_dict[CustomServiceList.PDU1.value] = service_pdu2_tuple
|
||||||
@ -145,6 +160,7 @@ class EiveHookObject(TmTcHookBase):
|
|||||||
service_op_code_dict[CustomServiceList.REACTION_WHEEL_4.value] = service_rw_tuple
|
service_op_code_dict[CustomServiceList.REACTION_WHEEL_4.value] = service_rw_tuple
|
||||||
service_op_code_dict[CustomServiceList.RAD_SENSOR.value] = service_rad_sensor_tuple
|
service_op_code_dict[CustomServiceList.RAD_SENSOR.value] = service_rad_sensor_tuple
|
||||||
service_op_code_dict[CustomServiceList.PLOC_SUPV.value] = service_ploc_supv_tuple
|
service_op_code_dict[CustomServiceList.PLOC_SUPV.value] = service_ploc_supv_tuple
|
||||||
|
service_op_code_dict[CustomServiceList.PLOC_UPDATER.value] = service_ploc_updater_tuple
|
||||||
return service_op_code_dict
|
return service_op_code_dict
|
||||||
|
|
||||||
def get_json_config_file_path(self) -> str:
|
def get_json_config_file_path(self) -> str:
|
||||||
|
@ -26,6 +26,7 @@ RW4_ID = bytes([0x44, 0x12, 0x00, 0x4])
|
|||||||
START_TRACKER_ID = bytes([0x44, 0x13, 0x00, 0x1])
|
START_TRACKER_ID = bytes([0x44, 0x13, 0x00, 0x1])
|
||||||
RAD_SENSOR_ID = bytes([0x44, 0x32, 0x00, 0xA5])
|
RAD_SENSOR_ID = bytes([0x44, 0x32, 0x00, 0xA5])
|
||||||
PLOC_SUPV_ID = bytes([0x44, 0x33, 0x00, 0x16])
|
PLOC_SUPV_ID = bytes([0x44, 0x33, 0x00, 0x16])
|
||||||
|
PLOC_UPDATER_ID = bytes([0x44, 0x33, 0x00, 0x00])
|
||||||
CORE_CONTROLLER_ID = bytes([0x43, 0x00, 0x00, 0x03])
|
CORE_CONTROLLER_ID = bytes([0x43, 0x00, 0x00, 0x03])
|
||||||
|
|
||||||
|
|
||||||
|
@ -12,6 +12,9 @@ from tmtccmd.config.definitions import QueueCommands
|
|||||||
|
|
||||||
from tmtccmd.tc.packer import TcQueueT
|
from tmtccmd.tc.packer import TcQueueT
|
||||||
from tmtccmd.ecss.tc import PusTelecommand
|
from tmtccmd.ecss.tc import PusTelecommand
|
||||||
|
from tmtccmd.utility.logger import get_console_logger
|
||||||
|
|
||||||
|
LOGGER = get_console_logger()
|
||||||
|
|
||||||
latchup_id_dict = {
|
latchup_id_dict = {
|
||||||
"0": "0.85V",
|
"0": "0.85V",
|
||||||
@ -60,9 +63,11 @@ class SupvActionIds:
|
|||||||
SET_GPIO = 34
|
SET_GPIO = 34
|
||||||
READ_GPIO = 35
|
READ_GPIO = 35
|
||||||
RESTART_SUPERVISOR = 36
|
RESTART_SUPERVISOR = 36
|
||||||
FACTORY_RESET = 37
|
FACTORY_RESET_CLEAR_ALL = 37
|
||||||
REQUEST_LOGGING_DATA = 38
|
REQUEST_LOGGING_DATA = 38
|
||||||
UPDATE_IMAGE_DATA = 39
|
UPDATE_IMAGE_DATA = 39
|
||||||
|
FACTORY_RESET_CLEAR_MIRROR = 40
|
||||||
|
FACTORY_RESET_CLEAR_CIRCULAR = 41
|
||||||
|
|
||||||
|
|
||||||
class SupvHkIds:
|
class SupvHkIds:
|
||||||
@ -242,6 +247,41 @@ def pack_ploc_supv_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code:
|
|||||||
command = pack_set_debug_verbosity_cmd(object_id)
|
command = pack_set_debug_verbosity_cmd(object_id)
|
||||||
command = PusTelecommand(service=8, subservice=128, ssc=49, app_data=command)
|
command = PusTelecommand(service=8, subservice=128, ssc=49, app_data=command)
|
||||||
tc_queue.appendleft(command.pack_command_tuple())
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
|
elif op_code == "35":
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Set GPIO command"))
|
||||||
|
command = pack_set_gpio_cmd(object_id)
|
||||||
|
command = PusTelecommand(service=8, subservice=128, ssc=50, app_data=command)
|
||||||
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
|
elif op_code == "36":
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Read GPIO command"))
|
||||||
|
command = pack_read_gpio_cmd(object_id)
|
||||||
|
command = PusTelecommand(service=8, subservice=128, ssc=51, app_data=command)
|
||||||
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
|
elif op_code == "37":
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Restart supervisor"))
|
||||||
|
command = command = object_id + struct.pack('!I', SupvActionIds.RESTART_SUPERVISOR)
|
||||||
|
command = PusTelecommand(service=8, subservice=128, ssc=52, app_data=command)
|
||||||
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
|
elif op_code == "38":
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Factory reset clear all"))
|
||||||
|
command = command = object_id + struct.pack('!I', SupvActionIds.FACTORY_RESET_CLEAR_ALL)
|
||||||
|
command = PusTelecommand(service=8, subservice=128, ssc=53, app_data=command)
|
||||||
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
|
elif op_code == "39":
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Factory reset clear mirror entries"))
|
||||||
|
command = command = object_id + struct.pack('!I', SupvActionIds.FACTORY_RESET_CLEAR_MIRROR)
|
||||||
|
command = PusTelecommand(service=8, subservice=128, ssc=54, app_data=command)
|
||||||
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
|
elif op_code == "40":
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Factory reset clear circular entries"))
|
||||||
|
command = command = object_id + struct.pack('!I', SupvActionIds.FACTORY_RESET_CLEAR_CIRCULAR)
|
||||||
|
command = PusTelecommand(service=8, subservice=128, ssc=55, app_data=command)
|
||||||
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
|
elif op_code == "41":
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: CAN loopback test"))
|
||||||
|
command = command = object_id + struct.pack('!I', SupvActionIds.CAN_LOOPBACK_TEST)
|
||||||
|
command = PusTelecommand(service=8, subservice=128, ssc=56, app_data=command)
|
||||||
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
|
|
||||||
return tc_queue
|
return tc_queue
|
||||||
|
|
||||||
@ -488,17 +528,56 @@ def pack_print_cpu_stats_cmd(object_id: bytearray) -> bytearray:
|
|||||||
|
|
||||||
def pack_set_debug_verbosity_cmd(object_id: bytearray) -> bytearray:
|
def pack_set_debug_verbosity_cmd(object_id: bytearray) -> bytearray:
|
||||||
command = bytearray()
|
command = bytearray()
|
||||||
print("Specify debug verbosity")
|
verbosity = get_debug_verbosity()
|
||||||
verbosity_options_dict = {
|
|
||||||
0: "None",
|
|
||||||
1: "Error",
|
|
||||||
2: "Warn",
|
|
||||||
3: "Info",
|
|
||||||
}
|
|
||||||
print("{:<6} | {}".format('Key', 'Description'))
|
|
||||||
for entry in verbosity_options_dict.items():
|
|
||||||
print("{:<6} | {}".format(entry[0], entry[1]))
|
|
||||||
verbosity = int(input("Specify verbosity key: "))
|
|
||||||
command = object_id + struct.pack('!I', SupvActionIds.SET_DBG_VERBOSITY)
|
command = object_id + struct.pack('!I', SupvActionIds.SET_DBG_VERBOSITY)
|
||||||
command = command + struct.pack('!B', verbosity)
|
command = command + struct.pack('!B', verbosity)
|
||||||
return command
|
return command
|
||||||
|
|
||||||
|
|
||||||
|
def get_debug_verbosity() -> int:
|
||||||
|
tries = 0
|
||||||
|
while tries < 3:
|
||||||
|
try:
|
||||||
|
print("Debug verbosity options")
|
||||||
|
verbosity_options_dict = {
|
||||||
|
0: "None",
|
||||||
|
1: "Error",
|
||||||
|
2: "Warn",
|
||||||
|
3: "Info",
|
||||||
|
}
|
||||||
|
print("{:<6} | {}".format('Key', 'Description'))
|
||||||
|
for entry in verbosity_options_dict.items():
|
||||||
|
print("{:<6} | {}".format(entry[0], entry[1]))
|
||||||
|
verbosity = int(input("Specify verbosity key: "))
|
||||||
|
if verbosity > len(verbosity_options_dict) - 1:
|
||||||
|
raise ValueError
|
||||||
|
return verbosity
|
||||||
|
except ValueError:
|
||||||
|
LOGGER.warning("Invalid verbosity key specified")
|
||||||
|
tries = tries + 1
|
||||||
|
LOGGER.error("get_debug_verbosity: Exceeded max tries to input verbosity key")
|
||||||
|
quit()
|
||||||
|
|
||||||
|
|
||||||
|
def pack_set_gpio_cmd(object_id: bytearray) -> bytearray:
|
||||||
|
port = int(input("Specify port: "))
|
||||||
|
pin = int(input("Specify pin: "))
|
||||||
|
val = int(input("Specify val: "))
|
||||||
|
command = bytearray()
|
||||||
|
command = object_id + struct.pack('!I', SupvActionIds.SET_GPIO)
|
||||||
|
command = command + struct.pack('!B', port)
|
||||||
|
command = command + struct.pack('!B', pin)
|
||||||
|
command = command + struct.pack('!B', val)
|
||||||
|
return command
|
||||||
|
|
||||||
|
|
||||||
|
def pack_read_gpio_cmd(object_id: bytearray) -> bytearray:
|
||||||
|
port = int(input("Specify port: "))
|
||||||
|
pin = int(input("Specify pin: "))
|
||||||
|
command = bytearray()
|
||||||
|
command = object_id + struct.pack('!I', SupvActionIds.READ_GPIO)
|
||||||
|
command = command + struct.pack('!B', port)
|
||||||
|
command = command + struct.pack('!B', pin)
|
||||||
|
return command
|
||||||
|
|
||||||
|
|
||||||
|
73
pus_tc/ploc_upater.py
Normal file
73
pus_tc/ploc_upater.py
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
"""
|
||||||
|
@file ploc_supervisor.py
|
||||||
|
@brief Tests for commanding the supervisor of the PLOC.
|
||||||
|
The supervisor is programmed by Thales.
|
||||||
|
@author J. Meier
|
||||||
|
@date 10.07.2021
|
||||||
|
"""
|
||||||
|
import struct
|
||||||
|
|
||||||
|
from tmtccmd.config.definitions import QueueCommands
|
||||||
|
|
||||||
|
from tmtccmd.tc.packer import TcQueueT
|
||||||
|
from tmtccmd.ecss.tc import PusTelecommand
|
||||||
|
from tmtccmd.utility.logger import get_console_logger
|
||||||
|
|
||||||
|
LOGGER = get_console_logger()
|
||||||
|
|
||||||
|
latchup_id_dict = {
|
||||||
|
"0": "0.85V",
|
||||||
|
"1": "1.8V",
|
||||||
|
"2": "MISC",
|
||||||
|
"3": "3.3V",
|
||||||
|
"4": "NVM_4XO",
|
||||||
|
"5": "MISSION",
|
||||||
|
"6": "SAFECOTS"
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class UpdaterActionIds:
|
||||||
|
UPDATE_NVM0_A = 0
|
||||||
|
UPDATE_NVM0_B = 1
|
||||||
|
UPDATE_NVM1_A = 2
|
||||||
|
UPDATE_NVM1_B = 3
|
||||||
|
|
||||||
|
|
||||||
|
class ImagePathDefs:
|
||||||
|
imageNvm0A = "/mnt/sd0/ploc/updateNvm0A.bin"
|
||||||
|
imageNvm0B = "/mnt/sd0/ploc/updateNvm0B.bin"
|
||||||
|
imageNvm1A = "/mnt/sd0/ploc/updateNvm1A.bin"
|
||||||
|
imageNvm1B = "/mnt/sd0/ploc/updateNvm1B.bin"
|
||||||
|
|
||||||
|
|
||||||
|
def pack_ploc_updater_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str) -> TcQueueT:
|
||||||
|
tc_queue.appendleft(
|
||||||
|
(QueueCommands.PRINT,
|
||||||
|
"Testing PLOC updater with object id: 0x" + object_id.hex())
|
||||||
|
)
|
||||||
|
|
||||||
|
if op_code == "0":
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Update partition A on NVM0"))
|
||||||
|
command = object_id + struct.pack('!I', UpdaterActionIds.UPDATE_NVM0_A) + \
|
||||||
|
bytearray(ImagePathDefs.imageNvm0A, 'utf-8')
|
||||||
|
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
|
||||||
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
|
if op_code == "1":
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Update partition B on NVM0"))
|
||||||
|
command = object_id + struct.pack('!I', UpdaterActionIds.UPDATE_NVM0_B) + \
|
||||||
|
bytearray(ImagePathDefs.imageNvm0B, 'utf-8')
|
||||||
|
command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command)
|
||||||
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
|
if op_code == "2":
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Update partition A on NVM1"))
|
||||||
|
command = object_id + struct.pack('!I', UpdaterActionIds.UPDATE_NVM1_A) + \
|
||||||
|
bytearray(ImagePathDefs.imageNvm1A, 'utf-8')
|
||||||
|
command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command)
|
||||||
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
|
if op_code == "3":
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Update partition B on NVM1"))
|
||||||
|
command = object_id + struct.pack('!I', UpdaterActionIds.UPDATE_NVM1_B) + \
|
||||||
|
bytearray(ImagePathDefs.imageNvm1B, 'utf-8')
|
||||||
|
command = PusTelecommand(service=8, subservice=128, ssc=33, app_data=command)
|
||||||
|
tc_queue.appendleft(command.pack_command_tuple())
|
@ -26,11 +26,12 @@ from pus_tc.ploc_supervisor import pack_ploc_supv_test_into
|
|||||||
from pus_tc.heater import pack_heater_test_into
|
from pus_tc.heater import pack_heater_test_into
|
||||||
from pus_tc.reaction_wheels import pack_single_rw_test_into
|
from pus_tc.reaction_wheels import pack_single_rw_test_into
|
||||||
from pus_tc.rad_sensor import pack_rad_sensor_test_into
|
from pus_tc.rad_sensor import pack_rad_sensor_test_into
|
||||||
|
from pus_tc.ploc_upater import pack_ploc_updater_test_into
|
||||||
from pus_tc.core import pack_core_commands
|
from pus_tc.core import pack_core_commands
|
||||||
from config.definitions import CustomServiceList
|
from config.definitions import CustomServiceList
|
||||||
from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, ACU_HANDLER_ID, \
|
from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, ACU_HANDLER_ID, \
|
||||||
TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, PLOC_MPSOC_ID, RW1_ID, RW2_ID, RW3_ID, RW4_ID, \
|
TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, PLOC_MPSOC_ID, RW1_ID, RW2_ID, RW3_ID, RW4_ID, \
|
||||||
RAD_SENSOR_ID, PLOC_SUPV_ID
|
RAD_SENSOR_ID, PLOC_SUPV_ID, PLOC_UPDATER_ID
|
||||||
|
|
||||||
|
|
||||||
LOGGER = get_console_logger()
|
LOGGER = get_console_logger()
|
||||||
@ -94,6 +95,9 @@ def pack_service_queue_user(service: Union[str, int], op_code: str, service_queu
|
|||||||
if service == CustomServiceList.PLOC_SUPV.value:
|
if service == CustomServiceList.PLOC_SUPV.value:
|
||||||
object_id = PLOC_SUPV_ID
|
object_id = PLOC_SUPV_ID
|
||||||
return pack_ploc_supv_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
|
return pack_ploc_supv_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
|
||||||
|
if service == CustomServiceList.PLOC_UPDATER.value:
|
||||||
|
object_id = PLOC_UPDATER_ID
|
||||||
|
return pack_ploc_updater_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
|
||||||
if service == CustomServiceList.CORE.value:
|
if service == CustomServiceList.CORE.value:
|
||||||
return pack_core_commands(tc_queue=service_queue, op_code=op_code)
|
return pack_core_commands(tc_queue=service_queue, op_code=op_code)
|
||||||
LOGGER.warning("Invalid Service !")
|
LOGGER.warning("Invalid Service !")
|
||||||
|
2
tmtccmd
2
tmtccmd
@ -1 +1 @@
|
|||||||
Subproject commit dbf6034513616e447cb980efe7cf0ac255c205a4
|
Subproject commit 2058b28fa167fa1cc3eb7dd69b18b4968e86ddb1
|
Loading…
Reference in New Issue
Block a user