use alloc::sync::Arc;
use core::ops::{Deref, DerefMut};
use maitake::sync::WaitCell;
use mulch::bipqueue::BipQueue;
pub struct SpscQueue {
storage: BipQueue<512>,
commit_wait: WaitCell,
release_wait: WaitCell,
}
impl SpscQueue {
pub fn new() -> (QueueProducer, QueueConsumer) {
let queue = Arc::new(SpscQueue {
storage: BipQueue::new(),
commit_wait: WaitCell::new(),
release_wait: WaitCell::new(),
});
let producer = QueueProducer { queue: queue.clone() };
let consumer = QueueConsumer { queue };
(producer, consumer)
}
}
pub struct QueueProducer {
queue: Arc<SpscQueue>,
}
impl QueueProducer {
pub fn grant_sync(&self, length: usize) -> Result<WriteGrant<'_>, ()> {
match self.queue.storage.grant(length) {
Ok(grant) => Ok(WriteGrant { inner: grant, queue: self.queue.clone() }),
Err(_) => Err(()),
}
}
pub async fn grant(&self, length: usize) -> WriteGrant<'_> {
loop {
let wait = self.queue.release_wait.subscribe().await;
match self.queue.storage.grant(length) {
Ok(grant) => return WriteGrant { inner: grant, queue: self.queue.clone() },
Err(_) => {
wait.await.unwrap();
}
}
}
}
}
pub struct WriteGrant<'a> {
inner: mulch::bipqueue::WriteGrant<'a, 512>,
queue: Arc<SpscQueue>,
}
impl<'a> WriteGrant<'a> {
pub fn commit(self, written: usize) {
self.inner.commit(written);
self.queue.commit_wait.wake();
}
}
impl<'a> Deref for WriteGrant<'a> {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.inner.deref()
}
}
impl<'a> DerefMut for WriteGrant<'a> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.deref_mut()
}
}
pub struct QueueConsumer {
queue: Arc<SpscQueue>,
}
impl QueueConsumer {
pub fn read_sync(&self) -> Result<ReadGrant<'_>, ()> {
match self.queue.storage.read() {
Ok(grant) => Ok(ReadGrant { inner: grant, queue: self.queue.clone() }),
Err(_) => Err(()),
}
}
pub async fn read(&self) -> ReadGrant<'_> {
loop {
let wait = self.queue.commit_wait.subscribe().await;
match self.queue.storage.read() {
Ok(grant) => return ReadGrant { inner: grant, queue: self.queue.clone() },
Err(_) => {
wait.await.unwrap();
}
}
}
}
}
pub struct ReadGrant<'a> {
inner: mulch::bipqueue::ReadGrant<'a, 512>,
queue: Arc<SpscQueue>,
}
impl<'a> ReadGrant<'a> {
pub fn release(self, read: usize) {
self.inner.release(read);
self.queue.release_wait.wake();
}
}
impl<'a> Deref for ReadGrant<'a> {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.inner.deref()
}
}