GPS HK Parsing #86

Merged
muellerr merged 16 commits from mueller/gps-hk-parsing into develop 2022-05-28 12:52:01 +02:00
7 changed files with 97 additions and 97 deletions

View File

@ -34,8 +34,7 @@ class CustomServiceList(enum.Enum):
REACTION_WHEEL_4 = "rw-4" REACTION_WHEEL_4 = "rw-4"
RW_ASSEMBLY = "rw-ass" RW_ASSEMBLY = "rw-ass"
RAD_SENSOR = "rad_sensor" RAD_SENSOR = "rad_sensor"
GPS_0 = "gps0" GPS_CTRL = "gnss-ctrl"
GPS_1 = "gps1"
PLOC_MEMORY_DUMPER = "ploc_memory_dumper" PLOC_MEMORY_DUMPER = "ploc_memory_dumper"
CORE = "core" CORE = "core"
STAR_TRACKER = "star_tracker" STAR_TRACKER = "star_tracker"

View File

@ -45,7 +45,6 @@ GYRO_1_L3G_HANDLER_ID = bytes([0x44, 0x12, 0x01, 0x11])
GYRO_2_ADIS_HANDLER_ID = bytes([0x44, 0x12, 0x02, 0x12]) GYRO_2_ADIS_HANDLER_ID = bytes([0x44, 0x12, 0x02, 0x12])
GYRO_3_L3G_HANDLER_ID = bytes([0x44, 0x12, 0x03, 0x13]) GYRO_3_L3G_HANDLER_ID = bytes([0x44, 0x12, 0x03, 0x13])
GPS_CONTROLLER = bytes([0x44, 0x13, 0x00, 0x45]) GPS_CONTROLLER = bytes([0x44, 0x13, 0x00, 0x45])
GPS_HANDLER_1_ID = bytes([0x44, 0x13, 0x01, 0x46])
RW1_ID = bytes([0x44, 0x12, 0x00, 0x47]) RW1_ID = bytes([0x44, 0x12, 0x00, 0x47])
RW2_ID = bytes([0x44, 0x12, 0x01, 0x48]) RW2_ID = bytes([0x44, 0x12, 0x01, 0x48])
RW3_ID = bytes([0x44, 0x12, 0x02, 0x49]) RW3_ID = bytes([0x44, 0x12, 0x02, 0x49])

View File

