clean up python code a bit #18
@ -1,7 +1,8 @@
|
||||
import struct
|
||||
from serde import Model, fields
|
||||
|
||||
from common import EXPERIMENT_APID, UniqueId, make_addressable_id
|
||||
from common import EXPERIMENT_APID, UniqueId, make_unique_id
|
||||
|
||||
|
||||
class CameraParameters(Model):
|
||||
R: fields.Int()
|
||||
@ -13,19 +14,4 @@ class CameraParameters(Model):
|
||||
W: fields.Int()
|
||||
|
||||
def serialize_for_uplink(self) -> bytearray:
|
||||
return self.to_json().encode('utf-8')
|
||||
|
||||
# Example serialization
|
||||
data = bytearray(make_addressable_id(EXPERIMENT_APID, UniqueId.CameraHandler))
|
||||
params = CameraParameters(8, 8, 8, 1, True, 200, 1000)
|
||||
serialized = params.to_json().encode('utf-8')
|
||||
byte_string = bytearray(struct.pack('!{}s'.format(len(serialized)), serialized))
|
||||
print(byte_string)
|
||||
print(params.serialize_for_uplink())
|
||||
data.extend(params.serialize_for_uplink())
|
||||
print(data)
|
||||
|
||||
# Example deserialization
|
||||
data = '{"R": 100, "G": 150, "B": 200, "N": 3, "P": true, "E": 10, "W": 20}'
|
||||
deserialized_params = CameraParameters.from_json(data)
|
||||
print(deserialized_params)
|
||||
return self.to_json().encode("utf-8")
|
||||
|
@ -3,14 +3,12 @@ from __future__ import annotations
|
||||
import dataclasses
|
||||
import enum
|
||||
import struct
|
||||
from serde import Model, fields
|
||||
|
||||
EXPERIMENT_ID = 278
|
||||
EXPERIMENT_APID = 1024 + EXPERIMENT_ID
|
||||
|
||||
|
||||
class UniqueId(enum.IntEnum):
|
||||
|
||||
Controller = 0
|
||||
PusEventManagement = 1
|
||||
PusRouting = 2
|
||||
@ -59,14 +57,11 @@ class AcsHkIds(enum.IntEnum):
|
||||
MGM_SET = 1
|
||||
|
||||
|
||||
def make_addressable_id(target_id: int, unique_id: int) -> bytes:
|
||||
byte_string = bytearray(struct.pack("!I", unique_id))
|
||||
# byte_string = bytearray(struct.pack("!I", target_id))
|
||||
# byte_string.extend(struct.pack("!I", unique_id))
|
||||
return byte_string
|
||||
def make_unique_id(unique_id: int) -> bytes:
|
||||
return struct.pack("!I", unique_id)
|
||||
|
||||
|
||||
def make_addressable_id_with_action_id(unique_id: int, action_id: int) -> bytes:
|
||||
def make_action_cmd_header(unique_id: int, action_id: int) -> bytes:
|
||||
byte_string = bytearray(struct.pack("!I", unique_id))
|
||||
byte_string.extend(struct.pack("!I", action_id))
|
||||
return byte_string
|
||||
|
@ -9,10 +9,13 @@ from tmtccmd.pus.tc.s200_fsfw_mode import Mode
|
||||
from tmtccmd.tmtc import DefaultPusQueueHelper
|
||||
from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd
|
||||
from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservice
|
||||
from serde import Model, fields
|
||||
|
||||
from camera_params import CameraParameters
|
||||
from common import EXPERIMENT_APID, UniqueId, make_addressable_id, make_addressable_id_with_action_id
|
||||
from common import (
|
||||
EXPERIMENT_APID,
|
||||
UniqueId,
|
||||
make_action_cmd_header,
|
||||
)
|
||||
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@ -70,33 +73,27 @@ def create_cmd_definition_tree() -> CmdTreeNode:
|
||||
root_node.add_child(scheduler_node)
|
||||
|
||||
action_node = CmdTreeNode("action", "Action Node")
|
||||
cam_node = CmdTreeNode(
|
||||
"take_image", "Take Image with IMS Imager"
|
||||
)
|
||||
cam_node = CmdTreeNode("take_image", "Take Image with IMS Imager")
|
||||
cam_node.add_child(
|
||||
CmdTreeNode("default_single", "Default Single Image Camera Parameters")
|
||||
)
|
||||
cam_node.add_child(
|
||||
CmdTreeNode("balanced_single", "Balanced Single Image Camera Parameters")
|
||||
)
|
||||
cam_node.add_child(
|
||||
CmdTreeNode(
|
||||
"default_single", "Default Single Image Camera Parameters"
|
||||
"default_single_flatsat",
|
||||
"Default Single Image Camera Parameters for use on FlatSat",
|
||||
)
|
||||
)
|
||||
cam_node.add_child(
|
||||
CmdTreeNode(
|
||||
"balanced_single", "Balanced Single Image Camera Parameters"
|
||||
"balanced_single_flatsat",
|
||||
"Balanced Single Image Camera Parameters for use on FlatSat",
|
||||
)
|
||||
)
|
||||
cam_node.add_child(
|
||||
CmdTreeNode(
|
||||
"default_single_flatsat", "Default Single Image Camera Parameters for use on FlatSat"
|
||||
)
|
||||
)
|
||||
cam_node.add_child(
|
||||
CmdTreeNode(
|
||||
"balanced_single_flatsat", "Balanced Single Image Camera Parameters for use on FlatSat"
|
||||
)
|
||||
)
|
||||
cam_node.add_child(
|
||||
CmdTreeNode(
|
||||
"custom_params", "Custom Camera Parameters as specified from file"
|
||||
)
|
||||
CmdTreeNode("custom_params", "Custom Camera Parameters as specified from file")
|
||||
)
|
||||
action_node.add_child(cam_node)
|
||||
root_node.add_child(action_node)
|
||||
@ -134,25 +131,32 @@ def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str):
|
||||
if cmd_path_list[0] == "acs":
|
||||
assert len(cmd_path_list) >= 2
|
||||
if cmd_path_list[0] == "action":
|
||||
assert len(cmd_path_list)>= 2
|
||||
assert len(cmd_path_list) >= 2
|
||||
if cmd_path_list[1] == "take_image":
|
||||
assert len(cmd_path_list)>= 3
|
||||
q.add_log_cmd("Sending PUS take image action request with " + cmd_path_list[2] + " params.")
|
||||
assert len(cmd_path_list) >= 3
|
||||
q.add_log_cmd(
|
||||
"Sending PUS take image action request with "
|
||||
+ cmd_path_list[2]
|
||||
+ " params."
|
||||
)
|
||||
data = bytearray()
|
||||
if cmd_path_list[2] == "default_single":
|
||||
data = make_addressable_id_with_action_id(UniqueId.CameraHandler, 1)
|
||||
data.extend(make_action_cmd_header(UniqueId.CameraHandler, 1))
|
||||
if cmd_path_list[2] == "balanced_single":
|
||||
data = make_addressable_id_with_action_id(UniqueId.CameraHandler, 2)
|
||||
data.extend(make_action_cmd_header(UniqueId.CameraHandler, 2))
|
||||
if cmd_path_list[2] == "default_single_flatsat":
|
||||
data = make_addressable_id_with_action_id(UniqueId.CameraHandler, 3)
|
||||
data.extend(make_action_cmd_header(UniqueId.CameraHandler, 3))
|
||||
if cmd_path_list[2] == "balanced_single_flatsat":
|
||||
data = make_addressable_id_with_action_id(UniqueId.CameraHandler, 4)
|
||||
data.extend(make_action_cmd_header(UniqueId.CameraHandler, 4))
|
||||
if cmd_path_list[2] == "custom":
|
||||
data = make_addressable_id_with_action_id(UniqueId.CameraHandler, 5)
|
||||
data.extend(make_action_cmd_header(UniqueId.CameraHandler, 5))
|
||||
params = CameraParameters(8, 8, 8, 1, True, 200, 1000)
|
||||
bytes = params.serialize_for_uplink()
|
||||
data.extend(bytes)
|
||||
print(data.hex(sep=","))
|
||||
return q.add_pus_tc(PusTelecommand(service=8, subservice=128, apid=EXPERIMENT_APID, app_data=data))
|
||||
data.extend(params.serialize_for_uplink())
|
||||
return q.add_pus_tc(
|
||||
PusTelecommand(
|
||||
service=8, subservice=128, apid=EXPERIMENT_APID, app_data=data
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def handle_set_mode_cmd(
|
||||
|
@ -149,10 +149,10 @@ class PusHandler(GenericApidHandlerBase):
|
||||
_LOGGER.info("Received test event")
|
||||
elif service == 8:
|
||||
if pus_tm.subservice == 130:
|
||||
_LOGGER.info(f"Received Action Data Reply TM[8,130]")
|
||||
_LOGGER.info("Received Action Data Reply TM[8,130]")
|
||||
reply = pus_tm.source_data
|
||||
reply = reply[6:]
|
||||
_LOGGER.info(f"Data Reply Content: " + reply.decode('utf-8'))
|
||||
_LOGGER.info(f"Data Reply Content: {reply.decode()}")
|
||||
elif service == 17:
|
||||
tm_packet = Service17Tm.unpack(
|
||||
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
|
||||
|
@ -1,2 +1,3 @@
|
||||
tmtccmd == 8.0.0rc2
|
||||
serde == 0.9.0
|
||||
# -e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd
|
||||
|
20
pytmtc/test_serde.py
Normal file
20
pytmtc/test_serde.py
Normal file
@ -0,0 +1,20 @@
|
||||
import struct
|
||||
from camera_params import CameraParameters
|
||||
from common import make_unique_id, EXPERIMENT_APID
|
||||
|
||||
|
||||
def test_serde_serialization():
|
||||
# Example serializatn
|
||||
data = bytearray(make_unique_id(EXPERIMENT_APID))
|
||||
params = CameraParameters(8, 8, 8, 1, True, 200, 1000)
|
||||
serialized = params.to_json().encode("utf-8")
|
||||
byte_string = bytearray(struct.pack("!{}s".format(len(serialized)), serialized))
|
||||
print(byte_string)
|
||||
print(params.serialize_for_uplink())
|
||||
data.extend(params.serialize_for_uplink())
|
||||
print(data)
|
||||
|
||||
# Example deserialization
|
||||
data = '{"R": 100, "G": 150, "B": 200, "N": 3, "P": true, "E": 10, "W": 20}'
|
||||
deserialized_params = CameraParameters.from_json(data)
|
||||
print(deserialized_params)
|
Loading…
x
Reference in New Issue
Block a user