use crate::impl_cl_type_trait;
use mesa_rust::pipe::context::*;
-use mesa_rust::pipe::fence::*;
use mesa_rust_util::static_assert;
use rusticl_opencl_gen::*;
struct EventMutState {
status: cl_int,
cbs: [Vec<(EventCB, *mut c_void)>; 3],
- fence: Option<PipeFence>,
work: Option<EventSig>,
}
state: Mutex::new(EventMutState {
status: CL_QUEUED as cl_int,
cbs: [Vec::new(), Vec::new(), Vec::new()],
- fence: None,
work: Some(work),
}),
cv: Condvar::new(),
state: Mutex::new(EventMutState {
status: CL_SUBMITTED as cl_int,
cbs: [Vec::new(), Vec::new(), Vec::new()],
- fence: None,
work: None,
}),
cv: Condvar::new(),
fn set_status(&self, lock: &mut MutexGuard<EventMutState>, new: cl_int) {
lock.status = new;
- self.cv.notify_all();
+
+ // signal on completion or an error
+ if new <= CL_COMPLETE as cl_int {
+ self.cv.notify_all();
+ }
+
if [CL_COMPLETE, CL_RUNNING, CL_SUBMITTED].contains(&(new as u32)) {
if let Some(cbs) = lock.cbs.get(new as usize) {
cbs.iter()
self.status() < 0
}
+ pub fn is_user(&self) -> bool {
+ self.cmd_type == CL_COMMAND_USER
+ }
+
pub fn add_cb(&self, state: cl_int, cb: EventCB, data: *mut c_void) {
let mut lock = self.state();
let status = lock.status;
}
}
+ pub(super) fn signal(&self) {
+ let mut lock = self.state();
+
+ self.set_status(&mut lock, CL_RUNNING as cl_int);
+ self.set_status(&mut lock, CL_COMPLETE as cl_int);
+ }
+
pub fn wait(&self) -> cl_int {
let mut lock = self.state();
- while lock.status >= CL_SUBMITTED as cl_int {
- if lock.fence.is_some() {
- lock.fence.as_ref().unwrap().wait();
- // so we trigger all cbs
- self.set_status(&mut lock, CL_RUNNING as cl_int);
- self.set_status(&mut lock, CL_COMPLETE as cl_int);
- } else {
- lock = self
- .cv
- .wait_timeout(lock, Duration::from_millis(50))
- .unwrap()
- .0;
- }
+ while lock.status >= CL_RUNNING as cl_int {
+ lock = self
+ .cv
+ .wait_timeout(lock, Duration::from_secs(1))
+ .unwrap()
+ .0;
}
lock.status
}
// We always assume that work here simply submits stuff to the hardware even if it's just doing
// sw emulation or nothing at all.
// If anything requets waiting, we will update the status through fencing later.
- pub fn call(&self, ctx: &PipeContext) -> cl_int {
+ pub fn call(&self, ctx: &PipeContext) {
let mut lock = self.state();
let status = lock.status;
if status == CL_QUEUED as cl_int {
CL_SUBMITTED as cl_int,
|e| e,
);
- lock.fence = Some(ctx.flush());
res
},
);
// absolutely sure it happens before the status update.
drop(work);
self.set_status(&mut lock, new);
- new
- } else {
- status
}
}
use crate::core::event::*;
use crate::impl_cl_type_trait;
+use mesa_rust::pipe::context::PipeContext;
use mesa_rust_util::properties::*;
use rusticl_opencl_gen::*;
use std::thread;
use std::thread::JoinHandle;
+struct QueueState {
+ pending: Vec<Arc<Event>>,
+ last: Option<Arc<Event>>,
+}
+
pub struct Queue {
pub base: CLObjectBase<CL_INVALID_COMMAND_QUEUE>,
pub context: Arc<Context>,
pub device: Arc<Device>,
pub props: cl_command_queue_properties,
pub props_v2: Option<Properties<cl_queue_properties>>,
- pending: Mutex<Vec<Arc<Event>>>,
+ state: Mutex<QueueState>,
_thrd: Option<JoinHandle<()>>,
chan_in: mpsc::Sender<Vec<Arc<Event>>>,
}
impl_cl_type_trait!(cl_command_queue, Queue, CL_INVALID_COMMAND_QUEUE);
+fn flush_events(evs: &mut Vec<Arc<Event>>, pipe: &PipeContext) {
+ if !evs.is_empty() {
+ pipe.flush().wait();
+ evs.drain(..).for_each(|e| e.signal());
+ }
+}
+
impl Queue {
pub fn new(
context: Arc<Context>,
device: device,
props: props,
props_v2: props_v2,
- pending: Mutex::new(Vec::new()),
+ state: Mutex::new(QueueState {
+ pending: Vec::new(),
+ last: None,
+ }),
_thrd: Some(
thread::Builder::new()
.name("rusticl queue thread".into())
if r.is_err() {
break;
}
+
let new_events = r.unwrap();
- for e in &new_events {
- // all events should be processed, but we might have to wait on user
- // events to happen
- let err = e.deps.iter().map(|e| e.wait()).find(|s| *s < 0);
+ let mut flushed = Vec::new();
+
+ for e in new_events {
+ // If we hit any deps from another queue, flush so we don't risk a dead
+ // lock.
+ if e.deps.iter().any(|ev| ev.queue != e.queue) {
+ flush_events(&mut flushed, &pipe);
+ }
+
+ // We have to wait on user events or events from other queues.
+ let err = e
+ .deps
+ .iter()
+ .filter(|ev| ev.is_user() || ev.queue != e.queue)
+ .map(|e| e.wait())
+ .find(|s| *s < 0);
+
if let Some(err) = err {
- // if a dependency failed, fail this event as well
+ // If a dependency failed, fail this event as well.
e.set_user_status(err);
+ continue;
+ }
+
+ e.call(&pipe);
+
+ if !e.is_user() {
+ flushed.push(e);
} else {
- e.call(&pipe);
+ // On each user event we flush our events as application might
+ // wait on them before signaling user events.
+ flush_events(&mut flushed, &pipe);
+
+ // Wait on user events as they are synchronization points in the
+ // application's control.
+ e.wait();
}
}
- for e in new_events {
- e.wait();
- }
+
+ flush_events(&mut flushed, &pipe);
})
.unwrap(),
),
}
pub fn queue(&self, e: Arc<Event>) {
- self.pending.lock().unwrap().push(e);
+ self.state.lock().unwrap().pending.push(e);
}
pub fn flush(&self, wait: bool) -> CLResult<()> {
- let mut p = self.pending.lock().unwrap();
- let events = p.clone();
+ let mut state = self.state.lock().unwrap();
+
+ // Update last if and only if we get new events, this prevents breaking application code
+ // doing things like `clFlush(q); clFinish(q);`
+ if let Some(last) = state.pending.last() {
+ state.last = Some(last.clone());
+ }
+
// This should never ever error, but if it does return an error
self.chan_in
- .send((*p).drain(0..).collect())
+ .send((state.pending).drain(0..).collect())
.map_err(|_| CL_OUT_OF_HOST_MEMORY)?;
if wait {
- for e in events {
- e.wait();
- }
+ // Waiting on the last event is good enough here as the queue will process it in order,
+ // also take the value so we can release the event once we are done
+ state.last.take().map(|e| e.wait());
}
Ok(())
}
pub fn dependencies_for_pending_events(&self) -> HashSet<Arc<Queue>> {
- let p = self.pending.lock().unwrap();
+ let state = self.state.lock().unwrap();
- let mut queues = Event::deep_unflushed_queues(&p);
+ let mut queues = Event::deep_unflushed_queues(&state.pending);
queues.remove(self);
queues
}