Compare commits
11 Commits
69fda96d7a
...
dfa45dbdba
Author | SHA1 | Date | |
---|---|---|---|
dfa45dbdba
|
|||
36d9323b57
|
|||
e04b5bf9d7
|
|||
3c0ac91227 | |||
f61c485979 | |||
2ebbf750bd
|
|||
847fccbe66 | |||
c8292f4ee1 | |||
1cafdc817f
|
|||
5967dede97
|
|||
344f16099e |
@@ -10,10 +10,14 @@ list yields a list of all related PRs for each release.
|
||||
|
||||
# [unreleased]
|
||||
|
||||
# [v7.0.0] 2024-04-17
|
||||
# [v7.0.0] 2024-04-25
|
||||
|
||||
- Reworked PLOC MPSoC commanding to be inline with OBSW update.
|
||||
|
||||
## Added
|
||||
|
||||
- Added handling for new clock events.
|
||||
|
||||
# [v6.2.0] 2024-04-10
|
||||
|
||||
## Added
|
||||
|
@@ -15,6 +15,10 @@ TM_DB_PATH = "tm.db"
|
||||
# Separate DB or not? Not sure..
|
||||
# RAW_TM_PATH = "raw_tm.db"
|
||||
|
||||
# TODO: The cleanest way would be to load those from the config file..
|
||||
PRINT_RAW_HK_B64_STR = False
|
||||
PRINT_RAW_EVENTS_B64_STR = False
|
||||
|
||||
PUS_APID = 0x65
|
||||
CFDP_APID = 0x66
|
||||
PUS_PACKET_ID = PacketId(PacketType.TM, True, PUS_APID)
|
||||
|
@@ -1,7 +1,9 @@
|
||||
import logging
|
||||
import datetime
|
||||
import sys
|
||||
import base64
|
||||
|
||||
from eive_tmtc.config.definitions import PRINT_RAW_EVENTS_B64_STR
|
||||
from eive_tmtc.config.events import get_event_dict
|
||||
from eive_tmtc.config.object_ids import get_object_ids
|
||||
from eive_tmtc.pus_tm.defs import PrintWrapper
|
||||
@@ -21,6 +23,8 @@ _LOGGER = logging.getLogger(__name__)
|
||||
def handle_event_packet( # noqa C901: Complexity okay here
|
||||
raw_tm: bytes, pw: PrintWrapper
|
||||
): # noqa C901: Complexity okay here
|
||||
if PRINT_RAW_EVENTS_B64_STR:
|
||||
print(f"PUS Event TM Base64: {base64.b64encode(raw_tm)}")
|
||||
tm = Service5Tm.unpack(data=raw_tm, time_reader=CdsShortTimestamp.empty())
|
||||
event_dict = get_event_dict()
|
||||
event_def = tm.event_definition
|
||||
@@ -36,6 +40,7 @@ def handle_event_packet( # noqa C901: Complexity okay here
|
||||
obj_name = event_def.reporter_id.hex(sep=",")
|
||||
else:
|
||||
obj_name = obj_id_obj.name
|
||||
assert tm.time_provider is not None
|
||||
generic_event_string = (
|
||||
f"Object {obj_name} generated Event {info.name} (ID: {event_def.event_id:#04x})"
|
||||
f" at {tm.time_provider.as_date_time()}"
|
||||
@@ -128,12 +133,16 @@ def handle_event_packet( # noqa C901: Complexity okay here
|
||||
time = event_def.param1 + event_def.param2 / 1000.0
|
||||
time_dt = datetime.datetime.fromtimestamp(time, datetime.timezone.utc)
|
||||
pw.dlog(f"Current time: {time_dt}")
|
||||
if info.name == "CLOCK_DUMP":
|
||||
if (
|
||||
info.name == "CLOCK_DUMP"
|
||||
or info.name == "CLOCK_DUMP_BEFORE_SETTING_TIME"
|
||||
or info.name == "CLOCK_DUMP_AFTER_SETTING_TIME"
|
||||
):
|
||||
specific_handler = True
|
||||
# param 1 is timeval seconds, param 2 is timeval subsecond microseconds
|
||||
time = event_def.param1 + event_def.param2 / 1000000.0
|
||||
time_dt = datetime.datetime.fromtimestamp(time, datetime.timezone.utc)
|
||||
pw.dlog(f"Current time: {time_dt}")
|
||||
pw.dlog(f"Clock dump event {info.name}. Current time: {time_dt}")
|
||||
if info.name == "ACTIVE_SD_INFO":
|
||||
sd_0_state = (event_def.param2 >> 16) & 0xFFFF
|
||||
sd_1_state = event_def.param2 & 0xFFFF
|
||||
|
@@ -6,6 +6,7 @@ import base64 # noqa
|
||||
import sqlite3
|
||||
from typing import List, cast
|
||||
from uuid import UUID
|
||||
from eive_tmtc.config.definitions import PRINT_RAW_HK_B64_STR
|
||||
from eive_tmtc.pus_tm.hk import HkTmInfo
|
||||
|
||||
from eive_tmtc.tmtc.acs.acs_ctrl import handle_acs_ctrl_hk_data
|
||||
@@ -74,7 +75,8 @@ def handle_hk_packet(
|
||||
if tm_packet.subservice == 25 or tm_packet.subservice == 26:
|
||||
hk_data = tm_packet.tm_data[8:]
|
||||
if named_obj_id.as_bytes in hk_filter.object_ids:
|
||||
# print(f"PUS TM Base64: {base64.b64encode(raw_tm)}")
|
||||
if PRINT_RAW_HK_B64_STR:
|
||||
print(f"PUS TM Base64: {base64.b64encode(raw_tm)}")
|
||||
handle_regular_hk_print(
|
||||
printer=printer,
|
||||
packet_uuid=packet_uuid,
|
||||
|
@@ -27,6 +27,8 @@ _LOGGER = logging.getLogger(__name__)
|
||||
|
||||
MANUAL_INPUT = "1"
|
||||
|
||||
CRIT_CMD_APID_DICT = {"1": ("flash_mkfs", 0x12A)}
|
||||
|
||||
OBC_WRITE_FILE_DICT = {
|
||||
MANUAL_INPUT: ("manual input", ""),
|
||||
"2": ("/mnt/sd0/ploc/mpsoc/flash_write.bin", "/mnt/sd0/ploc/mpsoc/flash_write.bin"),
|
||||
@@ -90,6 +92,8 @@ class ActionId(enum.IntEnum):
|
||||
TC_FLASH_READ_FULL_FILE = 30
|
||||
TC_SIMPLEX_STORE_FILE = 31
|
||||
TC_VERIFY_BOOT = 32
|
||||
TC_ENABLE_TC_EXECUTION = 33
|
||||
TC_FLASH_MKFS = 34
|
||||
|
||||
|
||||
class Submode(enum.IntEnum):
|
||||
@@ -128,6 +132,8 @@ class OpCode:
|
||||
MODE_IDLE = "mode_idle"
|
||||
MODE_REPLAY = "mode_replay"
|
||||
MODE_SNAPSHOT = "mode_snapshot"
|
||||
ENABLE_TC_EXECUTION = "enable_tc_exec"
|
||||
FLASH_MKFS = "flash_mkfs"
|
||||
|
||||
|
||||
class Info:
|
||||
@@ -158,6 +164,8 @@ class Info:
|
||||
DISABLE_PLOC_SUPV_COMMANDING_TO_ON = (
|
||||
"Disable PLOC SUPV commanding when switching ON"
|
||||
)
|
||||
ENABLE_TC_EXECUTION = "Enable execution of critical commands"
|
||||
FLASH_MKFS = "Flash MKFS command"
|
||||
|
||||
|
||||
class MemAddresses(enum.IntEnum):
|
||||
@@ -271,6 +279,35 @@ def pack_ploc_mpsoc_commands(
|
||||
+ bytearray(cam_cmd, "utf-8")
|
||||
)
|
||||
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
||||
if cmd_str == OpCode.ENABLE_TC_EXECUTION:
|
||||
q.add_log_cmd(Info.ENABLE_TC_EXECUTION)
|
||||
while True:
|
||||
for key, val in CRIT_CMD_APID_DICT.items():
|
||||
print(f"{key}: {val[0]} with APID {val[1]}")
|
||||
key = input("Please specify the command to enable by key: ")
|
||||
if key not in CRIT_CMD_APID_DICT:
|
||||
print("invalid key")
|
||||
continue
|
||||
apid = CRIT_CMD_APID_DICT[key][1]
|
||||
break
|
||||
app_data = struct.pack("!H", apid)
|
||||
q.add_pus_tc(
|
||||
create_action_cmd(PLOC_MPSOC_ID, ActionId.TC_ENABLE_TC_EXECUTION, app_data)
|
||||
)
|
||||
if cmd_str == OpCode.FLASH_MKFS:
|
||||
q.add_log_cmd(Info.FLASH_MKFS)
|
||||
while True:
|
||||
flash_select = int(input("Please select the flash ID (0 or 1): "))
|
||||
if flash_select != 0 and flash_select != 1:
|
||||
_LOGGER.warn("invalid flash select")
|
||||
continue
|
||||
break
|
||||
q.add_pus_tc(
|
||||
create_action_cmd(
|
||||
PLOC_MPSOC_ID, ActionId.TC_FLASH_MKFS, bytes([flash_select])
|
||||
)
|
||||
)
|
||||
|
||||
if cmd_str == "17":
|
||||
q.add_log_cmd("PLOC MPSoC: Set UART TX tristate")
|
||||
data = object_id.as_bytes + struct.pack("!I", ActionId.SET_UART_TX_TRISTATE)
|
||||
|
14
scripts/raw-analysis.py
Executable file
14
scripts/raw-analysis.py
Executable file
@@ -0,0 +1,14 @@
|
||||
#!/usr/bin/env python3
|
||||
from base64 import b64decode
|
||||
from spacepackets.ccsds.time import CdsShortTimestamp
|
||||
from spacepackets.ecss.tm import PusTelemetry
|
||||
from spacepackets.ecss.pus_1_verification import Service1Tm, UnpackParams
|
||||
|
||||
bruh = "CGX6cQAdIAEIOzcAAEBedwUTOzkYZe5WAAEAAAAAAAAAAF4z"
|
||||
data = b64decode(bruh)
|
||||
tm = PusTelemetry.unpack(data, CdsShortTimestamp.empty())
|
||||
srv1_tm = Service1Tm.from_tm(
|
||||
tm, UnpackParams(time_reader=CdsShortTimestamp.empty(), bytes_err_code=2)
|
||||
)
|
||||
print(f"service {tm.service} subservice {tm.subservice}")
|
||||
print(f"error code: {srv1_tm.error_code}")
|
Reference in New Issue
Block a user