@ -8,7 +8,7 @@ from tmtccmd.config import (
) )
from tmtccmd.config.globals import get_default_service_op_code_dict from tmtccmd.config.globals import get_default_service_op_code_dict
from pus_tc.devs.gps import GpsOpCodes from pus_tc.devs.gps import add_gps_cmds
from pus_tc.devs.pcdu import add_pcdu_cmds from pus_tc.devs.pcdu import add_pcdu_cmds
from pus_tc.devs.plpcdu import add_pl_pcdu_cmds from pus_tc.devs.plpcdu import add_pl_pcdu_cmds
from pus_tc.devs.rad_sensor import add_rad_sens_cmds from pus_tc.devs.rad_sensor import add_rad_sens_cmds
@ -93,15 +93,6 @@ def add_ccsds_cmds(cmd_dict: ServiceOpCodeDictT):
cmd_dict[CustomServiceList.CCSDS_HANDLER.value] = service_ccsds_handler_tuple cmd_dict[CustomServiceList.CCSDS_HANDLER.value] = service_ccsds_handler_tuple
def add_gps_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict = {
GpsOpCodes.RESET_GNSS.value: ("Reset GPS", {OpCodeDictKeys.TIMEOUT: 2.0})
}
service_tuple = ("GPS 0", op_code_dict)
cmd_dict[CustomServiceList.GPS_0.value] = service_tuple
cmd_dict[CustomServiceList.GPS_1.value] = service_tuple
def add_str_cmds(cmd_dict: ServiceOpCodeDictT): def add_str_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict_srv_star_tracker = { op_code_dict_srv_star_tracker = {
"0": ( "0": (

View File

@ -1,15 +1,46 @@
import enum import enum
from tmtccmd.config.definitions import QueueCommands from config.definitions import CustomServiceList
from tmtccmd.config import add_op_code_entry, add_service_op_code_entry
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
from tmtccmd.config.definitions import QueueCommands, ServiceOpCodeDictT
from tmtccmd.logging import get_console_logger
from tmtccmd.tc.definitions import TcQueueT from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.pus_8_funccmd import generate_action_command from tmtccmd.tc.pus_8_funccmd import generate_action_command
from config.object_ids import GPS_HANDLER_1_ID, GPS_CONTROLLER LOGGER = get_console_logger()
class GpsOpCodes(enum.Enum):
RESET_GNSS = "5" class OpCodes:
REQ_OS_HK = ["0", "hk-os"]
RESET_GNSS = ["5", "reset"]
class Info:
REQ_OS_HK = "Request One-Shot HK"
RESET_GNSS = "Reset GNSS using reset pin"
class SetIds:
HK = 0
def add_gps_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict = dict()
add_op_code_entry(
op_code_dict=op_code_dict, keys=OpCodes.RESET_GNSS, info=Info.RESET_GNSS
)
add_op_code_entry(
op_code_dict=op_code_dict, keys=OpCodes.REQ_OS_HK, info=Info.REQ_OS_HK
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
op_code_entry=op_code_dict,
name=CustomServiceList.GPS_CTRL.value,
info="GPS/GNSS Controller",
)
class SetIds: class SetIds:
@ -17,10 +48,12 @@ class SetIds:
def pack_gps_command(object_id: bytes, tc_queue: TcQueueT, op_code: str): def pack_gps_command(object_id: bytes, tc_queue: TcQueueT, op_code: str):
if op_code == GpsOpCodes.RESET_GNSS.value: if op_code in OpCodes.RESET_GNSS:
if object_id == GPS_CONTROLLER: # TODO: This needs to be re-implemented
tc_queue.appendleft((QueueCommands.PRINT, "Resetting GPS device 0")) LOGGER.warning("Reset pin handling needs to be re-implemented")
elif object_id == GPS_HANDLER_1_ID: if op_code in OpCodes.REQ_OS_HK:
tc_queue.appendleft((QueueCommands.PRINT, "Resetting GPS device 1")) tc_queue.appendleft((QueueCommands.PRINT, f"GMSS: {Info.REQ_OS_HK}"))
cmd = generate_action_command(object_id=object_id, action_id=int(op_code)) cmd = generate_one_hk_command(
sid=make_sid(object_id=object_id, set_id=SetIds.HK), ssc=0
)
tc_queue.appendleft(cmd.pack_command_tuple()) tc_queue.appendleft(cmd.pack_command_tuple())

View File

@ -64,7 +64,6 @@ from config.object_ids import (
STAR_TRACKER_ID, STAR_TRACKER_ID,
PLOC_MEMORY_DUMPER_ID, PLOC_MEMORY_DUMPER_ID,
GPS_CONTROLLER, GPS_CONTROLLER,
GPS_HANDLER_1_ID,
CCSDS_HANDLER_ID, CCSDS_HANDLER_ID,
PDEC_HANDLER_ID, PDEC_HANDLER_ID,
STR_IMG_HELPER_ID, STR_IMG_HELPER_ID,
@ -73,7 +72,7 @@ from config.object_ids import (
RW_ASSEMBLY, RW_ASSEMBLY,
get_object_ids, get_object_ids,
) )
import config.object_ids as oids
LOGGER = get_console_logger() LOGGER = get_console_logger()
@ -206,13 +205,9 @@ def pack_service_queue_user(
) )
if service == CustomServiceList.ACS.value: if service == CustomServiceList.ACS.value:
return pack_acs_command(tc_queue=service_queue, op_code=op_code) return pack_acs_command(tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.GPS_0.value: if service == CustomServiceList.GPS_CTRL.value:
return pack_gps_command( return pack_gps_command(
object_id=GPS_CONTROLLER, tc_queue=service_queue, op_code=op_code object_id=oids.GPS_CONTROLLER, tc_queue=service_queue, op_code=op_code
)
if service == CustomServiceList.GPS_1.value:
return pack_gps_command(
object_id=GPS_HANDLER_1_ID, tc_queue=service_queue, op_code=op_code
) )
if service == CustomServiceList.CCSDS_HANDLER.value: if service == CustomServiceList.CCSDS_HANDLER.value:
return pack_ccsds_handler_test( return pack_ccsds_handler_test(
@ -247,8 +242,8 @@ def pack_service_queue_user(
tc_queue=service_queue, object_id=RW_ASSEMBLY, op_code=op_code tc_queue=service_queue, object_id=RW_ASSEMBLY, op_code=op_code
) )
if service == CustomServiceList.CONTROLLERS.value: if service == CustomServiceList.CONTROLLERS.value:
return pack_cmd_ctrl_to_prompted_mode(tc_queue=service_queue, op_code=op_code) return pack_controller_commands(tc_queue=service_queue, op_code=op_code)
LOGGER.warning("Invalid Service !") LOGGER.warning(f"Invalid Service {service}")
def create_total_tc_queue_user() -> TcQueueT: def create_total_tc_queue_user() -> TcQueueT:

View File

@ -1,6 +1,4 @@
import os
import struct import struct
from datetime import datetime
from pus_tm.defs import PrintWrapper from pus_tm.defs import PrintWrapper
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
@ -9,51 +7,33 @@ from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes): def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes):
pw = PrintWrapper(printer) pw = PrintWrapper(printer)
pw.dlog(f"Received GPS data, HK data length {len(hk_data)}") pw.dlog(f"Received GPS data, HK data length {len(hk_data)}")
var_index = 0 current_idx = 0
header_list = [ fmt_str = "!ddddBBBHBBBBBI"
"Latitude", inc_len = struct.calcsize(fmt_str)
"Longitude", (
"Altitude", lat,
"Fix Mode", long,
"Sats in Use", alt,
"Date", speed,
"Unix Seconds", fix,
] sats_in_use,
latitude = struct.unpack("!d", hk_data[0:8])[0] sats_in_view,
longitude = struct.unpack("!d", hk_data[8:16])[0] year,
altitude = struct.unpack("!d", hk_data[16:24])[0] month,
fix_mode = hk_data[24] day,
sat_in_use = hk_data[25] hours,
year = struct.unpack("!H", hk_data[26:28])[0] minutes,
month = hk_data[28] seconds,
day = hk_data[29]
hours = hk_data[30]
minutes = hk_data[31]
seconds = hk_data[32]
date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}"
unix_seconds = struct.unpack("!I", hk_data[33:37])[0]
content_list = [
latitude,
longitude,
altitude,
fix_mode,
sat_in_use,
date_string,
unix_seconds, unix_seconds,
] ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
var_index += 13 current_idx += inc_len
if not os.path.isfile("gps_log.txt"): date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}"
with open("gps_log.txt", "w") as gps_file: pw.dlog(f"Lat: {lat} deg")
gps_file.write( pw.dlog(f"Long: {long} deg")
"Time, Latitude [deg], Longitude [deg], Altitude [m], Fix Mode, Sats in Use, " pw.dlog(f"Altitude: {alt} m | Speed: {speed} m/s")
"Date, Unix Seconds\n" pw.dlog(
) f"Fix Type: {fix} | Sats in View {sats_in_view} | Sats in Use {sats_in_use}"
with open("gps_log.txt", "a") as gps_file: )
gps_file.write( pw.dlog(f"GNSS Date: {date_string}")
f"{datetime.now()}, {latitude}, {longitude}, {altitude}, " pw.dlog(f"Unix seconds {unix_seconds}")
f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n" printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=14)
)
validity_buffer = hk_data[37:39]
pw.dlog(str(header_list))
pw.dlog(str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)

View File

@ -82,9 +82,9 @@ def handle_regular_hk_print(
"""This function is called when a Service 3 Housekeeping packet is received.""" """This function is called when a Service 3 Housekeeping packet is received."""
if object_id in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: if object_id in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
handle_rw_hk_data(printer, object_id, set_id, hk_data) handle_rw_hk_data(printer, object_id, set_id, hk_data)
if objb == obj_ids.SYRLINKS_HANDLER_ID: elif objb == obj_ids.SYRLINKS_HANDLER_ID:
handle_syrlinks_hk_data(printer=printer, hk_data=hk_data, set_id=set_id) handle_syrlinks_hk_data(printer=printer, hk_data=hk_data, set_id=set_id)
if objb == obj_ids.IMTQ_HANDLER_ID: elif objb == obj_ids.IMTQ_HANDLER_ID:
if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and ( if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and (
set_id <= ImtqSetIds.NEGATIVE_Z_TEST set_id <= ImtqSetIds.NEGATIVE_Z_TEST
): ):
@ -97,25 +97,25 @@ def handle_regular_hk_print(
return handle_raw_mtm_measurement(printer, hk_data) return handle_raw_mtm_measurement(printer, hk_data)
else: else:
LOGGER.info("Service 3 TM: IMTQ handler reply with unknown set id") LOGGER.info("Service 3 TM: IMTQ handler reply with unknown set id")
if objb == obj_ids.GPS_CONTROLLER or object_id == obj_ids.GPS_HANDLER_1_ID: elif objb == obj_ids.GPS_CONTROLLER:
handle_gps_data(printer=printer, hk_data=hk_data) return handle_gps_data(printer=printer, hk_data=hk_data)
if objb == obj_ids.BPX_HANDLER_ID: elif objb == obj_ids.BPX_HANDLER_ID:
handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer) handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer)
if objb == obj_ids.CORE_CONTROLLER_ID: elif objb == obj_ids.CORE_CONTROLLER_ID:
return handle_core_hk_data(printer=printer, hk_data=hk_data, set_id=set_id) return handle_core_hk_data(printer=printer, hk_data=hk_data, set_id=set_id)
if objb == obj_ids.PDU_1_HANDLER_ID: elif objb == obj_ids.PDU_1_HANDLER_ID:
return handle_pdu_data( return handle_pdu_data(
printer=printer, pdu_idx=1, set_id=set_id, hk_data=hk_data printer=printer, pdu_idx=1, set_id=set_id, hk_data=hk_data
) )
if objb == obj_ids.PDU_2_HANDLER_ID: elif objb == obj_ids.PDU_2_HANDLER_ID:
return handle_pdu_data( return handle_pdu_data(
printer=printer, pdu_idx=2, set_id=set_id, hk_data=hk_data printer=printer, pdu_idx=2, set_id=set_id, hk_data=hk_data
) )
if objb == obj_ids.ACU_HANDLER_ID: elif objb == obj_ids.ACU_HANDLER_ID:
return handle_acu_hk_data(printer=printer, hk_data=hk_data, set_id=set_id) return handle_acu_hk_data(printer=printer, hk_data=hk_data, set_id=set_id)
if objb == obj_ids.RAD_SENSOR_ID: elif objb == obj_ids.RAD_SENSOR_ID:
return handle_rad_sensor_data(printer=printer, hk_data=hk_data, set_id=set_id) return handle_rad_sensor_data(printer=printer, hk_data=hk_data, set_id=set_id)
if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: elif objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
return handle_rw_hk_data( return handle_rw_hk_data(
printer=printer, object_id=object_id, set_id=set_id, hk_data=hk_data printer=printer, object_id=object_id, set_id=set_id, hk_data=hk_data
) )
@ -136,9 +136,9 @@ def handle_regular_hk_print(
handle_sus_hk( handle_sus_hk(
object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id
) )
if objb == obj_ids.P60_DOCK_HANDLER: elif objb == obj_ids.P60_DOCK_HANDLER:
handle_p60_hk_data(printer=printer, set_id=set_id, hk_data=hk_data) handle_p60_hk_data(printer=printer, set_id=set_id, hk_data=hk_data)
if objb in [ elif objb in [
obj_ids.GYRO_0_ADIS_HANDLER_ID, obj_ids.GYRO_0_ADIS_HANDLER_ID,
obj_ids.GYRO_1_L3G_HANDLER_ID, obj_ids.GYRO_1_L3G_HANDLER_ID,
obj_ids.GYRO_2_ADIS_HANDLER_ID, obj_ids.GYRO_2_ADIS_HANDLER_ID,
@ -147,7 +147,7 @@ def handle_regular_hk_print(
handle_gyros_hk_data( handle_gyros_hk_data(
object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id
) )
if objb in [ elif objb in [
obj_ids.MGM_0_LIS3_HANDLER_ID, obj_ids.MGM_0_LIS3_HANDLER_ID,
obj_ids.MGM_1_RM3100_HANDLER_ID, obj_ids.MGM_1_RM3100_HANDLER_ID,
obj_ids.MGM_2_LIS3_HANDLER_ID, obj_ids.MGM_2_LIS3_HANDLER_ID,
@ -163,4 +163,7 @@ def handle_regular_hk_print(
object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data
) )
else: else:
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.") LOGGER.info(
f"Service 3 TM: Parsing for object {object_id} and set ID {set_id} "
f"has not been implemented."
)