Compare commits

...

10 Commits

Author SHA1 Message Date
Robin Mueller
ecb973c37f hk handling working for both core and aux from pdus 2022-04-12 16:35:34 +02:00
Robin Mueller
c995ca2dda core hk handling working 2022-04-12 16:10:51 +02:00
Robin Mueller
259cd25b6e continued pdu1 pdu2 handling 2022-04-12 16:00:37 +02:00
Robin Mueller
a966471fe5 add pdu HK handler 2022-04-12 15:36:04 +02:00
Robin Mueller
a6c91640a1 Merge remote-tracking branch 'origin/develop' into mueller/master 2022-04-11 14:43:09 +02:00
98783a0981 Merge pull request 'added new time commands' (#56) from mueller/time-cmd into develop
Reviewed-on: #56
2022-04-11 09:00:29 +02:00
19768cb1b1 added new time commands 2022-04-09 18:44:04 +02:00
9055281da4 Merge remote-tracking branch 'origin/develop' into mueller/master 2022-04-09 17:59:56 +02:00
Robin Mueller
633bc406c7 submodule updates 2022-04-08 14:10:44 +02:00
2accfef011 longer ping timeout 2022-04-08 09:59:21 +02:00
10 changed files with 270 additions and 63 deletions

View File

@@ -45,3 +45,4 @@ class CustomServiceList(enum.Enum):
ACS_ASS = "acs-ass"
SUS_ASS = "sus-ass"
TCS_ASS = "tcs-ass"
TIME = "time"

View File

@@ -1,19 +1,14 @@
import argparse
from typing import Union, Dict
from typing import Union
from tmtccmd.config.definitions import (
ServiceOpCodeDictT,
HkReplyUnpacked,
DataReplyUnpacked,
)
from tmtccmd.tm.service_3_base import Service3Base
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.utility.retval import RetvalDictT
from tmtccmd.pus.obj_id import ObjectIdDictT
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.core.backend import TmTcHandler
from tmtccmd.config.hook import TmTcHookBase
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.config.globals import OpCodeDictKeys
from config.definitions import CustomServiceList
@@ -75,6 +70,7 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
add_ploc_mpsoc_cmds,
add_ploc_supv_cmds,
add_system_cmds,
add_time_cmds,
)
from pus_tc.devs.gps import GpsOpCodes
@@ -88,6 +84,7 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
add_ploc_mpsoc_cmds(cmd_dict=service_op_code_dict)
add_ploc_supv_cmds(cmd_dict=service_op_code_dict)
add_system_cmds(cmd_dict=service_op_code_dict)
add_time_cmds(cmd_dict=service_op_code_dict)
op_code_dict = {
GpsOpCodes.RESET_GNSS.value: ("Reset GPS", {OpCodeDictKeys.TIMEOUT: 2.0})
@@ -125,7 +122,7 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
"2": ("Star Tracker: Mode Normal", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("Star Tracker: Mode Off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"4": ("Star Tracker: Mode Raw", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": ("Star Tracker: Ping", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": ("Star Tracker: Ping", {OpCodeDictKeys.TIMEOUT: 5.0}),
"6": (
"Star Tracker: Switch to bootloader program",
{OpCodeDictKeys.TIMEOUT: 2.0},

View File

@@ -37,11 +37,13 @@ class Info:
class SetIds:
PDU_1 = 1
PDU_2 = 2
P60_CORE = 3
P60_AUX = 4
ACU = 5
PDU_1_CORE = 1
PDU_1_AUX = 2
PDU_2_CORE = 3
PDU_2_AUX = 4
P60_CORE = 5
P60_AUX = 6
ACU = 7
class TableIds:

View File

@@ -179,6 +179,23 @@ def add_pl_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
)
def add_time_cmds(cmd_dict: ServiceOpCodeDictT):
from pus_tc.system.time import OpCodes, Info
op_code_dict = dict()
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.SET_CURRENT_TIME,
info=Info.SET_CURRENT_TIME,
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
name=CustomServiceList.TIME.value,
info="Time Service",
op_code_entry=op_code_dict,
)
def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
from pus_tc.devs.p60dock import P60OpCodes, GomspaceOpCodes, Info
from pus_tc.devs.pdu1 import Pdu1OpCodes
@@ -318,7 +335,12 @@ def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE,
info="PDU1: Request HK once",
info=GsInfo.REQUEST_CORE_HK_ONCE,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE,
info=GsInfo.REQUEST_AUX_HK_ONCE,
)
add_op_code_entry(
op_code_dict=op_code_dict,
@@ -425,7 +447,12 @@ def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE,
info="PDU2: Request HK once",
info=GsInfo.REQUEST_CORE_HK_ONCE,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE,
info=GsInfo.REQUEST_AUX_HK_ONCE,
)
add_op_code_entry(
op_code_dict=op_code_dict,

View File

@@ -5,7 +5,11 @@
"""
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.tc.service_3_housekeeping import generate_one_hk_command, make_sid
from tmtccmd.tc.service_3_housekeeping import (
generate_one_hk_command,
make_sid,
generate_one_diag_command,
)
from gomspace.gomspace_common import *
from gomspace.gomspace_pdu_definitions import *
from config.object_ids import PDU_1_HANDLER_ID
@@ -211,8 +215,13 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code in GomspaceOpCodes.REQUEST_CORE_HK_ONCE:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Requesting HK Table Once"))
hk_sid = make_sid(object_id=PDU_1_HANDLER_ID, set_id=SetIds.PDU_1)
tc_queue.appendleft((QueueCommands.PRINT, f"PDU1: {Info.REQUEST_CORE_HK_ONCE}"))
hk_sid = make_sid(object_id=PDU_1_HANDLER_ID, set_id=SetIds.PDU_1_CORE)
command = generate_one_diag_command(sid=hk_sid, ssc=0)
tc_queue.appendleft(command.pack_command_tuple())
if op_code in GomspaceOpCodes.REQUEST_AUX_HK_ONCE:
tc_queue.appendleft((QueueCommands.PRINT, f"PDU1: {Info.REQUEST_AUX_HK_ONCE}"))
hk_sid = make_sid(object_id=PDU_1_HANDLER_ID, set_id=SetIds.PDU_1_AUX)
command = generate_one_hk_command(sid=hk_sid, ssc=0)
tc_queue.appendleft(command.pack_command_tuple())
if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:

View File

@@ -8,7 +8,11 @@
"""
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.tc.service_3_housekeeping import generate_one_hk_command, make_sid
from tmtccmd.tc.service_3_housekeeping import (
generate_one_hk_command,
generate_one_diag_command,
make_sid,
)
from gomspace.gomspace_common import *
from gomspace.gomspace_pdu_definitions import *
from config.object_ids import PDU_2_HANDLER_ID
@@ -233,8 +237,13 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code in GomspaceOpCodes.REQUEST_CORE_HK_ONCE:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Requesting HK Table Once"))
hk_sid = make_sid(object_id=PDU_2_HANDLER_ID, set_id=SetIds.PDU_2)
tc_queue.appendleft((QueueCommands.PRINT, f"PDU2: {Info.REQUEST_CORE_HK_ONCE}"))
hk_sid = make_sid(object_id=PDU_2_HANDLER_ID, set_id=SetIds.PDU_2_CORE)
command = generate_one_diag_command(sid=hk_sid, ssc=0)
tc_queue.appendleft(command.pack_command_tuple())
if op_code in GomspaceOpCodes.REQUEST_AUX_HK_ONCE:
tc_queue.appendleft((QueueCommands.PRINT, f"PDU2: {Info.REQUEST_AUX_HK_ONCE}"))
hk_sid = make_sid(object_id=PDU_2_HANDLER_ID, set_id=SetIds.PDU_2_AUX)
command = generate_one_hk_command(sid=hk_sid, ssc=0)
tc_queue.appendleft(command.pack_command_tuple())
if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:

View File

@@ -146,9 +146,7 @@ class Submode:
FIRMWARE = 2
def pack_star_tracker_commands(
object_id: bytearray, tc_queue: TcQueueT, op_code: str
) -> TcQueueT:
def pack_star_tracker_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
tc_queue.appendleft(
(
QueueCommands.PRINT,

28
pus_tc/system/time.py Normal file
View File

@@ -0,0 +1,28 @@
from datetime import datetime
from spacepackets.ecss import PusTelecommand
from tmtccmd.config import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.logging import get_console_logger
LOGGER = get_console_logger()
class OpCodes:
SET_CURRENT_TIME = ["0", "set-curr-time"]
class Info:
SET_CURRENT_TIME = "Setting current time in ASCII format"
def pack_set_current_time_ascii_command(tc_queue: TcQueueT, ssc: int):
time_test_current_time = datetime.utcnow().isoformat() + "Z" + "\0"
current_time_ascii = time_test_current_time.encode("ascii")
LOGGER.info(f"Current time in ASCII format: {current_time_ascii}")
tc_queue.appendleft((QueueCommands.PRINT, Info.SET_CURRENT_TIME))
command = PusTelecommand(
service=9, subservice=128, ssc=ssc, app_data=current_time_ascii
)
tc_queue.appendleft(command.pack_command_tuple())

View File

@@ -36,6 +36,7 @@ from pus_tc.system.core import pack_core_commands
from pus_tc.devs.star_tracker import pack_star_tracker_commands
from pus_tc.devs.syrlinks_hk_handler import pack_syrlinks_command
from pus_tc.devs.gps import pack_gps_command
from pus_tc.system.time import pack_set_current_time_ascii_command
from pus_tc.system.acs import pack_acs_command, pack_sus_cmds
from pus_tc.devs.plpcdu import pack_pl_pcdu_commands
from pus_tc.devs.str_img_helper import pack_str_img_helper_command
@@ -230,6 +231,8 @@ def pack_service_queue_user(
return pack_acs_command(tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.TCS_ASS.value:
return pack_tcs_sys_commands(tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.TIME.value:
return pack_set_current_time_ascii_command(tc_queue=service_queue, ssc=0)
LOGGER.warning("Invalid Service !")

View File

@@ -16,16 +16,8 @@ from pus_tc.devs.syrlinks_hk_handler import SetIds
from pus_tc.devs.p60dock import SetIds
from pus_tc.devs.imtq import ImtqSetIds
from tmtccmd.pus.obj_id import ObjectId, ObjectIdDictT
from config.object_ids import (
SYRLINKS_HANDLER_ID,
IMTQ_HANDLER_ID,
GPS_HANDLER_0_ID,
GPS_HANDLER_1_ID,
BPX_HANDLER_ID,
CORE_CONTROLLER_ID,
P60_DOCK_HANDLER,
PL_PCDU_ID,
)
import config.object_ids as obj_ids
LOGGER = get_console_logger()
@@ -66,29 +58,37 @@ def handle_regular_hk_print(
object_id = object_id.as_bytes
set_id = hk_packet.set_id
"""This function is called when a Service 3 Housekeeping packet is received."""
if object_id == SYRLINKS_HANDLER_ID:
if object_id == obj_ids.SYRLINKS_HANDLER_ID:
if set_id == SetIds.RX_REGISTERS_DATASET:
return handle_syrlinks_rx_registers_dataset(printer, hk_data)
elif set_id == SetIds.TX_REGISTERS_DATASET:
return handle_syrlinks_tx_registers_dataset(printer, hk_data)
else:
LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id")
elif object_id == IMTQ_HANDLER_ID:
elif object_id == obj_ids.IMTQ_HANDLER_ID:
if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and (
set_id <= ImtqSetIds.NEGATIVE_Z_TEST
):
return handle_self_test_data(printer, hk_data)
else:
LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id")
elif object_id == GPS_HANDLER_0_ID or object_id == GPS_HANDLER_1_ID:
elif object_id == obj_ids.GPS_HANDLER_0_ID or object_id == obj_ids.GPS_HANDLER_1_ID:
handle_gps_data(printer=printer, hk_data=hk_data)
elif object_id == BPX_HANDLER_ID:
elif object_id == obj_ids.BPX_HANDLER_ID:
handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer)
elif object_id == CORE_CONTROLLER_ID:
elif object_id == obj_ids.CORE_CONTROLLER_ID:
return handle_core_hk_data(printer=printer, hk_data=hk_data)
elif object_id == P60_DOCK_HANDLER:
elif object_id == obj_ids.PDU_1_HANDLER_ID:
return handle_pdu_data(
printer=printer, pdu_idx=1, set_id=set_id, hk_data=hk_data
)
elif object_id == obj_ids.PDU_2_HANDLER_ID:
return handle_pdu_data(
printer=printer, pdu_idx=2, set_id=set_id, hk_data=hk_data
)
elif object_id == obj_ids.P60_DOCK_HANDLER:
handle_p60_hk_data(printer=printer, set_id=set_id, hk_data=hk_data)
elif object_id == PL_PCDU_ID:
elif object_id == obj_ids.PL_PCDU_ID:
log_to_both(printer, "Received PL PCDU HK data")
else:
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.")
@@ -429,6 +429,159 @@ P60_INDEX_LIST = [
WDT_LIST = ["GND", "I2C", "CAN", "CSP0", "CSP1"]
PDU1_CHANNELS_NAMES = [
"TCS Board",
"Syrlinks",
"Startracker",
"MGT",
"SUS Nominal",
"SCEX",
"PLOC",
"ACS A Side",
"Unused Channel 8",
]
PDU2_CHANNELS_NAMES = [
"Q7S",
"Payload PCDU CH1",
"RW",
"TCS Heater In",
"SUS Redundant",
"Deployment Mechanism",
"Payload PCDU CH6",
"ACS B Side",
"Payload Camera",
]
PDU_CHANNEL_NAMES = [PDU1_CHANNELS_NAMES, PDU2_CHANNELS_NAMES]
class WdtInfo:
def __init__(self):
self.wdt_reboots_list = []
self.time_pings_left_list = []
def print(self, printer: FsfwTmTcPrinter):
wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)"
log_to_both(printer, wdt_info)
for idx in range(len(self.wdt_reboots_list)):
log_to_both(
printer,
f"{WDT_LIST[idx].ljust(5)} | "
f"{self.wdt_reboots_list[idx]:010} | {self.time_pings_left_list[idx]:010}",
)
def parse(self, wdt_data: bytes, current_idx: int) -> int:
priv_idx = 0
self.wdt_reboots_list = []
self.time_pings_left_list = []
for idx in range(5):
self.wdt_reboots_list.append(
struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0]
)
priv_idx += 4
current_idx += 4
for idx in range(3):
self.time_pings_left_list.append(
struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0]
)
priv_idx += 4
current_idx += 4
for idx in range(2):
self.time_pings_left_list.append(wdt_data[priv_idx])
current_idx += 1
priv_idx += 1
return current_idx
def handle_pdu_data(
printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes
):
current_idx = 0
priv_idx = pdu_idx - 1
if set_id == SetIds.PDU_1_AUX or set_id == SetIds.PDU_2_AUX:
fmt_str = "!hhBBBIIH"
inc_len = struct.calcsize(fmt_str)
(
vcc,
vbat,
conv_enb_0,
conv_enb_1,
conv_enb_2,
boot_cause,
uptime,
reset_cause,
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
log_to_both(printer, f"VCC {vcc} mV | VBAT {vbat} mV")
log_to_both(
printer, f"Converter Enables [{conv_enb_0},{conv_enb_1},{conv_enb_2}]"
)
log_to_both(
printer,
f"Boot Cause {boot_cause} | Uptime {uptime} | Reset Cause {reset_cause}",
)
current_idx += inc_len
latchup_list = []
log_to_both(printer, "Latchups")
for idx in range(len(PDU1_CHANNELS_NAMES)):
latchup_list.append(
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
)
content_line = (
f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {latchup_list[idx]}"
)
log_to_both(printer, content_line)
current_idx += 2
device_types = []
for idx in range(len(PDU1_CHANNELS_NAMES)):
device_types.append(hk_data[current_idx])
current_idx += 1
device_statuses = []
for idx in range(len(PDU1_CHANNELS_NAMES)):
device_statuses.append(hk_data[current_idx])
current_idx += 1
wdt = WdtInfo()
current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx)
wdt.print(printer=printer)
if set_id == SetIds.PDU_1_CORE or set_id == SetIds.PDU_2_CORE:
log_to_both(printer, f"Received PDU HK from PDU {pdu_idx}")
current_list = []
for idx in range(len(PDU1_CHANNELS_NAMES)):
current_list.append(
struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0]
)
current_idx += 2
voltage_list = []
for idx in range(len(PDU1_CHANNELS_NAMES)):
voltage_list.append(
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
)
current_idx += 2
output_enb_list = []
for idx in range(len(PDU1_CHANNELS_NAMES)):
output_enb_list.append(hk_data[current_idx])
current_idx += 1
header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]"
print(header_str)
printer.file_logger.info(header_str)
for idx in range(len(PDU1_CHANNELS_NAMES)):
out_enb = f"{output_enb_list[idx]}".ljust(6)
content_line = (
f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {out_enb} | "
f"{voltage_list[idx]:05} | {current_list[idx]:04}"
)
log_to_both(printer, content_line)
fmt_str = "!IBh"
inc_len = struct.calcsize(fmt_str)
(boot_count, batt_mode, temperature) = struct.unpack(
fmt_str, hk_data[current_idx : current_idx + inc_len]
)
info = (
f"Boot Count {boot_count} | Battery Mode {batt_mode} | "
f"Temperature {temperature / 10.0}"
)
log_to_both(printer, info)
def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
if set_id == SetIds.P60_CORE:
@@ -506,21 +659,8 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
dearm_status,
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
current_idx += inc_len
wdt_reboots_list = []
for idx in range(5):
wdt_reboots_list.append(
struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0]
)
current_idx += 4
time_pings_left_list = []
for idx in range(3):
time_pings_left_list.append(
struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0]
)
current_idx += 4
for idx in range(2):
time_pings_left_list.append(hk_data[current_idx])
current_idx += 1
wdt = WdtInfo()
current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx)
fmt_str = "!hhbb"
inc_len = struct.calcsize(fmt_str)
(
@@ -547,14 +687,7 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
)
log_to_both(printer, util_info)
log_to_both(printer, util_info_2)
wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)"
log_to_both(printer, wdt_info)
for idx in range(len(wdt_reboots_list)):
log_to_both(
printer,
f"{WDT_LIST[idx].ljust(5)} | "
f"{wdt_reboots_list[idx]:010} | {time_pings_left_list[idx]:010}",
)
wdt.print(printer)
misc_info = (
f"Dearm {dearm_status} | ANT6 Depl {ant6_depl} | AR6 Deply {ar6_depl}"
)