Reticulum/RNS/Channel.py

695 lines
25 KiB
Python
Raw Permalink Normal View History

2023-02-27 18:05:25 -07:00
# MIT License
#
# Copyright (c) 2016-2023 Mark Qvist / unsigned.io and contributors.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
2023-02-25 17:23:25 -07:00
from __future__ import annotations
import collections
import enum
import threading
import time
from types import TracebackType
from typing import Type, Callable, TypeVar, Generic, NewType
2023-02-25 17:23:25 -07:00
import abc
import contextlib
import struct
import RNS
from abc import ABC, abstractmethod
TPacket = TypeVar("TPacket")
2023-02-25 17:23:25 -07:00
class SystemMessageTypes(enum.IntEnum):
SMT_STREAM_DATA = 0xff00
2023-02-25 17:23:25 -07:00
class ChannelOutletBase(ABC, Generic[TPacket]):
2023-02-27 16:36:04 -07:00
"""
An abstract transport layer interface used by Channel.
DEPRECATED: This was created for testing; eventually
Channel will use Link or a LinkBase interface
directly.
"""
2023-02-25 17:23:25 -07:00
@abstractmethod
def send(self, raw: bytes) -> TPacket:
2023-02-25 17:23:25 -07:00
raise NotImplemented()
@abstractmethod
def resend(self, packet: TPacket) -> TPacket:
2023-02-25 17:23:25 -07:00
raise NotImplemented()
@property
@abstractmethod
def mdu(self):
raise NotImplemented()
@property
@abstractmethod
def rtt(self):
raise NotImplemented()
@property
@abstractmethod
def is_usable(self):
raise NotImplemented()
@abstractmethod
def get_packet_state(self, packet: TPacket) -> MessageState:
2023-02-25 17:23:25 -07:00
raise NotImplemented()
@abstractmethod
def timed_out(self):
raise NotImplemented()
@abstractmethod
def __str__(self):
raise NotImplemented()
@abstractmethod
def set_packet_timeout_callback(self, packet: TPacket, callback: Callable[[TPacket], None] | None,
2023-02-25 17:23:25 -07:00
timeout: float | None = None):
raise NotImplemented()
@abstractmethod
def set_packet_delivered_callback(self, packet: TPacket, callback: Callable[[TPacket], None] | None):
2023-02-25 17:23:25 -07:00
raise NotImplemented()
@abstractmethod
def get_packet_id(self, packet: TPacket) -> any:
2023-02-25 17:23:25 -07:00
raise NotImplemented()
class CEType(enum.IntEnum):
2023-02-27 16:36:04 -07:00
"""
ChannelException type codes
"""
2023-02-25 17:23:25 -07:00
ME_NO_MSG_TYPE = 0
ME_INVALID_MSG_TYPE = 1
ME_NOT_REGISTERED = 2
ME_LINK_NOT_READY = 3
ME_ALREADY_SENT = 4
ME_TOO_BIG = 5
class ChannelException(Exception):
2023-02-27 16:36:04 -07:00
"""
An exception thrown by Channel, with a type code.
"""
2023-02-25 17:23:25 -07:00
def __init__(self, ce_type: CEType, *args):
super().__init__(args)
self.type = ce_type
class MessageState(enum.IntEnum):
2023-02-27 16:36:04 -07:00
"""
Set of possible states for a Message
"""
2023-02-25 17:23:25 -07:00
MSGSTATE_NEW = 0
MSGSTATE_SENT = 1
MSGSTATE_DELIVERED = 2
MSGSTATE_FAILED = 3
class MessageBase(abc.ABC):
2023-02-27 16:36:04 -07:00
"""
Base type for any messages sent or received on a Channel.
Subclasses must define the two abstract methods as well as
2023-02-27 18:05:25 -07:00
the ``MSGTYPE`` class variable.
2023-02-27 16:36:04 -07:00
"""
# MSGTYPE must be unique within all classes sent over a
# channel. Additionally, MSGTYPE > 0xf000 are reserved.
2023-02-25 17:23:25 -07:00
MSGTYPE = None
2023-02-27 18:05:25 -07:00
"""
Defines a unique identifier for a message class.
2023-02-27 20:10:28 -07:00
* Must be unique within all classes registered with a ``Channel``
* Must be less than ``0xf000``. Values greater than or equal to ``0xf000`` are reserved.
2023-02-27 18:05:25 -07:00
"""
2023-02-25 17:23:25 -07:00
@abstractmethod
def pack(self) -> bytes:
2023-02-27 16:36:04 -07:00
"""
Create and return the binary representation of the message
2023-02-27 18:05:25 -07:00
:return: binary representation of message
2023-02-27 16:36:04 -07:00
"""
2023-02-25 17:23:25 -07:00
raise NotImplemented()
@abstractmethod
2023-02-27 20:10:28 -07:00
def unpack(self, raw: bytes):
2023-02-27 16:36:04 -07:00
"""
Populate message from binary representation
2023-02-27 18:05:25 -07:00
:param raw: binary representation
2023-02-27 16:36:04 -07:00
"""
2023-02-25 17:23:25 -07:00
raise NotImplemented()
MessageCallbackType = NewType("MessageCallbackType", Callable[[MessageBase], bool])
2023-02-25 17:23:25 -07:00
class Envelope:
2023-02-27 16:36:04 -07:00
"""
Internal wrapper used to transport messages over a channel and
track its state within the channel framework.
"""
2023-02-25 17:23:25 -07:00
def unpack(self, message_factories: dict[int, Type]) -> MessageBase:
msgtype, self.sequence, length = struct.unpack(">HHH", self.raw[:6])
raw = self.raw[6:]
ctor = message_factories.get(msgtype, None)
if ctor is None:
raise ChannelException(CEType.ME_NOT_REGISTERED, f"Unable to find constructor for Channel MSGTYPE {hex(msgtype)}")
message = ctor()
message.unpack(raw)
self.unpacked = True
self.message = message
2023-02-25 17:23:25 -07:00
return message
def pack(self) -> bytes:
if self.message.__class__.MSGTYPE is None:
raise ChannelException(CEType.ME_NO_MSG_TYPE, f"{self.message.__class__} lacks MSGTYPE")
data = self.message.pack()
self.raw = struct.pack(">HHH", self.message.MSGTYPE, self.sequence, len(data)) + data
self.packed = True
2023-02-25 17:23:25 -07:00
return self.raw
def __init__(self, outlet: ChannelOutletBase, message: MessageBase = None, raw: bytes = None, sequence: int = None):
self.ts = time.time()
self.id = id(self)
self.message = message
self.raw = raw
self.packet: TPacket = None
2023-02-25 17:23:25 -07:00
self.sequence = sequence
self.outlet = outlet
self.tries = 0
self.unpacked = False
self.packed = False
2023-02-25 17:23:25 -07:00
self.tracked = False
class Channel(contextlib.AbstractContextManager):
2023-02-27 16:36:04 -07:00
"""
2023-02-27 20:10:28 -07:00
Provides reliable delivery of messages over
2023-02-27 18:05:25 -07:00
a link.
2023-02-27 20:10:28 -07:00
``Channel`` differs from ``Request`` and
``Resource`` in some important ways:
**Continuous**
Messages can be sent or received as long as
the ``Link`` is open.
**Bi-directional**
Messages can be sent in either direction on
the ``Link``; neither end is the client or
server.
**Size-constrained**
Messages must be encoded into a single packet.
``Channel`` is similar to ``Packet``, except that it
provides reliable delivery (automatic retries) as well
as a structure for exchanging several types of
messages over the ``Link``.
``Channel`` is not instantiated directly, but rather
obtained from a ``Link`` with ``get_channel()``.
2023-02-27 16:36:04 -07:00
"""
# The initial window size at channel setup
WINDOW = 2
# Absolute minimum window size
WINDOW_MIN = 2
WINDOW_MIN_LIMIT_SLOW = 2
WINDOW_MIN_LIMIT_MEDIUM = 5
WINDOW_MIN_LIMIT_FAST = 16
# The maximum window size for transfers on slow links
2023-05-10 11:15:20 -06:00
WINDOW_MAX_SLOW = 5
2023-05-11 12:23:36 -06:00
# The maximum window size for transfers on mid-speed links
WINDOW_MAX_MEDIUM = 12
2023-05-11 12:23:36 -06:00
# The maximum window size for transfers on fast links
2023-05-10 11:15:20 -06:00
WINDOW_MAX_FAST = 48
# For calculating maps and guard segments, this
# must be set to the global maximum window.
WINDOW_MAX = WINDOW_MAX_FAST
# If the fast rate is sustained for this many request
# rounds, the fast link window size will be allowed.
2023-05-10 12:01:33 -06:00
FAST_RATE_THRESHOLD = 10
# If the RTT rate is higher than this value,
# the max window size for fast links will be used.
RTT_FAST = 0.18
2023-05-11 12:23:36 -06:00
RTT_MEDIUM = 0.75
2023-05-11 13:28:13 -06:00
RTT_SLOW = 1.45
# The minimum allowed flexibility of the window size.
# The difference between window_max and window_min
# will never be smaller than this value.
WINDOW_FLEXIBILITY = 4
2023-05-11 09:48:07 -06:00
SEQ_MAX = 0xFFFF
SEQ_MODULUS = SEQ_MAX+1
2023-05-10 11:28:28 -06:00
2023-02-25 17:23:25 -07:00
def __init__(self, outlet: ChannelOutletBase):
2023-02-27 18:05:25 -07:00
"""
@param outlet:
"""
2023-02-25 17:23:25 -07:00
self._outlet = outlet
self._lock = threading.RLock()
self._tx_ring: collections.deque[Envelope] = collections.deque()
self._rx_ring: collections.deque[Envelope] = collections.deque()
self._message_callbacks: [MessageCallbackType] = []
2023-02-25 17:23:25 -07:00
self._next_sequence = 0
self._next_rx_sequence = 0
self._message_factories: dict[int, Type[MessageBase]] = {}
2023-02-25 17:23:25 -07:00
self._max_tries = 5
2023-05-10 11:15:20 -06:00
self.fast_rate_rounds = 0
2023-05-11 12:23:36 -06:00
self.medium_rate_rounds = 0
2023-02-25 17:23:25 -07:00
2023-05-11 13:28:13 -06:00
if self._outlet.rtt > Channel.RTT_SLOW:
self.window = 1
self.window_max = 1
self.window_min = 1
self.window_flexibility = 1
else:
self.window = Channel.WINDOW
self.window_max = Channel.WINDOW_MAX_SLOW
self.window_min = Channel.WINDOW_MIN
self.window_flexibility = Channel.WINDOW_FLEXIBILITY
2023-02-25 17:23:25 -07:00
def __enter__(self) -> Channel:
return self
def __exit__(self, __exc_type: Type[BaseException] | None, __exc_value: BaseException | None,
__traceback: TracebackType | None) -> bool | None:
2023-02-27 16:36:04 -07:00
self._shutdown()
2023-02-25 17:23:25 -07:00
return False
2023-02-27 20:10:28 -07:00
def register_message_type(self, message_class: Type[MessageBase]):
2023-02-27 16:36:04 -07:00
"""
2023-02-27 20:10:28 -07:00
Register a message class for reception over a ``Channel``.
Message classes must extend ``MessageBase``.
2023-02-27 18:05:25 -07:00
2023-02-27 20:10:28 -07:00
:param message_class: Class to register
2023-02-27 16:36:04 -07:00
"""
2023-02-27 20:10:28 -07:00
self._register_message_type(message_class, is_system_type=False)
def _register_message_type(self, message_class: Type[MessageBase], *, is_system_type: bool = False):
with self._lock:
if not issubclass(message_class, MessageBase):
raise ChannelException(CEType.ME_INVALID_MSG_TYPE,
f"{message_class} is not a subclass of {MessageBase}.")
if message_class.MSGTYPE is None:
raise ChannelException(CEType.ME_INVALID_MSG_TYPE,
f"{message_class} has invalid MSGTYPE class attribute.")
2023-02-27 16:36:04 -07:00
if message_class.MSGTYPE >= 0xf000 and not is_system_type:
raise ChannelException(CEType.ME_INVALID_MSG_TYPE,
f"{message_class} has system-reserved message type.")
try:
message_class()
except Exception as ex:
raise ChannelException(CEType.ME_INVALID_MSG_TYPE,
f"{message_class} raised an exception when constructed with no arguments: {ex}")
self._message_factories[message_class.MSGTYPE] = message_class
def add_message_handler(self, callback: MessageCallbackType):
2023-02-27 16:36:04 -07:00
"""
Add a handler for incoming messages. A handler
2023-02-27 20:10:28 -07:00
has the following signature:
``(message: MessageBase) -> bool``
2023-02-27 16:36:04 -07:00
Handlers are processed in the order they are
added. If any handler returns True, processing
of the message stops; handlers after the
returning handler will not be called.
2023-02-27 18:05:25 -07:00
:param callback: Function to call
2023-02-27 16:36:04 -07:00
"""
with self._lock:
if callback not in self._message_callbacks:
self._message_callbacks.append(callback)
def remove_message_handler(self, callback: MessageCallbackType):
2023-02-27 16:36:04 -07:00
"""
2023-02-27 20:10:28 -07:00
Remove a handler added with ``add_message_handler``.
2023-02-27 18:05:25 -07:00
:param callback: handler to remove
2023-02-27 16:36:04 -07:00
"""
with self._lock:
if callback in self._message_callbacks:
self._message_callbacks.remove(callback)
2023-02-25 17:23:25 -07:00
2023-02-27 16:36:04 -07:00
def _shutdown(self):
with self._lock:
self._message_callbacks.clear()
2023-02-27 16:36:04 -07:00
self._clear_rings()
2023-02-25 17:23:25 -07:00
2023-02-27 16:36:04 -07:00
def _clear_rings(self):
2023-02-25 17:23:25 -07:00
with self._lock:
for envelope in self._tx_ring:
if envelope.packet is not None:
self._outlet.set_packet_timeout_callback(envelope.packet, None)
self._outlet.set_packet_delivered_callback(envelope.packet, None)
self._tx_ring.clear()
self._rx_ring.clear()
2023-02-27 16:36:04 -07:00
def _emplace_envelope(self, envelope: Envelope, ring: collections.deque[Envelope]) -> bool:
2023-02-25 17:23:25 -07:00
with self._lock:
i = 0
2023-02-27 16:36:04 -07:00
for existing in ring:
if envelope.sequence == existing.sequence:
RNS.log(f"Envelope: Emplacement of duplicate envelope with sequence "+str(envelope.sequence), RNS.LOG_EXTREME)
2023-02-25 17:23:25 -07:00
return False
if envelope.sequence < existing.sequence and not (self._next_rx_sequence - envelope.sequence) > (Channel.SEQ_MAX//2):
ring.insert(i, envelope)
envelope.tracked = True
return True
2023-02-25 17:23:25 -07:00
i += 1
2023-02-25 17:23:25 -07:00
envelope.tracked = True
ring.append(envelope)
2023-02-25 17:23:25 -07:00
return True
def _run_callbacks(self, message: MessageBase):
cbs = self._message_callbacks.copy()
for cb in cbs:
try:
if cb(message):
return
2023-05-11 09:48:07 -06:00
except Exception as e:
RNS.log("Channel "+str(self)+" experienced an error while running a message callback. The contained exception was: "+str(e), RNS.LOG_ERROR)
2023-02-27 16:36:04 -07:00
def _receive(self, raw: bytes):
2023-02-25 17:23:25 -07:00
try:
envelope = Envelope(outlet=self._outlet, raw=raw)
with self._lock:
message = envelope.unpack(self._message_factories)
if envelope.sequence < self._next_rx_sequence:
2023-05-10 11:28:28 -06:00
window_overflow = (self._next_rx_sequence+Channel.WINDOW_MAX) % Channel.SEQ_MODULUS
if window_overflow < self._next_rx_sequence:
if envelope.sequence > window_overflow:
RNS.log("Invalid packet sequence ("+str(envelope.sequence)+") received on channel "+str(self), RNS.LOG_EXTREME)
return
else:
RNS.log("Invalid packet sequence ("+str(envelope.sequence)+") received on channel "+str(self), RNS.LOG_EXTREME)
return
2023-05-09 14:13:57 -06:00
2023-02-27 16:36:04 -07:00
is_new = self._emplace_envelope(envelope, self._rx_ring)
2023-02-25 17:23:25 -07:00
if not is_new:
RNS.log("Duplicate message received on channel "+str(self), RNS.LOG_EXTREME)
2023-02-25 17:23:25 -07:00
return
else:
with self._lock:
contigous = []
for e in self._rx_ring:
if e.sequence == self._next_rx_sequence:
contigous.append(e)
2023-05-10 11:28:28 -06:00
self._next_rx_sequence = (self._next_rx_sequence + 1) % Channel.SEQ_MODULUS
if self._next_rx_sequence == 0:
for e in self._rx_ring:
if e.sequence == self._next_rx_sequence:
contigous.append(e)
self._next_rx_sequence = (self._next_rx_sequence + 1) % Channel.SEQ_MODULUS
for e in contigous:
if not e.unpacked:
m = e.unpack(self._message_factories)
else:
m = e.message
self._rx_ring.remove(e)
self._run_callbacks(m)
2023-05-11 09:48:07 -06:00
except Exception as e:
RNS.log("An error ocurred while receiving data on "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
2023-02-25 17:23:25 -07:00
def is_ready_to_send(self) -> bool:
2023-02-27 16:36:04 -07:00
"""
2023-02-27 20:10:28 -07:00
Check if ``Channel`` is ready to send.
2023-02-27 18:05:25 -07:00
:return: True if ready
2023-02-27 16:36:04 -07:00
"""
2023-02-25 17:23:25 -07:00
if not self._outlet.is_usable:
return False
with self._lock:
outstanding = 0
2023-02-25 17:23:25 -07:00
for envelope in self._tx_ring:
if envelope.outlet == self._outlet:
if not envelope.packet or not self._outlet.get_packet_state(envelope.packet) == MessageState.MSGSTATE_DELIVERED:
outstanding += 1
2023-05-10 11:15:20 -06:00
if outstanding >= self.window:
return False
2023-02-25 17:23:25 -07:00
return True
def _packet_tx_op(self, packet: TPacket, op: Callable[[TPacket], bool]):
2023-02-25 17:23:25 -07:00
with self._lock:
envelope = next(filter(lambda e: self._outlet.get_packet_id(e.packet) == self._outlet.get_packet_id(packet),
self._tx_ring), None)
2023-02-25 17:23:25 -07:00
if envelope and op(envelope):
envelope.tracked = False
if envelope in self._tx_ring:
self._tx_ring.remove(envelope)
2023-05-10 12:01:33 -06:00
if self.window < self.window_max:
self.window += 1
2023-05-11 09:48:07 -06:00
# TODO: Remove at some point
# RNS.log("Increased "+str(self)+" window to "+str(self.window), RNS.LOG_DEBUG)
2023-05-10 12:01:33 -06:00
if self._outlet.rtt != 0:
if self._outlet.rtt > Channel.RTT_FAST:
self.fast_rate_rounds = 0
2023-05-11 12:23:36 -06:00
if self._outlet.rtt > Channel.RTT_MEDIUM:
self.medium_rate_rounds = 0
else:
self.medium_rate_rounds += 1
if self.window_max < Channel.WINDOW_MAX_MEDIUM and self.medium_rate_rounds == Channel.FAST_RATE_THRESHOLD:
self.window_max = Channel.WINDOW_MAX_MEDIUM
self.window_min = Channel.WINDOW_MIN_LIMIT_MEDIUM
2023-05-11 12:23:36 -06:00
# TODO: Remove at some point
# RNS.log("Increased "+str(self)+" max window to "+str(self.window_max), RNS.LOG_DEBUG)
# RNS.log("Increased "+str(self)+" min window to "+str(self.window_min), RNS.LOG_DEBUG)
2023-05-11 12:23:36 -06:00
2023-05-10 12:01:33 -06:00
else:
self.fast_rate_rounds += 1
if self.window_max < Channel.WINDOW_MAX_FAST and self.fast_rate_rounds == Channel.FAST_RATE_THRESHOLD:
self.window_max = Channel.WINDOW_MAX_FAST
self.window_min = Channel.WINDOW_MIN_LIMIT_FAST
2023-05-11 09:48:07 -06:00
# TODO: Remove at some point
# RNS.log("Increased "+str(self)+" max window to "+str(self.window_max), RNS.LOG_DEBUG)
# RNS.log("Increased "+str(self)+" min window to "+str(self.window_min), RNS.LOG_DEBUG)
2023-05-11 12:23:36 -06:00
2023-02-25 17:23:25 -07:00
else:
RNS.log("Envelope not found in TX ring for "+str(self), RNS.LOG_EXTREME)
2023-02-25 17:23:25 -07:00
if not envelope:
2023-05-11 09:48:07 -06:00
RNS.log("Spurious message received on "+str(self), RNS.LOG_EXTREME)
2023-02-25 17:23:25 -07:00
def _packet_delivered(self, packet: TPacket):
2023-02-25 17:23:25 -07:00
self._packet_tx_op(packet, lambda env: True)
def _update_packet_timeouts(self):
for envelope in self._tx_ring:
updated_timeout = self._get_packet_timeout_time(envelope.tries)
2023-09-19 02:13:27 -06:00
if envelope.packet and hasattr(envelope.packet, "receipt") and envelope.packet.receipt and envelope.packet.receipt.timeout:
2023-09-18 12:52:36 -06:00
if updated_timeout > envelope.packet.receipt.timeout:
envelope.packet.receipt.set_timeout(updated_timeout)
def _get_packet_timeout_time(self, tries: int) -> float:
to = pow(1.5, tries - 1) * max(self._outlet.rtt*2.5, 0.025) * (len(self._tx_ring)+1.5)
return to
def _packet_timeout(self, packet: TPacket):
2023-02-25 17:23:25 -07:00
def retry_envelope(envelope: Envelope) -> bool:
if envelope.tries >= self._max_tries:
2023-05-11 09:48:07 -06:00
RNS.log("Retry count exceeded on "+str(self)+", tearing down Link.", RNS.LOG_ERROR)
2023-02-27 16:36:04 -07:00
self._shutdown() # start on separate thread?
2023-02-25 17:23:25 -07:00
self._outlet.timed_out()
return True
2023-02-25 17:23:25 -07:00
envelope.tries += 1
self._outlet.resend(envelope.packet)
self._outlet.set_packet_delivered_callback(envelope.packet, self._packet_delivered)
self._outlet.set_packet_timeout_callback(envelope.packet, self._packet_timeout, self._get_packet_timeout_time(envelope.tries))
self._update_packet_timeouts()
2023-05-10 11:15:20 -06:00
if self.window > self.window_min:
self.window -= 1
# TODO: Remove at some point
# RNS.log("Decreased "+str(self)+" window to "+str(self.window), RNS.LOG_DEBUG)
if self.window_max > (self.window_min+self.window_flexibility):
2023-05-10 11:15:20 -06:00
self.window_max -= 1
# TODO: Remove at some point
# RNS.log("Decreased "+str(self)+" max window to "+str(self.window_max), RNS.LOG_DEBUG)
2023-05-10 11:15:20 -06:00
2023-05-11 09:48:07 -06:00
# TODO: Remove at some point
# RNS.log("Decreased "+str(self)+" window to "+str(self.window), RNS.LOG_EXTREME)
2023-05-10 11:15:20 -06:00
2023-02-25 17:23:25 -07:00
return False
if self._outlet.get_packet_state(packet) != MessageState.MSGSTATE_DELIVERED:
self._packet_tx_op(packet, retry_envelope)
2023-02-25 17:23:25 -07:00
def send(self, message: MessageBase) -> Envelope:
2023-02-27 16:36:04 -07:00
"""
Send a message. If a message send is attempted and
2023-02-27 20:10:28 -07:00
``Channel`` is not ready, an exception is thrown.
2023-02-27 18:05:25 -07:00
2023-02-27 20:10:28 -07:00
:param message: an instance of a ``MessageBase`` subclass
2023-02-27 16:36:04 -07:00
"""
2023-02-25 17:23:25 -07:00
envelope: Envelope | None = None
with self._lock:
if not self.is_ready_to_send():
raise ChannelException(CEType.ME_LINK_NOT_READY, f"Link is not ready")
2023-02-25 17:23:25 -07:00
envelope = Envelope(self._outlet, message=message, sequence=self._next_sequence)
2023-05-10 11:28:28 -06:00
self._next_sequence = (self._next_sequence + 1) % Channel.SEQ_MODULUS
2023-02-27 16:36:04 -07:00
self._emplace_envelope(envelope, self._tx_ring)
2023-02-25 17:23:25 -07:00
if envelope is None:
raise BlockingIOError()
envelope.pack()
if len(envelope.raw) > self._outlet.mdu:
raise ChannelException(CEType.ME_TOO_BIG, f"Packed message too big for packet: {len(envelope.raw)} > {self._outlet.mdu}")
2023-02-25 17:23:25 -07:00
envelope.packet = self._outlet.send(envelope.raw)
envelope.tries += 1
self._outlet.set_packet_delivered_callback(envelope.packet, self._packet_delivered)
self._outlet.set_packet_timeout_callback(envelope.packet, self._packet_timeout, self._get_packet_timeout_time(envelope.tries))
self._update_packet_timeouts()
2023-02-25 17:23:25 -07:00
return envelope
@property
def MDU(self):
2023-02-27 16:36:04 -07:00
"""
Maximum Data Unit: the number of bytes available
2023-02-27 20:10:28 -07:00
for a message to consume in a single send. This
value is adjusted from the ``Link`` MDU to accommodate
message header information.
2023-02-27 18:05:25 -07:00
:return: number of bytes available
2023-02-27 16:36:04 -07:00
"""
return self._outlet.mdu - 6 # sizeof(msgtype) + sizeof(length) + sizeof(sequence)
2023-02-25 17:23:25 -07:00
class LinkChannelOutlet(ChannelOutletBase):
2023-02-27 16:36:04 -07:00
"""
An implementation of ChannelOutletBase for RNS.Link.
Allows Channel to send packets over an RNS Link with
Packets.
2023-02-27 18:05:25 -07:00
:param link: RNS Link to wrap
2023-02-27 16:36:04 -07:00
"""
2023-02-25 17:23:25 -07:00
def __init__(self, link: RNS.Link):
self.link = link
def send(self, raw: bytes) -> RNS.Packet:
packet = RNS.Packet(self.link, raw, context=RNS.Packet.CHANNEL)
if self.link.status == RNS.Link.ACTIVE:
packet.send()
2023-02-25 17:23:25 -07:00
return packet
def resend(self, packet: RNS.Packet) -> RNS.Packet:
receipt = packet.resend()
if not receipt:
2023-02-25 17:23:25 -07:00
RNS.log("Failed to resend packet", RNS.LOG_ERROR)
return packet
@property
def mdu(self):
return self.link.MDU
@property
def rtt(self):
return self.link.rtt
@property
def is_usable(self):
return True # had issues looking at Link.status
def get_packet_state(self, packet: TPacket) -> MessageState:
if packet.receipt == None:
return MessageState.MSGSTATE_FAILED
2023-02-25 17:23:25 -07:00
status = packet.receipt.get_status()
if status == RNS.PacketReceipt.SENT:
return MessageState.MSGSTATE_SENT
if status == RNS.PacketReceipt.DELIVERED:
return MessageState.MSGSTATE_DELIVERED
if status == RNS.PacketReceipt.FAILED:
return MessageState.MSGSTATE_FAILED
else:
raise Exception(f"Unexpected receipt state: {status}")
def timed_out(self):
self.link.teardown()
def __str__(self):
return f"{self.__class__.__name__}({self.link})"
def set_packet_timeout_callback(self, packet: RNS.Packet, callback: Callable[[RNS.Packet], None] | None,
timeout: float | None = None):
if timeout and packet.receipt:
2023-02-25 17:23:25 -07:00
packet.receipt.set_timeout(timeout)
def inner(receipt: RNS.PacketReceipt):
callback(packet)
2023-02-28 07:38:23 -07:00
if packet and packet.receipt:
packet.receipt.set_timeout_callback(inner if callback else None)
2023-02-25 17:23:25 -07:00
def set_packet_delivered_callback(self, packet: RNS.Packet, callback: Callable[[RNS.Packet], None] | None):
def inner(receipt: RNS.PacketReceipt):
callback(packet)
2023-02-28 07:38:23 -07:00
if packet and packet.receipt:
packet.receipt.set_delivery_callback(inner if callback else None)
2023-02-25 17:23:25 -07:00
def get_packet_id(self, packet: RNS.Packet) -> any:
if packet and hasattr(packet, "get_hash") and callable(packet.get_hash):
return packet.get_hash()
else:
return None