Merge branch 'mohr/looooop' into mohr/thermal
This commit is contained in:
@ -12,7 +12,9 @@ from pus_tc.devs.reaction_wheels import add_rw_cmds
|
||||
from pus_tc.devs.bpx_batt import BpxOpCodes
|
||||
|
||||
|
||||
def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
|
||||
def get_eive_service_op_code_dict() -> ServiceOpCodeDictT:
|
||||
from tmtccmd.config.globals import get_default_service_op_code_dict
|
||||
service_op_code_dict = get_default_service_op_code_dict()
|
||||
add_bpx_cmd_definitions(cmd_dict=service_op_code_dict)
|
||||
add_core_controller_definitions(cmd_dict=service_op_code_dict)
|
||||
add_pl_pcdu_cmds(cmd_dict=service_op_code_dict)
|
||||
@ -31,6 +33,7 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
|
||||
add_pdec_cmds(cmd_dict=service_op_code_dict)
|
||||
add_heater_cmds(cmd_dict=service_op_code_dict)
|
||||
add_tmp_sens_cmds(cmd_dict=service_op_code_dict)
|
||||
return service_op_code_dict
|
||||
|
||||
|
||||
def add_tmp_sens_cmds(cmd_dict: ServiceOpCodeDictT):
|
||||
@ -803,6 +806,8 @@ def add_ploc_mpsoc_cmds(cmd_dict: ServiceOpCodeDictT):
|
||||
"14": ("Ploc MPSoC: Mode replay", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||
"15": ("Ploc MPSoC: Mode idle", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||
"16": ("Ploc MPSoC: Tc cam command send", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||
"17": ("Ploc MPSoC: Set UART TX tristate" , {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||
"18": ("Ploc MPSoC: Relesase UART TX", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||
}
|
||||
service_ploc_mpsoc_tuple = ("Ploc MPSoC", op_code_dict_srv_ploc_mpsoc)
|
||||
cmd_dict[CustomServiceList.PLOC_MPSOC.value] = service_ploc_mpsoc_tuple
|
||||
@ -896,6 +901,7 @@ def add_ploc_supv_cmds(cmd_dict: ServiceOpCodeDictT):
|
||||
),
|
||||
"55": ("PLOC Supervisor: Request ADC Report", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||
"56": ("PLOC Supervisor: Reset PL", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||
"57": ("PLOC Supervisor: Enable NVMs", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||
}
|
||||
service_ploc_supv_tuple = ("PLOC Supervisor", op_code_dict_srv_ploc_supv)
|
||||
|
||||
@ -933,9 +939,7 @@ def add_ploc_supv_cmds(cmd_dict: ServiceOpCodeDictT):
|
||||
{OpCodeDictKeys.TIMEOUT: 2.0},
|
||||
),
|
||||
}
|
||||
service_ploc_updater_tuple = ("Ploc Updater", op_code_dict_srv_ploc_updater)
|
||||
cmd_dict[CustomServiceList.PLOC_SUPV.value] = service_ploc_supv_tuple
|
||||
cmd_dict[CustomServiceList.PLOC_UPDATER.value] = service_ploc_updater_tuple
|
||||
cmd_dict[
|
||||
CustomServiceList.PLOC_MEMORY_DUMPER.value
|
||||
] = service_ploc_memory_dumper_tuple
|
||||
|
@ -46,6 +46,10 @@ class Info:
|
||||
HEATER_HEALTHY_CMD = "Set to healthy"
|
||||
|
||||
|
||||
# Needed in OBSW to differentiate between external and internal heater commands
|
||||
COMMAND_SOURCE_PARAM_EXTERNAL = 1
|
||||
|
||||
|
||||
class ActionIds(enum.IntEnum):
|
||||
SWITCH_HEATER = 0
|
||||
|
||||
@ -214,6 +218,7 @@ def pack_switch_heater_command(
|
||||
command = bytearray()
|
||||
command.append(switch_nr)
|
||||
command.append(switch_action)
|
||||
command.append(COMMAND_SOURCE_PARAM_EXTERNAL)
|
||||
return generate_action_command(
|
||||
object_id=object_id, action_id=ActionIds.SWITCH_HEATER, app_data=command
|
||||
)
|
||||
|
@ -53,6 +53,8 @@ class CommandIds(enum.IntEnum):
|
||||
TC_MODE_REPLAY = 16
|
||||
TC_CAM_CMD_SEND = 17
|
||||
TC_MODE_IDLE = 18
|
||||
SET_UART_TX_TRISTATE = 20
|
||||
RELEASE_UART_TX = 21
|
||||
|
||||
|
||||
class MemAddresses(enum.IntEnum):
|
||||
@ -180,6 +182,16 @@ def pack_ploc_mpsoc_commands(
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
elif op_code == "17":
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Set UART TX tristate"))
|
||||
command = object_id + struct.pack("!I", CommandIds.SET_UART_TX_TRISTATE)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=33, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
elif op_code == "18":
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Release UART TX"))
|
||||
command = object_id + struct.pack("!I", CommandIds.RELEASE_UART_TX)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=33, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
|
||||
return tc_queue
|
||||
|
||||
|
@ -40,6 +40,10 @@ update_file_dict = {
|
||||
"/mnt/sd0/ploc/supervisor/update-small.bin",
|
||||
"/mnt/sd0/ploc/supervisor/update-small.bin",
|
||||
],
|
||||
"5": [
|
||||
"/mnt/sd0/ploc/supervisor/mpsoc-uart-working.bin",
|
||||
"/mnt/sd0/ploc/supervisor/mpsoc-uart-working.bin",
|
||||
],
|
||||
}
|
||||
|
||||
event_buffer_path_dict = {
|
||||
@ -93,6 +97,7 @@ class SupvActionIds:
|
||||
LOGGING_SET_TOPIC = 56
|
||||
REQUEST_ADC_REPORT = 57
|
||||
RESET_PL = 58
|
||||
ENABLE_NVMS = 59
|
||||
|
||||
|
||||
class SupvHkIds:
|
||||
@ -272,7 +277,7 @@ def pack_ploc_supv_commands(
|
||||
tc_queue.appendleft(
|
||||
(QueueCommands.PRINT, "PLOC Supervisor: Restart supervisor")
|
||||
)
|
||||
command = command = object_id + struct.pack(
|
||||
command = object_id + struct.pack(
|
||||
"!I", SupvActionIds.RESTART_SUPERVISOR
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=52, app_data=command)
|
||||
@ -281,7 +286,7 @@ def pack_ploc_supv_commands(
|
||||
tc_queue.appendleft(
|
||||
(QueueCommands.PRINT, "PLOC Supervisor: Factory reset clear all")
|
||||
)
|
||||
command = command = object_id + struct.pack(
|
||||
command = object_id + struct.pack(
|
||||
"!I", SupvActionIds.FACTORY_RESET_CLEAR_ALL
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=53, app_data=command)
|
||||
@ -302,7 +307,7 @@ def pack_ploc_supv_commands(
|
||||
"PLOC Supervisor: Factory reset clear circular entries",
|
||||
)
|
||||
)
|
||||
command = command = object_id + struct.pack(
|
||||
command = object_id + struct.pack(
|
||||
"!I", SupvActionIds.FACTORY_RESET_CLEAR_CIRCULAR
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=55, app_data=command)
|
||||
@ -384,6 +389,14 @@ def pack_ploc_supv_commands(
|
||||
command = object_id + struct.pack("!I", SupvActionIds.RESET_PL)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=71, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
elif op_code == "57":
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Enable NVMs"))
|
||||
nvm01 = int(input("Enable (1) or disable(0) NVM 0 and 1: "))
|
||||
nvm3 = int(input("Enable (1) or disable(0) NVM 3: "))
|
||||
command = object_id + struct.pack('!I', SupvActionIds.ENABLE_NVMS) + struct.pack('B', nvm01) + \
|
||||
struct.pack('B', nvm3)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=72, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
|
||||
return tc_queue
|
||||
|
||||
@ -605,7 +618,7 @@ def pack_logging_buffer_request(object_id: bytearray) -> bytearray:
|
||||
|
||||
|
||||
def pack_set_gpio_cmd(object_id: bytearray) -> bytearray:
|
||||
port = int(input("Specify port : 0x"), 16)
|
||||
port = int(input("Specify port: 0x"), 16)
|
||||
pin = int(input("Specify pin: 0x"), 16)
|
||||
val = int(input("Specify val: 0x"), 16)
|
||||
command = object_id + struct.pack("!I", SupvActionIds.SET_GPIO)
|
||||
|
@ -1,148 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
@file ploc_udpater.py
|
||||
@brief Commands to initiate update transfer to ploc supervisor. This only updates the software of the MPSoC, it is not
|
||||
possible to update the software of the supervisor.
|
||||
The supervisor is programmed by Thales.
|
||||
@author J. Meier
|
||||
@date 10.07.2021
|
||||
"""
|
||||
import struct
|
||||
|
||||
from tmtccmd.config.definitions import QueueCommands
|
||||
|
||||
from tmtccmd.tc.packer import TcQueueT
|
||||
from spacepackets.ecss.tc import PusTelecommand
|
||||
|
||||
|
||||
latchup_id_dict = {
|
||||
"0": "0.85V",
|
||||
"1": "1.8V",
|
||||
"2": "MISC",
|
||||
"3": "3.3V",
|
||||
"4": "NVM_4XO",
|
||||
"5": "MISSION",
|
||||
"6": "SAFECOTS",
|
||||
}
|
||||
|
||||
|
||||
class UpdaterActionIds:
|
||||
UPDATE_A_UBOOT = 0
|
||||
UPDATE_A_BITSTREAM = 1
|
||||
UPDATE_A_LINUX = 2
|
||||
UPDATE_A_APP_SW = 3
|
||||
UPDATE_B_UBOOT = 4
|
||||
UPDATE_B_BITSTREAM = 5
|
||||
UPDATE_B_LINUX = 6
|
||||
UPDATE_B_LINUX = 7
|
||||
|
||||
|
||||
class ImagePathDefs:
|
||||
imageAuboot = "/mnt/sd0/ploc/updateAuboot.bin"
|
||||
imageAbitsream = "/mnt/sd0/ploc/updateAbitstream.bin"
|
||||
imageAlinux = "/mnt/sd0/ploc/updateAlinux.bin"
|
||||
imageAappsw = "/mnt/sd0/ploc/updateAappsw.bin"
|
||||
imageBuboot = "/mnt/sd0/ploc/updateBuboot.bin"
|
||||
imageBbitsream = "/mnt/sd0/ploc/updateBbitstream.bin"
|
||||
imageBlinux = "/mnt/sd0/ploc/updateBlinux.bin"
|
||||
imageBappsw = "/mnt/sd0/ploc/updateBappsw.bin"
|
||||
|
||||
|
||||
def pack_ploc_updater_test_into(
|
||||
object_id: bytearray, tc_queue: TcQueueT, op_code: str
|
||||
) -> TcQueueT:
|
||||
tc_queue.appendleft(
|
||||
(
|
||||
QueueCommands.PRINT,
|
||||
"Testing PLOC updater with object id: 0x" + object_id.hex(),
|
||||
)
|
||||
)
|
||||
|
||||
if op_code == "0":
|
||||
tc_queue.appendleft(
|
||||
(QueueCommands.PRINT, "PLOC Supervisor: Update uboot on partition A")
|
||||
)
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", UpdaterActionIds.UPDATE_A_UBOOT)
|
||||
+ bytearray(ImagePathDefs.imageAuboot, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
if op_code == "1":
|
||||
tc_queue.appendleft(
|
||||
(QueueCommands.PRINT, "PLOC Supervisor: Update bitstream on parition A")
|
||||
)
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", UpdaterActionIds.UPDATE_A_BITSTREAM)
|
||||
+ bytearray(ImagePathDefs.imageAbitsream, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
if op_code == "2":
|
||||
tc_queue.appendleft(
|
||||
(QueueCommands.PRINT, "PLOC Supervisor: Update linux on partition A")
|
||||
)
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", UpdaterActionIds.UPDATE_A_LINUX)
|
||||
+ bytearray(ImagePathDefs.imageAlinux, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
if op_code == "3":
|
||||
tc_queue.appendleft(
|
||||
(QueueCommands.PRINT, "PLOC Supervisor: Update application on partition A")
|
||||
)
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", UpdaterActionIds.UPDATE_A_APP_SW)
|
||||
+ bytearray(ImagePathDefs.imageAappsw, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=33, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
if op_code == "4":
|
||||
tc_queue.appendleft(
|
||||
(QueueCommands.PRINT, "PLOC Supervisor: Update uboot on partition B")
|
||||
)
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", UpdaterActionIds.UPDATE_B_UBOOT)
|
||||
+ bytearray(ImagePathDefs.imageBuboot, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
if op_code == "5":
|
||||
tc_queue.appendleft(
|
||||
(QueueCommands.PRINT, "PLOC Supervisor: Update bitstream on parition B")
|
||||
)
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", UpdaterActionIds.UPDATE_B_BITSTREAM)
|
||||
+ bytearray(ImagePathDefs.imageBbitsream, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
if op_code == "6":
|
||||
tc_queue.appendleft(
|
||||
(QueueCommands.PRINT, "PLOC Supervisor: Update linux on partition B")
|
||||
)
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", UpdaterActionIds.UPDATE_B_LINUX)
|
||||
+ bytearray(ImagePathDefs.imageBlinux, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
if op_code == "7":
|
||||
tc_queue.appendleft(
|
||||
(QueueCommands.PRINT, "PLOC Supervisor: Update application on partition B")
|
||||
)
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack("!I", UpdaterActionIds.UPDATE_B_APP_SW)
|
||||
+ bytearray(ImagePathDefs.imageBappsw, "utf-8")
|
||||
)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=33, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
@ -38,5 +38,4 @@ def pack_service200_test_into(tc_queue: TcQueueT) -> TcQueueT:
|
||||
mode_data = pack_mode_data(obj_id, Modes.OFF, 0)
|
||||
command = PusTelecommand(service=200, subservice=1, ssc=2030, app_data=mode_data)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service200.txt"))
|
||||
return tc_queue
|
||||
|
@ -29,7 +29,6 @@ from pus_tc.devs.ploc_supervisor import pack_ploc_supv_commands
|
||||
from pus_tc.devs.heater import pack_heater_cmds
|
||||
from pus_tc.devs.reaction_wheels import pack_single_rw_test_into, pack_rw_ass_cmds
|
||||
from pus_tc.devs.rad_sensor import pack_rad_sensor_test_into
|
||||
from pus_tc.devs.ploc_upater import pack_ploc_updater_test_into
|
||||
from pus_tc.devs.ploc_memory_dumper import pack_ploc_memory_dumper_cmd
|
||||
from pus_tc.devs.ccsds_handler import pack_ccsds_handler_test
|
||||
from pus_tc.system.core import pack_core_commands
|
||||
@ -59,7 +58,6 @@ from config.object_ids import (
|
||||
RW4_ID,
|
||||
RAD_SENSOR_ID,
|
||||
PLOC_SUPV_ID,
|
||||
PLOC_UPDATER_ID,
|
||||
STAR_TRACKER_ID,
|
||||
PLOC_MEMORY_DUMPER_ID,
|
||||
GPS_HANDLER_0_ID,
|
||||
@ -84,7 +82,8 @@ def pre_tc_send_cb(
|
||||
):
|
||||
if isinstance(queue_entry, bytes) or isinstance(queue_entry, bytearray):
|
||||
log_raw_pus_tc(
|
||||
packet=queue_entry, srv_subservice=(queue_info.service, queue_info.subservice)
|
||||
packet=queue_entry,
|
||||
srv_subservice=(queue_info.service, queue_info.subservice),
|
||||
)
|
||||
tc_info_string = f"Sent {queue_info}"
|
||||
LOGGER.info(tc_info_string)
|
||||
@ -94,6 +93,7 @@ def pre_tc_send_cb(
|
||||
if queue_entry == QueueCommands.PRINT:
|
||||
file_logger.info(queue_info)
|
||||
|
||||
|
||||
def pack_service_queue_user(
|
||||
service: Union[str, int], op_code: str, service_queue: TcQueueT
|
||||
):
|
||||
@ -178,11 +178,6 @@ def pack_service_queue_user(
|
||||
return pack_ploc_supv_commands(
|
||||
object_id=object_id, tc_queue=service_queue, op_code=op_code
|
||||
)
|
||||
if service == CustomServiceList.PLOC_UPDATER.value:
|
||||
object_id = PLOC_UPDATER_ID
|
||||
return pack_ploc_updater_test_into(
|
||||
object_id=object_id, tc_queue=service_queue, op_code=op_code
|
||||
)
|
||||
if service == CustomServiceList.STAR_TRACKER.value:
|
||||
object_id = STAR_TRACKER_ID
|
||||
return pack_star_tracker_commands(
|
||||
|
Reference in New Issue
Block a user