added TCS board ASS CMDS

This commit is contained in:
Robin Müller 2022-03-22 19:29:55 +01:00
parent 46c327101e
commit cb479cc2f4
No known key found for this signature in database
GPG Key ID: 71B58F8A3CDFA9AC
9 changed files with 142 additions and 51 deletions

View File

@ -44,3 +44,4 @@ class CustomServiceList(enum.Enum):
SYRLINKS = "syrlinks"
ACS_ASS = "acs-ass"
SUS_ASS = "sus-ass"
TCS_ASS = "tcs-ass"

View File

@ -69,6 +69,7 @@ PL_PCDU_ID = bytes([0x44, 0x30, 0x00, 0x00])
# System and Assembly Objects
ACS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x01])
SUS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x02])
TCS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x03])
def get_object_ids() -> ObjectIdDictT:

View File

@ -600,6 +600,7 @@ def add_ploc_supv_cmds(cmd_dict: ServiceOpCodeDictT):
def add_system_cmds(cmd_dict: ServiceOpCodeDictT):
from pus_tc.system.acs import AcsOpCodes, SusOpCodes
import pus_tc.system.tcs as tcs
default_opts = generate_op_code_options(
enter_listener_mode=False, custom_timeout=8.0
@ -685,3 +686,23 @@ def add_system_cmds(cmd_dict: ServiceOpCodeDictT):
info="SUS Assembly",
op_code_entry=op_code_dict,
)
op_code_dict = dict()
add_op_code_entry(
op_code_dict=op_code_dict,
keys=tcs.OpCodes.TCS_BOARD_ASS_NORMAL,
info=tcs.Info.TCS_BOARD_ASS_NORMAL,
options=default_opts,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=tcs.OpCodes.TCS_BOARD_ASS_OFF,
info=tcs.Info.TCS_BOARD_ASS_OFF,
options=default_opts,
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
name=CustomServiceList.TCS_ASS.value,
info="TCS Board Assembly",
op_code_entry=op_code_dict,
)

View File

