init project

This commit is contained in:
2025-04-24 12:07:40 +02:00
commit 788d9bd6ea
305 changed files with 61443 additions and 0 deletions

120
embassy-net/src/dns.rs Normal file
View File

@@ -0,0 +1,120 @@
//! DNS client compatible with the `embedded-nal-async` traits.
//!
//! This exists only for compatibility with crates that use `embedded-nal-async`.
//! Prefer using [`Stack::dns_query`](crate::Stack::dns_query) directly if you're
//! not using `embedded-nal-async`.
use heapless::Vec;
pub use smoltcp::socket::dns::{DnsQuery, Socket};
pub(crate) use smoltcp::socket::dns::{GetQueryResultError, StartQueryError};
pub use smoltcp::wire::{DnsQueryType, IpAddress};
use crate::Stack;
/// Errors returned by DnsSocket.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum Error {
/// Invalid name
InvalidName,
/// Name too long
NameTooLong,
/// Name lookup failed
Failed,
}
impl From<GetQueryResultError> for Error {
fn from(_: GetQueryResultError) -> Self {
Self::Failed
}
}
impl From<StartQueryError> for Error {
fn from(e: StartQueryError) -> Self {
match e {
StartQueryError::NoFreeSlot => Self::Failed,
StartQueryError::InvalidName => Self::InvalidName,
StartQueryError::NameTooLong => Self::NameTooLong,
}
}
}
/// DNS client compatible with the `embedded-nal-async` traits.
///
/// This exists only for compatibility with crates that use `embedded-nal-async`.
/// Prefer using [`Stack::dns_query`](crate::Stack::dns_query) directly if you're
/// not using `embedded-nal-async`.
pub struct DnsSocket<'a> {
stack: Stack<'a>,
}
impl<'a> DnsSocket<'a> {
/// Create a new DNS socket using the provided stack.
///
/// NOTE: If using DHCP, make sure it has reconfigured the stack to ensure the DNS servers are updated.
pub fn new(stack: Stack<'a>) -> Self {
Self { stack }
}
/// Make a query for a given name and return the corresponding IP addresses.
pub async fn query(
&self,
name: &str,
qtype: DnsQueryType,
) -> Result<Vec<IpAddress, { smoltcp::config::DNS_MAX_RESULT_COUNT }>, Error> {
self.stack.dns_query(name, qtype).await
}
}
impl<'a> embedded_nal_async::Dns for DnsSocket<'a> {
type Error = Error;
async fn get_host_by_name(
&self,
host: &str,
addr_type: embedded_nal_async::AddrType,
) -> Result<core::net::IpAddr, Self::Error> {
use core::net::IpAddr;
use embedded_nal_async::AddrType;
let (qtype, secondary_qtype) = match addr_type {
AddrType::IPv4 => (DnsQueryType::A, None),
AddrType::IPv6 => (DnsQueryType::Aaaa, None),
AddrType::Either => {
#[cfg(not(feature = "proto-ipv6"))]
let v6_first = false;
#[cfg(feature = "proto-ipv6")]
let v6_first = self.stack.config_v6().is_some();
match v6_first {
true => (DnsQueryType::Aaaa, Some(DnsQueryType::A)),
false => (DnsQueryType::A, Some(DnsQueryType::Aaaa)),
}
}
};
let mut addrs = self.query(host, qtype).await?;
if addrs.is_empty() {
if let Some(qtype) = secondary_qtype {
addrs = self.query(host, qtype).await?
}
}
if let Some(first) = addrs.get(0) {
Ok(match first {
#[cfg(feature = "proto-ipv4")]
IpAddress::Ipv4(addr) => IpAddr::V4(*addr),
#[cfg(feature = "proto-ipv6")]
IpAddress::Ipv6(addr) => IpAddr::V6(*addr),
})
} else {
Err(Error::Failed)
}
}
async fn get_host_by_address(&self, _addr: core::net::IpAddr, _result: &mut [u8]) -> Result<usize, Self::Error> {
todo!()
}
}
fn _assert_covariant<'a, 'b: 'a>(x: DnsSocket<'b>) -> DnsSocket<'a> {
x
}

View File

@@ -0,0 +1,112 @@
use core::task::Context;
use embassy_net_driver::{Capabilities, Checksum, Driver, RxToken, TxToken};
use smoltcp::phy::{self, Medium};
use smoltcp::time::Instant;
pub(crate) struct DriverAdapter<'d, 'c, T>
where
T: Driver,
{
// must be Some when actually using this to rx/tx
pub cx: Option<&'d mut Context<'c>>,
pub inner: &'d mut T,
pub medium: Medium,
}
impl<'d, 'c, T> phy::Device for DriverAdapter<'d, 'c, T>
where
T: Driver,
{
type RxToken<'a>
= RxTokenAdapter<T::RxToken<'a>>
where
Self: 'a;
type TxToken<'a>
= TxTokenAdapter<T::TxToken<'a>>
where
Self: 'a;
fn receive(&mut self, _timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
self.inner
.receive(unwrap!(self.cx.as_deref_mut()))
.map(|(rx, tx)| (RxTokenAdapter(rx), TxTokenAdapter(tx)))
}
/// Construct a transmit token.
fn transmit(&mut self, _timestamp: Instant) -> Option<Self::TxToken<'_>> {
self.inner.transmit(unwrap!(self.cx.as_deref_mut())).map(TxTokenAdapter)
}
/// Get a description of device capabilities.
fn capabilities(&self) -> phy::DeviceCapabilities {
fn convert(c: Checksum) -> phy::Checksum {
match c {
Checksum::Both => phy::Checksum::Both,
Checksum::Tx => phy::Checksum::Tx,
Checksum::Rx => phy::Checksum::Rx,
Checksum::None => phy::Checksum::None,
}
}
let caps: Capabilities = self.inner.capabilities();
let mut smolcaps = phy::DeviceCapabilities::default();
smolcaps.max_transmission_unit = caps.max_transmission_unit;
smolcaps.max_burst_size = caps.max_burst_size;
smolcaps.medium = self.medium;
smolcaps.checksum.ipv4 = convert(caps.checksum.ipv4);
smolcaps.checksum.tcp = convert(caps.checksum.tcp);
smolcaps.checksum.udp = convert(caps.checksum.udp);
#[cfg(feature = "proto-ipv4")]
{
smolcaps.checksum.icmpv4 = convert(caps.checksum.icmpv4);
}
#[cfg(feature = "proto-ipv6")]
{
smolcaps.checksum.icmpv6 = convert(caps.checksum.icmpv6);
}
smolcaps
}
}
pub(crate) struct RxTokenAdapter<T>(T)
where
T: RxToken;
impl<T> phy::RxToken for RxTokenAdapter<T>
where
T: RxToken,
{
fn consume<R, F>(self, f: F) -> R
where
F: FnOnce(&[u8]) -> R,
{
self.0.consume(|buf| {
#[cfg(feature = "packet-trace")]
trace!("embassy device rx: {:02x}", buf);
f(buf)
})
}
}
pub(crate) struct TxTokenAdapter<T>(T)
where
T: TxToken;
impl<T> phy::TxToken for TxTokenAdapter<T>
where
T: TxToken,
{
fn consume<R, F>(self, len: usize, f: F) -> R
where
F: FnOnce(&mut [u8]) -> R,
{
self.0.consume(len, |buf| {
let r = f(buf);
#[cfg(feature = "packet-trace")]
trace!("embassy device tx: {:02x}", buf);
r
})
}
}

270
embassy-net/src/fmt.rs Normal file
View File

@@ -0,0 +1,270 @@
#![macro_use]
#![allow(unused)]
use core::fmt::{Debug, Display, LowerHex};
#[cfg(all(feature = "defmt", feature = "log"))]
compile_error!("You may not enable both `defmt` and `log` features.");
#[collapse_debuginfo(yes)]
macro_rules! assert {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::assert!($($x)*);
#[cfg(feature = "defmt")]
::defmt::assert!($($x)*);
}
};
}
#[collapse_debuginfo(yes)]
macro_rules! assert_eq {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::assert_eq!($($x)*);
#[cfg(feature = "defmt")]
::defmt::assert_eq!($($x)*);
}
};
}
#[collapse_debuginfo(yes)]
macro_rules! assert_ne {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::assert_ne!($($x)*);
#[cfg(feature = "defmt")]
::defmt::assert_ne!($($x)*);
}
};
}
#[collapse_debuginfo(yes)]
macro_rules! debug_assert {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::debug_assert!($($x)*);
#[cfg(feature = "defmt")]
::defmt::debug_assert!($($x)*);
}
};
}
#[collapse_debuginfo(yes)]
macro_rules! debug_assert_eq {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::debug_assert_eq!($($x)*);
#[cfg(feature = "defmt")]
::defmt::debug_assert_eq!($($x)*);
}
};
}
#[collapse_debuginfo(yes)]
macro_rules! debug_assert_ne {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::debug_assert_ne!($($x)*);
#[cfg(feature = "defmt")]
::defmt::debug_assert_ne!($($x)*);
}
};
}
#[collapse_debuginfo(yes)]
macro_rules! todo {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::todo!($($x)*);
#[cfg(feature = "defmt")]
::defmt::todo!($($x)*);
}
};
}
#[collapse_debuginfo(yes)]
macro_rules! unreachable {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::unreachable!($($x)*);
#[cfg(feature = "defmt")]
::defmt::unreachable!($($x)*);
}
};
}
#[collapse_debuginfo(yes)]
macro_rules! panic {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::panic!($($x)*);
#[cfg(feature = "defmt")]
::defmt::panic!($($x)*);
}
};
}
#[collapse_debuginfo(yes)]
macro_rules! trace {
($s:literal $(, $x:expr)* $(,)?) => {
{
#[cfg(feature = "log")]
::log::trace!($s $(, $x)*);
#[cfg(feature = "defmt")]
::defmt::trace!($s $(, $x)*);
#[cfg(not(any(feature = "log", feature="defmt")))]
let _ = ($( & $x ),*);
}
};
}
#[collapse_debuginfo(yes)]
macro_rules! debug {
($s:literal $(, $x:expr)* $(,)?) => {
{
#[cfg(feature = "log")]
::log::debug!($s $(, $x)*);
#[cfg(feature = "defmt")]
::defmt::debug!($s $(, $x)*);
#[cfg(not(any(feature = "log", feature="defmt")))]
let _ = ($( & $x ),*);
}
};
}
#[collapse_debuginfo(yes)]
macro_rules! info {
($s:literal $(, $x:expr)* $(,)?) => {
{
#[cfg(feature = "log")]
::log::info!($s $(, $x)*);
#[cfg(feature = "defmt")]
::defmt::info!($s $(, $x)*);
#[cfg(not(any(feature = "log", feature="defmt")))]
let _ = ($( & $x ),*);
}
};
}
#[collapse_debuginfo(yes)]
macro_rules! warn {
($s:literal $(, $x:expr)* $(,)?) => {
{
#[cfg(feature = "log")]
::log::warn!($s $(, $x)*);
#[cfg(feature = "defmt")]
::defmt::warn!($s $(, $x)*);
#[cfg(not(any(feature = "log", feature="defmt")))]
let _ = ($( & $x ),*);
}
};
}
#[collapse_debuginfo(yes)]
macro_rules! error {
($s:literal $(, $x:expr)* $(,)?) => {
{
#[cfg(feature = "log")]
::log::error!($s $(, $x)*);
#[cfg(feature = "defmt")]
::defmt::error!($s $(, $x)*);
#[cfg(not(any(feature = "log", feature="defmt")))]
let _ = ($( & $x ),*);
}
};
}
#[cfg(feature = "defmt")]
#[collapse_debuginfo(yes)]
macro_rules! unwrap {
($($x:tt)*) => {
::defmt::unwrap!($($x)*)
};
}
#[cfg(not(feature = "defmt"))]
#[collapse_debuginfo(yes)]
macro_rules! unwrap {
($arg:expr) => {
match $crate::fmt::Try::into_result($arg) {
::core::result::Result::Ok(t) => t,
::core::result::Result::Err(e) => {
::core::panic!("unwrap of `{}` failed: {:?}", ::core::stringify!($arg), e);
}
}
};
($arg:expr, $($msg:expr),+ $(,)? ) => {
match $crate::fmt::Try::into_result($arg) {
::core::result::Result::Ok(t) => t,
::core::result::Result::Err(e) => {
::core::panic!("unwrap of `{}` failed: {}: {:?}", ::core::stringify!($arg), ::core::format_args!($($msg,)*), e);
}
}
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct NoneError;
pub trait Try {
type Ok;
type Error;
fn into_result(self) -> Result<Self::Ok, Self::Error>;
}
impl<T> Try for Option<T> {
type Ok = T;
type Error = NoneError;
#[inline]
fn into_result(self) -> Result<T, NoneError> {
self.ok_or(NoneError)
}
}
impl<T, E> Try for Result<T, E> {
type Ok = T;
type Error = E;
#[inline]
fn into_result(self) -> Self {
self
}
}
pub(crate) struct Bytes<'a>(pub &'a [u8]);
impl<'a> Debug for Bytes<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{:#02x?}", self.0)
}
}
impl<'a> Display for Bytes<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{:#02x?}", self.0)
}
}
impl<'a> LowerHex for Bytes<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{:#02x?}", self.0)
}
}
#[cfg(feature = "defmt")]
impl<'a> defmt::Format for Bytes<'a> {
fn format(&self, fmt: defmt::Formatter) {
defmt::write!(fmt, "{:02x}", self.0)
}
}

859
embassy-net/src/icmp.rs Normal file
View File

