216 lines
7.3 KiB
Python
216 lines
7.3 KiB
Python
import datetime
|
|
import enum
|
|
import logging
|
|
import math
|
|
import struct
|
|
from typing import Tuple
|
|
|
|
from dateutil.parser import parse
|
|
from spacepackets.ecss.pus_15_tm_storage import Subservice
|
|
from spacepackets.ecss.tc import PusTelecommand
|
|
from tmtccmd.config import CmdTreeNode, TmtcDefinitionWrapper
|
|
from tmtccmd.config.tmtc import OpCodeEntry
|
|
from tmtccmd.tmtc.queue import DefaultPusQueueHelper
|
|
from tmtccmd.util import ObjectIdU32
|
|
|
|
from eive_tmtc.config.definitions import CustomServiceList
|
|
from eive_tmtc.config.object_ids import (
|
|
CFDP_TM_STORE,
|
|
HK_TM_STORE,
|
|
MISC_TM_STORE,
|
|
NOT_OK_TM_STORE,
|
|
OK_TM_STORE,
|
|
get_object_ids,
|
|
)
|
|
|
|
|
|
class CustomSubservice(enum.IntEnum):
|
|
DELETE_BY_TIME_RANGE = 128
|
|
|
|
|
|
class OpCode:
|
|
RETRIEVAL_BY_TIME_RANGE = "retrieval_time_range"
|
|
DELETE_UP_TO = "delete_up_to"
|
|
DELETE_BY_TIME_RANGE = "delete_time_range"
|
|
|
|
|
|
class Info:
|
|
RETRIEVAL_BY_TIME_RANGE = "Dump Telemetry Packets by time range"
|
|
DELETE_UP_TO = "Delete Telemetry Packets up to time"
|
|
DELETE_BY_TIME_RANGE = "Delete Telemetry by time range"
|
|
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
def pack_tm_store_commands(q: DefaultPusQueueHelper, cmd_path: str):
|
|
if cmd_path == OpCode.DELETE_UP_TO:
|
|
obj_id, store_string = store_select_prompt()
|
|
app_data = bytearray(obj_id.as_bytes)
|
|
delete_up_to_time = time_prompt("Determining deletion end time")
|
|
end_stamp = int(math.floor(delete_up_to_time.timestamp()))
|
|
app_data.extend(struct.pack("!I", end_stamp))
|
|
q.add_log_cmd(Info.DELETE_UP_TO)
|
|
q.add_log_cmd(f"Selected Store: {obj_id}")
|
|
q.add_log_cmd(f"Deletion up to time {delete_up_to_time}")
|
|
q.add_pus_tc(
|
|
PusTelecommand(
|
|
service=15, subservice=Subservice.DELETE_UP_TO, app_data=app_data
|
|
)
|
|
)
|
|
elif cmd_path == OpCode.RETRIEVAL_BY_TIME_RANGE:
|
|
q.add_log_cmd(Info.RETRIEVAL_BY_TIME_RANGE)
|
|
obj_id, _ = store_select_prompt()
|
|
app_data = bytearray(obj_id.as_bytes)
|
|
start_of_dump_time = time_prompt("Determining retrieval start time")
|
|
start_stamp = int(math.floor(start_of_dump_time.timestamp()))
|
|
end_of_dump_time = time_prompt("Determining retrieval end time")
|
|
end_stamp = int(math.floor(end_of_dump_time.timestamp()))
|
|
app_data.extend(struct.pack("!I", start_stamp))
|
|
app_data.extend(struct.pack("!I", end_stamp))
|
|
q.add_log_cmd(Info.RETRIEVAL_BY_TIME_RANGE)
|
|
q.add_log_cmd(f"Selected Store: {obj_id}")
|
|
q.add_log_cmd(
|
|
f"Retrieval from time {start_of_dump_time} up to time {end_of_dump_time}"
|
|
)
|
|
q.add_pus_tc(
|
|
PusTelecommand(
|
|
service=15,
|
|
subservice=Subservice.RETRIEVAL_BY_TIME_RANGE,
|
|
app_data=app_data,
|
|
)
|
|
)
|
|
elif cmd_path == OpCode.DELETE_BY_TIME_RANGE:
|
|
q.add_log_cmd(Info.DELETE_BY_TIME_RANGE)
|
|
obj_id, _ = store_select_prompt()
|
|
app_data = bytearray(obj_id.as_bytes)
|
|
start_of_dump_time = time_prompt("Determining deletion start time")
|
|
start_stamp = int(math.floor(start_of_dump_time.timestamp()))
|
|
end_of_dump_time = time_prompt("Determining deletion end time")
|
|
end_stamp = int(math.floor(end_of_dump_time.timestamp()))
|
|
app_data.extend(struct.pack("!I", start_stamp))
|
|
app_data.extend(struct.pack("!I", end_stamp))
|
|
q.add_log_cmd(Info.DELETE_BY_TIME_RANGE)
|
|
q.add_log_cmd(f"Selected Store: {obj_id}")
|
|
q.add_log_cmd(
|
|
f"Deletion from time {start_of_dump_time} up to time {end_of_dump_time}"
|
|
)
|
|
q.add_pus_tc(
|
|
PusTelecommand(
|
|
service=15,
|
|
subservice=CustomSubservice.DELETE_BY_TIME_RANGE,
|
|
app_data=app_data,
|
|
)
|
|
)
|
|
|
|
|
|
def create_persistent_tm_store_node() -> CmdTreeNode:
|
|
node = CmdTreeNode("tm_store", "Persistent TM Store")
|
|
node.add_child(CmdTreeNode(OpCode.DELETE_UP_TO, Info.DELETE_UP_TO))
|
|
node.add_child(
|
|
CmdTreeNode(OpCode.RETRIEVAL_BY_TIME_RANGE, Info.RETRIEVAL_BY_TIME_RANGE)
|
|
)
|
|
return node
|
|
|
|
|
|
def add_persistent_tm_store_cmd_defs(defs: TmtcDefinitionWrapper):
|
|
oce = OpCodeEntry()
|
|
oce.add(keys=OpCode.DELETE_UP_TO, info=Info.DELETE_UP_TO)
|
|
oce.add(keys=OpCode.RETRIEVAL_BY_TIME_RANGE, info=Info.RETRIEVAL_BY_TIME_RANGE)
|
|
oce.add(keys=OpCode.DELETE_BY_TIME_RANGE, info=Info.DELETE_BY_TIME_RANGE)
|
|
defs.add_service(
|
|
CustomServiceList.TM_STORE, "Persistent TM Store", op_code_entry=oce
|
|
)
|
|
|
|
|
|
class TmStoreSelect(enum.IntEnum):
|
|
OK_TM_STORE = 0
|
|
NOT_OK_TM_STORE = 1
|
|
MISC_TM_STORE = 2
|
|
HK_TM_STORE = 3
|
|
CFDP_TM_STORE = 4
|
|
|
|
|
|
STORE_DICT = {
|
|
TmStoreSelect.OK_TM_STORE: (OK_TM_STORE, "OK Store (Verification)"),
|
|
TmStoreSelect.NOT_OK_TM_STORE: (
|
|
NOT_OK_TM_STORE,
|
|
"NOT OK Store (Events, Verification Failures..)",
|
|
),
|
|
TmStoreSelect.MISC_TM_STORE: (MISC_TM_STORE, "Miscellaneous Store"),
|
|
TmStoreSelect.HK_TM_STORE: (HK_TM_STORE, "HK TM Store"),
|
|
TmStoreSelect.CFDP_TM_STORE: (CFDP_TM_STORE, "CFDP TM Store"),
|
|
}
|
|
|
|
|
|
TIME_INPUT_DICT = {
|
|
1: "Full manual input with dateutil.parser.parse",
|
|
2: "Offset from now in seconds",
|
|
}
|
|
|
|
|
|
def time_prompt(info_str: str) -> datetime.datetime:
|
|
print(f"{info_str}. Available time input types: ")
|
|
for k, v in TIME_INPUT_DICT.items():
|
|
print(f" {k}: {v}")
|
|
while True:
|
|
time_input_key = int(input("Please specify a time input type by key: "))
|
|
if time_input_key not in TIME_INPUT_DICT.keys():
|
|
_LOGGER.warning("Invalid key, try again")
|
|
continue
|
|
break
|
|
if time_input_key == 1:
|
|
return time_prompt_fully_manually()
|
|
elif time_input_key == 2:
|
|
return time_prompt_offset_from_now()
|
|
raise ValueError("can not determine datetime")
|
|
|
|
|
|
def time_prompt_fully_manually() -> datetime.datetime:
|
|
# TODO: Add support for offset from now in seconds
|
|
time = parse(
|
|
input(
|
|
"Please enter the time in any format supported by dateutil.parser.parse\n"
|
|
"Recommended format: UTC ISO format YYYY-MM-DDThh:mm:ssZ: "
|
|
)
|
|
)
|
|
print(f"Parsed timestamp: {time}")
|
|
return time
|
|
|
|
|
|
def time_prompt_offset_from_now() -> datetime.datetime:
|
|
seconds_offset = math.floor(
|
|
float(
|
|
input(
|
|
"Please enter the time as a offset from now in seconds. Negative offset"
|
|
" is allowed: "
|
|
)
|
|
)
|
|
)
|
|
time_now_with_offset = datetime.datetime.now(
|
|
tz=datetime.timezone.utc
|
|
) + datetime.timedelta(seconds=seconds_offset)
|
|
print(f"Absolute resulting time: {time_now_with_offset}")
|
|
return time_now_with_offset
|
|
|
|
|
|
def store_select_prompt() -> Tuple[ObjectIdU32, str]:
|
|
obj_id_dict = get_object_ids()
|
|
print("Available TM stores:")
|
|
for k, v in STORE_DICT.items():
|
|
print(f" {k}: {v[1]}")
|
|
while True:
|
|
target_index = int(
|
|
input("Please enter the target store for the TM store transaction: ")
|
|
)
|
|
desc_and_obj_id = STORE_DICT.get(TmStoreSelect(target_index))
|
|
if desc_and_obj_id is None:
|
|
_LOGGER.warning("Invalid index. Try again")
|
|
continue
|
|
break
|
|
obj_id_raw = desc_and_obj_id[0]
|
|
obj_id = obj_id_dict.get(obj_id_raw)
|
|
assert obj_id is not None
|
|
print(f"Selected store: {obj_id} ({desc_and_obj_id[1]})")
|
|
return obj_id, desc_and_obj_id[1]
|