eive-tmtc/eive_tmtc/tmtc/tm_store.py

153 lines
4.5 KiB
Python
Raw Normal View History

2023-02-17 18:43:20 +01:00
import datetime
2023-02-17 18:35:21 +01:00
import logging
2023-02-17 18:43:20 +01:00
import math
2023-02-17 18:13:20 +01:00
import struct
2023-02-17 18:35:21 +01:00
from eive_tmtc.config.object_ids import (
HK_TM_STORE,
MISC_TM_STORE,
OK_TM_STORE,
NOT_OK_TM_STORE,
CFDP_TM_STORE,
get_object_ids,
)
2023-02-07 15:21:48 +01:00
from eive_tmtc.config.definitions import CustomServiceList
2023-02-17 18:13:20 +01:00
from tmtccmd.config import TmtcDefinitionWrapper
from tmtccmd.config.tmtc import tmtc_definitions_provider, OpCodeEntry
2023-02-07 15:21:48 +01:00
from tmtccmd.tc import service_provider
from tmtccmd.tc.decorator import ServiceProviderParams
2023-02-17 18:13:20 +01:00
from dateutil.parser import parse
2023-02-17 18:43:51 +01:00
from spacepackets.ecss import PusService # noqa
2023-02-17 18:35:21 +01:00
from spacepackets.ecss.tc import PusTelecommand
from spacepackets.ecss.pus_15_tm_storage import Subservice
from tmtccmd.util import ObjectIdU32
2023-02-17 18:13:20 +01:00
class OpCode:
DUMP = "dump"
2023-02-17 18:43:51 +01:00
DELETE_UP_TO = "delete_up_to"
2023-02-17 18:13:20 +01:00
class Info:
DUMP = "Dump Telemetry Packets"
2023-02-17 18:43:51 +01:00
DELETE_UP_TO = "Delete Telemetry Packets"
2023-02-07 15:21:48 +01:00
2023-02-17 18:35:21 +01:00
_LOGGER = logging.getLogger(__name__)
2023-02-07 15:21:48 +01:00
@service_provider(CustomServiceList.TM_STORE)
def pack_tm_store_commands(p: ServiceProviderParams):
2023-02-17 18:13:20 +01:00
q = p.queue_helper
o = p.op_code
2023-02-17 18:43:51 +01:00
if o == OpCode.DELETE_UP_TO:
2023-02-17 18:43:20 +01:00
obj_id, store_string = store_select_prompt()
2023-02-17 18:35:21 +01:00
app_data = bytearray(obj_id.as_bytes)
2023-02-17 18:43:20 +01:00
delete_up_to_time = time_prompt()
end_stamp = int(math.floor(delete_up_to_time.timestamp()))
2023-02-17 18:35:21 +01:00
app_data.extend(struct.pack("!I", end_stamp))
2023-02-17 18:43:51 +01:00
q.add_log_cmd(Info.DELETE_UP_TO)
2023-02-17 18:43:20 +01:00
q.add_log_cmd(f"Selected Store: {obj_id}")
q.add_log_cmd(
f"Deletion up to time "
f"{datetime.datetime.fromtimestamp(end_stamp, tz=datetime.timezone.utc)}"
)
2023-02-17 18:35:21 +01:00
print(
PusTelecommand(
service=15, subservice=Subservice.DELETE_UP_TO, app_data=app_data
2023-02-17 18:13:20 +01:00
)
)
2023-02-17 18:35:21 +01:00
# )
2023-02-17 18:13:20 +01:00
elif o == OpCode.DUMP:
q.add_log_cmd(Info.DUMP)
pass
@tmtc_definitions_provider
def add_persistent_tm_store_cmd_defs(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
2023-02-17 18:43:51 +01:00
oce.add(keys=OpCode.DELETE_UP_TO, info=Info.DELETE_UP_TO)
2023-02-17 18:13:20 +01:00
oce.add(keys=OpCode.DUMP, info=Info.DUMP)
defs.add_service(
CustomServiceList.TM_STORE, "Persistent TM Store", op_code_entry=oce
)
2023-02-17 18:35:21 +01:00
STORE_DICT = {
OK_TM_STORE: "OK Store (Verification)",
NOT_OK_TM_STORE: "NOT OK Store (Events, Verification Failures..)",
MISC_TM_STORE: "Miscellaneous Store",
HK_TM_STORE: "HK TM Store",
CFDP_TM_STORE: "CFDP TM Store",
}
TIME_INPUT_DICT = {
1: "Full manual input with dateutil.parser.parse",
2: "Offset from now in seconds",
}
2023-02-17 18:43:20 +01:00
def time_prompt() -> datetime.datetime:
print("Available time input types: ")
for k, v in TIME_INPUT_DICT.items():
print(f" {k}: {v}")
while True:
time_input_key = int(input("Please specify a time input type by key: "))
if time_input_key not in TIME_INPUT_DICT.keys():
_LOGGER.warning("Invalid key, try again")
continue
break
if time_input_key == 1:
return time_prompt_fully_manually()
elif time_input_key == 2:
return time_prompt_offset_from_now()
def time_prompt_fully_manually() -> datetime.datetime:
2023-02-17 18:43:20 +01:00
# TODO: Add support for offset from now in seconds
time = parse(
2023-02-17 18:43:20 +01:00
input(
"Please enter the time in any format supported by dateutil.parser.parse\n"
2023-02-17 18:43:20 +01:00
"Recommended format: UTC ISO format YYYY-MM-DDThh:mm:ssZ: "
)
)
print(f"Parsed timestamp: {time}")
return time
def time_prompt_offset_from_now() -> datetime.datetime:
seconds_offset = math.floor(
float(
input(
"Please enter the time as a offset from now in seconds. Negative offset is allowed: "
)
)
)
time_now_with_offset = datetime.datetime.now(
tz=datetime.timezone.utc
) + datetime.timedelta(seconds=seconds_offset)
print(f"Absolute resulting time: {time_now_with_offset}")
return time_now_with_offset
2023-02-17 18:43:20 +01:00
def store_select_prompt() -> (ObjectIdU32, str):
2023-02-17 18:35:21 +01:00
obj_id_dict = get_object_ids()
print("Available TM stores:")
idx_to_obj_id = dict()
for idx, (k, v) in enumerate(STORE_DICT.items()):
2023-02-17 18:43:20 +01:00
idx_to_obj_id.update({idx: (k, v)})
print(f" {idx}: {v}")
2023-02-17 18:35:21 +01:00
while True:
target_index = int(
input("Please enter the target store for the TM store transaction: ")
)
2023-02-17 18:43:20 +01:00
obj_id_and_store_str = idx_to_obj_id.get(target_index)
if obj_id_and_store_str is None:
2023-02-17 18:35:21 +01:00
_LOGGER.warning("Invalid index. Try again")
continue
break
2023-02-17 18:43:20 +01:00
obj_id_raw = obj_id_and_store_str[0]
return obj_id_dict.get(obj_id_raw), obj_id_and_store_str[1]