@@ -0,0 +1,859 @@
//! ICMP sockets.
use core::future::{poll_fn, Future};
use core::mem;
use core::task::{Context, Poll};
use smoltcp::iface::{Interface, SocketHandle};
pub use smoltcp::phy::ChecksumCapabilities;
use smoltcp::socket::icmp;
pub use smoltcp::socket::icmp::{Endpoint as IcmpEndpoint, PacketMetadata};
use smoltcp::wire::IpAddress;
#[cfg(feature = "proto-ipv4")]
pub use smoltcp::wire::{Icmpv4Message, Icmpv4Packet, Icmpv4Repr};
#[cfg(feature = "proto-ipv6")]
pub use smoltcp::wire::{Icmpv6Message, Icmpv6Packet, Icmpv6Repr};
use crate::Stack;
/// Error returned by [`IcmpSocket::bind`].
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum BindError {
/// The socket was already open.
InvalidState,
/// The endpoint isn't specified
InvalidEndpoint,
/// No route to host.
NoRoute,
}
/// Error returned by [`IcmpSocket::send_to`].
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum SendError {
/// No route to host.
NoRoute,
/// Socket not bound to an outgoing port.
SocketNotBound,
/// There is not enough transmit buffer capacity to ever send this packet.
PacketTooLarge,
}
/// Error returned by [`IcmpSocket::recv_from`].
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum RecvError {
/// Provided buffer was smaller than the received packet.
Truncated,
}
/// An ICMP socket.
pub struct IcmpSocket<'a> {
stack: Stack<'a>,
handle: SocketHandle,
}
impl<'a> IcmpSocket<'a> {
/// Create a new ICMP socket using the provided stack and buffers.
pub fn new(
stack: Stack<'a>,
rx_meta: &'a mut [PacketMetadata],
rx_buffer: &'a mut [u8],
tx_meta: &'a mut [PacketMetadata],
tx_buffer: &'a mut [u8],
) -> Self {
let handle = stack.with_mut(|i| {
let rx_meta: &'static mut [PacketMetadata] = unsafe { mem::transmute(rx_meta) };
let rx_buffer: &'static mut [u8] = unsafe { mem::transmute(rx_buffer) };
let tx_meta: &'static mut [PacketMetadata] = unsafe { mem::transmute(tx_meta) };
let tx_buffer: &'static mut [u8] = unsafe { mem::transmute(tx_buffer) };
i.sockets.add(icmp::Socket::new(
icmp::PacketBuffer::new(rx_meta, rx_buffer),
icmp::PacketBuffer::new(tx_meta, tx_buffer),
))
});
Self { stack, handle }
}
/// Bind the socket to the given endpoint.
pub fn bind<T>(&mut self, endpoint: T) -> Result<(), BindError>
where
T: Into<IcmpEndpoint>,
{
let endpoint = endpoint.into();
if !endpoint.is_specified() {
return Err(BindError::InvalidEndpoint);
}
match self.with_mut(|s, _| s.bind(endpoint)) {
Ok(()) => Ok(()),
Err(icmp::BindError::InvalidState) => Err(BindError::InvalidState),
Err(icmp::BindError::Unaddressable) => Err(BindError::NoRoute),
}
}
fn with<R>(&self, f: impl FnOnce(&icmp::Socket, &Interface) -> R) -> R {
self.stack.with(|i| {
let socket = i.sockets.get::<icmp::Socket>(self.handle);
f(socket, &i.iface)
})
}
fn with_mut<R>(&self, f: impl FnOnce(&mut icmp::Socket, &mut Interface) -> R) -> R {
self.stack.with_mut(|i| {
let socket = i.sockets.get_mut::<icmp::Socket>(self.handle);
let res = f(socket, &mut i.iface);
i.waker.wake();
res
})
}
/// Wait until the socket becomes readable.
///
/// A socket is readable when a packet has been received, or when there are queued packets in
/// the buffer.
pub fn wait_recv_ready(&self) -> impl Future<Output = ()> + '_ {
poll_fn(move |cx| self.poll_recv_ready(cx))
}
/// Wait until a datagram can be read.
///
/// When no datagram is readable, this method will return `Poll::Pending` and
/// register the current task to be notified when a datagram is received.
///
/// When a datagram is received, this method will return `Poll::Ready`.
pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
self.with_mut(|s, _| {
if s.can_recv() {
Poll::Ready(())
} else {
// socket buffer is empty wait until at least one byte has arrived
s.register_recv_waker(cx.waker());
Poll::Pending
}
})
}
/// Receive a datagram.
///
/// This method will wait until a datagram is received.
///
/// Returns the number of bytes received and the remote endpoint.
pub fn recv_from<'s>(
&'s self,
buf: &'s mut [u8],
) -> impl Future<Output = Result<(usize, IpAddress), RecvError>> + 's {
poll_fn(|cx| self.poll_recv_from(buf, cx))
}
/// Receive a datagram.
///
/// When no datagram is available, this method will return `Poll::Pending` and
/// register the current task to be notified when a datagram is received.
///
/// When a datagram is received, this method will return `Poll::Ready` with the
/// number of bytes received and the remote endpoint.
pub fn poll_recv_from(&self, buf: &mut [u8], cx: &mut Context<'_>) -> Poll<Result<(usize, IpAddress), RecvError>> {
self.with_mut(|s, _| match s.recv_slice(buf) {
Ok((n, meta)) => Poll::Ready(Ok((n, meta))),
// No data ready
Err(icmp::RecvError::Truncated) => Poll::Ready(Err(RecvError::Truncated)),
Err(icmp::RecvError::Exhausted) => {
s.register_recv_waker(cx.waker());
Poll::Pending
}
})
}
/// Dequeue a packet received from a remote endpoint and calls the provided function with the
/// slice of the packet and the remote endpoint address and returns `Poll::Ready` with the
/// function's returned value.
///
/// **Note**: when the size of the provided buffer is smaller than the size of the payload,
/// the packet is dropped and a `RecvError::Truncated` error is returned.
pub async fn recv_from_with<F, R>(&self, f: F) -> Result<R, RecvError>
where
F: FnOnce((&[u8], IpAddress)) -> R,
{
let mut f = Some(f);
poll_fn(move |cx| {
self.with_mut(|s, _| match s.recv() {
Ok(x) => Poll::Ready(Ok(unwrap!(f.take())(x))),
Err(icmp::RecvError::Exhausted) => {
cx.waker().wake_by_ref();
Poll::Pending
}
Err(icmp::RecvError::Truncated) => Poll::Ready(Err(RecvError::Truncated)),
})
})
.await
}
/// Wait until the socket becomes writable.
///
/// A socket becomes writable when there is space in the buffer, from initial memory or after
/// dispatching datagrams on a full buffer.
pub fn wait_send_ready(&self) -> impl Future<Output = ()> + '_ {
poll_fn(|cx| self.poll_send_ready(cx))
}
/// Wait until a datagram can be sent.
///
/// When no datagram can be sent (i.e. the buffer is full), this method will return
/// `Poll::Pending` and register the current task to be notified when
/// space is freed in the buffer after a datagram has been dispatched.
///
/// When a datagram can be sent, this method will return `Poll::Ready`.
pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
self.with_mut(|s, _| {
if s.can_send() {
Poll::Ready(())
} else {
// socket buffer is full wait until a datagram has been dispatched
s.register_send_waker(cx.waker());
Poll::Pending
}
})
}
/// Send a datagram to the specified remote endpoint.
///
/// This method will wait until the datagram has been sent.
///
/// If the socket's send buffer is too small to fit `buf`, this method will return `Err(SendError::PacketTooLarge)`
///
/// When the remote endpoint is not reachable, this method will return `Err(SendError::NoRoute)`
pub async fn send_to<T>(&self, buf: &[u8], remote_endpoint: T) -> Result<(), SendError>
where
T: Into<IpAddress>,
{
let remote_endpoint: IpAddress = remote_endpoint.into();
poll_fn(move |cx| self.poll_send_to(buf, remote_endpoint, cx)).await
}
/// Send a datagram to the specified remote endpoint.
///
/// When the datagram has been sent, this method will return `Poll::Ready(Ok())`.
///
/// When the socket's send buffer is full, this method will return `Poll::Pending`
/// and register the current task to be notified when the buffer has space available.
///
/// If the socket's send buffer is too small to fit `buf`, this method will return `Poll::Ready(Err(SendError::PacketTooLarge))`
///
/// When the remote endpoint is not reachable, this method will return `Poll::Ready(Err(Error::NoRoute))`.
pub fn poll_send_to<T>(&self, buf: &[u8], remote_endpoint: T, cx: &mut Context<'_>) -> Poll<Result<(), SendError>>
where
T: Into<IpAddress>,
{
// Don't need to wake waker in `with_mut` if the buffer will never fit the icmp tx_buffer.
let send_capacity_too_small = self.with(|s, _| s.payload_send_capacity() < buf.len());
if send_capacity_too_small {
return Poll::Ready(Err(SendError::PacketTooLarge));
}
self.with_mut(|s, _| match s.send_slice(buf, remote_endpoint.into()) {
// Entire datagram has been sent
Ok(()) => Poll::Ready(Ok(())),
Err(icmp::SendError::BufferFull) => {
s.register_send_waker(cx.waker());
Poll::Pending
}
Err(icmp::SendError::Unaddressable) => {
// If no sender/outgoing port is specified, there is not really "no route"
if s.is_open() {
Poll::Ready(Err(SendError::NoRoute))
} else {
Poll::Ready(Err(SendError::SocketNotBound))
}
}
})
}
/// Enqueue a packet to be sent to a given remote address with a zero-copy function.
///
/// This method will wait until the buffer can fit the requested size before
/// calling the function to fill its contents.
pub async fn send_to_with<T, F, R>(&mut self, size: usize, remote_endpoint: T, f: F) -> Result<R, SendError>
where
T: Into<IpAddress>,
F: FnOnce(&mut [u8]) -> R,
{
// Don't need to wake waker in `with_mut` if the buffer will never fit the icmp tx_buffer.
let send_capacity_too_small = self.with(|s, _| s.payload_send_capacity() < size);
if send_capacity_too_small {
return Err(SendError::PacketTooLarge);
}
let mut f = Some(f);
let remote_endpoint = remote_endpoint.into();
poll_fn(move |cx| {
self.with_mut(|s, _| match s.send(size, remote_endpoint) {
Ok(buf) => Poll::Ready(Ok({ unwrap!(f.take())(buf) })),
Err(icmp::SendError::BufferFull) => {
s.register_send_waker(cx.waker());
Poll::Pending
}
Err(icmp::SendError::Unaddressable) => Poll::Ready(Err(SendError::NoRoute)),
})
})
.await
}
/// Flush the socket.
///
/// This method will wait until the socket is flushed.
pub fn flush(&mut self) -> impl Future<Output = ()> + '_ {
poll_fn(|cx| {
self.with_mut(|s, _| {
if s.send_queue() == 0 {
Poll::Ready(())
} else {
s.register_send_waker(cx.waker());
Poll::Pending
}
})
})
}
/// Check whether the socket is open.
pub fn is_open(&self) -> bool {
self.with(|s, _| s.is_open())
}
/// Returns whether the socket is ready to send data, i.e. it has enough buffer space to hold a packet.
pub fn may_send(&self) -> bool {
self.with(|s, _| s.can_send())
}
/// Returns whether the socket is ready to receive data, i.e. it has received a packet that's now in the buffer.
pub fn may_recv(&self) -> bool {
self.with(|s, _| s.can_recv())
}
/// Return the maximum number packets the socket can receive.
pub fn packet_recv_capacity(&self) -> usize {
self.with(|s, _| s.packet_recv_capacity())
}
/// Return the maximum number packets the socket can receive.
pub fn packet_send_capacity(&self) -> usize {
self.with(|s, _| s.packet_send_capacity())
}
/// Return the maximum number of bytes inside the recv buffer.
pub fn payload_recv_capacity(&self) -> usize {
self.with(|s, _| s.payload_recv_capacity())
}
/// Return the maximum number of bytes inside the transmit buffer.
pub fn payload_send_capacity(&self) -> usize {
self.with(|s, _| s.payload_send_capacity())
}
/// Return the time-to-live (IPv4) or hop limit (IPv6) value used in outgoing packets.
pub fn hop_limit(&self) -> Option<u8> {
self.with(|s, _| s.hop_limit())
}
/// Set the hop limit field in the IP header of sent packets.
pub fn set_hop_limit(&mut self, hop_limit: Option<u8>) {
self.with_mut(|s, _| s.set_hop_limit(hop_limit))
}
}
impl Drop for IcmpSocket<'_> {
fn drop(&mut self) {
self.stack.with_mut(|i| i.sockets.remove(self.handle));
}
}
pub mod ping {
//! Ping utilities.
//!
//! This module allows for an easy ICMP Echo message interface used to
//! ping devices with an [ICMP Socket](IcmpSocket).
//!
//! ## Usage
//!
//! ```
//! use core::net::Ipv4Addr;
//! use core::str::FromStr;
//!
//! use embassy_net::icmp::ping::{PingManager, PingParams};
//! use embassy_net::icmp::PacketMetadata;
//!
//! let mut rx_buffer = [0; 256];
//! let mut tx_buffer = [0; 256];
//! let mut rx_meta = [PacketMetadata::EMPTY];
//! let mut tx_meta = [PacketMetadata::EMPTY];
//!
//! let mut ping_manager = PingManager::new(stack, &mut rx_meta, &mut rx_buffer, &mut tx_meta, &mut tx_buffer);
//! let addr = "192.168.8.1";
//! let mut ping_params = PingParams::new(Ipv4Addr::from_str(addr).unwrap());
//! ping_params.set_payload(b"Hello, router!");
//! match ping_manager.ping(&ping_params).await {
//! Ok(time) => info!("Ping time of {}: {}ms", addr, time.as_millis()),
//! Err(ping_error) => warn!("{:?}", ping_error),
//! };
//! ```
use core::net::IpAddr;
#[cfg(feature = "proto-ipv6")]
use core::net::Ipv6Addr;
use embassy_time::{Duration, Instant, Timer, WithTimeout};
#[cfg(feature = "proto-ipv6")]
use smoltcp::wire::IpAddress;
#[cfg(feature = "proto-ipv6")]
use smoltcp::wire::Ipv6Address;
use super::*;
/// Error returned by [`ping()`](PingManager::ping).
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum PingError {
/// The target did not respond.
///
/// The packet was sent but the Reply packet has not been recieved
/// in the timeout set by [`set_timeout()`](PingParams::set_timeout).
DestinationHostUnreachable,
/// The target has not been specified.
InvalidTargetAddress,
/// The source has not been specified (Ipv6 only).
#[cfg(feature = "proto-ipv6")]
InvalidSourceAddress,
/// The socket could not queue the packet in the buffer.
SocketSendTimeout,
/// Container error for [`icmp::BindError`].
SocketBindError(BindError),
/// Container error for [`icmp::SendError`].
SocketSendError(SendError),
/// Container error for [`icmp::RecvError`].
SocketRecvError(RecvError),
}
/// Manages ICMP ping operations.
///
/// This struct provides functionality to send ICMP echo requests (pings) to a specified target
/// and measure the round-trip time for the requests. It supports both IPv4 and IPv6, depending
/// on the enabled features.
///
/// # Fields
///
/// * `stack` - The network stack instance used for managing network operations.
/// * `rx_meta` - Metadata buffer for receiving packets.
/// * `rx_buffer` - Buffer for receiving packets.
/// * `tx_meta` - Metadata buffer for transmitting packets.
/// * `tx_buffer` - Buffer for transmitting packets.
/// * `ident` - Identifier for the ICMP echo requests.
///
/// # Methods
///
/// * [`new`](PingManager::new) - Creates a new instance of `PingManager` with the specified stack and buffers.
/// * [`ping`](PingManager::ping) - Sends ICMP echo requests to the specified target and returns the average round-trip time.
pub struct PingManager<'d> {
stack: Stack<'d>,
rx_meta: &'d mut [PacketMetadata],
rx_buffer: &'d mut [u8],
tx_meta: &'d mut [PacketMetadata],
tx_buffer: &'d mut [u8],
ident: u16,
}
impl<'d> PingManager<'d> {
/// Creates a new instance of [`PingManager`] with a [`Stack`] instance
/// and the buffers used for RX and TX.
///
/// **note**: This does not yet creates the ICMP socket.
pub fn new(
stack: Stack<'d>,
rx_meta: &'d mut [PacketMetadata],
rx_buffer: &'d mut [u8],
tx_meta: &'d mut [PacketMetadata],
tx_buffer: &'d mut [u8],
) -> Self {
Self {
stack,
rx_meta,
rx_buffer,
tx_meta,
tx_buffer,
ident: 0,
}
}
/// Sends ICMP echo requests to the specified target and returns the average round-trip time.
///
/// # Arguments
///
/// * `params` - Parameters for configuring the ping operation.
///
/// # Returns
///
/// * `Ok(Duration)` - The average round-trip time for the ping requests.
/// * `Err(PingError)` - An error occurred during the ping operation.
pub async fn ping<'a>(&mut self, params: &PingParams<'a>) -> Result<Duration, PingError> {
// Input validation
if params.target().is_none() {
return Err(PingError::InvalidTargetAddress);
}
#[cfg(feature = "proto-ipv6")]
if params.target().unwrap().is_ipv6() && params.source().is_none() {
return Err(PingError::InvalidSourceAddress);
}
// Increment the ident (wrapping u16) to respect standards
self.ident = self.ident.wrapping_add(1u16);
// Used to calculate the average duration
let mut total_duration = Duration::default();
let mut num_of_durations = 0u16;
// Increment the sequence number as per standards
for seq_no in 0..params.count() {
// Make sure each ping takes at least 1 second to respect standards
let rate_limit_start = Instant::now();
// make a single ping
// - shorts out errors
// - select the ip version
let ping_duration = match params.target.unwrap() {
#[cfg(feature = "proto-ipv4")]
IpAddress::Ipv4(_) => self.single_ping_v4(params, seq_no).await?,
#[cfg(feature = "proto-ipv6")]
IpAddress::Ipv6(_) => self.single_ping_v6(params, seq_no).await?,
};
// safely add up the durations of each ping
if let Some(dur) = total_duration.checked_add(ping_duration) {
total_duration = dur;
num_of_durations += 1;
}
// 1 sec min per ping
let rate_limit_end = rate_limit_start.elapsed();
if rate_limit_end <= params.rate_limit {
Timer::after(params.rate_limit.checked_sub(rate_limit_end).unwrap()).await;
}
}
// calculate and return the average duration
Ok(total_duration.checked_div(num_of_durations as u32).unwrap())
}
#[cfg(feature = "proto-ipv4")]
fn create_repr_ipv4<'b>(&self, params: &PingParams<'b>, seq_no: u16) -> Icmpv4Repr<'b> {
Icmpv4Repr::EchoRequest {
ident: self.ident,
seq_no,
data: params.payload,
}
}
#[cfg(feature = "proto-ipv6")]
fn create_repr_ipv6<'b>(&self, params: &PingParams<'b>, seq_no: u16) -> Icmpv6Repr<'b> {
Icmpv6Repr::EchoRequest {
ident: self.ident,
seq_no,
data: params.payload,
}
}
#[cfg(feature = "proto-ipv4")]
async fn single_ping_v4(&mut self, params: &PingParams<'_>, seq_no: u16) -> Result<Duration, PingError> {
let ping_repr = self.create_repr_ipv4(params, seq_no);
// Create the socket and set hop limit and bind it to the endpoint with the ident
let mut socket = IcmpSocket::new(self.stack, self.rx_meta, self.rx_buffer, self.tx_meta, self.tx_buffer);
socket.set_hop_limit(params.hop_limit);
if let Err(e) = socket.bind(IcmpEndpoint::Ident(self.ident)) {
return Err(PingError::SocketBindError(e));
}
// Helper func to fill the buffer when sending the ICMP packet
fn fill_packet_buffer(buf: &mut [u8], ping_repr: Icmpv4Repr<'_>) -> Instant {
let mut icmp_packet = Icmpv4Packet::new_unchecked(buf);
ping_repr.emit(&mut icmp_packet, &ChecksumCapabilities::default());
Instant::now()
}
// Send with timeout the ICMP packet filling it with the helper function
let send_result = socket
.send_to_with(ping_repr.buffer_len(), params.target.unwrap(), |buf| {
fill_packet_buffer(buf, ping_repr)
})
.with_timeout(Duration::from_millis(100))
.await;
// Filter and translate potential errors from sending the packet
let now = match send_result {
Ok(send_result) => match send_result {
Ok(i) => i,
Err(e) => return Err(PingError::SocketSendError(e)),
},
Err(_) => return Err(PingError::SocketSendTimeout),
};
// Helper function for the recieve helper function to validate the echo reply
fn filter_pong(buf: &[u8], seq_no: u16) -> bool {
let pong_packet = match Icmpv4Packet::new_checked(buf) {
Ok(pak) => pak,
Err(_) => return false,
};
pong_packet.echo_seq_no() == seq_no
}
// Helper function to recieve and return the correct echo reply when it finds it
async fn recv_pong(socket: &IcmpSocket<'_>, seq_no: u16) -> Result<(), PingError> {
while match socket.recv_from_with(|(buf, _)| filter_pong(buf, seq_no)).await {
Ok(b) => !b,
Err(e) => return Err(PingError::SocketRecvError(e)),
} {}
Ok(())
}
// Calls the recieve helper function with a timeout
match recv_pong(&socket, seq_no).with_timeout(params.timeout).await {
Ok(res) => res?,
Err(_) => return Err(PingError::DestinationHostUnreachable),
}
// Return the round trip duration
Ok(now.elapsed())
}
#[cfg(feature = "proto-ipv6")]
async fn single_ping_v6(&mut self, params: &PingParams<'_>, seq_no: u16) -> Result<Duration, PingError> {
let ping_repr = self.create_repr_ipv6(params, seq_no);
// Create the socket and set hop limit and bind it to the endpoint with the ident
let mut socket = IcmpSocket::new(self.stack, self.rx_meta, self.rx_buffer, self.tx_meta, self.tx_buffer);
socket.set_hop_limit(params.hop_limit);
if let Err(e) = socket.bind(IcmpEndpoint::Ident(self.ident)) {
return Err(PingError::SocketBindError(e));
}
// Helper func to fill the buffer when sending the ICMP packet
fn fill_packet_buffer(buf: &mut [u8], ping_repr: Icmpv6Repr<'_>, params: &PingParams<'_>) -> Instant {
let mut icmp_packet = Icmpv6Packet::new_unchecked(buf);
let target = match params.target().unwrap() {
IpAddr::V4(_) => unreachable!(),
IpAddr::V6(addr) => addr,
};
ping_repr.emit(
&params.source().unwrap(),
&target,
&mut icmp_packet,
&ChecksumCapabilities::default(),
);
Instant::now()
}
// Send with timeout the ICMP packet filling it with the helper function
let send_result = socket
.send_to_with(ping_repr.buffer_len(), params.target.unwrap(), |buf| {
fill_packet_buffer(buf, ping_repr, params)
})
.with_timeout(Duration::from_millis(100))
.await;
let now = match send_result {
Ok(send_result) => match send_result {
Ok(i) => i,
Err(e) => return Err(PingError::SocketSendError(e)),
},
Err(_) => return Err(PingError::SocketSendTimeout),
};
// Helper function for the recieve helper function to validate the echo reply
fn filter_pong(buf: &[u8], seq_no: u16) -> bool {
let pong_packet = match Icmpv6Packet::new_checked(buf) {
Ok(pak) => pak,
Err(_) => return false,
};
pong_packet.echo_seq_no() == seq_no
}
// Helper function to recieve and return the correct echo reply when it finds it
async fn recv_pong(socket: &IcmpSocket<'_>, seq_no: u16) -> Result<(), PingError> {
while match socket.recv_from_with(|(buf, _)| filter_pong(buf, seq_no)).await {
Ok(b) => !b,
Err(e) => return Err(PingError::SocketRecvError(e)),
} {}
Ok(())
}
// Calls the recieve helper function with a timeout
match recv_pong(&socket, seq_no).with_timeout(params.timeout).await {
Ok(res) => res?,
Err(_) => return Err(PingError::DestinationHostUnreachable),
}
// Return the round trip duration
Ok(now.elapsed())
}
}
/// Parameters for configuring the ping operation.
///
/// This struct provides various configuration options for performing ICMP ping operations,
/// including the target IP address, payload data, hop limit, number of pings, and timeout duration.
///
/// # Fields
///
/// * `target` - The target IP address for the ping operation.
/// * `source` - The source IP address for the ping operation (IPv6 only).
/// * `payload` - The data to be sent in the payload field of the ping.
/// * `hop_limit` - The hop limit to be used by the socket.
/// * `count` - The number of pings to be sent in one ping operation.
/// * `timeout` - The timeout duration before returning a [`PingError::DestinationHostUnreachable`] error.
/// * `rate_limit` - The minimum time per echo request.
pub struct PingParams<'a> {
target: Option<IpAddress>,
#[cfg(feature = "proto-ipv6")]
source: Option<Ipv6Address>,
payload: &'a [u8],
hop_limit: Option<u8>,
count: u16,
timeout: Duration,
rate_limit: Duration,
}
impl Default for PingParams<'_> {
fn default() -> Self {
Self {
target: None,
#[cfg(feature = "proto-ipv6")]
source: None,
payload: b"embassy-net",
hop_limit: None,
count: 4,
timeout: Duration::from_secs(4),
rate_limit: Duration::from_secs(1),
}
}
}
impl<'a> PingParams<'a> {
/// Creates a new instance of [`PingParams`] with the specified target IP address.
pub fn new<T: Into<IpAddr>>(target: T) -> Self {
Self {
target: Some(PingParams::ip_addr_to_smoltcp(target)),
#[cfg(feature = "proto-ipv6")]
source: None,
payload: b"embassy-net",
hop_limit: None,
count: 4,
timeout: Duration::from_secs(4),
rate_limit: Duration::from_secs(1),
}
}
fn ip_addr_to_smoltcp<T: Into<IpAddr>>(ip_addr: T) -> IpAddress {
match ip_addr.into() {
#[cfg(feature = "proto-ipv4")]
IpAddr::V4(v4) => IpAddress::Ipv4(v4),
#[cfg(not(feature = "proto-ipv4"))]
IpAddr::V4(_) => unreachable!(),
#[cfg(feature = "proto-ipv6")]
IpAddr::V6(v6) => IpAddress::Ipv6(v6),
#[cfg(not(feature = "proto-ipv6"))]
IpAddr::V6(_) => unreachable!(),
}
}
/// Sets the target IP address for the ping.
pub fn set_target<T: Into<IpAddr>>(&mut self, target: T) -> &mut Self {
self.target = Some(PingParams::ip_addr_to_smoltcp(target));
self
}
/// Retrieves the target IP address for the ping.
pub fn target(&self) -> Option<IpAddr> {
self.target.map(|t| t.into())
}
/// Sets the source IP address for the ping (IPv6 only).
#[cfg(feature = "proto-ipv6")]
pub fn set_source<T: Into<Ipv6Address>>(&mut self, source: T) -> &mut Self {
self.source = Some(source.into());
self
}
/// Retrieves the source IP address for the ping (IPv6 only).
#[cfg(feature = "proto-ipv6")]
pub fn source(&self) -> Option<Ipv6Addr> {
self.source
}
/// Sets the data used in the payload field of the ping with the provided slice.
pub fn set_payload(&mut self, payload: &'a [u8]) -> &mut Self {
self.payload = payload;
self
}
/// Gives a reference to the slice of data that's going to be sent in the payload field
/// of the ping.
pub fn payload(&self) -> &'a [u8] {
self.payload
}
/// Sets the hop limit that will be used by the socket with [`set_hop_limit()`](IcmpSocket::set_hop_limit).
///
/// **Note**: A hop limit of [`Some(0)`](Some()) is equivalent to a hop limit of [`None`].
pub fn set_hop_limit(&mut self, hop_limit: Option<u8>) -> &mut Self {
let mut hop_limit = hop_limit;
if hop_limit.is_some_and(|x| x == 0) {
hop_limit = None
}
self.hop_limit = hop_limit;
self
}
/// Retrieves the hop limit that will be used by the socket with [`set_hop_limit()`](IcmpSocket::set_hop_limit).
pub fn hop_limit(&self) -> Option<u8> {
self.hop_limit
}
/// Sets the count used for specifying the number of pings done on one
/// [`ping()`](PingManager::ping) call.
///
/// **Note**: A count of 0 will be set as 1.
pub fn set_count(&mut self, count: u16) -> &mut Self {
let mut count = count;
if count == 0 {
count = 1;
}
self.count = count;
self
}
/// Retrieve the count used for specifying the number of pings done on one
/// [`ping()`](PingManager::ping) call.
pub fn count(&self) -> u16 {
self.count
}
/// Sets the timeout used before returning [`PingError::DestinationHostUnreachable`]
/// when waiting for the Echo Reply icmp packet.
pub fn set_timeout(&mut self, timeout: Duration) -> &mut Self {
self.timeout = timeout;
self
}
/// Retrieve the timeout used before returning [`PingError::DestinationHostUnreachable`]
/// when waiting for the Echo Reply icmp packet.
pub fn timeout(&self) -> Duration {
self.timeout
}
/// Sets the `rate_limit`: minimum time per echo request.
pub fn set_rate_limit(&mut self, rate_limit: Duration) -> &mut Self {
self.rate_limit = rate_limit;
self
}
/// Retrieve the rate_limit.
pub fn rate_limit(&self) -> Duration {
self.rate_limit
}
}
}

