diff --git a/config/definitions.py b/config/definitions.py index 5aa4174..c12a6d2 100644 --- a/config/definitions.py +++ b/config/definitions.py @@ -34,8 +34,7 @@ class CustomServiceList(enum.Enum): REACTION_WHEEL_4 = "rw-4" RW_ASSEMBLY = "rw-ass" RAD_SENSOR = "rad_sensor" - GPS_0 = "gps0" - GPS_1 = "gps1" + GPS_CTRL = "gnss-ctrl" PLOC_MEMORY_DUMPER = "ploc_memory_dumper" CORE = "core" STAR_TRACKER = "star_tracker" diff --git a/config/object_ids.py b/config/object_ids.py index fcd9e9c..826da95 100644 --- a/config/object_ids.py +++ b/config/object_ids.py @@ -45,7 +45,6 @@ GYRO_1_L3G_HANDLER_ID = bytes([0x44, 0x12, 0x01, 0x11]) GYRO_2_ADIS_HANDLER_ID = bytes([0x44, 0x12, 0x02, 0x12]) GYRO_3_L3G_HANDLER_ID = bytes([0x44, 0x12, 0x03, 0x13]) GPS_CONTROLLER = bytes([0x44, 0x13, 0x00, 0x45]) -GPS_HANDLER_1_ID = bytes([0x44, 0x13, 0x01, 0x46]) RW1_ID = bytes([0x44, 0x12, 0x00, 0x47]) RW2_ID = bytes([0x44, 0x12, 0x01, 0x48]) RW3_ID = bytes([0x44, 0x12, 0x02, 0x49]) diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py index e8f8266..061884f 100644 --- a/pus_tc/cmd_definitions.py +++ b/pus_tc/cmd_definitions.py @@ -8,7 +8,7 @@ from tmtccmd.config import ( ) from tmtccmd.config.globals import get_default_service_op_code_dict -from pus_tc.devs.gps import GpsOpCodes +from pus_tc.devs.gps import add_gps_cmds from pus_tc.devs.pcdu import add_pcdu_cmds from pus_tc.devs.plpcdu import add_pl_pcdu_cmds from pus_tc.devs.rad_sensor import add_rad_sens_cmds @@ -93,15 +93,6 @@ def add_ccsds_cmds(cmd_dict: ServiceOpCodeDictT): cmd_dict[CustomServiceList.CCSDS_HANDLER.value] = service_ccsds_handler_tuple -def add_gps_cmds(cmd_dict: ServiceOpCodeDictT): - op_code_dict = { - GpsOpCodes.RESET_GNSS.value: ("Reset GPS", {OpCodeDictKeys.TIMEOUT: 2.0}) - } - service_tuple = ("GPS 0", op_code_dict) - cmd_dict[CustomServiceList.GPS_0.value] = service_tuple - cmd_dict[CustomServiceList.GPS_1.value] = service_tuple - - def add_str_cmds(cmd_dict: ServiceOpCodeDictT): op_code_dict_srv_star_tracker = { "0": ( diff --git a/pus_tc/devs/gps.py b/pus_tc/devs/gps.py index 9f308ed..96f89f2 100644 --- a/pus_tc/devs/gps.py +++ b/pus_tc/devs/gps.py @@ -1,15 +1,46 @@ import enum -from tmtccmd.config.definitions import QueueCommands +from config.definitions import CustomServiceList +from tmtccmd.config import add_op_code_entry, add_service_op_code_entry +from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command +from tmtccmd.config.definitions import QueueCommands, ServiceOpCodeDictT +from tmtccmd.logging import get_console_logger from tmtccmd.tc.definitions import TcQueueT from tmtccmd.tc.pus_8_funccmd import generate_action_command -from config.object_ids import GPS_HANDLER_1_ID, GPS_CONTROLLER +LOGGER = get_console_logger() -class GpsOpCodes(enum.Enum): - RESET_GNSS = "5" + +class OpCodes: + REQ_OS_HK = ["0", "hk-os"] + RESET_GNSS = ["5", "reset"] + + +class Info: + REQ_OS_HK = "Request One-Shot HK" + RESET_GNSS = "Reset GNSS using reset pin" + + +class SetIds: + HK = 0 + + +def add_gps_cmds(cmd_dict: ServiceOpCodeDictT): + op_code_dict = dict() + add_op_code_entry( + op_code_dict=op_code_dict, keys=OpCodes.RESET_GNSS, info=Info.RESET_GNSS + ) + add_op_code_entry( + op_code_dict=op_code_dict, keys=OpCodes.REQ_OS_HK, info=Info.REQ_OS_HK + ) + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + op_code_entry=op_code_dict, + name=CustomServiceList.GPS_CTRL.value, + info="GPS/GNSS Controller", + ) class SetIds: @@ -17,10 +48,12 @@ class SetIds: def pack_gps_command(object_id: bytes, tc_queue: TcQueueT, op_code: str): - if op_code == GpsOpCodes.RESET_GNSS.value: - if object_id == GPS_CONTROLLER: - tc_queue.appendleft((QueueCommands.PRINT, "Resetting GPS device 0")) - elif object_id == GPS_HANDLER_1_ID: - tc_queue.appendleft((QueueCommands.PRINT, "Resetting GPS device 1")) - cmd = generate_action_command(object_id=object_id, action_id=int(op_code)) + if op_code in OpCodes.RESET_GNSS: + # TODO: This needs to be re-implemented + LOGGER.warning("Reset pin handling needs to be re-implemented") + if op_code in OpCodes.REQ_OS_HK: + tc_queue.appendleft((QueueCommands.PRINT, f"GMSS: {Info.REQ_OS_HK}")) + cmd = generate_one_hk_command( + sid=make_sid(object_id=object_id, set_id=SetIds.HK), ssc=0 + ) tc_queue.appendleft(cmd.pack_command_tuple()) diff --git a/pus_tc/tc_packer_hook.py b/pus_tc/tc_packer_hook.py index 752e19a..be637d4 100644 --- a/pus_tc/tc_packer_hook.py +++ b/pus_tc/tc_packer_hook.py @@ -64,7 +64,6 @@ from config.object_ids import ( STAR_TRACKER_ID, PLOC_MEMORY_DUMPER_ID, GPS_CONTROLLER, - GPS_HANDLER_1_ID, CCSDS_HANDLER_ID, PDEC_HANDLER_ID, STR_IMG_HELPER_ID, @@ -73,7 +72,7 @@ from config.object_ids import ( RW_ASSEMBLY, get_object_ids, ) - +import config.object_ids as oids LOGGER = get_console_logger() @@ -206,13 +205,9 @@ def pack_service_queue_user( ) if service == CustomServiceList.ACS.value: return pack_acs_command(tc_queue=service_queue, op_code=op_code) - if service == CustomServiceList.GPS_0.value: + if service == CustomServiceList.GPS_CTRL.value: return pack_gps_command( - object_id=GPS_CONTROLLER, tc_queue=service_queue, op_code=op_code - ) - if service == CustomServiceList.GPS_1.value: - return pack_gps_command( - object_id=GPS_HANDLER_1_ID, tc_queue=service_queue, op_code=op_code + object_id=oids.GPS_CONTROLLER, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.CCSDS_HANDLER.value: return pack_ccsds_handler_test( @@ -247,8 +242,8 @@ def pack_service_queue_user( tc_queue=service_queue, object_id=RW_ASSEMBLY, op_code=op_code ) if service == CustomServiceList.CONTROLLERS.value: - return pack_cmd_ctrl_to_prompted_mode(tc_queue=service_queue, op_code=op_code) - LOGGER.warning("Invalid Service !") + return pack_controller_commands(tc_queue=service_queue, op_code=op_code) + LOGGER.warning(f"Invalid Service {service}") def create_total_tc_queue_user() -> TcQueueT: diff --git a/pus_tm/devs/gps.py b/pus_tm/devs/gps.py index d60828d..070a7d6 100644 --- a/pus_tm/devs/gps.py +++ b/pus_tm/devs/gps.py @@ -1,6 +1,4 @@ -import os import struct -from datetime import datetime from pus_tm.defs import PrintWrapper from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter @@ -9,51 +7,33 @@ from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes): pw = PrintWrapper(printer) pw.dlog(f"Received GPS data, HK data length {len(hk_data)}") - var_index = 0 - header_list = [ - "Latitude", - "Longitude", - "Altitude", - "Fix Mode", - "Sats in Use", - "Date", - "Unix Seconds", - ] - latitude = struct.unpack("!d", hk_data[0:8])[0] - longitude = struct.unpack("!d", hk_data[8:16])[0] - altitude = struct.unpack("!d", hk_data[16:24])[0] - fix_mode = hk_data[24] - sat_in_use = hk_data[25] - year = struct.unpack("!H", hk_data[26:28])[0] - month = hk_data[28] - day = hk_data[29] - hours = hk_data[30] - minutes = hk_data[31] - seconds = hk_data[32] - date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}" - unix_seconds = struct.unpack("!I", hk_data[33:37])[0] - content_list = [ - latitude, - longitude, - altitude, - fix_mode, - sat_in_use, - date_string, + current_idx = 0 + fmt_str = "!ddddBBBHBBBBBI" + inc_len = struct.calcsize(fmt_str) + ( + lat, + long, + alt, + speed, + fix, + sats_in_use, + sats_in_view, + year, + month, + day, + hours, + minutes, + seconds, unix_seconds, - ] - var_index += 13 - if not os.path.isfile("gps_log.txt"): - with open("gps_log.txt", "w") as gps_file: - gps_file.write( - "Time, Latitude [deg], Longitude [deg], Altitude [m], Fix Mode, Sats in Use, " - "Date, Unix Seconds\n" - ) - with open("gps_log.txt", "a") as gps_file: - gps_file.write( - f"{datetime.now()}, {latitude}, {longitude}, {altitude}, " - f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n" - ) - validity_buffer = hk_data[37:39] - pw.dlog(str(header_list)) - pw.dlog(str(content_list)) - printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) + ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}" + pw.dlog(f"Lat: {lat} deg") + pw.dlog(f"Long: {long} deg") + pw.dlog(f"Altitude: {alt} m | Speed: {speed} m/s") + pw.dlog( + f"Fix Type: {fix} | Sats in View {sats_in_view} | Sats in Use {sats_in_use}" + ) + pw.dlog(f"GNSS Date: {date_string}") + pw.dlog(f"Unix seconds {unix_seconds}") + printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=14) diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py index c5f0e46..2d7397e 100644 --- a/pus_tm/hk_handling.py +++ b/pus_tm/hk_handling.py @@ -82,9 +82,9 @@ def handle_regular_hk_print( """This function is called when a Service 3 Housekeeping packet is received.""" if object_id in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: handle_rw_hk_data(printer, object_id, set_id, hk_data) - if objb == obj_ids.SYRLINKS_HANDLER_ID: + elif objb == obj_ids.SYRLINKS_HANDLER_ID: handle_syrlinks_hk_data(printer=printer, hk_data=hk_data, set_id=set_id) - if objb == obj_ids.IMTQ_HANDLER_ID: + elif objb == obj_ids.IMTQ_HANDLER_ID: if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and ( set_id <= ImtqSetIds.NEGATIVE_Z_TEST ): @@ -97,25 +97,25 @@ def handle_regular_hk_print( return handle_raw_mtm_measurement(printer, hk_data) else: LOGGER.info("Service 3 TM: IMTQ handler reply with unknown set id") - if objb == obj_ids.GPS_CONTROLLER or object_id == obj_ids.GPS_HANDLER_1_ID: - handle_gps_data(printer=printer, hk_data=hk_data) - if objb == obj_ids.BPX_HANDLER_ID: + elif objb == obj_ids.GPS_CONTROLLER: + return handle_gps_data(printer=printer, hk_data=hk_data) + elif objb == obj_ids.BPX_HANDLER_ID: handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer) - if objb == obj_ids.CORE_CONTROLLER_ID: + elif objb == obj_ids.CORE_CONTROLLER_ID: return handle_core_hk_data(printer=printer, hk_data=hk_data, set_id=set_id) - if objb == obj_ids.PDU_1_HANDLER_ID: + elif objb == obj_ids.PDU_1_HANDLER_ID: return handle_pdu_data( printer=printer, pdu_idx=1, set_id=set_id, hk_data=hk_data ) - if objb == obj_ids.PDU_2_HANDLER_ID: + elif objb == obj_ids.PDU_2_HANDLER_ID: return handle_pdu_data( printer=printer, pdu_idx=2, set_id=set_id, hk_data=hk_data ) - if objb == obj_ids.ACU_HANDLER_ID: + elif objb == obj_ids.ACU_HANDLER_ID: return handle_acu_hk_data(printer=printer, hk_data=hk_data, set_id=set_id) - if objb == obj_ids.RAD_SENSOR_ID: + elif objb == obj_ids.RAD_SENSOR_ID: return handle_rad_sensor_data(printer=printer, hk_data=hk_data, set_id=set_id) - if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: + elif objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: return handle_rw_hk_data( printer=printer, object_id=object_id, set_id=set_id, hk_data=hk_data ) @@ -136,9 +136,9 @@ def handle_regular_hk_print( handle_sus_hk( object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id ) - if objb == obj_ids.P60_DOCK_HANDLER: + elif objb == obj_ids.P60_DOCK_HANDLER: handle_p60_hk_data(printer=printer, set_id=set_id, hk_data=hk_data) - if objb in [ + elif objb in [ obj_ids.GYRO_0_ADIS_HANDLER_ID, obj_ids.GYRO_1_L3G_HANDLER_ID, obj_ids.GYRO_2_ADIS_HANDLER_ID, @@ -147,7 +147,7 @@ def handle_regular_hk_print( handle_gyros_hk_data( object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id ) - if objb in [ + elif objb in [ obj_ids.MGM_0_LIS3_HANDLER_ID, obj_ids.MGM_1_RM3100_HANDLER_ID, obj_ids.MGM_2_LIS3_HANDLER_ID, @@ -163,4 +163,7 @@ def handle_regular_hk_print( object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data ) else: - LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.") + LOGGER.info( + f"Service 3 TM: Parsing for object {object_id} and set ID {set_id} " + f"has not been implemented." + )