Reticulum/RNS/Buffer.py

315 lines
11 KiB
Python
Raw Normal View History

from __future__ import annotations
2023-05-09 14:13:57 -06:00
import bz2
import sys
2023-05-18 17:57:20 -06:00
import time
import threading
from threading import RLock
import struct
from RNS.Channel import Channel, MessageBase, SystemMessageTypes
import RNS
from io import RawIOBase, BufferedRWPair, BufferedReader, BufferedWriter
from typing import Callable
from contextlib import AbstractContextManager
class StreamDataMessage(MessageBase):
MSGTYPE = SystemMessageTypes.SMT_STREAM_DATA
"""
Message type for ``Channel``. ``StreamDataMessage``
uses a system-reserved message type.
"""
2023-05-09 14:13:57 -06:00
STREAM_ID_MAX = 0x3fff # 16383
"""
2023-05-09 14:13:57 -06:00
The stream id is limited to 2 bytes - 2 bit
"""
MAX_DATA_LEN = RNS.Link.MDU - 2 - 6 # 2 for stream data message header, 6 for channel envelope
"""
When the Buffer package is imported, this value is
calculcated based on the value of OVERHEAD
"""
def __init__(self, stream_id: int = None, data: bytes = None, eof: bool = False):
"""
This class is used to encapsulate binary stream
data to be sent over a ``Channel``.
:param stream_id: id of stream relative to receiver
:param data: binary data
:param eof: set to True if signalling End of File
"""
super().__init__()
if stream_id is not None and stream_id > self.STREAM_ID_MAX:
2023-05-09 14:13:57 -06:00
raise ValueError("stream_id must be 0-16383")
self.stream_id = stream_id
2023-05-09 14:13:57 -06:00
self.compressed = False
self.data = data or bytes()
self.eof = eof
def pack(self) -> bytes:
if self.stream_id is None:
raise ValueError("stream_id")
2023-05-09 14:13:57 -06:00
compressed_data = bz2.compress(self.data)
saved = len(self.data)-len(compressed_data)
if saved > 0:
self.data = compressed_data
self.compressed = True
header_val = (0x3fff & self.stream_id) | (0x8000 if self.eof else 0x0000) | (0x4000 if self.compressed > 0 else 0x0000)
return bytes(struct.pack(">H", header_val) + (self.data if self.data else bytes()))
def unpack(self, raw):
self.stream_id = struct.unpack(">H", raw[:2])[0]
self.eof = (0x8000 & self.stream_id) > 0
2023-05-09 14:13:57 -06:00
self.compressed = (0x4000 & self.stream_id) > 0
self.stream_id = self.stream_id & 0x3fff
self.data = raw[2:]
2023-05-18 17:57:20 -06:00
2023-05-09 14:13:57 -06:00
if self.compressed:
self.data = bz2.decompress(self.data)
class RawChannelReader(RawIOBase, AbstractContextManager):
"""
An implementation of RawIOBase that receives
binary stream data sent over a ``Channel``.
This class generally need not be instantiated directly.
Use :func:`RNS.Buffer.create_reader`,
:func:`RNS.Buffer.create_writer`, and
:func:`RNS.Buffer.create_bidirectional_buffer` functions
to create buffered streams with optional callbacks.
For additional information on the API of this
object, see the Python documentation for
``RawIOBase``.
"""
def __init__(self, stream_id: int, channel: Channel):
"""
Create a raw channel reader.
:param stream_id: local stream id to receive at
:param channel: ``Channel`` object to receive from
"""
self._stream_id = stream_id
self._channel = channel
self._lock = RLock()
self._buffer = bytearray()
self._eof = False
self._channel._register_message_type(StreamDataMessage, is_system_type=True)
self._channel.add_message_handler(self._handle_message)
self._listeners: [Callable[[int], None]] = []
def add_ready_callback(self, cb: Callable[[int], None]):
"""
Add a function to be called when new data is available.
The function should have the signature ``(ready_bytes: int) -> None``
:param cb: function to call
"""
with self._lock:
self._listeners.append(cb)
def remove_ready_callback(self, cb: Callable[[int], None]):
"""
Remove a function added with :func:`RNS.RawChannelReader.add_ready_callback()`
:param cb: function to remove
"""
with self._lock:
self._listeners.remove(cb)
def _handle_message(self, message: MessageBase):
if isinstance(message, StreamDataMessage):
if message.stream_id == self._stream_id:
with self._lock:
if message.data is not None:
self._buffer.extend(message.data)
if message.eof:
self._eof = True
for listener in self._listeners:
try:
2023-05-18 15:35:28 -06:00
threading.Thread(target=listener, name="Message Callback", args=[len(self._buffer)], daemon=True).start()
except Exception as ex:
RNS.log("Error calling RawChannelReader(" + str(self._stream_id) + ") callback: " + str(ex))
return True
return False
def _read(self, __size: int) -> bytes | None:
with self._lock:
result = self._buffer[:__size]
self._buffer = self._buffer[__size:]
return result if len(result) > 0 or self._eof else None
def readinto(self, __buffer: bytearray) -> int | None:
ready = self._read(len(__buffer))
if ready is not None:
__buffer[:len(ready)] = ready
return len(ready) if ready is not None else None
def writable(self) -> bool:
return False
def seekable(self) -> bool:
return False
def readable(self) -> bool:
return True
def close(self):
with self._lock:
self._channel.remove_message_handler(self._handle_message)
self._listeners.clear()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
class RawChannelWriter(RawIOBase, AbstractContextManager):
"""
An implementation of RawIOBase that receives
binary stream data sent over a channel.
This class generally need not be instantiated directly.
Use :func:`RNS.Buffer.create_reader`,
:func:`RNS.Buffer.create_writer`, and
:func:`RNS.Buffer.create_bidirectional_buffer` functions
to create buffered streams with optional callbacks.
For additional information on the API of this
object, see the Python documentation for
``RawIOBase``.
"""
def __init__(self, stream_id: int, channel: Channel):
"""
Create a raw channel writer.
:param stream_id: remote stream id to sent do
:param channel: ``Channel`` object to send on
"""
self._stream_id = stream_id
self._channel = channel
self._eof = False
def write(self, __b: bytes) -> int | None:
try:
chunk = bytes(__b[:StreamDataMessage.MAX_DATA_LEN])
message = StreamDataMessage(self._stream_id, chunk, self._eof)
self._channel.send(message)
return len(chunk)
except RNS.Channel.ChannelException as cex:
if cex.type != RNS.Channel.CEType.ME_LINK_NOT_READY:
raise
return 0
def close(self):
2023-05-18 17:57:20 -06:00
try:
link_rtt = self._channel._outlet.link.rtt
timeout = time.time() + (link_rtt * len(self._channel._tx_ring) * 1)
except Exception as e:
timeout = time.time() + 15
while time.time() < timeout and not self._channel.is_ready_to_send():
time.sleep(0.05)
self._eof = True
self.write(bytes())
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
def seekable(self) -> bool:
return False
def readable(self) -> bool:
return False
def writable(self) -> bool:
return True
class Buffer:
"""
Static functions for creating buffered streams that send
and receive over a ``Channel``.
These functions use ``BufferedReader``, ``BufferedWriter``,
and ``BufferedRWPair`` to add buffering to
``RawChannelReader`` and ``RawChannelWriter``.
"""
@staticmethod
def create_reader(stream_id: int, channel: Channel,
ready_callback: Callable[[int], None] | None = None) -> BufferedReader:
"""
Create a buffered reader that reads binary data sent
over a ``Channel``, with an optional callback when
new data is available.
Callback signature: ``(ready_bytes: int) -> None``
For more information on the reader-specific functions
of this object, see the Python documentation for
``BufferedReader``
:param stream_id: the local stream id to receive from
:param channel: the channel to receive on
:param ready_callback: function to call when new data is available
:return: a BufferedReader object
"""
reader = RawChannelReader(stream_id, channel)
if ready_callback:
reader.add_ready_callback(ready_callback)
return BufferedReader(reader)
@staticmethod
def create_writer(stream_id: int, channel: Channel) -> BufferedWriter:
"""
Create a buffered writer that writes binary data over
a ``Channel``.
For more information on the writer-specific functions
of this object, see the Python documentation for
``BufferedWriter``
:param stream_id: the remote stream id to send to
:param channel: the channel to send on
:return: a BufferedWriter object
"""
writer = RawChannelWriter(stream_id, channel)
return BufferedWriter(writer)
@staticmethod
def create_bidirectional_buffer(receive_stream_id: int, send_stream_id: int, channel: Channel,
ready_callback: Callable[[int], None] | None = None) -> BufferedRWPair:
"""
Create a buffered reader/writer pair that reads and
writes binary data over a ``Channel``, with an
optional callback when new data is available.
Callback signature: ``(ready_bytes: int) -> None``
For more information on the reader-specific functions
of this object, see the Python documentation for
``BufferedRWPair``
:param receive_stream_id: the local stream id to receive at
:param send_stream_id: the remote stream id to send to
:param channel: the channel to send and receive on
:param ready_callback: function to call when new data is available
:return: a BufferedRWPair object
"""
reader = RawChannelReader(receive_stream_id, channel)
if ready_callback:
reader.add_ready_callback(ready_callback)
writer = RawChannelWriter(send_stream_id, channel)
return BufferedRWPair(reader, writer)