892
embassy-net/src/lib.rs Normal file
View File

@@ -0,0 +1,892 @@
#![no_std]
#![allow(async_fn_in_trait)]
#![warn(missing_docs)]
#![doc = include_str!("../README.md")]
//! ## Feature flags
#![doc = document_features::document_features!(feature_label = r#"<span class="stab portability"><code>{feature}</code></span>"#)]
#[cfg(not(any(feature = "proto-ipv4", feature = "proto-ipv6")))]
compile_error!("You must enable at least one of the following features: proto-ipv4, proto-ipv6");
// This mod MUST go first, so that the others see its macros.
pub(crate) mod fmt;
#[cfg(feature = "dns")]
pub mod dns;
mod driver_util;
#[cfg(feature = "icmp")]
pub mod icmp;
#[cfg(feature = "raw")]
pub mod raw;
#[cfg(feature = "tcp")]
pub mod tcp;
mod time;
#[cfg(feature = "udp")]
pub mod udp;
use core::cell::RefCell;
use core::future::{poll_fn, Future};
use core::mem::MaybeUninit;
use core::pin::pin;
use core::task::{Context, Poll};
pub use embassy_net_driver as driver;
use embassy_net_driver::{Driver, LinkState};
use embassy_sync::waitqueue::WakerRegistration;
use embassy_time::{Instant, Timer};
use heapless::Vec;
#[cfg(feature = "dns")]
pub use smoltcp::config::DNS_MAX_SERVER_COUNT;
#[cfg(feature = "multicast")]
pub use smoltcp::iface::MulticastError;
#[cfg(any(feature = "dns", feature = "dhcpv4"))]
use smoltcp::iface::SocketHandle;
use smoltcp::iface::{Interface, SocketSet, SocketStorage};
use smoltcp::phy::Medium;
#[cfg(feature = "dhcpv4")]
use smoltcp::socket::dhcpv4::{self, RetryConfig};
#[cfg(feature = "medium-ethernet")]
pub use smoltcp::wire::EthernetAddress;
#[cfg(any(feature = "medium-ethernet", feature = "medium-ieee802154", feature = "medium-ip"))]
pub use smoltcp::wire::HardwareAddress;
#[cfg(any(feature = "udp", feature = "tcp"))]
pub use smoltcp::wire::IpListenEndpoint;
#[cfg(feature = "medium-ieee802154")]
pub use smoltcp::wire::{Ieee802154Address, Ieee802154Frame};
pub use smoltcp::wire::{IpAddress, IpCidr, IpEndpoint};
#[cfg(feature = "proto-ipv4")]
pub use smoltcp::wire::{Ipv4Address, Ipv4Cidr};
#[cfg(feature = "proto-ipv6")]
pub use smoltcp::wire::{Ipv6Address, Ipv6Cidr};
use crate::driver_util::DriverAdapter;
use crate::time::{instant_from_smoltcp, instant_to_smoltcp};
const LOCAL_PORT_MIN: u16 = 1025;
const LOCAL_PORT_MAX: u16 = 65535;
#[cfg(feature = "dns")]
const MAX_QUERIES: usize = 4;
#[cfg(feature = "dhcpv4-hostname")]
const MAX_HOSTNAME_LEN: usize = 32;
/// Memory resources needed for a network stack.
pub struct StackResources<const SOCK: usize> {
sockets: MaybeUninit<[SocketStorage<'static>; SOCK]>,
inner: MaybeUninit<RefCell<Inner>>,
#[cfg(feature = "dns")]
queries: MaybeUninit<[Option<dns::DnsQuery>; MAX_QUERIES]>,
#[cfg(feature = "dhcpv4-hostname")]
hostname: HostnameResources,
}
#[cfg(feature = "dhcpv4-hostname")]
struct HostnameResources {
option: MaybeUninit<smoltcp::wire::DhcpOption<'static>>,
data: MaybeUninit<[u8; MAX_HOSTNAME_LEN]>,
}
impl<const SOCK: usize> StackResources<SOCK> {
/// Create a new set of stack resources.
pub const fn new() -> Self {
Self {
sockets: MaybeUninit::uninit(),
inner: MaybeUninit::uninit(),
#[cfg(feature = "dns")]
queries: MaybeUninit::uninit(),
#[cfg(feature = "dhcpv4-hostname")]
hostname: HostnameResources {
option: MaybeUninit::uninit(),
data: MaybeUninit::uninit(),
},
}
}
}
/// Static IP address configuration.
#[cfg(feature = "proto-ipv4")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StaticConfigV4 {
/// IP address and subnet mask.
pub address: Ipv4Cidr,
/// Default gateway.
pub gateway: Option<Ipv4Address>,
/// DNS servers.
pub dns_servers: Vec<Ipv4Address, 3>,
}
/// Static IPv6 address configuration
#[cfg(feature = "proto-ipv6")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StaticConfigV6 {
/// IP address and subnet mask.
pub address: Ipv6Cidr,
/// Default gateway.
pub gateway: Option<Ipv6Address>,
/// DNS servers.
pub dns_servers: Vec<Ipv6Address, 3>,
}
/// DHCP configuration.
#[cfg(feature = "dhcpv4")]
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub struct DhcpConfig {
/// Maximum lease duration.
///
/// If not set, the lease duration specified by the server will be used.
/// If set, the lease duration will be capped at this value.
pub max_lease_duration: Option<embassy_time::Duration>,
/// Retry configuration.
pub retry_config: RetryConfig,
/// Ignore NAKs from DHCP servers.
///
/// This is not compliant with the DHCP RFCs, since theoretically we must stop using the assigned IP when receiving a NAK. This can increase reliability on broken networks with buggy routers or rogue DHCP servers, however.
pub ignore_naks: bool,
/// Server port. This is almost always 67. Do not change unless you know what you're doing.
pub server_port: u16,
/// Client port. This is almost always 68. Do not change unless you know what you're doing.
pub client_port: u16,
/// Our hostname. This will be sent to the DHCP server as Option 12.
#[cfg(feature = "dhcpv4-hostname")]
pub hostname: Option<heapless::String<MAX_HOSTNAME_LEN>>,
}
#[cfg(feature = "dhcpv4")]
impl Default for DhcpConfig {
fn default() -> Self {
Self {
max_lease_duration: Default::default(),
retry_config: Default::default(),
ignore_naks: Default::default(),
server_port: smoltcp::wire::DHCP_SERVER_PORT,
client_port: smoltcp::wire::DHCP_CLIENT_PORT,
#[cfg(feature = "dhcpv4-hostname")]
hostname: None,
}
}
}
/// Network stack configuration.
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct Config {
/// IPv4 configuration
#[cfg(feature = "proto-ipv4")]
pub ipv4: ConfigV4,
/// IPv6 configuration
#[cfg(feature = "proto-ipv6")]
pub ipv6: ConfigV6,
}
impl Config {
/// IPv4 configuration with static addressing.
#[cfg(feature = "proto-ipv4")]
pub const fn ipv4_static(config: StaticConfigV4) -> Self {
Self {
ipv4: ConfigV4::Static(config),
#[cfg(feature = "proto-ipv6")]
ipv6: ConfigV6::None,
}
}
/// IPv6 configuration with static addressing.
#[cfg(feature = "proto-ipv6")]
pub const fn ipv6_static(config: StaticConfigV6) -> Self {
Self {
#[cfg(feature = "proto-ipv4")]
ipv4: ConfigV4::None,
ipv6: ConfigV6::Static(config),
}
}
/// IPv4 configuration with dynamic addressing.
///
/// # Example
/// ```rust
/// # use embassy_net::Config;
/// let _cfg = Config::dhcpv4(Default::default());
/// ```
#[cfg(feature = "dhcpv4")]
pub const fn dhcpv4(config: DhcpConfig) -> Self {
Self {
ipv4: ConfigV4::Dhcp(config),
#[cfg(feature = "proto-ipv6")]
ipv6: ConfigV6::None,
}
}
}
/// Network stack IPv4 configuration.
#[cfg(feature = "proto-ipv4")]
#[derive(Debug, Clone, Default)]
pub enum ConfigV4 {
/// Do not configure IPv4.
#[default]
None,
/// Use a static IPv4 address configuration.
Static(StaticConfigV4),
/// Use DHCP to obtain an IP address configuration.
#[cfg(feature = "dhcpv4")]
Dhcp(DhcpConfig),
}
/// Network stack IPv6 configuration.
#[cfg(feature = "proto-ipv6")]
#[derive(Debug, Clone, Default)]
pub enum ConfigV6 {
/// Do not configure IPv6.
#[default]
None,
/// Use a static IPv6 address configuration.
Static(StaticConfigV6),
}
/// Network stack runner.
///
/// You must call [`Runner::run()`] in a background task for the network stack to work.
pub struct Runner<'d, D: Driver> {
driver: D,
stack: Stack<'d>,
}
/// Network stack handle
///
/// Use this to create sockets. It's `Copy`, so you can pass
/// it by value instead of by reference.
#[derive(Copy, Clone)]
pub struct Stack<'d> {
inner: &'d RefCell<Inner>,
}
pub(crate) struct Inner {
pub(crate) sockets: SocketSet<'static>, // Lifetime type-erased.
pub(crate) iface: Interface,
/// Waker used for triggering polls.
pub(crate) waker: WakerRegistration,
/// Waker used for waiting for link up or config up.
state_waker: WakerRegistration,
hardware_address: HardwareAddress,
next_local_port: u16,
link_up: bool,
#[cfg(feature = "proto-ipv4")]
static_v4: Option<StaticConfigV4>,
#[cfg(feature = "proto-ipv6")]
static_v6: Option<StaticConfigV6>,
#[cfg(feature = "dhcpv4")]
dhcp_socket: Option<SocketHandle>,
#[cfg(feature = "dns")]
dns_socket: SocketHandle,
#[cfg(feature = "dns")]
dns_waker: WakerRegistration,
#[cfg(feature = "dhcpv4-hostname")]
hostname: *mut HostnameResources,
}
fn _assert_covariant<'a, 'b: 'a>(x: Stack<'b>) -> Stack<'a> {
x
}
/// Create a new network stack.
pub fn new<'d, D: Driver, const SOCK: usize>(
mut driver: D,
config: Config,
resources: &'d mut StackResources<SOCK>,
random_seed: u64,
) -> (Stack<'d>, Runner<'d, D>) {
let (hardware_address, medium) = to_smoltcp_hardware_address(driver.hardware_address());
let mut iface_cfg = smoltcp::iface::Config::new(hardware_address);
iface_cfg.random_seed = random_seed;
let iface = Interface::new(
iface_cfg,
&mut DriverAdapter {
inner: &mut driver,
cx: None,
medium,
},
instant_to_smoltcp(Instant::now()),
);
unsafe fn transmute_slice<T>(x: &mut [T]) -> &'static mut [T] {
core::mem::transmute(x)
}
let sockets = resources.sockets.write([SocketStorage::EMPTY; SOCK]);
#[allow(unused_mut)]
let mut sockets: SocketSet<'static> = SocketSet::new(unsafe { transmute_slice(sockets) });
let next_local_port = (random_seed % (LOCAL_PORT_MAX - LOCAL_PORT_MIN) as u64) as u16 + LOCAL_PORT_MIN;
#[cfg(feature = "dns")]
let dns_socket = sockets.add(dns::Socket::new(
&[],
managed::ManagedSlice::Borrowed(unsafe {
transmute_slice(resources.queries.write([const { None }; MAX_QUERIES]))
}),
));
let mut inner = Inner {
sockets,
iface,
waker: WakerRegistration::new(),
state_waker: WakerRegistration::new(),
next_local_port,
hardware_address,
link_up: false,
#[cfg(feature = "proto-ipv4")]
static_v4: None,
#[cfg(feature = "proto-ipv6")]
static_v6: None,
#[cfg(feature = "dhcpv4")]
dhcp_socket: None,
#[cfg(feature = "dns")]
dns_socket,
#[cfg(feature = "dns")]
dns_waker: WakerRegistration::new(),
#[cfg(feature = "dhcpv4-hostname")]
hostname: &mut resources.hostname,
};
#[cfg(feature = "proto-ipv4")]
inner.set_config_v4(config.ipv4);
#[cfg(feature = "proto-ipv6")]
inner.set_config_v6(config.ipv6);
inner.apply_static_config();
let inner = &*resources.inner.write(RefCell::new(inner));
let stack = Stack { inner };
(stack, Runner { driver, stack })
}
fn to_smoltcp_hardware_address(addr: driver::HardwareAddress) -> (HardwareAddress, Medium) {
match addr {
#[cfg(feature = "medium-ethernet")]
driver::HardwareAddress::Ethernet(eth) => (HardwareAddress::Ethernet(EthernetAddress(eth)), Medium::Ethernet),
#[cfg(feature = "medium-ieee802154")]
driver::HardwareAddress::Ieee802154(ieee) => (
HardwareAddress::Ieee802154(Ieee802154Address::Extended(ieee)),
Medium::Ieee802154,
),
#[cfg(feature = "medium-ip")]
driver::HardwareAddress::Ip => (HardwareAddress::Ip, Medium::Ip),
#[allow(unreachable_patterns)]
_ => panic!(
"Unsupported medium {:?}. Make sure to enable the right medium feature in embassy-net's Cargo features.",
addr
),
}
}
impl<'d> Stack<'d> {
fn with<R>(&self, f: impl FnOnce(&Inner) -> R) -> R {
f(&self.inner.borrow())
}
fn with_mut<R>(&self, f: impl FnOnce(&mut Inner) -> R) -> R {
f(&mut self.inner.borrow_mut())
}
/// Get the hardware address of the network interface.
pub fn hardware_address(&self) -> HardwareAddress {
self.with(|i| i.hardware_address)
}
/// Check whether the link is up.
pub fn is_link_up(&self) -> bool {
self.with(|i| i.link_up)
}
/// Check whether the network stack has a valid IP configuration.
/// This is true if the network stack has a static IP configuration or if DHCP has completed
pub fn is_config_up(&self) -> bool {
let v4_up;
let v6_up;
#[cfg(feature = "proto-ipv4")]
{
v4_up = self.config_v4().is_some();
}
#[cfg(not(feature = "proto-ipv4"))]
{
v4_up = false;
}
#[cfg(feature = "proto-ipv6")]
{
v6_up = self.config_v6().is_some();
}
#[cfg(not(feature = "proto-ipv6"))]
{
v6_up = false;
}
v4_up || v6_up
}
/// Wait for the network device to obtain a link signal.
pub async fn wait_link_up(&self) {
self.wait(|| self.is_link_up()).await
}
/// Wait for the network device to lose link signal.
pub async fn wait_link_down(&self) {
self.wait(|| !self.is_link_up()).await
}
/// Wait for the network stack to obtain a valid IP configuration.
///
/// ## Notes:
/// - Ensure [`Runner::run`] has been started before using this function.
///
/// - This function may never return (e.g. if no configuration is obtained through DHCP).
/// The caller is supposed to handle a timeout for this case.
///
/// ## Example
/// ```ignore
/// let config = embassy_net::Config::dhcpv4(Default::default());
/// // Init network stack
/// // NOTE: DHCP and DNS need one socket slot if enabled. This is why we're
/// // provisioning space for 3 sockets here: one for DHCP, one for DNS, and one for your code (e.g. TCP).
/// // If you use more sockets you must increase this. If you don't enable DHCP or DNS you can decrease it.
/// static RESOURCES: StaticCell<embassy_net::StackResources<3>> = StaticCell::new();
/// let (stack, runner) = embassy_net::new(
/// driver,
/// config,
/// RESOURCES.init(embassy_net::StackResources::new()),
/// seed
/// );
/// // Launch network task that runs `runner.run().await`
/// spawner.spawn(net_task(runner)).unwrap();
/// // Wait for DHCP config
/// stack.wait_config_up().await;
/// // use the network stack
/// // ...
/// ```
pub async fn wait_config_up(&self) {
self.wait(|| self.is_config_up()).await
}
/// Wait for the network stack to lose a valid IP configuration.
pub async fn wait_config_down(&self) {
self.wait(|| !self.is_config_up()).await
}
fn wait<'a>(&'a self, mut predicate: impl FnMut() -> bool + 'a) -> impl Future<Output = ()> + 'a {
poll_fn(move |cx| {
if predicate() {
Poll::Ready(())
} else {
// If the config is not up, we register a waker that is woken up
// when a config is applied (static or DHCP).
trace!("Waiting for config up");
self.with_mut(|i| {
i.state_waker.register(cx.waker());
});
Poll::Pending
}
})
}
/// Get the current IPv4 configuration.
///
/// If using DHCP, this will be None if DHCP hasn't been able to
/// acquire an IP address, or Some if it has.
#[cfg(feature = "proto-ipv4")]
pub fn config_v4(&self) -> Option<StaticConfigV4> {
self.with(|i| i.static_v4.clone())
}
/// Get the current IPv6 configuration.
#[cfg(feature = "proto-ipv6")]
pub fn config_v6(&self) -> Option<StaticConfigV6> {
self.with(|i| i.static_v6.clone())
}
/// Set the IPv4 configuration.
#[cfg(feature = "proto-ipv4")]
pub fn set_config_v4(&self, config: ConfigV4) {
self.with_mut(|i| {
i.set_config_v4(config);
i.apply_static_config();
})
}
/// Set the IPv6 configuration.
#[cfg(feature = "proto-ipv6")]
pub fn set_config_v6(&self, config: ConfigV6) {
self.with_mut(|i| {
i.set_config_v6(config);
i.apply_static_config();
})
}
/// Make a query for a given name and return the corresponding IP addresses.
#[cfg(feature = "dns")]
pub async fn dns_query(
&self,
name: &str,
qtype: dns::DnsQueryType,
) -> Result<Vec<IpAddress, { smoltcp::config::DNS_MAX_RESULT_COUNT }>, dns::Error> {
// For A and AAAA queries we try detect whether `name` is just an IP address
match qtype {
#[cfg(feature = "proto-ipv4")]
dns::DnsQueryType::A => {
if let Ok(ip) = name.parse().map(IpAddress::Ipv4) {
return Ok([ip].into_iter().collect());
}
}
#[cfg(feature = "proto-ipv6")]
dns::DnsQueryType::Aaaa => {
if let Ok(ip) = name.parse().map(IpAddress::Ipv6) {
return Ok([ip].into_iter().collect());
}
}
_ => {}
}
let query = poll_fn(|cx| {
self.with_mut(|i| {
let socket = i.sockets.get_mut::<dns::Socket>(i.dns_socket);
match socket.start_query(i.iface.context(), name, qtype) {
Ok(handle) => {
i.waker.wake();
Poll::Ready(Ok(handle))
}
Err(dns::StartQueryError::NoFreeSlot) => {
i.dns_waker.register(cx.waker());
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
})
})
.await?;
#[must_use = "to delay the drop handler invocation to the end of the scope"]
struct OnDrop<F: FnOnce()> {
f: core::mem::MaybeUninit<F>,
}
impl<F: FnOnce()> OnDrop<F> {
fn new(f: F) -> Self {
Self {
f: core::mem::MaybeUninit::new(f),
}
}
fn defuse(self) {
core::mem::forget(self)
}
}
impl<F: FnOnce()> Drop for OnDrop<F> {
fn drop(&mut self) {
unsafe { self.f.as_ptr().read()() }
}
}
let drop = OnDrop::new(|| {
self.with_mut(|i| {
let socket = i.sockets.get_mut::<dns::Socket>(i.dns_socket);
socket.cancel_query(query);
i.waker.wake();
i.dns_waker.wake();
})
});
let res = poll_fn(|cx| {
self.with_mut(|i| {
let socket = i.sockets.get_mut::<dns::Socket>(i.dns_socket);
match socket.get_query_result(query) {
Ok(addrs) => {
i.dns_waker.wake();
Poll::Ready(Ok(addrs))
}
Err(dns::GetQueryResultError::Pending) => {
socket.register_query_waker(query, cx.waker());
Poll::Pending
}
Err(e) => {
i.dns_waker.wake();
Poll::Ready(Err(e.into()))
}
}
})
})
.await;
drop.defuse();
res
}
}
#[cfg(feature = "multicast")]
impl<'d> Stack<'d> {
/// Join a multicast group.
pub fn join_multicast_group(&self, addr: impl Into<IpAddress>) -> Result<(), MulticastError> {
self.with_mut(|i| i.iface.join_multicast_group(addr))
}
/// Leave a multicast group.
pub fn leave_multicast_group(&self, addr: impl Into<IpAddress>) -> Result<(), MulticastError> {
self.with_mut(|i| i.iface.leave_multicast_group(addr))
}
/// Get whether the network stack has joined the given multicast group.
pub fn has_multicast_group(&self, addr: impl Into<IpAddress>) -> bool {
self.with(|i| i.iface.has_multicast_group(addr))
}
}
impl Inner {
#[allow(clippy::absurd_extreme_comparisons)]
pub fn get_local_port(&mut self) -> u16 {
let res = self.next_local_port;
self.next_local_port = if res >= LOCAL_PORT_MAX { LOCAL_PORT_MIN } else { res + 1 };
res
}
#[cfg(feature = "proto-ipv4")]
pub fn set_config_v4(&mut self, config: ConfigV4) {
// Handle static config.
self.static_v4 = match config.clone() {
ConfigV4::None => None,
#[cfg(feature = "dhcpv4")]
ConfigV4::Dhcp(_) => None,
ConfigV4::Static(c) => Some(c),
};
// Handle DHCP config.
#[cfg(feature = "dhcpv4")]
match config {
ConfigV4::Dhcp(c) => {
// Create the socket if it doesn't exist.
if self.dhcp_socket.is_none() {
let socket = smoltcp::socket::dhcpv4::Socket::new();
let handle = self.sockets.add(socket);
self.dhcp_socket = Some(handle);
}
// Configure it
let socket = self.sockets.get_mut::<dhcpv4::Socket>(unwrap!(self.dhcp_socket));
socket.set_ignore_naks(c.ignore_naks);
socket.set_max_lease_duration(c.max_lease_duration.map(crate::time::duration_to_smoltcp));
socket.set_ports(c.server_port, c.client_port);
socket.set_retry_config(c.retry_config);
socket.set_outgoing_options(&[]);
#[cfg(feature = "dhcpv4-hostname")]
if let Some(h) = c.hostname {
// safety:
// - we just did set_outgoing_options([]) so we know the socket is no longer holding a reference.
// - we know this pointer lives for as long as the stack exists, because `new()` borrows
// the resources for `'d`. Therefore it's OK to pass a reference to this to smoltcp.
let hostname = unsafe { &mut *self.hostname };
// create data
let data = hostname.data.write([0; MAX_HOSTNAME_LEN]);
data[..h.len()].copy_from_slice(h.as_bytes());
let data: &[u8] = &data[..h.len()];
// set the option.
let option = hostname.option.write(smoltcp::wire::DhcpOption { data, kind: 12 });
socket.set_outgoing_options(core::slice::from_ref(option));
}
socket.reset();
}
_ => {
// Remove DHCP socket if any.
if let Some(socket) = self.dhcp_socket {
self.sockets.remove(socket);
self.dhcp_socket = None;
}
}
}
}
#[cfg(feature = "proto-ipv6")]
pub fn set_config_v6(&mut self, config: ConfigV6) {
self.static_v6 = match config {
ConfigV6::None => None,
ConfigV6::Static(c) => Some(c),
};
}
fn apply_static_config(&mut self) {
let mut addrs = Vec::new();
#[cfg(feature = "dns")]
let mut dns_servers: Vec<_, 6> = Vec::new();
#[cfg(feature = "proto-ipv4")]
let mut gateway_v4 = None;
#[cfg(feature = "proto-ipv6")]
let mut gateway_v6 = None;
#[cfg(feature = "proto-ipv4")]
if let Some(config) = &self.static_v4 {
debug!("IPv4: UP");
debug!(" IP address: {:?}", config.address);
debug!(" Default gateway: {:?}", config.gateway);
unwrap!(addrs.push(IpCidr::Ipv4(config.address)).ok());
gateway_v4 = config.gateway;
#[cfg(feature = "dns")]
for s in &config.dns_servers {
debug!(" DNS server: {:?}", s);
unwrap!(dns_servers.push(s.clone().into()).ok());
}
} else {
info!("IPv4: DOWN");
}
#[cfg(feature = "proto-ipv6")]
if let Some(config) = &self.static_v6 {
debug!("IPv6: UP");
debug!(" IP address: {:?}", config.address);
debug!(" Default gateway: {:?}", config.gateway);
unwrap!(addrs.push(IpCidr::Ipv6(config.address)).ok());
gateway_v6 = config.gateway.into();
#[cfg(feature = "dns")]
for s in &config.dns_servers {
debug!(" DNS server: {:?}", s);
unwrap!(dns_servers.push(s.clone().into()).ok());
}
} else {
info!("IPv6: DOWN");
}
// Apply addresses
self.iface.update_ip_addrs(|a| *a = addrs);
// Apply gateways
#[cfg(feature = "proto-ipv4")]
if let Some(gateway) = gateway_v4 {
unwrap!(self.iface.routes_mut().add_default_ipv4_route(gateway));
} else {
self.iface.routes_mut().remove_default_ipv4_route();
}
#[cfg(feature = "proto-ipv6")]
if let Some(gateway) = gateway_v6 {
unwrap!(self.iface.routes_mut().add_default_ipv6_route(gateway));
} else {
self.iface.routes_mut().remove_default_ipv6_route();
}
// Apply DNS servers
#[cfg(feature = "dns")]
if !dns_servers.is_empty() {
let count = if dns_servers.len() > DNS_MAX_SERVER_COUNT {
warn!("Number of DNS servers exceeds DNS_MAX_SERVER_COUNT, truncating list.");
DNS_MAX_SERVER_COUNT
} else {
dns_servers.len()
};
self.sockets
.get_mut::<smoltcp::socket::dns::Socket>(self.dns_socket)
.update_servers(&dns_servers[..count]);
}
self.state_waker.wake();
}
fn poll<D: Driver>(&mut self, cx: &mut Context<'_>, driver: &mut D) {
self.waker.register(cx.waker());
let (_hardware_addr, medium) = to_smoltcp_hardware_address(driver.hardware_address());
#[cfg(any(feature = "medium-ethernet", feature = "medium-ieee802154"))]
{
let do_set = match medium {
#[cfg(feature = "medium-ethernet")]
Medium::Ethernet => true,
#[cfg(feature = "medium-ieee802154")]
Medium::Ieee802154 => true,
#[allow(unreachable_patterns)]
_ => false,
};
if do_set {
self.iface.set_hardware_addr(_hardware_addr);
}
}
let timestamp = instant_to_smoltcp(Instant::now());
let mut smoldev = DriverAdapter {
cx: Some(cx),
inner: driver,
medium,
};
self.iface.poll(timestamp, &mut smoldev, &mut self.sockets);
// Update link up
let old_link_up = self.link_up;
self.link_up = driver.link_state(cx) == LinkState::Up;
// Print when changed
if old_link_up != self.link_up {
info!("link_up = {:?}", self.link_up);
self.state_waker.wake();
}
#[cfg(feature = "dhcpv4")]
if let Some(dhcp_handle) = self.dhcp_socket {
let socket = self.sockets.get_mut::<dhcpv4::Socket>(dhcp_handle);
let configure = if self.link_up {
if old_link_up != self.link_up {
socket.reset();
}
match socket.poll() {
None => false,
Some(dhcpv4::Event::Deconfigured) => {
self.static_v4 = None;
true
}
Some(dhcpv4::Event::Configured(config)) => {
self.static_v4 = Some(StaticConfigV4 {
address: config.address,
gateway: config.router,
dns_servers: config.dns_servers,
});
true
}
}
} else if old_link_up {
socket.reset();
self.static_v4 = None;
true
} else {
false
};
if configure {
self.apply_static_config()
}
}
if let Some(poll_at) = self.iface.poll_at(timestamp, &mut self.sockets) {
let t = pin!(Timer::at(instant_from_smoltcp(poll_at)));
if t.poll(cx).is_ready() {
cx.waker().wake_by_ref();
}
}
}
}
impl<'d, D: Driver> Runner<'d, D> {
/// Run the network stack.
///
/// You must call this in a background task, to process network events.
pub async fn run(&mut self) -> ! {
poll_fn(|cx| {
self.stack.with_mut(|i| i.poll(cx, &mut self.driver));
Poll::<()>::Pending
})
.await;
unreachable!()
}
}

190
embassy-net/src/raw.rs Normal file
View File

@@ -0,0 +1,190 @@
//! Raw sockets.
use core::future::{poll_fn, Future};
use core::mem;
use core::task::{Context, Poll};
use embassy_net_driver::Driver;
use smoltcp::iface::{Interface, SocketHandle};
use smoltcp::socket::raw;
pub use smoltcp::socket::raw::PacketMetadata;
pub use smoltcp::wire::{IpProtocol, IpVersion};
use crate::Stack;
/// Error returned by [`RawSocket::recv`] and [`RawSocket::send`].
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum RecvError {
/// Provided buffer was smaller than the received packet.
Truncated,
}
/// An Raw socket.
pub struct RawSocket<'a> {
stack: Stack<'a>,
handle: SocketHandle,
}
impl<'a> RawSocket<'a> {
/// Create a new Raw socket using the provided stack and buffers.
pub fn new<D: Driver>(
stack: Stack<'a>,
ip_version: IpVersion,
ip_protocol: IpProtocol,
rx_meta: &'a mut [PacketMetadata],
rx_buffer: &'a mut [u8],
tx_meta: &'a mut [PacketMetadata],
tx_buffer: &'a mut [u8],
) -> Self {
let handle = stack.with_mut(|i| {
let rx_meta: &'static mut [PacketMetadata] = unsafe { mem::transmute(rx_meta) };
let rx_buffer: &'static mut [u8] = unsafe { mem::transmute(rx_buffer) };
let tx_meta: &'static mut [PacketMetadata] = unsafe { mem::transmute(tx_meta) };
let tx_buffer: &'static mut [u8] = unsafe { mem::transmute(tx_buffer) };
i.sockets.add(raw::Socket::new(
ip_version,
ip_protocol,
raw::PacketBuffer::new(rx_meta, rx_buffer),
raw::PacketBuffer::new(tx_meta, tx_buffer),
))
});
Self { stack, handle }
}
fn with_mut<R>(&self, f: impl FnOnce(&mut raw::Socket, &mut Interface) -> R) -> R {
self.stack.with_mut(|i| {
let socket = i.sockets.get_mut::<raw::Socket>(self.handle);
let res = f(socket, &mut i.iface);
i.waker.wake();
res
})
}
/// Wait until the socket becomes readable.
///
/// A socket is readable when a packet has been received, or when there are queued packets in
/// the buffer.
pub fn wait_recv_ready(&self) -> impl Future<Output = ()> + '_ {
poll_fn(move |cx| self.poll_recv_ready(cx))
}
/// Receive a datagram.
///
/// This method will wait until a datagram is received.
pub async fn recv(&self, buf: &mut [u8]) -> Result<usize, RecvError> {
poll_fn(move |cx| self.poll_recv(buf, cx)).await
}
/// Wait until a datagram can be read.
///
/// When no datagram is readable, this method will return `Poll::Pending` and
/// register the current task to be notified when a datagram is received.
///
/// When a datagram is received, this method will return `Poll::Ready`.
pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
self.with_mut(|s, _| {
if s.can_recv() {
Poll::Ready(())
} else {
// socket buffer is empty wait until at least one byte has arrived
s.register_recv_waker(cx.waker());
Poll::Pending
}
})
}
/// Receive a datagram.
///
/// When no datagram is available, this method will return `Poll::Pending` and
/// register the current task to be notified when a datagram is received.
pub fn poll_recv(&self, buf: &mut [u8], cx: &mut Context<'_>) -> Poll<Result<usize, RecvError>> {
self.with_mut(|s, _| match s.recv_slice(buf) {
Ok(n) => Poll::Ready(Ok(n)),
// No data ready
Err(raw::RecvError::Truncated) => Poll::Ready(Err(RecvError::Truncated)),
Err(raw::RecvError::Exhausted) => {
s.register_recv_waker(cx.waker());
Poll::Pending
}
})
}
/// Wait until the socket becomes writable.
///
/// A socket becomes writable when there is space in the buffer, from initial memory or after
/// dispatching datagrams on a full buffer.
pub fn wait_send_ready(&self) -> impl Future<Output = ()> + '_ {
poll_fn(move |cx| self.poll_send_ready(cx))
}
/// Wait until a datagram can be sent.
///
/// When no datagram can be sent (i.e. the buffer is full), this method will return
/// `Poll::Pending` and register the current task to be notified when
/// space is freed in the buffer after a datagram has been dispatched.
///
/// When a datagram can be sent, this method will return `Poll::Ready`.
pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
self.with_mut(|s, _| {
if s.can_send() {
Poll::Ready(())
} else {
// socket buffer is full wait until a datagram has been dispatched
s.register_send_waker(cx.waker());
Poll::Pending
}
})
}
/// Send a datagram.
///
/// This method will wait until the datagram has been sent.`
pub fn send<'s>(&'s self, buf: &'s [u8]) -> impl Future<Output = ()> + 's {
poll_fn(|cx| self.poll_send(buf, cx))
}
/// Send a datagram.
///
/// When the datagram has been sent, this method will return `Poll::Ready(Ok())`.
///
/// When the socket's send buffer is full, this method will return `Poll::Pending`
/// and register the current task to be notified when the buffer has space available.
pub fn poll_send(&self, buf: &[u8], cx: &mut Context<'_>) -> Poll<()> {
self.with_mut(|s, _| match s.send_slice(buf) {
// Entire datagram has been sent
Ok(()) => Poll::Ready(()),
Err(raw::SendError::BufferFull) => {
s.register_send_waker(cx.waker());
Poll::Pending
}
})
}
/// Flush the socket.
///
/// This method will wait until the socket is flushed.
pub fn flush(&mut self) -> impl Future<Output = ()> + '_ {
poll_fn(|cx| {
self.with_mut(|s, _| {
if s.send_queue() == 0 {
Poll::Ready(())
} else {
s.register_send_waker(cx.waker());
Poll::Pending
}
})
})
}
}
impl Drop for RawSocket<'_> {
fn drop(&mut self) {
self.stack.with_mut(|i| i.sockets.remove(self.handle));
}
}
fn _assert_covariant<'a, 'b: 'a>(x: RawSocket<'b>) -> RawSocket<'a> {
x
}