@ -47,7 +47,7 @@ class PlocReplyIds:
def pack_ploc_mpsoc_commands(
object_id: bytearray, tc_queue: TcQueueT, op_code: str
object_id: bytearray, tc_queue: TcQueueT, op_code: str
) -> TcQueueT:
tc_queue.appendleft(
(
@ -58,7 +58,9 @@ def pack_ploc_mpsoc_commands(
if op_code == "0":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: TC mem write test"))
memory_address = int(input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16)
memory_address = int(
input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16
)
memory_data = int(input("PLOC MPSoC: Tc Mem Write: Type memory data: 0x"), 16)
# TODO: implement variable length mem write command
mem_len = 1 # 1 32-bit word
@ -89,7 +91,7 @@ def pack_ploc_mpsoc_commands(
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "5":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Replay stop"))
command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_STOP)
command = object_id + struct.pack("!I", CommandIds.TC_REPLAY_STOP)
command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "6":
@ -99,7 +101,7 @@ def pack_ploc_mpsoc_commands(
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "7":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Downlink pwr off"))
command = object_id + struct.pack('!I', CommandIds.TC_DOWNLINK_PWR_OFF)
command = object_id + struct.pack("!I", CommandIds.TC_DOWNLINK_PWR_OFF)
command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "8":
@ -112,7 +114,7 @@ def pack_ploc_mpsoc_commands(
def generate_write_mem_command(
object_id: bytearray, memory_address: int, memory_data: int, mem_len: int
object_id: bytearray, memory_address: int, memory_data: int, mem_len: int
) -> bytearray:
"""This function generates the command to write to a memory address within the PLOC
@param object_id The object id of the PlocHandler
@ -120,11 +122,11 @@ def generate_write_mem_command(
@param memory_data The data to write to the memory address specified by the bytearray memory_address.
"""
command = (
object_id
+ struct.pack('!I', CommandIds.TC_MEM_WRITE)
+ struct.pack('!I', memory_address)
+ struct.pack('!H', mem_len)
+ struct.pack("!I", memory_data)
object_id
+ struct.pack("!I", CommandIds.TC_MEM_WRITE)
+ struct.pack("!I", memory_address)
+ struct.pack("!H", mem_len)
+ struct.pack("!I", memory_data)
)
return command
@ -133,43 +135,63 @@ def prepare_mem_read_command(object_id: bytearray) -> bytearray:
memory_address = int(input("PLOC MPSoC Tc Mem Read: Type memory address: 0x"), 16)
num_words = int(input("PLOC MPSoC specify number of words (32-bit) to read: "))
command = (
object_id + struct.pack('!I', CommandIds.TC_MEM_READ) + struct.pack("!I", memory_address) + struct.pack(
'!H', num_words)
object_id
+ struct.pack("!I", CommandIds.TC_MEM_READ)
+ struct.pack("!I", memory_address)
+ struct.pack("!H", num_words)
)
return command
def prepare_flash_write_cmd(object_id: bytearray) -> bytearray:
file = get_flash_write_file()
command = object_id + struct.pack('!I', CommandIds.FLASH_WRITE) + bytearray(file, 'utf-8')
command = (
object_id + struct.pack("!I", CommandIds.FLASH_WRITE) + bytearray(file, "utf-8")
)
return command
def prepare_flash_delete_cmd(object_id: bytearray) -> bytearray:
file = get_mpsoc_file()
command = object_id + struct.pack('!I', CommandIds.TC_FLASH_DELETE) + bytearray(file, 'utf-8')
command = (
object_id
+ struct.pack("!I", CommandIds.TC_FLASH_DELETE)
+ bytearray(file, "utf-8")
)
return command
def prepare_replay_start_cmd(object_id: bytearray) -> bytearray:
replay = int(input("Specify replay mode (0 - repeated, 1 - once): "))
command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_START) + struct.pack('!B', replay)
command = (
object_id
+ struct.pack("!I", CommandIds.TC_REPLAY_START)
+ struct.pack("!B", replay)
)
return command
def prepare_downlink_pwr_on_cmd(object_id: bytearray) -> bytearray:
mode = int(input("Specify JESD mode (0 - 5): "))
lane_rate = int(input("Specify lane rate (0 - 9): "))
command = object_id + struct.pack('!I', CommandIds.TC_DOWNLINK_PWR_ON) + struct.pack('!B', mode) \
+ struct.pack('!B', lane_rate)
command = (
object_id
+ struct.pack("!I", CommandIds.TC_DOWNLINK_PWR_ON)
+ struct.pack("!B", mode)
+ struct.pack("!B", lane_rate)
)
return command
def prepare_replay_write_sequence_cmd(object_id: bytearray) -> bytearray:
use_decoding = int(input("Use decoding (set to 1): "))
file = get_mpsoc_file()
command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_WRITE_SEQUENCE) + struct.pack('!B', use_decoding) + \
bytearray(file, 'utf-8')
command = (
object_id
+ struct.pack("!I", CommandIds.TC_REPLAY_WRITE_SEQUENCE)
+ struct.pack("!B", use_decoding)
+ bytearray(file, "utf-8")
)
return command

View File

@ -1,9 +1,10 @@
import enum
from tmtccmd.tc.definitions import TcQueueT, QueueCommands
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes, Subservices
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_200_mode import Modes
from config.object_ids import ACS_BOARD_ASS_ID, SUS_BOARD_ASS_ID
from .common import command_assembly
class AcsOpCodes:
ACS_ASS_A_SIDE = ["0", "acs-a"]
@ -30,7 +31,7 @@ class DualSideSubmodes(enum.IntEnum):
def pack_acs_command(tc_queue: TcQueueT, op_code: str):
if op_code in AcsOpCodes.ACS_ASS_A_SIDE:
command_acs_board(
command_assembly(
object_id=ACS_BOARD_ASS_ID,
mode=Modes.NORMAL,
submode=DualSideSubmodes.A_SIDE,
@ -38,7 +39,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str):
info="Switching to ACS board assembly A side",
)
if op_code in AcsOpCodes.ACS_ASS_B_SIDE:
command_acs_board(
command_assembly(
object_id=ACS_BOARD_ASS_ID,
mode=Modes.NORMAL,
submode=DualSideSubmodes.B_SIDE,
@ -46,7 +47,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str):
info="Switching to ACS board assembly B side",
)
if op_code in AcsOpCodes.ACS_ASS_DUAL_MODE:
command_acs_board(
command_assembly(
object_id=ACS_BOARD_ASS_ID,
mode=Modes.NORMAL,
submode=DualSideSubmodes.DUAL_SIDE,
@ -54,7 +55,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str):
info="Switching to ACS board assembly dual mode",
)
if op_code in AcsOpCodes.ACS_ASS_A_ON:
command_acs_board(
command_assembly(
object_id=ACS_BOARD_ASS_ID,
mode=Modes.ON,
submode=DualSideSubmodes.A_SIDE,
@ -62,7 +63,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str):
info="Switching ACS board assembly A side on",
)
if op_code in AcsOpCodes.ACS_ASS_B_ON:
command_acs_board(
command_assembly(
object_id=ACS_BOARD_ASS_ID,
mode=Modes.ON,
submode=DualSideSubmodes.B_SIDE,
@ -70,7 +71,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str):
info="Switching ACS board assembly B side on",
)
if op_code in AcsOpCodes.ACS_ASS_DUAL_ON:
command_acs_board(
command_assembly(
object_id=ACS_BOARD_ASS_ID,
mode=Modes.ON,
submode=DualSideSubmodes.B_SIDE,
@ -78,7 +79,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str):
info="Switching ACS board assembly dual side on",
)
if op_code in AcsOpCodes.ACS_ASS_OFF:
command_acs_board(
command_assembly(
object_id=ACS_BOARD_ASS_ID,
mode=Modes.OFF,
submode=0,
@ -89,7 +90,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str):
def pack_sus_cmds(tc_queue: TcQueueT, op_code: str):
if op_code in SusOpCodes.SUS_ASS_NOM_SIDE:
command_acs_board(
command_assembly(
object_id=SUS_BOARD_ASS_ID,
mode=Modes.NORMAL,
submode=DualSideSubmodes.A_SIDE,
@ -97,7 +98,7 @@ def pack_sus_cmds(tc_queue: TcQueueT, op_code: str):
info="Switching to SUS board to nominal side",
)
if op_code in SusOpCodes.SUS_ASS_RED_SIDE:
command_acs_board(
command_assembly(
object_id=SUS_BOARD_ASS_ID,
mode=Modes.NORMAL,
submode=DualSideSubmodes.B_SIDE,
@ -105,7 +106,7 @@ def pack_sus_cmds(tc_queue: TcQueueT, op_code: str):
info="Switching to SUS board to redundant side",
)
if op_code in SusOpCodes.SUS_ASS_OFF:
command_acs_board(
command_assembly(
object_id=SUS_BOARD_ASS_ID,
mode=Modes.OFF,
submode=0,
@ -113,25 +114,10 @@ def pack_sus_cmds(tc_queue: TcQueueT, op_code: str):
info="Switching SUS board off",
)
if op_code in SusOpCodes.SUS_ASS_DUAL_MODE:
command_acs_board(
command_assembly(
object_id=SUS_BOARD_ASS_ID,
mode=Modes.NORMAL,
submode=DualSideSubmodes.DUAL_SIDE,
tc_queue=tc_queue,
info="Switching to SUS board to dual side",
)
def command_acs_board(
object_id: bytes, mode: Modes, submode: int, tc_queue: TcQueueT, info: str
):
tc_queue.appendleft((QueueCommands.PRINT, info))
mode_data = pack_mode_data(
object_id=object_id,
mode=mode,
submode=submode,
)
cmd = PusTelecommand(
service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data
)
tc_queue.appendleft(cmd.pack_command_tuple())

18
pus_tc/system/common.py Normal file
View File

@ -0,0 +1,18 @@
from tmtccmd.tc.definitions import TcQueueT, QueueCommands
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes, Subservices
def command_assembly(
object_id: bytes, mode: Modes, submode: int, tc_queue: TcQueueT, info: str
):
tc_queue.appendleft((QueueCommands.PRINT, info))
mode_data = pack_mode_data(
object_id=object_id,
mode=mode,
submode=submode,
)
cmd = PusTelecommand(
service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data
)
tc_queue.appendleft(cmd.pack_command_tuple())

34
pus_tc/system/tcs.py Normal file
View File

@ -0,0 +1,34 @@
from tmtccmd.tc.definitions import TcQueueT, QueueCommands
from tmtccmd.tc.service_200_mode import Modes
from .common import command_assembly
from config.object_ids import TCS_BOARD_ASS_ID
class OpCodes:
TCS_BOARD_ASS_NORMAL = ["0", "tcs-normal"]
TCS_BOARD_ASS_OFF = ["1", "tcs-off"]
class Info:
TCS_BOARD_ASS_NORMAL = "Switching TCS board assembly on"
TCS_BOARD_ASS_OFF = "Switching TCS board assembly off"
def pack_tcs_sys_commands(tc_queue: TcQueueT, op_code: str):
if op_code in OpCodes.TCS_BOARD_ASS_NORMAL:
command_assembly(
object_id=TCS_BOARD_ASS_ID,
mode=Modes.NORMAL,
submode=0,
tc_queue=tc_queue,
info=Info.TCS_BOARD_ASS_NORMAL,
)
if op_code in OpCodes.TCS_BOARD_ASS_OFF:
command_assembly(
object_id=TCS_BOARD_ASS_ID,
mode=Modes.NORMAL,
submode=0,
tc_queue=tc_queue,
info=Info.TCS_BOARD_ASS_OFF,
)

View File

@ -34,6 +34,7 @@ from pus_tc.devs.gps import pack_gps_command
from pus_tc.system.acs import pack_acs_command, pack_sus_cmds
from pus_tc.devs.plpcdu import pack_pl_pcdu_commands
from pus_tc.devs.str_img_helper import pack_str_img_helper_command
from pus_tc.system.tcs import pack_tcs_sys_commands
from config.definitions import CustomServiceList
from config.object_ids import (
P60_DOCK_HANDLER,
@ -120,7 +121,9 @@ def pack_service_queue_user(
)
if service == CustomServiceList.PLOC_MPSOC.value:
object_id = PLOC_MPSOC_ID
return pack_ploc_mpsoc_commands(object_id=object_id, tc_queue=service_queue, op_code=op_code)
return pack_ploc_mpsoc_commands(
object_id=object_id, tc_queue=service_queue, op_code=op_code
)
if service == CustomServiceList.REACTION_WHEEL_1.value:
object_id = RW1_ID
return pack_single_rw_test_into(
@ -205,6 +208,8 @@ def pack_service_queue_user(
return pack_pl_pcdu_commands(tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.ACS_ASS.value:
return pack_acs_command(tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.TCS_ASS.value:
return pack_tcs_sys_commands(tc_queue=service_queue, op_code=op_code)
LOGGER.warning("Invalid Service !")

View File

@ -69,8 +69,11 @@ def handle_ploc_replies(action_id: int, custom_data: bytearray) -> DataReplyUnpa
"PLOC Mem Len",
"PLOC Read Memory Data",
]
reply.content_list = ["0x" + custom_data[:4].hex(), struct.unpack('!H', custom_data[4:6])[0],
"0x" + custom_data[6:10].hex()]
reply.content_list = [
"0x" + custom_data[:4].hex(),
struct.unpack("!H", custom_data[4:6])[0],
"0x" + custom_data[6:10].hex(),
]
return reply