GPS HK Parsing #86
@ -34,8 +34,7 @@ class CustomServiceList(enum.Enum):
|
|||||||
REACTION_WHEEL_4 = "rw-4"
|
REACTION_WHEEL_4 = "rw-4"
|
||||||
RW_ASSEMBLY = "rw-ass"
|
RW_ASSEMBLY = "rw-ass"
|
||||||
RAD_SENSOR = "rad_sensor"
|
RAD_SENSOR = "rad_sensor"
|
||||||
GPS_0 = "gps0"
|
GPS_CTRL = "gnss-ctrl"
|
||||||
GPS_1 = "gps1"
|
|
||||||
PLOC_MEMORY_DUMPER = "ploc_memory_dumper"
|
PLOC_MEMORY_DUMPER = "ploc_memory_dumper"
|
||||||
CORE = "core"
|
CORE = "core"
|
||||||
STAR_TRACKER = "star_tracker"
|
STAR_TRACKER = "star_tracker"
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
from pus_tc.devs.gps import GpsOpCodes
|
from pus_tc.devs.gps import OpCodes
|
||||||
from pus_tc.devs.pcdu import add_pcdu_cmds
|
from pus_tc.devs.pcdu import add_pcdu_cmds
|
||||||
from pus_tc.devs.rad_sensor import add_rad_sens_cmds
|
from pus_tc.devs.rad_sensor import add_rad_sens_cmds
|
||||||
from tmtccmd.config import (
|
from tmtccmd.config import (
|
||||||
@ -89,15 +89,6 @@ def add_ccsds_cmds(cmd_dict: ServiceOpCodeDictT):
|
|||||||
cmd_dict[CustomServiceList.CCSDS_HANDLER.value] = service_ccsds_handler_tuple
|
cmd_dict[CustomServiceList.CCSDS_HANDLER.value] = service_ccsds_handler_tuple
|
||||||
|
|
||||||
|
|
||||||
def add_gps_cmds(cmd_dict: ServiceOpCodeDictT):
|
|
||||||
op_code_dict = {
|
|
||||||
GpsOpCodes.RESET_GNSS.value: ("Reset GPS", {OpCodeDictKeys.TIMEOUT: 2.0})
|
|
||||||
}
|
|
||||||
service_tuple = ("GPS 0", op_code_dict)
|
|
||||||
cmd_dict[CustomServiceList.GPS_0.value] = service_tuple
|
|
||||||
cmd_dict[CustomServiceList.GPS_1.value] = service_tuple
|
|
||||||
|
|
||||||
|
|
||||||
def add_str_cmds(cmd_dict: ServiceOpCodeDictT):
|
def add_str_cmds(cmd_dict: ServiceOpCodeDictT):
|
||||||
op_code_dict_srv_star_tracker = {
|
op_code_dict_srv_star_tracker = {
|
||||||
"0": (
|
"0": (
|
||||||
|
@ -1,22 +1,63 @@
|
|||||||
import enum
|
import enum
|
||||||
|
|
||||||
from tmtccmd.config.definitions import QueueCommands
|
from config.definitions import CustomServiceList
|
||||||
|
from tmtccmd.config import add_op_code_entry, add_service_op_code_entry
|
||||||
|
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
|
||||||
|
from tmtccmd.config.definitions import QueueCommands, ServiceOpCodeDictT
|
||||||
|
from tmtccmd.logging import get_console_logger
|
||||||
from tmtccmd.tc.definitions import TcQueueT
|
from tmtccmd.tc.definitions import TcQueueT
|
||||||
from tmtccmd.tc.pus_8_funccmd import generate_action_command
|
from tmtccmd.tc.pus_8_funccmd import generate_action_command
|
||||||
|
|
||||||
|
|
||||||
from config.object_ids import GPS_HANDLER_1_ID, GPS_HANDLER_0_ID
|
from config.object_ids import GPS_HANDLER_1_ID, GPS_HANDLER_0_ID
|
||||||
|
|
||||||
|
LOGGER = get_console_logger()
|
||||||
|
|
||||||
class GpsOpCodes(enum.Enum):
|
|
||||||
RESET_GNSS = "5"
|
class OpCodes:
|
||||||
|
REQ_OS_HK = ["0", "hk-os"]
|
||||||
|
RESET_GNSS = ["5", "reset"]
|
||||||
|
|
||||||
|
|
||||||
|
class Info:
|
||||||
|
REQ_OS_HK = "Request One-Shot HK"
|
||||||
|
RESET_GNSS = "Reset GNSS using reset pin"
|
||||||
|
|
||||||
|
|
||||||
|
class SetIds:
|
||||||
|
HK = 0
|
||||||
|
|
||||||
|
|
||||||
|
def add_gps_cmds(cmd_dict: ServiceOpCodeDictT):
|
||||||
|
op_code_dict = dict()
|
||||||
|
add_op_code_entry(
|
||||||
|
op_code_dict=op_code_dict, keys=OpCodes.RESET_GNSS, info=Info.RESET_GNSS
|
||||||
|
)
|
||||||
|
add_op_code_entry(
|
||||||
|
op_code_dict=op_code_dict, keys=OpCodes.REQ_OS_HK, info=Info.REQ_OS_HK
|
||||||
|
)
|
||||||
|
add_service_op_code_entry(
|
||||||
|
srv_op_code_dict=cmd_dict,
|
||||||
|
op_code_entry=op_code_dict,
|
||||||
|
name=CustomServiceList.GPS_CTRL.value,
|
||||||
|
info="GPS/GNSS Controller"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def pack_gps_command(object_id: bytes, tc_queue: TcQueueT, op_code: str):
|
def pack_gps_command(object_id: bytes, tc_queue: TcQueueT, op_code: str):
|
||||||
if op_code == GpsOpCodes.RESET_GNSS.value:
|
if op_code in OpCodes.RESET_GNSS:
|
||||||
if object_id == GPS_HANDLER_0_ID:
|
if object_id == GPS_HANDLER_0_ID:
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, "Resetting GPS device 0"))
|
# TODO: This needs to be re-implemented
|
||||||
|
LOGGER.warning("Reset pin handling needs to be re-implemented")
|
||||||
|
return
|
||||||
|
# tc_queue.appendleft((QueueCommands.PRINT, "Resetting GPS device 0"))
|
||||||
elif object_id == GPS_HANDLER_1_ID:
|
elif object_id == GPS_HANDLER_1_ID:
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, "Resetting GPS device 1"))
|
LOGGER.warning("Reset pin handling needs to be re-implemented")
|
||||||
|
return
|
||||||
|
# tc_queue.appendleft((QueueCommands.PRINT, "Resetting GPS device 1"))
|
||||||
cmd = generate_action_command(object_id=object_id, action_id=int(op_code))
|
cmd = generate_action_command(object_id=object_id, action_id=int(op_code))
|
||||||
tc_queue.appendleft(cmd.pack_command_tuple())
|
tc_queue.appendleft(cmd.pack_command_tuple())
|
||||||
|
if op_code in OpCodes.REQ_OS_HK:
|
||||||
|
tc_queue.appendleft((QueueCommands.PRINT, f"GMSS: {Info.REQ_OS_HK}"))
|
||||||
|
cmd = generate_one_hk_command(sid=make_sid(object_id=object_id, set_id=SetIds.HK), ssc=0)
|
||||||
|
tc_queue.appendleft(cmd.pack_command_tuple())
|
||||||
|
@ -1,6 +1,4 @@
|
|||||||
import os
|
|
||||||
import struct
|
import struct
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
from pus_tm.defs import PrintWrapper
|
from pus_tm.defs import PrintWrapper
|
||||||
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
|
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
|
||||||
@ -9,51 +7,31 @@ from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
|
|||||||
def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes):
|
def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||||
pw = PrintWrapper(printer)
|
pw = PrintWrapper(printer)
|
||||||
pw.dlog(f"Received GPS data, HK data length {len(hk_data)}")
|
pw.dlog(f"Received GPS data, HK data length {len(hk_data)}")
|
||||||
var_index = 0
|
current_idx = 0
|
||||||
header_list = [
|
fmt_str = "!ddddBBBHBBBBBI"
|
||||||
"Latitude",
|
inc_len = struct.calcsize(fmt_str)
|
||||||
"Longitude",
|
(
|
||||||
"Altitude",
|
lat,
|
||||||
"Fix Mode",
|
long,
|
||||||
"Sats in Use",
|
alt,
|
||||||
"Date",
|
speed,
|
||||||
"Unix Seconds",
|
fix,
|
||||||
]
|
sats_in_use,
|
||||||
latitude = struct.unpack("!d", hk_data[0:8])[0]
|
sats_in_view,
|
||||||
longitude = struct.unpack("!d", hk_data[8:16])[0]
|
year,
|
||||||
altitude = struct.unpack("!d", hk_data[16:24])[0]
|
month,
|
||||||
fix_mode = hk_data[24]
|
day,
|
||||||
sat_in_use = hk_data[25]
|
hours,
|
||||||
year = struct.unpack("!H", hk_data[26:28])[0]
|
minutes,
|
||||||
month = hk_data[28]
|
seconds,
|
||||||
day = hk_data[29]
|
unix_seconds
|
||||||
hours = hk_data[30]
|
) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len])
|
||||||
minutes = hk_data[31]
|
current_idx += inc_len
|
||||||
seconds = hk_data[32]
|
|
||||||
date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}"
|
date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}"
|
||||||
unix_seconds = struct.unpack("!I", hk_data[33:37])[0]
|
pw.dlog(f"Lat: {lat} deg")
|
||||||
content_list = [
|
pw.dlog(f"Long: {long} deg")
|
||||||
latitude,
|
pw.dlog(f"Altitude: {alt} m | Speed: {speed} m/s")
|
||||||
longitude,
|
pw.dlog(f"Fix Type: {fix} | Sats in View {sats_in_view} | Sats in Use {sats_in_use}")
|
||||||
altitude,
|
pw.dlog(f"GNSS Date: {date_string}")
|
||||||
fix_mode,
|
pw.dlog(f"Unix seconds {unix_seconds}")
|
||||||
sat_in_use,
|
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=14)
|
||||||
date_string,
|
|
||||||
unix_seconds,
|
|
||||||
]
|
|
||||||
var_index += 13
|
|
||||||
if not os.path.isfile("gps_log.txt"):
|
|
||||||
with open("gps_log.txt", "w") as gps_file:
|
|
||||||
gps_file.write(
|
|
||||||
"Time, Latitude [deg], Longitude [deg], Altitude [m], Fix Mode, Sats in Use, "
|
|
||||||
"Date, Unix Seconds\n"
|
|
||||||
)
|
|
||||||
with open("gps_log.txt", "a") as gps_file:
|
|
||||||
gps_file.write(
|
|
||||||
f"{datetime.now()}, {latitude}, {longitude}, {altitude}, "
|
|
||||||
f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n"
|
|
||||||
)
|
|
||||||
validity_buffer = hk_data[37:39]
|
|
||||||
pw.dlog(str(header_list))
|
|
||||||
pw.dlog(str(content_list))
|
|
||||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
|
|
||||||
|
Loading…
Reference in New Issue
Block a user