920
embassy-net/src/tcp.rs Normal file
View File

@@ -0,0 +1,920 @@
//! TCP sockets.
//!
//! # Listening
//!
//! `embassy-net` does not have a `TcpListener`. Instead, individual `TcpSocket`s can be put into
//! listening mode by calling [`TcpSocket::accept`].
//!
//! Incoming connections when no socket is listening are rejected. To accept many incoming
//! connections, create many sockets and put them all into listening mode.
use core::future::{poll_fn, Future};
use core::mem;
use core::task::{Context, Poll};
use embassy_time::Duration;
use smoltcp::iface::{Interface, SocketHandle};
use smoltcp::socket::tcp;
pub use smoltcp::socket::tcp::State;
use smoltcp::wire::{IpEndpoint, IpListenEndpoint};
use crate::time::duration_to_smoltcp;
use crate::Stack;
/// Error returned by TcpSocket read/write functions.
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum Error {
/// The connection was reset.
///
/// This can happen on receiving a RST packet, or on timeout.
ConnectionReset,
}
/// Error returned by [`TcpSocket::connect`].
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum ConnectError {
/// The socket is already connected or listening.
InvalidState,
/// The remote host rejected the connection with a RST packet.
ConnectionReset,
/// Connect timed out.
TimedOut,
/// No route to host.
NoRoute,
}
/// Error returned by [`TcpSocket::accept`].
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum AcceptError {
/// The socket is already connected or listening.
InvalidState,
/// Invalid listen port
InvalidPort,
/// The remote host rejected the connection with a RST packet.
ConnectionReset,
}
/// A TCP socket.
pub struct TcpSocket<'a> {
io: TcpIo<'a>,
}
/// The reader half of a TCP socket.
pub struct TcpReader<'a> {
io: TcpIo<'a>,
}
/// The writer half of a TCP socket.
pub struct TcpWriter<'a> {
io: TcpIo<'a>,
}
impl<'a> TcpReader<'a> {
/// Wait until the socket becomes readable.
///
/// A socket becomes readable when the receive half of the full-duplex connection is open
/// (see [`may_recv()`](TcpSocket::may_recv)), and there is some pending data in the receive buffer.
///
/// This is the equivalent of [read](#method.read), without buffering any data.
pub fn wait_read_ready(&self) -> impl Future<Output = ()> + '_ {
poll_fn(move |cx| self.io.poll_read_ready(cx))
}
/// Read data from the socket.
///
/// Returns how many bytes were read, or an error. If no data is available, it waits
/// until there is at least one byte available.
///
/// # Note
/// A return value of Ok(0) means that we have read all data and the remote
/// side has closed our receive half of the socket. The remote can no longer
/// send bytes.
///
/// The send half of the socket is still open. If you want to reconnect using
/// the socket you split this reader off the send half needs to be closed using
/// [`abort()`](TcpSocket::abort).
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
self.io.read(buf).await
}
/// Call `f` with the largest contiguous slice of octets in the receive buffer,
/// and dequeue the amount of elements returned by `f`.
///
/// If no data is available, it waits until there is at least one byte available.
pub async fn read_with<F, R>(&mut self, f: F) -> Result<R, Error>
where
F: FnOnce(&mut [u8]) -> (usize, R),
{
self.io.read_with(f).await
}
/// Return the maximum number of bytes inside the transmit buffer.
pub fn recv_capacity(&self) -> usize {
self.io.recv_capacity()
}
/// Return the amount of octets queued in the receive buffer. This value can be larger than
/// the slice read by the next `recv` or `peek` call because it includes all queued octets,
/// and not only the octets that may be returned as a contiguous slice.
pub fn recv_queue(&self) -> usize {
self.io.recv_queue()
}
}
impl<'a> TcpWriter<'a> {
/// Wait until the socket becomes writable.
///
/// A socket becomes writable when the transmit half of the full-duplex connection is open
/// (see [`may_send()`](TcpSocket::may_send)), and the transmit buffer is not full.
///
/// This is the equivalent of [write](#method.write), without sending any data.
pub fn wait_write_ready(&self) -> impl Future<Output = ()> + '_ {
poll_fn(move |cx| self.io.poll_write_ready(cx))
}
/// Write data to the socket.
///
/// Returns how many bytes were written, or an error. If the socket is not ready to
/// accept data, it waits until it is.
pub fn write<'s>(&'s mut self, buf: &'s [u8]) -> impl Future<Output = Result<usize, Error>> + 's {
self.io.write(buf)
}
/// Flushes the written data to the socket.
///
/// This waits until all data has been sent, and ACKed by the remote host. For a connection
/// closed with [`abort()`](TcpSocket::abort) it will wait for the TCP RST packet to be sent.
pub fn flush(&mut self) -> impl Future<Output = Result<(), Error>> + '_ {
self.io.flush()
}
/// Call `f` with the largest contiguous slice of octets in the transmit buffer,
/// and enqueue the amount of elements returned by `f`.
///
/// If the socket is not ready to accept data, it waits until it is.
pub async fn write_with<F, R>(&mut self, f: F) -> Result<R, Error>
where
F: FnOnce(&mut [u8]) -> (usize, R),
{
self.io.write_with(f).await
}
/// Return the maximum number of bytes inside the transmit buffer.
pub fn send_capacity(&self) -> usize {
self.io.send_capacity()
}
/// Return the amount of octets queued in the transmit buffer.
pub fn send_queue(&self) -> usize {
self.io.send_queue()
}
}
impl<'a> TcpSocket<'a> {
/// Create a new TCP socket on the given stack, with the given buffers.
pub fn new(stack: Stack<'a>, rx_buffer: &'a mut [u8], tx_buffer: &'a mut [u8]) -> Self {
let handle = stack.with_mut(|i| {
let rx_buffer: &'static mut [u8] = unsafe { mem::transmute(rx_buffer) };
let tx_buffer: &'static mut [u8] = unsafe { mem::transmute(tx_buffer) };
i.sockets.add(tcp::Socket::new(
tcp::SocketBuffer::new(rx_buffer),
tcp::SocketBuffer::new(tx_buffer),
))
});
Self {
io: TcpIo { stack, handle },
}
}
/// Return the maximum number of bytes inside the recv buffer.
pub fn recv_capacity(&self) -> usize {
self.io.recv_capacity()
}
/// Return the maximum number of bytes inside the transmit buffer.
pub fn send_capacity(&self) -> usize {
self.io.send_capacity()
}
/// Return the amount of octets queued in the transmit buffer.
pub fn send_queue(&self) -> usize {
self.io.send_queue()
}
/// Return the amount of octets queued in the receive buffer. This value can be larger than
/// the slice read by the next `recv` or `peek` call because it includes all queued octets,
/// and not only the octets that may be returned as a contiguous slice.
pub fn recv_queue(&self) -> usize {
self.io.recv_queue()
}
/// Call `f` with the largest contiguous slice of octets in the transmit buffer,
/// and enqueue the amount of elements returned by `f`.
///
/// If the socket is not ready to accept data, it waits until it is.
pub async fn write_with<F, R>(&mut self, f: F) -> Result<R, Error>
where
F: FnOnce(&mut [u8]) -> (usize, R),
{
self.io.write_with(f).await
}
/// Call `f` with the largest contiguous slice of octets in the receive buffer,
/// and dequeue the amount of elements returned by `f`.
///
/// If no data is available, it waits until there is at least one byte available.
pub async fn read_with<F, R>(&mut self, f: F) -> Result<R, Error>
where
F: FnOnce(&mut [u8]) -> (usize, R),
{
self.io.read_with(f).await
}
/// Split the socket into reader and a writer halves.
pub fn split(&mut self) -> (TcpReader<'_>, TcpWriter<'_>) {
(TcpReader { io: self.io }, TcpWriter { io: self.io })
}
/// Connect to a remote host.
pub async fn connect<T>(&mut self, remote_endpoint: T) -> Result<(), ConnectError>
where
T: Into<IpEndpoint>,
{
let local_port = self.io.stack.with_mut(|i| i.get_local_port());
match {
self.io
.with_mut(|s, i| s.connect(i.context(), remote_endpoint, local_port))
} {
Ok(()) => {}
Err(tcp::ConnectError::InvalidState) => return Err(ConnectError::InvalidState),
Err(tcp::ConnectError::Unaddressable) => return Err(ConnectError::NoRoute),
}
poll_fn(|cx| {
self.io.with_mut(|s, _| match s.state() {
tcp::State::Closed | tcp::State::TimeWait => Poll::Ready(Err(ConnectError::ConnectionReset)),
tcp::State::Listen => unreachable!(),
tcp::State::SynSent | tcp::State::SynReceived => {
s.register_send_waker(cx.waker());
Poll::Pending
}
_ => Poll::Ready(Ok(())),
})
})
.await
}
/// Accept a connection from a remote host.
///
/// This function puts the socket in listening mode, and waits until a connection is received.
pub async fn accept<T>(&mut self, local_endpoint: T) -> Result<(), AcceptError>
where
T: Into<IpListenEndpoint>,
{
match self.io.with_mut(|s, _| s.listen(local_endpoint)) {
Ok(()) => {}
Err(tcp::ListenError::InvalidState) => return Err(AcceptError::InvalidState),
Err(tcp::ListenError::Unaddressable) => return Err(AcceptError::InvalidPort),
}
poll_fn(|cx| {
self.io.with_mut(|s, _| match s.state() {
tcp::State::Listen | tcp::State::SynSent | tcp::State::SynReceived => {
s.register_send_waker(cx.waker());
Poll::Pending
}
_ => Poll::Ready(Ok(())),
})
})
.await
}
/// Wait until the socket becomes readable.
///
/// A socket becomes readable when the receive half of the full-duplex connection is open
/// (see [may_recv](#method.may_recv)), and there is some pending data in the receive buffer.
///
/// This is the equivalent of [read](#method.read), without buffering any data.
pub fn wait_read_ready(&self) -> impl Future<Output = ()> + '_ {
poll_fn(move |cx| self.io.poll_read_ready(cx))
}
/// Read data from the socket.
///
/// Returns how many bytes were read, or an error. If no data is available, it waits
/// until there is at least one byte available.
///
/// A return value of Ok(0) means that the socket was closed and is longer
/// able to receive any data.
pub fn read<'s>(&'s mut self, buf: &'s mut [u8]) -> impl Future<Output = Result<usize, Error>> + 's {
self.io.read(buf)
}
/// Wait until the socket becomes writable.
///
/// A socket becomes writable when the transmit half of the full-duplex connection is open
/// (see [may_send](#method.may_send)), and the transmit buffer is not full.
///
/// This is the equivalent of [write](#method.write), without sending any data.
pub fn wait_write_ready(&self) -> impl Future<Output = ()> + '_ {
poll_fn(move |cx| self.io.poll_write_ready(cx))
}
/// Write data to the socket.
///
/// Returns how many bytes were written, or an error. If the socket is not ready to
/// accept data, it waits until it is.
pub fn write<'s>(&'s mut self, buf: &'s [u8]) -> impl Future<Output = Result<usize, Error>> + 's {
self.io.write(buf)
}
/// Flushes the written data to the socket.
///
/// This waits until all data has been sent, and ACKed by the remote host. For a connection
/// closed with [`abort()`](TcpSocket::abort) it will wait for the TCP RST packet to be sent.
pub fn flush(&mut self) -> impl Future<Output = Result<(), Error>> + '_ {
self.io.flush()
}
/// Set the timeout for the socket.
///
/// If the timeout is set, the socket will be closed if no data is received for the
/// specified duration.
///
/// # Note:
/// Set a keep alive interval ([`set_keep_alive`] to prevent timeouts when
/// the remote could still respond.
pub fn set_timeout(&mut self, duration: Option<Duration>) {
self.io
.with_mut(|s, _| s.set_timeout(duration.map(duration_to_smoltcp)))
}
/// Set the keep-alive interval for the socket.
///
/// If the keep-alive interval is set, the socket will send keep-alive packets after
/// the specified duration of inactivity.
///
/// If not set, the socket will not send keep-alive packets.
///
/// By setting a [`timeout`](Self::timeout) larger then the keep alive you
/// can detect a remote endpoint that no longer answers.
pub fn set_keep_alive(&mut self, interval: Option<Duration>) {
self.io
.with_mut(|s, _| s.set_keep_alive(interval.map(duration_to_smoltcp)))
}
/// Set the hop limit field in the IP header of sent packets.
pub fn set_hop_limit(&mut self, hop_limit: Option<u8>) {
self.io.with_mut(|s, _| s.set_hop_limit(hop_limit))
}
/// Get the local endpoint of the socket.
///
/// Returns `None` if the socket is not bound (listening) or not connected.
pub fn local_endpoint(&self) -> Option<IpEndpoint> {
self.io.with(|s, _| s.local_endpoint())
}
/// Get the remote endpoint of the socket.
///
/// Returns `None` if the socket is not connected.
pub fn remote_endpoint(&self) -> Option<IpEndpoint> {
self.io.with(|s, _| s.remote_endpoint())
}
/// Get the state of the socket.
pub fn state(&self) -> State {
self.io.with(|s, _| s.state())
}
/// Close the write half of the socket.
///
/// This closes only the write half of the socket. The read half side remains open, the
/// socket can still receive data.
///
/// Data that has been written to the socket and not yet sent (or not yet ACKed) will still
/// still sent. The last segment of the pending to send data is sent with the FIN flag set.
pub fn close(&mut self) {
self.io.with_mut(|s, _| s.close())
}
/// Forcibly close the socket.
///
/// This instantly closes both the read and write halves of the socket. Any pending data
/// that has not been sent will be lost.
///
/// Note that the TCP RST packet is not sent immediately - if the `TcpSocket` is dropped too soon
/// the remote host may not know the connection has been closed.
/// `abort()` callers should wait for a [`flush()`](TcpSocket::flush) call to complete before
/// dropping or reusing the socket.
pub fn abort(&mut self) {
self.io.with_mut(|s, _| s.abort())
}
/// Return whether the transmit half of the full-duplex connection is open.
///
/// This function returns true if it's possible to send data and have it arrive
/// to the remote endpoint. However, it does not make any guarantees about the state
/// of the transmit buffer, and even if it returns true, [write](#method.write) may
/// not be able to enqueue any octets.
///
/// In terms of the TCP state machine, the socket must be in the `ESTABLISHED` or
/// `CLOSE-WAIT` state.
pub fn may_send(&self) -> bool {
self.io.with(|s, _| s.may_send())
}
/// Check whether the transmit half of the full-duplex connection is open
/// (see [may_send](#method.may_send)), and the transmit buffer is not full.
pub fn can_send(&self) -> bool {
self.io.with(|s, _| s.can_send())
}
/// return whether the receive half of the full-duplex connection is open.
/// This function returns true if its possible to receive data from the remote endpoint.
/// It will return true while there is data in the receive buffer, and if there isnt,
/// as long as the remote endpoint has not closed the connection.
pub fn may_recv(&self) -> bool {
self.io.with(|s, _| s.may_recv())
}
/// Get whether the socket is ready to receive data, i.e. whether there is some pending data in the receive buffer.
pub fn can_recv(&self) -> bool {
self.io.with(|s, _| s.can_recv())
}
}
impl<'a> Drop for TcpSocket<'a> {
fn drop(&mut self) {
self.io.stack.with_mut(|i| i.sockets.remove(self.io.handle));
}
}
fn _assert_covariant<'a, 'b: 'a>(x: TcpSocket<'b>) -> TcpSocket<'a> {
x
}
fn _assert_covariant_reader<'a, 'b: 'a>(x: TcpReader<'b>) -> TcpReader<'a> {
x
}
fn _assert_covariant_writer<'a, 'b: 'a>(x: TcpWriter<'b>) -> TcpWriter<'a> {
x
}
// =======================
#[derive(Copy, Clone)]
struct TcpIo<'a> {
stack: Stack<'a>,
handle: SocketHandle,
}
impl<'d> TcpIo<'d> {
fn with<R>(&self, f: impl FnOnce(&tcp::Socket, &Interface) -> R) -> R {
self.stack.with(|i| {
let socket = i.sockets.get::<tcp::Socket>(self.handle);
f(socket, &i.iface)
})
}
fn with_mut<R>(&self, f: impl FnOnce(&mut tcp::Socket, &mut Interface) -> R) -> R {
self.stack.with_mut(|i| {
let socket = i.sockets.get_mut::<tcp::Socket>(self.handle);
let res = f(socket, &mut i.iface);
i.waker.wake();
res
})
}
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
self.with_mut(|s, _| {
if s.can_recv() {
Poll::Ready(())
} else {
s.register_recv_waker(cx.waker());
Poll::Pending
}
})
}
fn read<'s>(&'s mut self, buf: &'s mut [u8]) -> impl Future<Output = Result<usize, Error>> + 's {
poll_fn(|cx| {
// CAUTION: smoltcp semantics around EOF are different to what you'd expect
// from posix-like IO, so we have to tweak things here.
self.with_mut(|s, _| match s.recv_slice(buf) {
// Reading into empty buffer
Ok(0) if buf.is_empty() => {
// embedded_io_async::Read's contract is to not block if buf is empty. While
// this function is not a direct implementor of the trait method, we still don't
// want our future to never resolve.
Poll::Ready(Ok(0))
}
// No data ready
Ok(0) => {
s.register_recv_waker(cx.waker());
Poll::Pending
}
// Data ready!
Ok(n) => Poll::Ready(Ok(n)),
// EOF
Err(tcp::RecvError::Finished) => Poll::Ready(Ok(0)),
// Connection reset. TODO: this can also be timeouts etc, investigate.
Err(tcp::RecvError::InvalidState) => Poll::Ready(Err(Error::ConnectionReset)),
})
})
}
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
self.with_mut(|s, _| {
if s.can_send() {
Poll::Ready(())
} else {
s.register_send_waker(cx.waker());
Poll::Pending
}
})
}
fn write<'s>(&'s mut self, buf: &'s [u8]) -> impl Future<Output = Result<usize, Error>> + 's {
poll_fn(|cx| {
self.with_mut(|s, _| match s.send_slice(buf) {
// Not ready to send (no space in the tx buffer)
Ok(0) => {
s.register_send_waker(cx.waker());
Poll::Pending
}
// Some data sent
Ok(n) => Poll::Ready(Ok(n)),
// Connection reset. TODO: this can also be timeouts etc, investigate.
Err(tcp::SendError::InvalidState) => Poll::Ready(Err(Error::ConnectionReset)),
})
})
}
async fn write_with<F, R>(&mut self, f: F) -> Result<R, Error>
where
F: FnOnce(&mut [u8]) -> (usize, R),
{
let mut f = Some(f);
poll_fn(move |cx| {
self.with_mut(|s, _| {
if !s.can_send() {
if s.may_send() {
// socket buffer is full wait until it has atleast one byte free
s.register_send_waker(cx.waker());
Poll::Pending
} else {
// if we can't transmit because the transmit half of the duplex connection is closed then return an error
Poll::Ready(Err(Error::ConnectionReset))
}
} else {
Poll::Ready(match s.send(unwrap!(f.take())) {
// Connection reset. TODO: this can also be timeouts etc, investigate.
Err(tcp::SendError::InvalidState) => Err(Error::ConnectionReset),
Ok(r) => Ok(r),
})
}
})
})
.await
}
async fn read_with<F, R>(&mut self, f: F) -> Result<R, Error>
where
F: FnOnce(&mut [u8]) -> (usize, R),
{
let mut f = Some(f);
poll_fn(move |cx| {
self.with_mut(|s, _| {
if !s.can_recv() {
if s.may_recv() {
// socket buffer is empty wait until it has atleast one byte has arrived
s.register_recv_waker(cx.waker());
Poll::Pending
} else {
// if we can't receive because the receive half of the duplex connection is closed then return an error
Poll::Ready(Err(Error::ConnectionReset))
}
} else {
Poll::Ready(match s.recv(unwrap!(f.take())) {
// Connection reset. TODO: this can also be timeouts etc, investigate.
Err(tcp::RecvError::Finished) | Err(tcp::RecvError::InvalidState) => {
Err(Error::ConnectionReset)
}
Ok(r) => Ok(r),
})
}
})
})
.await
}
fn flush(&mut self) -> impl Future<Output = Result<(), Error>> + '_ {
poll_fn(|cx| {
self.with_mut(|s, _| {
let data_pending = (s.send_queue() > 0) && s.state() != tcp::State::Closed;
let fin_pending = matches!(
s.state(),
tcp::State::FinWait1 | tcp::State::Closing | tcp::State::LastAck
);
let rst_pending = s.state() == tcp::State::Closed && s.remote_endpoint().is_some();
// If there are outstanding send operations, register for wake up and wait
// smoltcp issues wake-ups when octets are dequeued from the send buffer
if data_pending || fin_pending || rst_pending {
s.register_send_waker(cx.waker());
Poll::Pending
// No outstanding sends, socket is flushed
} else {
Poll::Ready(Ok(()))
}
})
})
}
fn recv_capacity(&self) -> usize {
self.with(|s, _| s.recv_capacity())
}
fn send_capacity(&self) -> usize {
self.with(|s, _| s.send_capacity())
}
fn send_queue(&self) -> usize {
self.with(|s, _| s.send_queue())
}
fn recv_queue(&self) -> usize {
self.with(|s, _| s.recv_queue())
}
}
mod embedded_io_impls {
use super::*;
impl embedded_io_async::Error for ConnectError {
fn kind(&self) -> embedded_io_async::ErrorKind {
match self {
ConnectError::ConnectionReset => embedded_io_async::ErrorKind::ConnectionReset,
ConnectError::TimedOut => embedded_io_async::ErrorKind::TimedOut,
ConnectError::NoRoute => embedded_io_async::ErrorKind::NotConnected,
ConnectError::InvalidState => embedded_io_async::ErrorKind::Other,
}
}
}
impl embedded_io_async::Error for Error {
fn kind(&self) -> embedded_io_async::ErrorKind {
match self {
Error::ConnectionReset => embedded_io_async::ErrorKind::ConnectionReset,
}
}
}
impl<'d> embedded_io_async::ErrorType for TcpSocket<'d> {
type Error = Error;
}
impl<'d> embedded_io_async::Read for TcpSocket<'d> {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
self.io.read(buf).await
}
}
impl<'d> embedded_io_async::ReadReady for TcpSocket<'d> {
fn read_ready(&mut self) -> Result<bool, Self::Error> {
Ok(self.io.with(|s, _| s.can_recv() || !s.may_recv()))
}
}
impl<'d> embedded_io_async::Write for TcpSocket<'d> {
async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
self.io.write(buf).await
}
async fn flush(&mut self) -> Result<(), Self::Error> {
self.io.flush().await
}
}
impl<'d> embedded_io_async::WriteReady for TcpSocket<'d> {
fn write_ready(&mut self) -> Result<bool, Self::Error> {
Ok(self.io.with(|s, _| s.can_send()))
}
}
impl<'d> embedded_io_async::ErrorType for TcpReader<'d> {
type Error = Error;
}
impl<'d> embedded_io_async::Read for TcpReader<'d> {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
self.io.read(buf).await
}
}
impl<'d> embedded_io_async::ReadReady for TcpReader<'d> {
fn read_ready(&mut self) -> Result<bool, Self::Error> {
Ok(self.io.with(|s, _| s.can_recv() || !s.may_recv()))
}
}
impl<'d> embedded_io_async::ErrorType for TcpWriter<'d> {
type Error = Error;
}
impl<'d> embedded_io_async::Write for TcpWriter<'d> {
async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
self.io.write(buf).await
}
async fn flush(&mut self) -> Result<(), Self::Error> {
self.io.flush().await
}
}
impl<'d> embedded_io_async::WriteReady for TcpWriter<'d> {
fn write_ready(&mut self) -> Result<bool, Self::Error> {
Ok(self.io.with(|s, _| s.can_send()))
}
}
}
/// TCP client compatible with `embedded-nal-async` traits.
pub mod client {
use core::cell::{Cell, UnsafeCell};
use core::mem::MaybeUninit;
use core::net::IpAddr;
use core::ptr::NonNull;
use super::*;
/// TCP client connection pool compatible with `embedded-nal-async` traits.
///
/// The pool is capable of managing up to N concurrent connections with tx and rx buffers according to TX_SZ and RX_SZ.
pub struct TcpClient<'d, const N: usize, const TX_SZ: usize = 1024, const RX_SZ: usize = 1024> {
stack: Stack<'d>,
state: &'d TcpClientState<N, TX_SZ, RX_SZ>,
socket_timeout: Option<Duration>,
}
impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> TcpClient<'d, N, TX_SZ, RX_SZ> {
/// Create a new `TcpClient`.
pub fn new(stack: Stack<'d>, state: &'d TcpClientState<N, TX_SZ, RX_SZ>) -> Self {
Self {
stack,
state,
socket_timeout: None,
}
}
/// Set the timeout for each socket created by this `TcpClient`.
///
/// If the timeout is set, the socket will be closed if no data is received for the
/// specified duration.
pub fn set_timeout(&mut self, timeout: Option<Duration>) {
self.socket_timeout = timeout;
}
}
impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> embedded_nal_async::TcpConnect
for TcpClient<'d, N, TX_SZ, RX_SZ>
{
type Error = Error;
type Connection<'m>
= TcpConnection<'m, N, TX_SZ, RX_SZ>
where
Self: 'm;
async fn connect<'a>(&'a self, remote: core::net::SocketAddr) -> Result<Self::Connection<'a>, Self::Error> {
let addr: crate::IpAddress = match remote.ip() {
#[cfg(feature = "proto-ipv4")]
IpAddr::V4(addr) => crate::IpAddress::Ipv4(addr),
#[cfg(not(feature = "proto-ipv4"))]
IpAddr::V4(_) => panic!("ipv4 support not enabled"),
#[cfg(feature = "proto-ipv6")]
IpAddr::V6(addr) => crate::IpAddress::Ipv6(addr),
#[cfg(not(feature = "proto-ipv6"))]
IpAddr::V6(_) => panic!("ipv6 support not enabled"),
};
let remote_endpoint = (addr, remote.port());
let mut socket = TcpConnection::new(self.stack, self.state)?;
socket.socket.set_timeout(self.socket_timeout);
socket
.socket
.connect(remote_endpoint)
.await
.map_err(|_| Error::ConnectionReset)?;
Ok(socket)
}
}
/// Opened TCP connection in a [`TcpClient`].
pub struct TcpConnection<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> {
socket: TcpSocket<'d>,
state: &'d TcpClientState<N, TX_SZ, RX_SZ>,
bufs: NonNull<([u8; TX_SZ], [u8; RX_SZ])>,
}
impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> TcpConnection<'d, N, TX_SZ, RX_SZ> {
fn new(stack: Stack<'d>, state: &'d TcpClientState<N, TX_SZ, RX_SZ>) -> Result<Self, Error> {
let mut bufs = state.pool.alloc().ok_or(Error::ConnectionReset)?;
Ok(Self {
socket: unsafe { TcpSocket::new(stack, &mut bufs.as_mut().1, &mut bufs.as_mut().0) },
state,
bufs,
})
}
}
impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> Drop for TcpConnection<'d, N, TX_SZ, RX_SZ> {
fn drop(&mut self) {
unsafe {
self.socket.close();
self.state.pool.free(self.bufs);
}
}
}
impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> embedded_io_async::ErrorType
for TcpConnection<'d, N, TX_SZ, RX_SZ>
{
type Error = Error;
}
impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> embedded_io_async::Read
for TcpConnection<'d, N, TX_SZ, RX_SZ>
{
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
self.socket.read(buf).await
}
}
impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> embedded_io_async::Write
for TcpConnection<'d, N, TX_SZ, RX_SZ>
{
async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
self.socket.write(buf).await
}
async fn flush(&mut self) -> Result<(), Self::Error> {
self.socket.flush().await
}
}
/// State for TcpClient
pub struct TcpClientState<const N: usize, const TX_SZ: usize, const RX_SZ: usize> {
pool: Pool<([u8; TX_SZ], [u8; RX_SZ]), N>,
}
impl<const N: usize, const TX_SZ: usize, const RX_SZ: usize> TcpClientState<N, TX_SZ, RX_SZ> {
/// Create a new `TcpClientState`.
pub const fn new() -> Self {
Self { pool: Pool::new() }
}
}
struct Pool<T, const N: usize> {
used: [Cell<bool>; N],
data: [UnsafeCell<MaybeUninit<T>>; N],
}
impl<T, const N: usize> Pool<T, N> {
const VALUE: Cell<bool> = Cell::new(false);
const UNINIT: UnsafeCell<MaybeUninit<T>> = UnsafeCell::new(MaybeUninit::uninit());
const fn new() -> Self {
Self {
used: [Self::VALUE; N],
data: [Self::UNINIT; N],
}
}
}
impl<T, const N: usize> Pool<T, N> {
fn alloc(&self) -> Option<NonNull<T>> {
for n in 0..N {
// this can't race because Pool is not Sync.
if !self.used[n].get() {
self.used[n].set(true);
let p = self.data[n].get() as *mut T;
return Some(unsafe { NonNull::new_unchecked(p) });
}
}
None
}
/// safety: p must be a pointer obtained from self.alloc that hasn't been freed yet.
unsafe fn free(&self, p: NonNull<T>) {
let origin = self.data.as_ptr() as *mut T;
let n = p.as_ptr().offset_from(origin);
assert!(n >= 0);
assert!((n as usize) < N);
self.used[n as usize].set(false);
}
}
}

