Deletion by time range cmd #256
@ -3,6 +3,7 @@ import enum
|
|||||||
import logging
|
import logging
|
||||||
import math
|
import math
|
||||||
import struct
|
import struct
|
||||||
|
from typing import Tuple
|
||||||
|
|
||||||
from eive_tmtc.config.object_ids import (
|
from eive_tmtc.config.object_ids import (
|
||||||
HK_TM_STORE,
|
HK_TM_STORE,
|
||||||
@ -19,20 +20,25 @@ from tmtccmd.tmtc import service_provider
|
|||||||
from tmtccmd.tmtc.decorator import ServiceProviderParams
|
from tmtccmd.tmtc.decorator import ServiceProviderParams
|
||||||
from dateutil.parser import parse
|
from dateutil.parser import parse
|
||||||
|
|
||||||
from spacepackets.ecss import PusService # noqa
|
|
||||||
from spacepackets.ecss.tc import PusTelecommand
|
from spacepackets.ecss.tc import PusTelecommand
|
||||||
from spacepackets.ecss.pus_15_tm_storage import Subservice
|
from spacepackets.ecss.pus_15_tm_storage import Subservice
|
||||||
from tmtccmd.util import ObjectIdU32
|
from tmtccmd.util import ObjectIdU32
|
||||||
|
|
||||||
|
|
||||||
|
class CustomSubservice(enum.IntEnum):
|
||||||
|
DELETE_BY_TIME_RANGE = 128
|
||||||
|
|
||||||
|
|
||||||
class OpCode:
|
class OpCode:
|
||||||
RETRIEVAL_BY_TIME_RANGE = "retrieval_time_range"
|
RETRIEVAL_BY_TIME_RANGE = "retrieval_time_range"
|
||||||
DELETE_UP_TO = "delete_up_to"
|
DELETE_UP_TO = "delete_up_to"
|
||||||
|
DELETE_BY_TIME_RANGE = "delete_time_range"
|
||||||
|
|
||||||
|
|
||||||
class Info:
|
class Info:
|
||||||
RETRIEVAL_BY_TIME_RANGE = "Dump Telemetry Packets by time range"
|
RETRIEVAL_BY_TIME_RANGE = "Dump Telemetry Packets by time range"
|
||||||
DELETE_UP_TO = "Delete Telemetry Packets up to time"
|
DELETE_UP_TO = "Delete Telemetry Packets up to time"
|
||||||
|
DELETE_BY_TIME_RANGE = "Delete Telemetry by time range"
|
||||||
|
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
@ -43,7 +49,7 @@ def pack_tm_store_commands(p: ServiceProviderParams):
|
|||||||
q = p.queue_helper
|
q = p.queue_helper
|
||||||
o = p.op_code
|
o = p.op_code
|
||||||
if o == OpCode.DELETE_UP_TO:
|
if o == OpCode.DELETE_UP_TO:
|
||||||
obj_id, store_string = store_select_prompt()
|
obj_id, _ = store_select_prompt()
|
||||||
app_data = bytearray(obj_id.as_bytes)
|
app_data = bytearray(obj_id.as_bytes)
|
||||||
delete_up_to_time = time_prompt("Determining deletion end time")
|
delete_up_to_time = time_prompt("Determining deletion end time")
|
||||||
end_stamp = int(math.floor(delete_up_to_time.timestamp()))
|
end_stamp = int(math.floor(delete_up_to_time.timestamp()))
|
||||||
@ -58,7 +64,7 @@ def pack_tm_store_commands(p: ServiceProviderParams):
|
|||||||
)
|
)
|
||||||
elif o == OpCode.RETRIEVAL_BY_TIME_RANGE:
|
elif o == OpCode.RETRIEVAL_BY_TIME_RANGE:
|
||||||
q.add_log_cmd(Info.RETRIEVAL_BY_TIME_RANGE)
|
q.add_log_cmd(Info.RETRIEVAL_BY_TIME_RANGE)
|
||||||
obj_id, store_string = store_select_prompt()
|
obj_id, _ = store_select_prompt()
|
||||||
app_data = bytearray(obj_id.as_bytes)
|
app_data = bytearray(obj_id.as_bytes)
|
||||||
start_of_dump_time = time_prompt("Determining retrieval start time")
|
start_of_dump_time = time_prompt("Determining retrieval start time")
|
||||||
start_stamp = int(math.floor(start_of_dump_time.timestamp()))
|
start_stamp = int(math.floor(start_of_dump_time.timestamp()))
|
||||||
@ -78,6 +84,28 @@ def pack_tm_store_commands(p: ServiceProviderParams):
|
|||||||
app_data=app_data,
|
app_data=app_data,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
elif o == OpCode.DELETE_BY_TIME_RANGE:
|
||||||
|
q.add_log_cmd(Info.DELETE_BY_TIME_RANGE)
|
||||||
|
obj_id, _ = store_select_prompt()
|
||||||
|
app_data = bytearray(obj_id.as_bytes)
|
||||||
|
start_of_dump_time = time_prompt("Determining deletion start time")
|
||||||
|
start_stamp = int(math.floor(start_of_dump_time.timestamp()))
|
||||||
|
end_of_dump_time = time_prompt("Determining deletion end time")
|
||||||
|
end_stamp = int(math.floor(end_of_dump_time.timestamp()))
|
||||||
|
app_data.extend(struct.pack("!I", start_stamp))
|
||||||
|
app_data.extend(struct.pack("!I", end_stamp))
|
||||||
|
q.add_log_cmd(Info.DELETE_BY_TIME_RANGE)
|
||||||
|
q.add_log_cmd(f"Selected Store: {obj_id}")
|
||||||
|
q.add_log_cmd(
|
||||||
|
f"Deletion from time {start_of_dump_time} up to time {end_of_dump_time}"
|
||||||
|
)
|
||||||
|
q.add_pus_tc(
|
||||||
|
PusTelecommand(
|
||||||
|
service=15,
|
||||||
|
subservice=CustomSubservice.DELETE_BY_TIME_RANGE,
|
||||||
|
app_data=app_data,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@tmtc_definitions_provider
|
@tmtc_definitions_provider
|
||||||
@ -130,6 +158,8 @@ def time_prompt(info_str: str) -> datetime.datetime:
|
|||||||
return time_prompt_fully_manually()
|
return time_prompt_fully_manually()
|
||||||
elif time_input_key == 2:
|
elif time_input_key == 2:
|
||||||
return time_prompt_offset_from_now()
|
return time_prompt_offset_from_now()
|
||||||
|
else:
|
||||||
|
raise ValueError()
|
||||||
|
|
||||||
|
|
||||||
def time_prompt_fully_manually() -> datetime.datetime:
|
def time_prompt_fully_manually() -> datetime.datetime:
|
||||||
@ -160,7 +190,7 @@ def time_prompt_offset_from_now() -> datetime.datetime:
|
|||||||
return time_now_with_offset
|
return time_now_with_offset
|
||||||
|
|
||||||
|
|
||||||
def store_select_prompt() -> (ObjectIdU32, str):
|
def store_select_prompt() -> Tuple[ObjectIdU32, str]:
|
||||||
obj_id_dict = get_object_ids()
|
obj_id_dict = get_object_ids()
|
||||||
print("Available TM stores:")
|
print("Available TM stores:")
|
||||||
for k, v in STORE_DICT.items():
|
for k, v in STORE_DICT.items():
|
||||||
|
Loading…
Reference in New Issue
Block a user