Merge pull request 'Deletion by time range cmd' (#256) from add-deletion-by-time-range-cmd into main
All checks were successful
EIVE/-/pipeline/head This commit looks good

Reviewed-on: #256
Reviewed-by: Marius Eggert <eggertm@irs.uni-stuttgart.de>
This commit is contained in:
Robin Müller 2023-11-29 14:09:30 +01:00
commit 0a417a89e9

View File

@ -3,6 +3,7 @@ import enum
import logging import logging
import math import math
import struct import struct
from typing import Tuple
from eive_tmtc.config.object_ids import ( from eive_tmtc.config.object_ids import (
HK_TM_STORE, HK_TM_STORE,
@ -19,20 +20,25 @@ from tmtccmd.tmtc import service_provider
from tmtccmd.tmtc.decorator import ServiceProviderParams from tmtccmd.tmtc.decorator import ServiceProviderParams
from dateutil.parser import parse from dateutil.parser import parse
from spacepackets.ecss import PusService # noqa
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from spacepackets.ecss.pus_15_tm_storage import Subservice from spacepackets.ecss.pus_15_tm_storage import Subservice
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
class CustomSubservice(enum.IntEnum):
DELETE_BY_TIME_RANGE = 128
class OpCode: class OpCode:
RETRIEVAL_BY_TIME_RANGE = "retrieval_time_range" RETRIEVAL_BY_TIME_RANGE = "retrieval_time_range"
DELETE_UP_TO = "delete_up_to" DELETE_UP_TO = "delete_up_to"
DELETE_BY_TIME_RANGE = "delete_time_range"
class Info: class Info:
RETRIEVAL_BY_TIME_RANGE = "Dump Telemetry Packets by time range" RETRIEVAL_BY_TIME_RANGE = "Dump Telemetry Packets by time range"
DELETE_UP_TO = "Delete Telemetry Packets up to time" DELETE_UP_TO = "Delete Telemetry Packets up to time"
DELETE_BY_TIME_RANGE = "Delete Telemetry by time range"
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -43,7 +49,7 @@ def pack_tm_store_commands(p: ServiceProviderParams):
q = p.queue_helper q = p.queue_helper
o = p.op_code o = p.op_code
if o == OpCode.DELETE_UP_TO: if o == OpCode.DELETE_UP_TO:
obj_id, store_string = store_select_prompt() obj_id, _ = store_select_prompt()
app_data = bytearray(obj_id.as_bytes) app_data = bytearray(obj_id.as_bytes)
delete_up_to_time = time_prompt("Determining deletion end time") delete_up_to_time = time_prompt("Determining deletion end time")
end_stamp = int(math.floor(delete_up_to_time.timestamp())) end_stamp = int(math.floor(delete_up_to_time.timestamp()))
@ -58,7 +64,7 @@ def pack_tm_store_commands(p: ServiceProviderParams):
) )
elif o == OpCode.RETRIEVAL_BY_TIME_RANGE: elif o == OpCode.RETRIEVAL_BY_TIME_RANGE:
q.add_log_cmd(Info.RETRIEVAL_BY_TIME_RANGE) q.add_log_cmd(Info.RETRIEVAL_BY_TIME_RANGE)
obj_id, store_string = store_select_prompt() obj_id, _ = store_select_prompt()
app_data = bytearray(obj_id.as_bytes) app_data = bytearray(obj_id.as_bytes)
start_of_dump_time = time_prompt("Determining retrieval start time") start_of_dump_time = time_prompt("Determining retrieval start time")
start_stamp = int(math.floor(start_of_dump_time.timestamp())) start_stamp = int(math.floor(start_of_dump_time.timestamp()))
@ -78,6 +84,28 @@ def pack_tm_store_commands(p: ServiceProviderParams):
app_data=app_data, app_data=app_data,
) )
) )
elif o == OpCode.DELETE_BY_TIME_RANGE:
q.add_log_cmd(Info.DELETE_BY_TIME_RANGE)
obj_id, _ = store_select_prompt()
app_data = bytearray(obj_id.as_bytes)
start_of_dump_time = time_prompt("Determining deletion start time")
start_stamp = int(math.floor(start_of_dump_time.timestamp()))
end_of_dump_time = time_prompt("Determining deletion end time")
end_stamp = int(math.floor(end_of_dump_time.timestamp()))
app_data.extend(struct.pack("!I", start_stamp))
app_data.extend(struct.pack("!I", end_stamp))
q.add_log_cmd(Info.DELETE_BY_TIME_RANGE)
q.add_log_cmd(f"Selected Store: {obj_id}")
q.add_log_cmd(
f"Deletion from time {start_of_dump_time} up to time {end_of_dump_time}"
)
q.add_pus_tc(
PusTelecommand(
service=15,
subservice=CustomSubservice.DELETE_BY_TIME_RANGE,
app_data=app_data,
)
)
@tmtc_definitions_provider @tmtc_definitions_provider
@ -85,6 +113,7 @@ def add_persistent_tm_store_cmd_defs(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry() oce = OpCodeEntry()
oce.add(keys=OpCode.DELETE_UP_TO, info=Info.DELETE_UP_TO) oce.add(keys=OpCode.DELETE_UP_TO, info=Info.DELETE_UP_TO)
oce.add(keys=OpCode.RETRIEVAL_BY_TIME_RANGE, info=Info.RETRIEVAL_BY_TIME_RANGE) oce.add(keys=OpCode.RETRIEVAL_BY_TIME_RANGE, info=Info.RETRIEVAL_BY_TIME_RANGE)
oce.add(keys=OpCode.DELETE_BY_TIME_RANGE, info=Info.DELETE_BY_TIME_RANGE)
defs.add_service( defs.add_service(
CustomServiceList.TM_STORE, "Persistent TM Store", op_code_entry=oce CustomServiceList.TM_STORE, "Persistent TM Store", op_code_entry=oce
) )
@ -130,6 +159,8 @@ def time_prompt(info_str: str) -> datetime.datetime:
return time_prompt_fully_manually() return time_prompt_fully_manually()
elif time_input_key == 2: elif time_input_key == 2:
return time_prompt_offset_from_now() return time_prompt_offset_from_now()
else:
raise ValueError()
def time_prompt_fully_manually() -> datetime.datetime: def time_prompt_fully_manually() -> datetime.datetime:
@ -160,7 +191,7 @@ def time_prompt_offset_from_now() -> datetime.datetime:
return time_now_with_offset return time_now_with_offset
def store_select_prompt() -> (ObjectIdU32, str): def store_select_prompt() -> Tuple[ObjectIdU32, str]:
obj_id_dict = get_object_ids() obj_id_dict = get_object_ids()
print("Available TM stores:") print("Available TM stores:")
for k, v in STORE_DICT.items(): for k, v in STORE_DICT.items():