20
embassy-net/src/time.rs Normal file
View File

@@ -0,0 +1,20 @@
#![allow(unused)]
use embassy_time::{Duration, Instant};
use smoltcp::time::{Duration as SmolDuration, Instant as SmolInstant};
pub(crate) fn instant_to_smoltcp(instant: Instant) -> SmolInstant {
SmolInstant::from_micros(instant.as_micros() as i64)
}
pub(crate) fn instant_from_smoltcp(instant: SmolInstant) -> Instant {
Instant::from_micros(instant.total_micros() as u64)
}
pub(crate) fn duration_to_smoltcp(duration: Duration) -> SmolDuration {
SmolDuration::from_micros(duration.as_micros())
}
pub(crate) fn duration_from_smoltcp(duration: SmolDuration) -> Duration {
Duration::from_micros(duration.total_micros())
}

396
embassy-net/src/udp.rs Normal file
View File

@@ -0,0 +1,396 @@
//! UDP sockets.
use core::future::{poll_fn, Future};
use core::mem;
use core::task::{Context, Poll};
use smoltcp::iface::{Interface, SocketHandle};
use smoltcp::socket::udp;
pub use smoltcp::socket::udp::{PacketMetadata, UdpMetadata};
use smoltcp::wire::IpListenEndpoint;
use crate::Stack;
/// Error returned by [`UdpSocket::bind`].
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum BindError {
/// The socket was already open.
InvalidState,
/// No route to host.
NoRoute,
}
/// Error returned by [`UdpSocket::send_to`].
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum SendError {
/// No route to host.
NoRoute,
/// Socket not bound to an outgoing port.
SocketNotBound,
/// There is not enough transmit buffer capacity to ever send this packet.
PacketTooLarge,
}
/// Error returned by [`UdpSocket::recv_from`].
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum RecvError {
/// Provided buffer was smaller than the received packet.
Truncated,
}
/// An UDP socket.
pub struct UdpSocket<'a> {
stack: Stack<'a>,
handle: SocketHandle,
}
impl<'a> UdpSocket<'a> {
/// Create a new UDP socket using the provided stack and buffers.
pub fn new(
stack: Stack<'a>,
rx_meta: &'a mut [PacketMetadata],
rx_buffer: &'a mut [u8],
tx_meta: &'a mut [PacketMetadata],
tx_buffer: &'a mut [u8],
) -> Self {
let handle = stack.with_mut(|i| {
let rx_meta: &'static mut [PacketMetadata] = unsafe { mem::transmute(rx_meta) };
let rx_buffer: &'static mut [u8] = unsafe { mem::transmute(rx_buffer) };
let tx_meta: &'static mut [PacketMetadata] = unsafe { mem::transmute(tx_meta) };
let tx_buffer: &'static mut [u8] = unsafe { mem::transmute(tx_buffer) };
i.sockets.add(udp::Socket::new(
udp::PacketBuffer::new(rx_meta, rx_buffer),
udp::PacketBuffer::new(tx_meta, tx_buffer),
))
});
Self { stack, handle }
}
/// Bind the socket to a local endpoint.
pub fn bind<T>(&mut self, endpoint: T) -> Result<(), BindError>
where
T: Into<IpListenEndpoint>,
{
let mut endpoint = endpoint.into();
if endpoint.port == 0 {
// If user didn't specify port allocate a dynamic port.
endpoint.port = self.stack.with_mut(|i| i.get_local_port());
}
match self.with_mut(|s, _| s.bind(endpoint)) {
Ok(()) => Ok(()),
Err(udp::BindError::InvalidState) => Err(BindError::InvalidState),
Err(udp::BindError::Unaddressable) => Err(BindError::NoRoute),
}
}
fn with<R>(&self, f: impl FnOnce(&udp::Socket, &Interface) -> R) -> R {
self.stack.with(|i| {
let socket = i.sockets.get::<udp::Socket>(self.handle);
f(socket, &i.iface)
})
}
fn with_mut<R>(&self, f: impl FnOnce(&mut udp::Socket, &mut Interface) -> R) -> R {
self.stack.with_mut(|i| {
let socket = i.sockets.get_mut::<udp::Socket>(self.handle);
let res = f(socket, &mut i.iface);
i.waker.wake();
res
})
}
/// Wait until the socket becomes readable.
///
/// A socket is readable when a packet has been received, or when there are queued packets in
/// the buffer.
pub fn wait_recv_ready(&self) -> impl Future<Output = ()> + '_ {
poll_fn(move |cx| self.poll_recv_ready(cx))
}
/// Wait until a datagram can be read.
///
/// When no datagram is readable, this method will return `Poll::Pending` and
/// register the current task to be notified when a datagram is received.
///
/// When a datagram is received, this method will return `Poll::Ready`.
pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
self.with_mut(|s, _| {
if s.can_recv() {
Poll::Ready(())
} else {
// socket buffer is empty wait until at least one byte has arrived
s.register_recv_waker(cx.waker());
Poll::Pending
}
})
}
/// Receive a datagram.
///
/// This method will wait until a datagram is received.
///
/// Returns the number of bytes received and the remote endpoint.
pub fn recv_from<'s>(
&'s self,
buf: &'s mut [u8],
) -> impl Future<Output = Result<(usize, UdpMetadata), RecvError>> + 's {
poll_fn(|cx| self.poll_recv_from(buf, cx))
}
/// Receive a datagram.
///
/// When no datagram is available, this method will return `Poll::Pending` and
/// register the current task to be notified when a datagram is received.
///
/// When a datagram is received, this method will return `Poll::Ready` with the
/// number of bytes received and the remote endpoint.
pub fn poll_recv_from(
&self,
buf: &mut [u8],
cx: &mut Context<'_>,
) -> Poll<Result<(usize, UdpMetadata), RecvError>> {
self.with_mut(|s, _| match s.recv_slice(buf) {
Ok((n, meta)) => Poll::Ready(Ok((n, meta))),
// No data ready
Err(udp::RecvError::Truncated) => Poll::Ready(Err(RecvError::Truncated)),
Err(udp::RecvError::Exhausted) => {
s.register_recv_waker(cx.waker());
Poll::Pending
}
})
}
/// Receive a datagram with a zero-copy function.
///
/// When no datagram is available, this method will return `Poll::Pending` and
/// register the current task to be notified when a datagram is received.
///
/// When a datagram is received, this method will call the provided function
/// with the number of bytes received and the remote endpoint and return
/// `Poll::Ready` with the function's returned value.
pub async fn recv_from_with<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&[u8], UdpMetadata) -> R,
{
let mut f = Some(f);
poll_fn(move |cx| {
self.with_mut(|s, _| {
match s.recv() {
Ok((buffer, endpoint)) => Poll::Ready(unwrap!(f.take())(buffer, endpoint)),
Err(udp::RecvError::Truncated) => unreachable!(),
Err(udp::RecvError::Exhausted) => {
// socket buffer is empty wait until at least one byte has arrived
s.register_recv_waker(cx.waker());
Poll::Pending
}
}
})
})
.await
}
/// Wait until the socket becomes writable.
///
/// A socket becomes writable when there is space in the buffer, from initial memory or after
/// dispatching datagrams on a full buffer.
pub fn wait_send_ready(&self) -> impl Future<Output = ()> + '_ {
poll_fn(|cx| self.poll_send_ready(cx))
}
/// Wait until a datagram can be sent.
///
/// When no datagram can be sent (i.e. the buffer is full), this method will return
/// `Poll::Pending` and register the current task to be notified when
/// space is freed in the buffer after a datagram has been dispatched.
///
/// When a datagram can be sent, this method will return `Poll::Ready`.
pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
self.with_mut(|s, _| {
if s.can_send() {
Poll::Ready(())
} else {
// socket buffer is full wait until a datagram has been dispatched
s.register_send_waker(cx.waker());
Poll::Pending
}
})
}
/// Send a datagram to the specified remote endpoint.
///
/// This method will wait until the datagram has been sent.
///
/// If the socket's send buffer is too small to fit `buf`, this method will return `Err(SendError::PacketTooLarge)`
///
/// When the remote endpoint is not reachable, this method will return `Err(SendError::NoRoute)`
pub async fn send_to<T>(&self, buf: &[u8], remote_endpoint: T) -> Result<(), SendError>
where
T: Into<UdpMetadata>,
{
let remote_endpoint: UdpMetadata = remote_endpoint.into();
poll_fn(move |cx| self.poll_send_to(buf, remote_endpoint, cx)).await
}
/// Send a datagram to the specified remote endpoint.
///
/// When the datagram has been sent, this method will return `Poll::Ready(Ok())`.
///
/// When the socket's send buffer is full, this method will return `Poll::Pending`
/// and register the current task to be notified when the buffer has space available.
///
/// If the socket's send buffer is too small to fit `buf`, this method will return `Poll::Ready(Err(SendError::PacketTooLarge))`
///
/// When the remote endpoint is not reachable, this method will return `Poll::Ready(Err(Error::NoRoute))`.
pub fn poll_send_to<T>(&self, buf: &[u8], remote_endpoint: T, cx: &mut Context<'_>) -> Poll<Result<(), SendError>>
where
T: Into<UdpMetadata>,
{
// Don't need to wake waker in `with_mut` if the buffer will never fit the udp tx_buffer.
let send_capacity_too_small = self.with(|s, _| s.payload_send_capacity() < buf.len());
if send_capacity_too_small {
return Poll::Ready(Err(SendError::PacketTooLarge));
}
self.with_mut(|s, _| match s.send_slice(buf, remote_endpoint) {
// Entire datagram has been sent
Ok(()) => Poll::Ready(Ok(())),
Err(udp::SendError::BufferFull) => {
s.register_send_waker(cx.waker());
Poll::Pending
}
Err(udp::SendError::Unaddressable) => {
// If no sender/outgoing port is specified, there is not really "no route"
if s.endpoint().port == 0 {
Poll::Ready(Err(SendError::SocketNotBound))
} else {
Poll::Ready(Err(SendError::NoRoute))
}
}
})
}
/// Send a datagram to the specified remote endpoint with a zero-copy function.
///
/// This method will wait until the buffer can fit the requested size before
/// calling the function to fill its contents.
///
/// If the socket's send buffer is too small to fit `size`, this method will return `Err(SendError::PacketTooLarge)`
///
/// When the remote endpoint is not reachable, this method will return `Err(SendError::NoRoute)`
pub async fn send_to_with<T, F, R>(&mut self, size: usize, remote_endpoint: T, f: F) -> Result<R, SendError>
where
T: Into<UdpMetadata> + Copy,
F: FnOnce(&mut [u8]) -> R,
{
// Don't need to wake waker in `with_mut` if the buffer will never fit the udp tx_buffer.
let send_capacity_too_small = self.with(|s, _| s.payload_send_capacity() < size);
if send_capacity_too_small {
return Err(SendError::PacketTooLarge);
}
let mut f = Some(f);
poll_fn(move |cx| {
self.with_mut(|s, _| {
match s.send(size, remote_endpoint) {
Ok(buffer) => Poll::Ready(Ok(unwrap!(f.take())(buffer))),
Err(udp::SendError::BufferFull) => {
s.register_send_waker(cx.waker());
Poll::Pending
}
Err(udp::SendError::Unaddressable) => {
// If no sender/outgoing port is specified, there is not really "no route"
if s.endpoint().port == 0 {
Poll::Ready(Err(SendError::SocketNotBound))
} else {
Poll::Ready(Err(SendError::NoRoute))
}
}
}
})
})
.await
}
/// Flush the socket.
///
/// This method will wait until the socket is flushed.
pub fn flush(&mut self) -> impl Future<Output = ()> + '_ {
poll_fn(|cx| {
self.with_mut(|s, _| {
if s.send_queue() == 0 {
Poll::Ready(())
} else {
s.register_send_waker(cx.waker());
Poll::Pending
}
})
})
}
/// Returns the local endpoint of the socket.
pub fn endpoint(&self) -> IpListenEndpoint {
self.with(|s, _| s.endpoint())
}
/// Returns whether the socket is open.
pub fn is_open(&self) -> bool {
self.with(|s, _| s.is_open())
}
/// Close the socket.
pub fn close(&mut self) {
self.with_mut(|s, _| s.close())
}
/// Returns whether the socket is ready to send data, i.e. it has enough buffer space to hold a packet.
pub fn may_send(&self) -> bool {
self.with(|s, _| s.can_send())
}
/// Returns whether the socket is ready to receive data, i.e. it has received a packet that's now in the buffer.
pub fn may_recv(&self) -> bool {
self.with(|s, _| s.can_recv())
}
/// Return the maximum number packets the socket can receive.
pub fn packet_recv_capacity(&self) -> usize {
self.with(|s, _| s.packet_recv_capacity())
}
/// Return the maximum number packets the socket can receive.
pub fn packet_send_capacity(&self) -> usize {
self.with(|s, _| s.packet_send_capacity())
}
/// Return the maximum number of bytes inside the recv buffer.
pub fn payload_recv_capacity(&self) -> usize {
self.with(|s, _| s.payload_recv_capacity())
}
/// Return the maximum number of bytes inside the transmit buffer.
pub fn payload_send_capacity(&self) -> usize {
self.with(|s, _| s.payload_send_capacity())
}
/// Set the hop limit field in the IP header of sent packets.
pub fn set_hop_limit(&mut self, hop_limit: Option<u8>) {
self.with_mut(|s, _| s.set_hop_limit(hop_limit))
}
}
impl Drop for UdpSocket<'_> {
fn drop(&mut self) {
self.stack.with_mut(|i| i.sockets.remove(self.handle));
}
}
fn _assert_covariant<'a, 'b: 'a>(x: UdpSocket<'b>) -> UdpSocket<'a> {
x
}