MPSoC Overhaul #296
@ -308,6 +308,15 @@ def pack_ploc_mpsoc_commands(
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if cmd_str == OpCode.SIMPLEX_STORE_FILE:
|
||||||
|
q.add_log_cmd(Info.SIMPLEX_STORE_FILE)
|
||||||
|
q.add_pus_tc(
|
||||||
|
create_action_cmd(
|
||||||
|
PLOC_MPSOC_ID,
|
||||||
|
ActionId.TC_SIMPLEX_STORE_FILE,
|
||||||
|
user_data=prepare_simplex_store_file_user_data(),
|
||||||
|
)
|
||||||
|
)
|
||||||
if cmd_str == "17":
|
if cmd_str == "17":
|
||||||
q.add_log_cmd("PLOC MPSoC: Set UART TX tristate")
|
q.add_log_cmd("PLOC MPSoC: Set UART TX tristate")
|
||||||
data = object_id.as_bytes + struct.pack("!I", ActionId.SET_UART_TX_TRISTATE)
|
data = object_id.as_bytes + struct.pack("!I", ActionId.SET_UART_TX_TRISTATE)
|
||||||
@ -322,8 +331,12 @@ def pack_ploc_mpsoc_commands(
|
|||||||
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
||||||
if cmd_str == OpCode.SIMPLEX_STREAM_FILE:
|
if cmd_str == OpCode.SIMPLEX_STREAM_FILE:
|
||||||
q.add_log_cmd(Info.SIMPLEX_STREAM_FILE)
|
q.add_log_cmd(Info.SIMPLEX_STREAM_FILE)
|
||||||
data = prepare_simplex_stream_file_cmd(object_id.as_bytes)
|
data = prepare_simplex_stream_file_user_data()
|
||||||
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
q.add_pus_tc(
|
||||||
|
create_action_cmd(
|
||||||
|
PLOC_MPSOC_ID, ActionId.TC_SIMPLEX_STREAM_FILE, user_data=data
|
||||||
|
)
|
||||||
|
)
|
||||||
if cmd_str == OpCode.DOWNLINK_DATA_MODULATE:
|
if cmd_str == OpCode.DOWNLINK_DATA_MODULATE:
|
||||||
q.add_log_cmd("PLOC MPSoC: Downlink data modulate")
|
q.add_log_cmd("PLOC MPSoC: Downlink data modulate")
|
||||||
data = prepare_downlink_data_modulate_cmd(object_id.as_bytes)
|
data = prepare_downlink_data_modulate_cmd(object_id.as_bytes)
|
||||||
@ -481,10 +494,10 @@ def prepare_replay_write_sequence_cmd(object_id: bytes) -> bytearray:
|
|||||||
return bytearray(command)
|
return bytearray(command)
|
||||||
|
|
||||||
|
|
||||||
def prepare_cam_take_pic_cmd(object_id: bytes) -> bytearray:
|
def prepare_cam_take_pic_cmd(object_id: bytes) -> bytes:
|
||||||
selection = input("Use default parameter? (Y/N): ")
|
filename = input("Specify target filename: ")
|
||||||
|
selection = input("Use default parameter? (y/n): ")
|
||||||
if selection.lower() in ["y", "1", "yes"]:
|
if selection.lower() in ["y", "1", "yes"]:
|
||||||
filename = "0:/test"
|
|
||||||
encoder_setting_y = 7
|
encoder_setting_y = 7
|
||||||
quantization_y = 0
|
quantization_y = 0
|
||||||
encoder_setting_cb = 7
|
encoder_setting_cb = 7
|
||||||
@ -493,7 +506,6 @@ def prepare_cam_take_pic_cmd(object_id: bytes) -> bytearray:
|
|||||||
quantization_cr = 0
|
quantization_cr = 0
|
||||||
bypass_compressor = 0
|
bypass_compressor = 0
|
||||||
else:
|
else:
|
||||||
filename = input("Specify filename: ")
|
|
||||||
encoder_setting_y = int(input("Specify encoderSetting_Y: "))
|
encoder_setting_y = int(input("Specify encoderSetting_Y: "))
|
||||||
quantization_y = int(input("Specify quantization_Y: "))
|
quantization_y = int(input("Specify quantization_Y: "))
|
||||||
encoder_setting_cb = int(input("Specify encoderSetting_Cb: "))
|
encoder_setting_cb = int(input("Specify encoderSetting_Cb: "))
|
||||||
@ -504,7 +516,7 @@ def prepare_cam_take_pic_cmd(object_id: bytes) -> bytearray:
|
|||||||
command = (
|
command = (
|
||||||
object_id
|
object_id
|
||||||
+ struct.pack("!I", ActionId.TC_CAM_TAKE_PIC)
|
+ struct.pack("!I", ActionId.TC_CAM_TAKE_PIC)
|
||||||
+ bytearray(filename, "utf-8")
|
+ filename.encode()
|
||||||
+ bytes([0])
|
+ bytes([0])
|
||||||
+ struct.pack("!B", encoder_setting_y)
|
+ struct.pack("!B", encoder_setting_y)
|
||||||
+ struct.pack("!Q", quantization_y)
|
+ struct.pack("!Q", quantization_y)
|
||||||
@ -514,30 +526,21 @@ def prepare_cam_take_pic_cmd(object_id: bytes) -> bytearray:
|
|||||||
+ struct.pack("!Q", quantization_cr)
|
+ struct.pack("!Q", quantization_cr)
|
||||||
+ struct.pack("!B", bypass_compressor)
|
+ struct.pack("!B", bypass_compressor)
|
||||||
)
|
)
|
||||||
return bytearray(command)
|
|
||||||
|
|
||||||
|
|
||||||
def prepare_simplex_stream_file_cmd(object_id: bytes) -> bytes:
|
|
||||||
filename = input("Specify filename: ")
|
|
||||||
command = (
|
|
||||||
object_id
|
|
||||||
+ struct.pack("!I", ActionId.TC_SIMPLEX_STREAM_FILE)
|
|
||||||
+ bytearray(filename, "utf-8")
|
|
||||||
+ bytes([0])
|
|
||||||
)
|
|
||||||
return command
|
return command
|
||||||
|
|
||||||
|
|
||||||
def prepare_simplex_store_file_cmd(object_id: bytes) -> bytes:
|
def prepare_simplex_stream_file_user_data() -> bytes:
|
||||||
|
filename = input("Specify filename: ")
|
||||||
|
command = filename.encode() + bytes([0])
|
||||||
|
return command
|
||||||
|
|
||||||
|
|
||||||
|
def prepare_simplex_store_file_user_data() -> bytes:
|
||||||
num_of_chunks = int(input("Please specify the number of chunks: "))
|
num_of_chunks = int(input("Please specify the number of chunks: "))
|
||||||
assert num_of_chunks >= 0
|
assert num_of_chunks >= 0
|
||||||
filename = input("Specify filename: ")
|
filename = input("Specify filename: ")
|
||||||
command = (
|
command = (
|
||||||
object_id
|
struct.pack("!I", num_of_chunks) + bytearray(filename, "utf-8") + bytes([0])
|
||||||
+ struct.pack("!I", ActionId.TC_SIMPLEX_STORE_FILE)
|
|
||||||
+ struct.pack("!I", num_of_chunks)
|
|
||||||
+ bytearray(filename, "utf-8")
|
|
||||||
+ bytes([0])
|
|
||||||
)
|
)
|
||||||
return command
|
return command
|
||||||
|
|
||||||
@ -742,7 +745,7 @@ def handle_mpsoc_data_reply(action_id: int, pw: PrintWrapper, custom_data: bytea
|
|||||||
"PLOC MPSoC flash directory data shorter than minimum 16 bytes"
|
"PLOC MPSoC flash directory data shorter than minimum 16 bytes"
|
||||||
)
|
)
|
||||||
current_idx = 0
|
current_idx = 0
|
||||||
dir_name_short = custom_data[current_idx : current_idx + 12].decode("utf-8")
|
dir_name_short = custom_data[current_idx : current_idx + 12].decode()
|
||||||
current_idx += 12
|
current_idx += 12
|
||||||
num_elements = struct.unpack("!I", custom_data[current_idx : current_idx + 4])[
|
num_elements = struct.unpack("!I", custom_data[current_idx : current_idx + 4])[
|
||||||
0
|
0
|
||||||
@ -762,9 +765,7 @@ def handle_mpsoc_data_reply(action_id: int, pw: PrintWrapper, custom_data: bytea
|
|||||||
# It is as weird as it looks..
|
# It is as weird as it looks..
|
||||||
for _ in range(num_elements):
|
for _ in range(num_elements):
|
||||||
end_of_str = custom_data[current_idx : current_idx + 12].index(b"\x00")
|
end_of_str = custom_data[current_idx : current_idx + 12].index(b"\x00")
|
||||||
elem_name = custom_data[current_idx : current_idx + end_of_str].decode(
|
elem_name = custom_data[current_idx : current_idx + end_of_str].decode()
|
||||||
"utf-8"
|
|
||||||
)
|
|
||||||
current_idx += 12
|
current_idx += 12
|
||||||
elem_names.append(elem_name)
|
elem_names.append(elem_name)
|
||||||
for _ in range(num_elements):
|
for _ in range(num_elements):
|
||||||
|
@ -29,7 +29,7 @@ classifiers = [
|
|||||||
"Topic :: Scientific/Engineering"
|
"Topic :: Scientific/Engineering"
|
||||||
]
|
]
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"tmtccmd ~= 8.0.0rc1",
|
"tmtccmd == 8.0.0rc1",
|
||||||
"cfdp-py~=0.1.0",
|
"cfdp-py~=0.1.0",
|
||||||
# "tmtccmd @ git+https://github.com/robamu-org/tmtccmd@main",
|
# "tmtccmd @ git+https://github.com/robamu-org/tmtccmd@main",
|
||||||
"python-dateutil ~= 2.8",
|
"python-dateutil ~= 2.8",
|
||||||
|
Loading…
Reference in New Issue
Block a user