Update tmtccmd #21

Merged
meierj merged 32 commits from mueller/master into develop 2021-09-15 14:59:39 +02:00
5 changed files with 127 additions and 104 deletions
Showing only changes of commit abe71c0206 - Show all commits

View File

@ -3,7 +3,6 @@ from typing import Union, Dict, Tuple
from tmtccmd.config.definitions import ServiceOpCodeDictT
from tmtccmd.tm.service_3_base import Service3Base
from tmtccmd.ecss.tm import PusTelemetry
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.core.backend import TmTcHandler
@ -88,6 +87,7 @@ class EiveHookObject(TmTcHookBase):
def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
from pus_tc.pdu1 import Pdu1OpCodes
from pus_tc.pdu2 import Pdu2OpCodes
op_code_dict = {
'reboot': ('Reboot with Prompt', {OpCodeDictKeys.TIMEOUT: 2.0}),
'reboot_self': ('Reboot Self', {OpCodeDictKeys.TIMEOUT: 4.0}),
@ -133,6 +133,8 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
("PDU1: Turn ACS Side A on", {OpCodeDictKeys.TIMEOUT: 2.0}),
Pdu1OpCodes.ACS_A_SIDE_OFF.value:
("PDU1: Turn ACS Side A off", {OpCodeDictKeys.TIMEOUT: 2.0}),
Pdu1OpCodes.PRINT_SWITCH_STATE.value:
("PDU1: Print switch states", {OpCodeDictKeys.TIMEOUT: 2.0})
}
service_pdu1_tuple = ("PDU1 Device", op_code_dict_srv_pdu1)
@ -140,6 +142,8 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
"0": ("PDU2 Tests", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("PDU1: Turn ACS Side B on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("PDU1: Turn ACS Side B off", {OpCodeDictKeys.TIMEOUT: 2.0}),
Pdu2OpCodes.PRINT_SWITCH_STATE.value:
("PDU2: Print switch states", {OpCodeDictKeys.TIMEOUT: 2.0})
}
service_pdu2_tuple = ("PDU2 Device", op_code_dict_srv_pdu2)

View File

@ -6,15 +6,20 @@
@author J. Meier
@date 17.12.2020
"""
import enum
from tmtccmd.tc.service_8_functional_cmd import generate_action_command
from tmtccmd.tc.definitions import PusTelecommand
class GomspaceDeviceActions:
PING = bytearray([0x0, 0x0, 0x0, 0x1])
REBOOT = bytearray([0x0, 0x0, 0x0, 0x4])
PARAM_GET = bytearray([0x0, 0x0, 0x0, 0x00])
PARAM_SET = bytearray([0x0, 0x0, 0x0, 0xFF])
WDT_RESET = bytearray([0x0, 0x0, 0x0, 0x9])
REQUEST_HK_TABLE = bytearray([0x0, 0x0, 0x0, 0x10])
class GomspaceDeviceActionIds(enum.IntEnum):
PING = 1
REBOOT = 4
PARAM_GET = 0
PARAM_SET = 255
WDT_RESET = 9
REQUEST_HK_TABLE = 16
PRINT_SWITCH_STATE = 17
class TableIds:
@ -37,8 +42,9 @@ class Channel:
off = 0
def pack_get_param_command(object_id: bytearray, table_id: int, memory_address: bytearray,
parameter_size: int) -> bytearray:
def pack_get_param_command(
object_id: bytearray, table_id: int, memory_address: bytearray, parameter_size: int
) -> PusTelecommand:
""" Function to generate a command to retrieve parameters like the temperature from a gomspace device.
@param object_id: The object id of the gomspace device handler.
@param table_id: The table id of the gomspace device
@ -46,49 +52,54 @@ def pack_get_param_command(object_id: bytearray, table_id: int, memory_address:
@param parameter_size: Size of the value to read. E.g. temperature is uint16_t and thus parameter_size is 2
@return: The command as bytearray.
"""
action_id = GomspaceDeviceActions.PARAM_GET
command = bytearray()
command = command + object_id + action_id
command.append(table_id)
command = command + memory_address
command.append(parameter_size)
return command
app_data = bytearray()
app_data.append(table_id)
app_data.extend(memory_address)
app_data.append(parameter_size)
return generate_action_command(
object_id=object_id, action_id=GomspaceDeviceActionIds.PARAM_GET, app_data=app_data
)
def pack_set_param_command(object_id: bytearray, memory_address: bytearray, parameter_size: int,
parameter: int) -> bytearray:
def pack_set_param_command(
object_id: bytearray, memory_address: bytearray, parameter_size: int, parameter: int,
ssc: int = 0
) -> PusTelecommand:
""" Function to generate a command to set a parameter
@param object_id: The object id of the gomspace device handler.
@param memory_address: Address offset within table of the value to set.
@param parameter: The parameter value to set.
@param parameter_size: Size of the value to set. There are uint8_t, uint16_t and uint32_t in the device tables.
@return: The command as bytearray.
:param object_id: The object id of the gomspace device handler.
:param memory_address: Address offset within table of the value to set.
:param parameter: The parameter value to set.
:param parameter_size: Size of the value to set. There are uint8_t, uint16_t and uint32_t
in the device tables.
:param ssc:
:return: The command as bytearray.
"""
action_id = GomspaceDeviceActions.PARAM_SET
command = bytearray()
command = command + object_id + action_id
command = command + memory_address
command.append(parameter_size)
action_id = GomspaceDeviceActionIds.PARAM_SET
app_data = bytearray()
app_data += memory_address
app_data.append(parameter_size)
if parameter_size == 1:
command.append(parameter)
app_data.append(parameter)
elif parameter_size == 2:
byte_one = 0xFF00 & parameter >> 8
byte_two = 0xFF & parameter
command.append(byte_one)
command.append(byte_two)
app_data.append(byte_one)
app_data.append(byte_two)
elif parameter_size == 4:
byte_one = 0xFF000000 & parameter >> 24
byte_two = 0xFF0000 & parameter >> 16
byte_three = 0xFF00 & parameter >> 8
byte_four = 0xFF & parameter
command.append(byte_one)
command.append(byte_two)
command.append(byte_three)
command.append(byte_four)
return command
app_data.append(byte_one)
app_data.append(byte_two)
app_data.append(byte_three)
app_data.append(byte_four)
return generate_action_command(
object_id=object_id, action_id=action_id, app_data=app_data, ssc=ssc
)
def pack_ping_command(object_id: bytearray, data: bytearray) -> bytearray:
def pack_ping_command(object_id: bytearray, data: bytearray) -> PusTelecommand:
"""" Function to generate the command to ping a gomspace device
@param object_id Object Id of the gomspace device handler.
@param data Bytearray containing the bytes to send to the gomspace device. For now the on board software
@ -96,37 +107,30 @@ def pack_ping_command(object_id: bytearray, data: bytearray) -> bytearray:
@note The ping request sends the specified data to a gompsace device. These
data are simply copied by the device and then sent back.
"""
action_id = GomspaceDeviceActions.PING
command = object_id + action_id + data
return command
return generate_action_command(
object_id=object_id, action_id=GomspaceDeviceActionIds.PING, app_data=data
)
def pack_gnd_wdt_reset_command(object_id: bytearray) -> bytearray:
def pack_gnd_wdt_reset_command(object_id: bytearray) -> PusTelecommand:
"""" Function to generate the command to reset the watchdog of a gomspace device.
@param object_id Object Id of the gomspace device handler.
"""
action_id = GomspaceDeviceActions.WDT_RESET
command = bytearray()
command += object_id + action_id
return command
return generate_action_command(object_id=object_id, action_id=GomspaceDeviceActionIds.WDT_RESET)
def pack_reboot_command(object_id: bytearray) -> bytearray:
def pack_reboot_command(object_id: bytearray) -> PusTelecommand:
""" Function to generate the command which triggers a reboot of a gomspace device
@param object_id The object id of the gomspace device handler.
"""
action_id = GomspaceDeviceActions.REBOOT
command = bytearray()
command += object_id + action_id
return command
return generate_action_command(object_id=object_id, action_id=GomspaceDeviceActionIds.REBOOT)
def pack_request_full_hk_table_command(object_id: bytearray) -> bytearray:
def pack_request_full_hk_table_command(object_id: bytearray) -> PusTelecommand:
""" Function to generate the command to request the full housekeeping table from a gomspace
device.
@param object_id The object id of the gomspace device handler.
"""
action_id = GomspaceDeviceActions.REQUEST_HK_TABLE
command = bytearray()
command = object_id + action_id
return command
return generate_action_command(
object_id=object_id, action_id=GomspaceDeviceActionIds.REQUEST_HK_TABLE
)

View File

@ -21,6 +21,7 @@ class Pdu1OpCodes(enum.Enum):
STAR_TRACKER_OFF = "4"
ACS_A_SIDE_ON = "6"
ACS_A_SIDE_OFF = "7"
PRINT_SWITCH_STATE = "17"
class PDU1TestProcedure:
@ -46,25 +47,21 @@ def pack_pdu1_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn star tracker on"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "3":
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn SUS nominal on"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_4.parameter_address,
PDUConfigTable.out_en_4.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == Pdu1OpCodes.STAR_TRACKER_OFF.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn star tracker off"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "5":
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn SUS nominal off"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_4.parameter_address,
PDUConfigTable.out_en_4.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=33, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == Pdu1OpCodes.ACS_A_SIDE_ON.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn ACS Side A on"))
@ -72,7 +69,6 @@ def pack_pdu1_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
object_id, PDUConfigTable.out_en_7.parameter_address,
PDUConfigTable.out_en_7.parameter_size, Channel.on
)
command = PusTelecommand(service=8, subservice=128, ssc=34, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == Pdu1OpCodes.ACS_A_SIDE_OFF.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn ACS Side A off"))
@ -80,14 +76,18 @@ def pack_pdu1_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
object_id, PDUConfigTable.out_en_7.parameter_address,
PDUConfigTable.out_en_7.parameter_size, Channel.off
)
command = PusTelecommand(service=8, subservice=128, ssc=35, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == Pdu1OpCodes.PRINT_SWITCH_STATE.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Print Switch Status"))
command = generate_action_command(
object_id=object_id, action_id=int(Pdu1OpCodes.PRINT_SWITCH_STATE.value)
)
tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.ping:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Ping Test"))
ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
command = pack_ping_command(object_id, ping_data)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.read_temperature:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Testing temperature reading"))
@ -95,29 +95,32 @@ def pack_pdu1_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
object_id, TableIds.hk, PDUHkTable.temperature.parameter_address,
PDUHkTable.temperature.parameter_size
)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_2_on:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 2 on (Star Tracker)"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
command = pack_set_param_command(
object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.on
)
tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_2_off:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 2 off (Star Tracker)"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command)
command = pack_set_param_command(
object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.off
)
tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_3_on:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 3 on (MTQ)"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_3.parameter_address,
PDUConfigTable.out_en_3.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command)
command = pack_set_param_command(
object_id, PDUConfigTable.out_en_3.parameter_address,
PDUConfigTable.out_en_3.parameter_size, Channel.on
)
tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_3_off:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 3 off (MTQ)"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_3.parameter_address,
PDUConfigTable.out_en_3.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=33, app_data=command)
command = pack_set_param_command(
object_id, PDUConfigTable.out_en_3.parameter_address,
PDUConfigTable.out_en_3.parameter_size, Channel.off
)
tc_queue.appendleft(command.pack_command_tuple())

View File

@ -8,10 +8,14 @@
"""
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand
from gomspace.gomspace_common import *
from gomspace.gomspace_pdu_definitions import *
from pus_tc.p60dock import P60DockConfigTable
class Pdu2OpCodes(enum.Enum):
ACS_SIDE_B_ON = "1"
ACS_SIDE_B_OFF = "2"
PRINT_SWITCH_STATE = "17"
class PDU2TestProcedure:
@ -39,77 +43,85 @@ class PDU2TestProcedure:
def pack_pdu2_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str) -> TcQueueT:
tc_queue.appendleft((QueueCommands.PRINT, "Testing PDU2"))
if op_code == "1":
if op_code == Pdu2OpCodes.ACS_SIDE_B_ON.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn ACS Side B on"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_7.parameter_address,
PDUConfigTable.out_en_7.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue
if op_code == "2":
if op_code == Pdu2OpCodes.ACS_SIDE_B_OFF.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn ACS Side B off"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_7.parameter_address,
PDUConfigTable.out_en_7.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue
if op_code == Pdu2OpCodes.PRINT_SWITCH_STATE.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Print Switch Status"))
command = generate_action_command(
object_id=object_id, action_id=int(Pdu2OpCodes.PRINT_SWITCH_STATE.value)
)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.reboot:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Reboot"))
command = pack_reboot_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.read_gnd_wdt:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Reading ground watchdog timer value"))
command = pack_get_param_command(object_id, TableIds.hk, PDUHkTable.wdt_gnd_left.parameter_address,
PDUHkTable.wdt_gnd_left.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
command = pack_get_param_command(
object_id, TableIds.hk, PDUHkTable.wdt_gnd_left.parameter_address,
PDUHkTable.wdt_gnd_left.parameter_size
)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.gnd_wdt_reset:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Testing ground watchdog reset"))
command = pack_gnd_wdt_reset_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.ping:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Ping Test"))
ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
command = pack_ping_command(object_id, ping_data)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.channel_2_on:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Testing setting output channel 2 on (TCS Heater)"))
tc_queue.appendleft(
(QueueCommands.PRINT, "PDU2: Testing setting output channel 2 on (TCS Heater)")
)
command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.read_temperature:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Testing temperature reading"))
command = pack_get_param_command(object_id, TableIds.hk, PDUHkTable.temperature.parameter_address,
PDUHkTable.temperature.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
command = pack_get_param_command(
object_id, TableIds.hk, PDUHkTable.temperature.parameter_address,
PDUHkTable.temperature.parameter_size
)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.read_channel_2_state:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Reading output channel 2 state (TCS Heater)"))
command = pack_get_param_command(object_id, TableIds.config, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command)
tc_queue.appendleft(
(QueueCommands.PRINT, "PDU2: Reading output channel 2 state (TCS Heater)")
)
command = pack_get_param_command(
object_id, TableIds.config, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size
)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.read_cur_lu_lim_0:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Reading current limit value of output channel 0 (OBC)"))
command = pack_get_param_command(object_id, TableIds.config, PDUConfigTable.cur_lu_lim_0.parameter_address,
PDUConfigTable.cur_lu_lim_0.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command)
tc_queue.appendleft(
(QueueCommands.PRINT, "PDU2: Reading current limit value of output channel 0 (OBC)")
)
command = pack_get_param_command(
object_id, TableIds.config, PDUConfigTable.cur_lu_lim_0.parameter_address,
PDUConfigTable.cur_lu_lim_0.parameter_size
)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.channel_2_off:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Testing setting output channel 2 off"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.request_hk_table:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Requesting housekeeping table"))
command = pack_request_full_hk_table_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue

@ -1 +1 @@
Subproject commit ee3a887a7d7d00cabd1a3868bba27004f880248a
Subproject commit e02559d2d18fa17a119144ee240423b0c7d90e2d