Merge pull request 'TCS ASS commands' (#45) from mueller/master into develop
Reviewed-on: #45
This commit is contained in:
commit
8243011529
@ -44,3 +44,4 @@ class CustomServiceList(enum.Enum):
|
|||||||
SYRLINKS = "syrlinks"
|
SYRLINKS = "syrlinks"
|
||||||
ACS_ASS = "acs-ass"
|
ACS_ASS = "acs-ass"
|
||||||
SUS_ASS = "sus-ass"
|
SUS_ASS = "sus-ass"
|
||||||
|
TCS_ASS = "tcs-ass"
|
||||||
|
@ -161,6 +161,7 @@
|
|||||||
12301;0x300d;NOT_ENOUGH_DEVICES_DUAL_MODE;HIGH;;mission/system/SusAssembly.h
|
12301;0x300d;NOT_ENOUGH_DEVICES_DUAL_MODE;HIGH;;mission/system/SusAssembly.h
|
||||||
12302;0x300e;POWER_STATE_MACHINE_TIMEOUT;MEDIUM;;mission/system/SusAssembly.h
|
12302;0x300e;POWER_STATE_MACHINE_TIMEOUT;MEDIUM;;mission/system/SusAssembly.h
|
||||||
12303;0x300f;SIDE_SWITCH_TRANSITION_NOT_ALLOWED;LOW;Not implemented, would increase already high complexity. Operator should instead command the assembly off first and then command the assembly on into the desired mode/submode combination;mission/system/SusAssembly.h
|
12303;0x300f;SIDE_SWITCH_TRANSITION_NOT_ALLOWED;LOW;Not implemented, would increase already high complexity. Operator should instead command the assembly off first and then command the assembly on into the desired mode/submode combination;mission/system/SusAssembly.h
|
||||||
|
12400;0x3070;CHILDREN_LOST_MODE;MEDIUM;;mission/system/TcsBoardAssembly.h
|
||||||
13600;0x3520;ALLOC_FAILURE;MEDIUM;;bsp_q7s/core/CoreController.h
|
13600;0x3520;ALLOC_FAILURE;MEDIUM;;bsp_q7s/core/CoreController.h
|
||||||
13601;0x3521;REBOOT_SW;MEDIUM; Software reboot occured. Can also be a systemd reboot. P1: Current Chip, P2: Current Copy;bsp_q7s/core/CoreController.h
|
13601;0x3521;REBOOT_SW;MEDIUM; Software reboot occured. Can also be a systemd reboot. P1: Current Chip, P2: Current Copy;bsp_q7s/core/CoreController.h
|
||||||
13602;0x3522;REBOOT_MECHANISM_TRIGGERED;MEDIUM;The reboot mechanism was triggered. P1: First 16 bits: Last Chip, Last 16 bits: Last Copy, P2: Each byte is the respective reboot count for the slots;bsp_q7s/core/CoreController.h
|
13602;0x3522;REBOOT_MECHANISM_TRIGGERED;MEDIUM;The reboot mechanism was triggered. P1: First 16 bits: Last Chip, Last 16 bits: Last Copy, P2: Each byte is the respective reboot count for the slots;bsp_q7s/core/CoreController.h
|
||||||
|
|
@ -69,6 +69,7 @@ PL_PCDU_ID = bytes([0x44, 0x30, 0x00, 0x00])
|
|||||||
# System and Assembly Objects
|
# System and Assembly Objects
|
||||||
ACS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x01])
|
ACS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x01])
|
||||||
SUS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x02])
|
SUS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x02])
|
||||||
|
TCS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x03])
|
||||||
|
|
||||||
|
|
||||||
def get_object_ids() -> ObjectIdDictT:
|
def get_object_ids() -> ObjectIdDictT:
|
||||||
|
@ -109,6 +109,7 @@
|
|||||||
0x54694269;TEST_TASK
|
0x54694269;TEST_TASK
|
||||||
0x73000001;ACS_BOARD_ASS
|
0x73000001;ACS_BOARD_ASS
|
||||||
0x73000002;SUS_BOARD_ASS
|
0x73000002;SUS_BOARD_ASS
|
||||||
|
0x73000003;TCS_BOARD_ASS
|
||||||
0x73000100;TM_FUNNEL
|
0x73000100;TM_FUNNEL
|
||||||
0x73500000;CCSDS_IP_CORE_BRIDGE
|
0x73500000;CCSDS_IP_CORE_BRIDGE
|
||||||
0xFFFFFFFF;NO_OBJECT
|
0xFFFFFFFF;NO_OBJECT
|
||||||
|
|
@ -600,6 +600,7 @@ def add_ploc_supv_cmds(cmd_dict: ServiceOpCodeDictT):
|
|||||||
|
|
||||||
def add_system_cmds(cmd_dict: ServiceOpCodeDictT):
|
def add_system_cmds(cmd_dict: ServiceOpCodeDictT):
|
||||||
from pus_tc.system.acs import AcsOpCodes, SusOpCodes
|
from pus_tc.system.acs import AcsOpCodes, SusOpCodes
|
||||||
|
import pus_tc.system.tcs as tcs
|
||||||
|
|
||||||
default_opts = generate_op_code_options(
|
default_opts = generate_op_code_options(
|
||||||
enter_listener_mode=False, custom_timeout=8.0
|
enter_listener_mode=False, custom_timeout=8.0
|
||||||
@ -685,3 +686,23 @@ def add_system_cmds(cmd_dict: ServiceOpCodeDictT):
|
|||||||
info="SUS Assembly",
|
info="SUS Assembly",
|
||||||
op_code_entry=op_code_dict,
|
op_code_entry=op_code_dict,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
op_code_dict = dict()
|
||||||
|
add_op_code_entry(
|
||||||
|
op_code_dict=op_code_dict,
|
||||||
|
keys=tcs.OpCodes.TCS_BOARD_ASS_NORMAL,
|
||||||
|
info=tcs.Info.TCS_BOARD_ASS_NORMAL,
|
||||||
|
options=default_opts,
|
||||||
|
)
|
||||||
|
add_op_code_entry(
|
||||||
|
op_code_dict=op_code_dict,
|
||||||
|
keys=tcs.OpCodes.TCS_BOARD_ASS_OFF,
|
||||||
|
info=tcs.Info.TCS_BOARD_ASS_OFF,
|
||||||
|
options=default_opts,
|
||||||
|
)
|
||||||
|
add_service_op_code_entry(
|
||||||
|
srv_op_code_dict=cmd_dict,
|
||||||
|
name=CustomServiceList.TCS_ASS.value,
|
||||||
|
info="TCS Board Assembly",
|
||||||
|
op_code_entry=op_code_dict,
|
||||||
|
)
|
||||||
|
@ -47,7 +47,7 @@ class PlocReplyIds:
|
|||||||
|
|
||||||
|
|
||||||
def pack_ploc_mpsoc_commands(
|
def pack_ploc_mpsoc_commands(
|
||||||
object_id: bytearray, tc_queue: TcQueueT, op_code: str
|
object_id: bytearray, tc_queue: TcQueueT, op_code: str
|
||||||
) -> TcQueueT:
|
) -> TcQueueT:
|
||||||
tc_queue.appendleft(
|
tc_queue.appendleft(
|
||||||
(
|
(
|
||||||
@ -58,7 +58,9 @@ def pack_ploc_mpsoc_commands(
|
|||||||
|
|
||||||
if op_code == "0":
|
if op_code == "0":
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: TC mem write test"))
|
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: TC mem write test"))
|
||||||
memory_address = int(input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16)
|
memory_address = int(
|
||||||
|
input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16
|
||||||
|
)
|
||||||
memory_data = int(input("PLOC MPSoC: Tc Mem Write: Type memory data: 0x"), 16)
|
memory_data = int(input("PLOC MPSoC: Tc Mem Write: Type memory data: 0x"), 16)
|
||||||
# TODO: implement variable length mem write command
|
# TODO: implement variable length mem write command
|
||||||
mem_len = 1 # 1 32-bit word
|
mem_len = 1 # 1 32-bit word
|
||||||
@ -89,7 +91,7 @@ def pack_ploc_mpsoc_commands(
|
|||||||
tc_queue.appendleft(command.pack_command_tuple())
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
elif op_code == "5":
|
elif op_code == "5":
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Replay stop"))
|
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Replay stop"))
|
||||||
command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_STOP)
|
command = object_id + struct.pack("!I", CommandIds.TC_REPLAY_STOP)
|
||||||
command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command)
|
command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command)
|
||||||
tc_queue.appendleft(command.pack_command_tuple())
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
elif op_code == "6":
|
elif op_code == "6":
|
||||||
@ -99,7 +101,7 @@ def pack_ploc_mpsoc_commands(
|
|||||||
tc_queue.appendleft(command.pack_command_tuple())
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
elif op_code == "7":
|
elif op_code == "7":
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Downlink pwr off"))
|
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Downlink pwr off"))
|
||||||
command = object_id + struct.pack('!I', CommandIds.TC_DOWNLINK_PWR_OFF)
|
command = object_id + struct.pack("!I", CommandIds.TC_DOWNLINK_PWR_OFF)
|
||||||
command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command)
|
command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command)
|
||||||
tc_queue.appendleft(command.pack_command_tuple())
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
elif op_code == "8":
|
elif op_code == "8":
|
||||||
@ -112,7 +114,7 @@ def pack_ploc_mpsoc_commands(
|
|||||||
|
|
||||||
|
|
||||||
def generate_write_mem_command(
|
def generate_write_mem_command(
|
||||||
object_id: bytearray, memory_address: int, memory_data: int, mem_len: int
|
object_id: bytearray, memory_address: int, memory_data: int, mem_len: int
|
||||||
) -> bytearray:
|
) -> bytearray:
|
||||||
"""This function generates the command to write to a memory address within the PLOC
|
"""This function generates the command to write to a memory address within the PLOC
|
||||||
@param object_id The object id of the PlocHandler
|
@param object_id The object id of the PlocHandler
|
||||||
@ -120,11 +122,11 @@ def generate_write_mem_command(
|
|||||||
@param memory_data The data to write to the memory address specified by the bytearray memory_address.
|
@param memory_data The data to write to the memory address specified by the bytearray memory_address.
|
||||||
"""
|
"""
|
||||||
command = (
|
command = (
|
||||||
object_id
|
object_id
|
||||||
+ struct.pack('!I', CommandIds.TC_MEM_WRITE)
|
+ struct.pack("!I", CommandIds.TC_MEM_WRITE)
|
||||||
+ struct.pack('!I', memory_address)
|
+ struct.pack("!I", memory_address)
|
||||||
+ struct.pack('!H', mem_len)
|
+ struct.pack("!H", mem_len)
|
||||||
+ struct.pack("!I", memory_data)
|
+ struct.pack("!I", memory_data)
|
||||||
)
|
)
|
||||||
return command
|
return command
|
||||||
|
|
||||||
@ -133,43 +135,63 @@ def prepare_mem_read_command(object_id: bytearray) -> bytearray:
|
|||||||
memory_address = int(input("PLOC MPSoC Tc Mem Read: Type memory address: 0x"), 16)
|
memory_address = int(input("PLOC MPSoC Tc Mem Read: Type memory address: 0x"), 16)
|
||||||
num_words = int(input("PLOC MPSoC specify number of words (32-bit) to read: "))
|
num_words = int(input("PLOC MPSoC specify number of words (32-bit) to read: "))
|
||||||
command = (
|
command = (
|
||||||
object_id + struct.pack('!I', CommandIds.TC_MEM_READ) + struct.pack("!I", memory_address) + struct.pack(
|
object_id
|
||||||
'!H', num_words)
|
+ struct.pack("!I", CommandIds.TC_MEM_READ)
|
||||||
|
+ struct.pack("!I", memory_address)
|
||||||
|
+ struct.pack("!H", num_words)
|
||||||
)
|
)
|
||||||
return command
|
return command
|
||||||
|
|
||||||
|
|
||||||
def prepare_flash_write_cmd(object_id: bytearray) -> bytearray:
|
def prepare_flash_write_cmd(object_id: bytearray) -> bytearray:
|
||||||
file = get_flash_write_file()
|
file = get_flash_write_file()
|
||||||
command = object_id + struct.pack('!I', CommandIds.FLASH_WRITE) + bytearray(file, 'utf-8')
|
command = (
|
||||||
|
object_id + struct.pack("!I", CommandIds.FLASH_WRITE) + bytearray(file, "utf-8")
|
||||||
|
)
|
||||||
return command
|
return command
|
||||||
|
|
||||||
|
|
||||||
def prepare_flash_delete_cmd(object_id: bytearray) -> bytearray:
|
def prepare_flash_delete_cmd(object_id: bytearray) -> bytearray:
|
||||||
file = get_mpsoc_file()
|
file = get_mpsoc_file()
|
||||||
command = object_id + struct.pack('!I', CommandIds.TC_FLASH_DELETE) + bytearray(file, 'utf-8')
|
command = (
|
||||||
|
object_id
|
||||||
|
+ struct.pack("!I", CommandIds.TC_FLASH_DELETE)
|
||||||
|
+ bytearray(file, "utf-8")
|
||||||
|
)
|
||||||
return command
|
return command
|
||||||
|
|
||||||
|
|
||||||
def prepare_replay_start_cmd(object_id: bytearray) -> bytearray:
|
def prepare_replay_start_cmd(object_id: bytearray) -> bytearray:
|
||||||
replay = int(input("Specify replay mode (0 - repeated, 1 - once): "))
|
replay = int(input("Specify replay mode (0 - repeated, 1 - once): "))
|
||||||
command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_START) + struct.pack('!B', replay)
|
command = (
|
||||||
|
object_id
|
||||||
|
+ struct.pack("!I", CommandIds.TC_REPLAY_START)
|
||||||
|
+ struct.pack("!B", replay)
|
||||||
|
)
|
||||||
return command
|
return command
|
||||||
|
|
||||||
|
|
||||||
def prepare_downlink_pwr_on_cmd(object_id: bytearray) -> bytearray:
|
def prepare_downlink_pwr_on_cmd(object_id: bytearray) -> bytearray:
|
||||||
mode = int(input("Specify JESD mode (0 - 5): "))
|
mode = int(input("Specify JESD mode (0 - 5): "))
|
||||||
lane_rate = int(input("Specify lane rate (0 - 9): "))
|
lane_rate = int(input("Specify lane rate (0 - 9): "))
|
||||||
command = object_id + struct.pack('!I', CommandIds.TC_DOWNLINK_PWR_ON) + struct.pack('!B', mode) \
|
command = (
|
||||||
+ struct.pack('!B', lane_rate)
|
object_id
|
||||||
|
+ struct.pack("!I", CommandIds.TC_DOWNLINK_PWR_ON)
|
||||||
|
+ struct.pack("!B", mode)
|
||||||
|
+ struct.pack("!B", lane_rate)
|
||||||
|
)
|
||||||
return command
|
return command
|
||||||
|
|
||||||
|
|
||||||
def prepare_replay_write_sequence_cmd(object_id: bytearray) -> bytearray:
|
def prepare_replay_write_sequence_cmd(object_id: bytearray) -> bytearray:
|
||||||
use_decoding = int(input("Use decoding (set to 1): "))
|
use_decoding = int(input("Use decoding (set to 1): "))
|
||||||
file = get_mpsoc_file()
|
file = get_mpsoc_file()
|
||||||
command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_WRITE_SEQUENCE) + struct.pack('!B', use_decoding) + \
|
command = (
|
||||||
bytearray(file, 'utf-8')
|
object_id
|
||||||
|
+ struct.pack("!I", CommandIds.TC_REPLAY_WRITE_SEQUENCE)
|
||||||
|
+ struct.pack("!B", use_decoding)
|
||||||
|
+ bytearray(file, "utf-8")
|
||||||
|
)
|
||||||
return command
|
return command
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,9 +1,10 @@
|
|||||||
import enum
|
import enum
|
||||||
from tmtccmd.tc.definitions import TcQueueT, QueueCommands
|
from tmtccmd.tc.definitions import TcQueueT
|
||||||
from spacepackets.ecss.tc import PusTelecommand
|
from tmtccmd.tc.service_200_mode import Modes
|
||||||
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes, Subservices
|
|
||||||
from config.object_ids import ACS_BOARD_ASS_ID, SUS_BOARD_ASS_ID
|
from config.object_ids import ACS_BOARD_ASS_ID, SUS_BOARD_ASS_ID
|
||||||
|
|
||||||
|
from .common import command_assembly
|
||||||
|
|
||||||
|
|
||||||
class AcsOpCodes:
|
class AcsOpCodes:
|
||||||
ACS_ASS_A_SIDE = ["0", "acs-a"]
|
ACS_ASS_A_SIDE = ["0", "acs-a"]
|
||||||
@ -30,7 +31,7 @@ class DualSideSubmodes(enum.IntEnum):
|
|||||||
|
|
||||||
def pack_acs_command(tc_queue: TcQueueT, op_code: str):
|
def pack_acs_command(tc_queue: TcQueueT, op_code: str):
|
||||||
if op_code in AcsOpCodes.ACS_ASS_A_SIDE:
|
if op_code in AcsOpCodes.ACS_ASS_A_SIDE:
|
||||||
command_acs_board(
|
command_assembly(
|
||||||
object_id=ACS_BOARD_ASS_ID,
|
object_id=ACS_BOARD_ASS_ID,
|
||||||
mode=Modes.NORMAL,
|
mode=Modes.NORMAL,
|
||||||
submode=DualSideSubmodes.A_SIDE,
|
submode=DualSideSubmodes.A_SIDE,
|
||||||
@ -38,7 +39,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str):
|
|||||||
info="Switching to ACS board assembly A side",
|
info="Switching to ACS board assembly A side",
|
||||||
)
|
)
|
||||||
if op_code in AcsOpCodes.ACS_ASS_B_SIDE:
|
if op_code in AcsOpCodes.ACS_ASS_B_SIDE:
|
||||||
command_acs_board(
|
command_assembly(
|
||||||
object_id=ACS_BOARD_ASS_ID,
|
object_id=ACS_BOARD_ASS_ID,
|
||||||
mode=Modes.NORMAL,
|
mode=Modes.NORMAL,
|
||||||
submode=DualSideSubmodes.B_SIDE,
|
submode=DualSideSubmodes.B_SIDE,
|
||||||
@ -46,7 +47,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str):
|
|||||||
info="Switching to ACS board assembly B side",
|
info="Switching to ACS board assembly B side",
|
||||||
)
|
)
|
||||||
if op_code in AcsOpCodes.ACS_ASS_DUAL_MODE:
|
if op_code in AcsOpCodes.ACS_ASS_DUAL_MODE:
|
||||||
command_acs_board(
|
command_assembly(
|
||||||
object_id=ACS_BOARD_ASS_ID,
|
object_id=ACS_BOARD_ASS_ID,
|
||||||
mode=Modes.NORMAL,
|
mode=Modes.NORMAL,
|
||||||
submode=DualSideSubmodes.DUAL_SIDE,
|
submode=DualSideSubmodes.DUAL_SIDE,
|
||||||
@ -54,7 +55,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str):
|
|||||||
info="Switching to ACS board assembly dual mode",
|
info="Switching to ACS board assembly dual mode",
|
||||||
)
|
)
|
||||||
if op_code in AcsOpCodes.ACS_ASS_A_ON:
|
if op_code in AcsOpCodes.ACS_ASS_A_ON:
|
||||||
command_acs_board(
|
command_assembly(
|
||||||
object_id=ACS_BOARD_ASS_ID,
|
object_id=ACS_BOARD_ASS_ID,
|
||||||
mode=Modes.ON,
|
mode=Modes.ON,
|
||||||
submode=DualSideSubmodes.A_SIDE,
|
submode=DualSideSubmodes.A_SIDE,
|
||||||
@ -62,7 +63,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str):
|
|||||||
info="Switching ACS board assembly A side on",
|
info="Switching ACS board assembly A side on",
|
||||||
)
|
)
|
||||||
if op_code in AcsOpCodes.ACS_ASS_B_ON:
|
if op_code in AcsOpCodes.ACS_ASS_B_ON:
|
||||||
command_acs_board(
|
command_assembly(
|
||||||
object_id=ACS_BOARD_ASS_ID,
|
object_id=ACS_BOARD_ASS_ID,
|
||||||
mode=Modes.ON,
|
mode=Modes.ON,
|
||||||
submode=DualSideSubmodes.B_SIDE,
|
submode=DualSideSubmodes.B_SIDE,
|
||||||
@ -70,7 +71,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str):
|
|||||||
info="Switching ACS board assembly B side on",
|
info="Switching ACS board assembly B side on",
|
||||||
)
|
)
|
||||||
if op_code in AcsOpCodes.ACS_ASS_DUAL_ON:
|
if op_code in AcsOpCodes.ACS_ASS_DUAL_ON:
|
||||||
command_acs_board(
|
command_assembly(
|
||||||
object_id=ACS_BOARD_ASS_ID,
|
object_id=ACS_BOARD_ASS_ID,
|
||||||
mode=Modes.ON,
|
mode=Modes.ON,
|
||||||
submode=DualSideSubmodes.B_SIDE,
|
submode=DualSideSubmodes.B_SIDE,
|
||||||
@ -78,7 +79,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str):
|
|||||||
info="Switching ACS board assembly dual side on",
|
info="Switching ACS board assembly dual side on",
|
||||||
)
|
)
|
||||||
if op_code in AcsOpCodes.ACS_ASS_OFF:
|
if op_code in AcsOpCodes.ACS_ASS_OFF:
|
||||||
command_acs_board(
|
command_assembly(
|
||||||
object_id=ACS_BOARD_ASS_ID,
|
object_id=ACS_BOARD_ASS_ID,
|
||||||
mode=Modes.OFF,
|
mode=Modes.OFF,
|
||||||
submode=0,
|
submode=0,
|
||||||
@ -89,7 +90,7 @@ def pack_acs_command(tc_queue: TcQueueT, op_code: str):
|
|||||||
|
|
||||||
def pack_sus_cmds(tc_queue: TcQueueT, op_code: str):
|
def pack_sus_cmds(tc_queue: TcQueueT, op_code: str):
|
||||||
if op_code in SusOpCodes.SUS_ASS_NOM_SIDE:
|
if op_code in SusOpCodes.SUS_ASS_NOM_SIDE:
|
||||||
command_acs_board(
|
command_assembly(
|
||||||
object_id=SUS_BOARD_ASS_ID,
|
object_id=SUS_BOARD_ASS_ID,
|
||||||
mode=Modes.NORMAL,
|
mode=Modes.NORMAL,
|
||||||
submode=DualSideSubmodes.A_SIDE,
|
submode=DualSideSubmodes.A_SIDE,
|
||||||
@ -97,7 +98,7 @@ def pack_sus_cmds(tc_queue: TcQueueT, op_code: str):
|
|||||||
info="Switching to SUS board to nominal side",
|
info="Switching to SUS board to nominal side",
|
||||||
)
|
)
|
||||||
if op_code in SusOpCodes.SUS_ASS_RED_SIDE:
|
if op_code in SusOpCodes.SUS_ASS_RED_SIDE:
|
||||||
command_acs_board(
|
command_assembly(
|
||||||
object_id=SUS_BOARD_ASS_ID,
|
object_id=SUS_BOARD_ASS_ID,
|
||||||
mode=Modes.NORMAL,
|
mode=Modes.NORMAL,
|
||||||
submode=DualSideSubmodes.B_SIDE,
|
submode=DualSideSubmodes.B_SIDE,
|
||||||
@ -105,7 +106,7 @@ def pack_sus_cmds(tc_queue: TcQueueT, op_code: str):
|
|||||||
info="Switching to SUS board to redundant side",
|
info="Switching to SUS board to redundant side",
|
||||||
)
|
)
|
||||||
if op_code in SusOpCodes.SUS_ASS_OFF:
|
if op_code in SusOpCodes.SUS_ASS_OFF:
|
||||||
command_acs_board(
|
command_assembly(
|
||||||
object_id=SUS_BOARD_ASS_ID,
|
object_id=SUS_BOARD_ASS_ID,
|
||||||
mode=Modes.OFF,
|
mode=Modes.OFF,
|
||||||
submode=0,
|
submode=0,
|
||||||
@ -113,25 +114,10 @@ def pack_sus_cmds(tc_queue: TcQueueT, op_code: str):
|
|||||||
info="Switching SUS board off",
|
info="Switching SUS board off",
|
||||||
)
|
)
|
||||||
if op_code in SusOpCodes.SUS_ASS_DUAL_MODE:
|
if op_code in SusOpCodes.SUS_ASS_DUAL_MODE:
|
||||||
command_acs_board(
|
command_assembly(
|
||||||
object_id=SUS_BOARD_ASS_ID,
|
object_id=SUS_BOARD_ASS_ID,
|
||||||
mode=Modes.NORMAL,
|
mode=Modes.NORMAL,
|
||||||
submode=DualSideSubmodes.DUAL_SIDE,
|
submode=DualSideSubmodes.DUAL_SIDE,
|
||||||
tc_queue=tc_queue,
|
tc_queue=tc_queue,
|
||||||
info="Switching to SUS board to dual side",
|
info="Switching to SUS board to dual side",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def command_acs_board(
|
|
||||||
object_id: bytes, mode: Modes, submode: int, tc_queue: TcQueueT, info: str
|
|
||||||
):
|
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, info))
|
|
||||||
mode_data = pack_mode_data(
|
|
||||||
object_id=object_id,
|
|
||||||
mode=mode,
|
|
||||||
submode=submode,
|
|
||||||
)
|
|
||||||
cmd = PusTelecommand(
|
|
||||||
service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data
|
|
||||||
)
|
|
||||||
tc_queue.appendleft(cmd.pack_command_tuple())
|
|
||||||
|
18
pus_tc/system/common.py
Normal file
18
pus_tc/system/common.py
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
from tmtccmd.tc.definitions import TcQueueT, QueueCommands
|
||||||
|
from spacepackets.ecss.tc import PusTelecommand
|
||||||
|
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes, Subservices
|
||||||
|
|
||||||
|
|
||||||
|
def command_assembly(
|
||||||
|
object_id: bytes, mode: Modes, submode: int, tc_queue: TcQueueT, info: str
|
||||||
|
):
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, info))
|
||||||
|
mode_data = pack_mode_data(
|
||||||
|
object_id=object_id,
|
||||||
|
mode=mode,
|
||||||
|
submode=submode,
|
||||||
|
)
|
||||||
|
cmd = PusTelecommand(
|
||||||
|
service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data
|
||||||
|
)
|
||||||
|
tc_queue.appendleft(cmd.pack_command_tuple())
|
34
pus_tc/system/tcs.py
Normal file
34
pus_tc/system/tcs.py
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
from tmtccmd.tc.definitions import TcQueueT, QueueCommands
|
||||||
|
from tmtccmd.tc.service_200_mode import Modes
|
||||||
|
|
||||||
|
from .common import command_assembly
|
||||||
|
from config.object_ids import TCS_BOARD_ASS_ID
|
||||||
|
|
||||||
|
|
||||||
|
class OpCodes:
|
||||||
|
TCS_BOARD_ASS_NORMAL = ["0", "tcs-normal"]
|
||||||
|
TCS_BOARD_ASS_OFF = ["1", "tcs-off"]
|
||||||
|
|
||||||
|
|
||||||
|
class Info:
|
||||||
|
TCS_BOARD_ASS_NORMAL = "Switching TCS board assembly on"
|
||||||
|
TCS_BOARD_ASS_OFF = "Switching TCS board assembly off"
|
||||||
|
|
||||||
|
|
||||||
|
def pack_tcs_sys_commands(tc_queue: TcQueueT, op_code: str):
|
||||||
|
if op_code in OpCodes.TCS_BOARD_ASS_NORMAL:
|
||||||
|
command_assembly(
|
||||||
|
object_id=TCS_BOARD_ASS_ID,
|
||||||
|
mode=Modes.NORMAL,
|
||||||
|
submode=0,
|
||||||
|
tc_queue=tc_queue,
|
||||||
|
info=Info.TCS_BOARD_ASS_NORMAL,
|
||||||
|
)
|
||||||
|
if op_code in OpCodes.TCS_BOARD_ASS_OFF:
|
||||||
|
command_assembly(
|
||||||
|
object_id=TCS_BOARD_ASS_ID,
|
||||||
|
mode=Modes.OFF,
|
||||||
|
submode=0,
|
||||||
|
tc_queue=tc_queue,
|
||||||
|
info=Info.TCS_BOARD_ASS_OFF,
|
||||||
|
)
|
@ -34,6 +34,7 @@ from pus_tc.devs.gps import pack_gps_command
|
|||||||
from pus_tc.system.acs import pack_acs_command, pack_sus_cmds
|
from pus_tc.system.acs import pack_acs_command, pack_sus_cmds
|
||||||
from pus_tc.devs.plpcdu import pack_pl_pcdu_commands
|
from pus_tc.devs.plpcdu import pack_pl_pcdu_commands
|
||||||
from pus_tc.devs.str_img_helper import pack_str_img_helper_command
|
from pus_tc.devs.str_img_helper import pack_str_img_helper_command
|
||||||
|
from pus_tc.system.tcs import pack_tcs_sys_commands
|
||||||
from config.definitions import CustomServiceList
|
from config.definitions import CustomServiceList
|
||||||
from config.object_ids import (
|
from config.object_ids import (
|
||||||
P60_DOCK_HANDLER,
|
P60_DOCK_HANDLER,
|
||||||
@ -120,7 +121,9 @@ def pack_service_queue_user(
|
|||||||
)
|
)
|
||||||
if service == CustomServiceList.PLOC_MPSOC.value:
|
if service == CustomServiceList.PLOC_MPSOC.value:
|
||||||
object_id = PLOC_MPSOC_ID
|
object_id = PLOC_MPSOC_ID
|
||||||
return pack_ploc_mpsoc_commands(object_id=object_id, tc_queue=service_queue, op_code=op_code)
|
return pack_ploc_mpsoc_commands(
|
||||||
|
object_id=object_id, tc_queue=service_queue, op_code=op_code
|
||||||
|
)
|
||||||
if service == CustomServiceList.REACTION_WHEEL_1.value:
|
if service == CustomServiceList.REACTION_WHEEL_1.value:
|
||||||
object_id = RW1_ID
|
object_id = RW1_ID
|
||||||
return pack_single_rw_test_into(
|
return pack_single_rw_test_into(
|
||||||
@ -205,6 +208,8 @@ def pack_service_queue_user(
|
|||||||
return pack_pl_pcdu_commands(tc_queue=service_queue, op_code=op_code)
|
return pack_pl_pcdu_commands(tc_queue=service_queue, op_code=op_code)
|
||||||
if service == CustomServiceList.ACS_ASS.value:
|
if service == CustomServiceList.ACS_ASS.value:
|
||||||
return pack_acs_command(tc_queue=service_queue, op_code=op_code)
|
return pack_acs_command(tc_queue=service_queue, op_code=op_code)
|
||||||
|
if service == CustomServiceList.TCS_ASS.value:
|
||||||
|
return pack_tcs_sys_commands(tc_queue=service_queue, op_code=op_code)
|
||||||
LOGGER.warning("Invalid Service !")
|
LOGGER.warning("Invalid Service !")
|
||||||
|
|
||||||
|
|
||||||
|
@ -69,8 +69,11 @@ def handle_ploc_replies(action_id: int, custom_data: bytearray) -> DataReplyUnpa
|
|||||||
"PLOC Mem Len",
|
"PLOC Mem Len",
|
||||||
"PLOC Read Memory Data",
|
"PLOC Read Memory Data",
|
||||||
]
|
]
|
||||||
reply.content_list = ["0x" + custom_data[:4].hex(), struct.unpack('!H', custom_data[4:6])[0],
|
reply.content_list = [
|
||||||
"0x" + custom_data[6:10].hex()]
|
"0x" + custom_data[:4].hex(),
|
||||||
|
struct.unpack("!H", custom_data[4:6])[0],
|
||||||
|
"0x" + custom_data[6:10].hex(),
|
||||||
|
]
|
||||||
return reply
|
return reply
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user