rusticl/queue: overhaul of the queue+event handling
authorKarol Herbst <git@karolherbst.de>
Tue, 13 Jun 2023 00:07:02 +0000 (02:07 +0200)
committerMarge Bot <emma+marge@anholt.net>
Wed, 14 Jun 2023 11:14:46 +0000 (11:14 +0000)
This new approach handles things as follows:
1. Fences won't be attached to events anymore, applications only wait on
   the cv attached to the event.
2. Only the queue is allowed to update event status for non user events.
   This will eliminate all remaining status updating races between the
   queue and applications waiting on events.
3. Queue minimized flushing by bundling events
4. Increase cv wait timeout as there is really no point in waking up too
   often.

Reduces amount of emited fences on radeonsi in luxmark 3.1 luxball by 90%

Signed-off-by: Karol Herbst <git@karolherbst.de>
Reviewed by Nora Allen <blackcatgames@protonmail.com>

Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/23612>

src/gallium/frontends/rusticl/core/event.rs
src/gallium/frontends/rusticl/core/queue.rs

index 8c87a2c..879e99a 100644 (file)
@@ -5,7 +5,6 @@ use crate::core::queue::*;
 use crate::impl_cl_type_trait;
 
 use mesa_rust::pipe::context::*;
-use mesa_rust::pipe::fence::*;
 use mesa_rust_util::static_assert;
 use rusticl_opencl_gen::*;
 
@@ -29,7 +28,6 @@ pub type EventSig = Box<dyn Fn(&Arc<Queue>, &PipeContext) -> CLResult<()>>;
 struct EventMutState {
     status: cl_int,
     cbs: [Vec<(EventCB, *mut c_void)>; 3],
-    fence: Option<PipeFence>,
     work: Option<EventSig>,
 }
 
@@ -65,7 +63,6 @@ impl Event {
             state: Mutex::new(EventMutState {
                 status: CL_QUEUED as cl_int,
                 cbs: [Vec::new(), Vec::new(), Vec::new()],
-                fence: None,
                 work: Some(work),
             }),
             cv: Condvar::new(),
@@ -82,7 +79,6 @@ impl Event {
             state: Mutex::new(EventMutState {
                 status: CL_SUBMITTED as cl_int,
                 cbs: [Vec::new(), Vec::new(), Vec::new()],
-                fence: None,
                 work: None,
             }),
             cv: Condvar::new(),
@@ -104,7 +100,12 @@ impl Event {
 
     fn set_status(&self, lock: &mut MutexGuard<EventMutState>, new: cl_int) {
         lock.status = new;
-        self.cv.notify_all();
+
+        // signal on completion or an error
+        if new <= CL_COMPLETE as cl_int {
+            self.cv.notify_all();
+        }
+
         if [CL_COMPLETE, CL_RUNNING, CL_SUBMITTED].contains(&(new as u32)) {
             if let Some(cbs) = lock.cbs.get(new as usize) {
                 cbs.iter()
@@ -122,6 +123,10 @@ impl Event {
         self.status() < 0
     }
 
+    pub fn is_user(&self) -> bool {
+        self.cmd_type == CL_COMMAND_USER
+    }
+
     pub fn add_cb(&self, state: cl_int, cb: EventCB, data: *mut c_void) {
         let mut lock = self.state();
         let status = lock.status;
@@ -135,21 +140,21 @@ impl Event {
         }
     }
 
+    pub(super) fn signal(&self) {
+        let mut lock = self.state();
+
+        self.set_status(&mut lock, CL_RUNNING as cl_int);
+        self.set_status(&mut lock, CL_COMPLETE as cl_int);
+    }
+
     pub fn wait(&self) -> cl_int {
         let mut lock = self.state();
-        while lock.status >= CL_SUBMITTED as cl_int {
-            if lock.fence.is_some() {
-                lock.fence.as_ref().unwrap().wait();
-                // so we trigger all cbs
-                self.set_status(&mut lock, CL_RUNNING as cl_int);
-                self.set_status(&mut lock, CL_COMPLETE as cl_int);
-            } else {
-                lock = self
-                    .cv
-                    .wait_timeout(lock, Duration::from_millis(50))
-                    .unwrap()
-                    .0;
-            }
+        while lock.status >= CL_RUNNING as cl_int {
+            lock = self
+                .cv
+                .wait_timeout(lock, Duration::from_secs(1))
+                .unwrap()
+                .0;
         }
         lock.status
     }
@@ -157,7 +162,7 @@ impl Event {
     // We always assume that work here simply submits stuff to the hardware even if it's just doing
     // sw emulation or nothing at all.
     // If anything requets waiting, we will update the status through fencing later.
-    pub fn call(&self, ctx: &PipeContext) -> cl_int {
+    pub fn call(&self, ctx: &PipeContext) {
         let mut lock = self.state();
         let status = lock.status;
         if status == CL_QUEUED as cl_int {
@@ -171,7 +176,6 @@ impl Event {
                         CL_SUBMITTED as cl_int,
                         |e| e,
                     );
-                    lock.fence = Some(ctx.flush());
                     res
                 },
             );
@@ -180,9 +184,6 @@ impl Event {
             // absolutely sure it happens before the status update.
             drop(work);
             self.set_status(&mut lock, new);
-            new
-        } else {
-            status
         }
     }
 
index a8e5209..1777cde 100644 (file)
@@ -4,6 +4,7 @@ use crate::core::device::*;
 use crate::core::event::*;
 use crate::impl_cl_type_trait;
 
+use mesa_rust::pipe::context::PipeContext;
 use mesa_rust_util::properties::*;
 use rusticl_opencl_gen::*;
 
@@ -14,19 +15,31 @@ use std::sync::Mutex;
 use std::thread;
 use std::thread::JoinHandle;
 
+struct QueueState {
+    pending: Vec<Arc<Event>>,
+    last: Option<Arc<Event>>,
+}
+
 pub struct Queue {
     pub base: CLObjectBase<CL_INVALID_COMMAND_QUEUE>,
     pub context: Arc<Context>,
     pub device: Arc<Device>,
     pub props: cl_command_queue_properties,
     pub props_v2: Option<Properties<cl_queue_properties>>,
-    pending: Mutex<Vec<Arc<Event>>>,
+    state: Mutex<QueueState>,
     _thrd: Option<JoinHandle<()>>,
     chan_in: mpsc::Sender<Vec<Arc<Event>>>,
 }
 
 impl_cl_type_trait!(cl_command_queue, Queue, CL_INVALID_COMMAND_QUEUE);
 
+fn flush_events(evs: &mut Vec<Arc<Event>>, pipe: &PipeContext) {
+    if !evs.is_empty() {
+        pipe.flush().wait();
+        evs.drain(..).for_each(|e| e.signal());
+    }
+}
+
 impl Queue {
     pub fn new(
         context: Arc<Context>,
@@ -44,7 +57,10 @@ impl Queue {
             device: device,
             props: props,
             props_v2: props_v2,
-            pending: Mutex::new(Vec::new()),
+            state: Mutex::new(QueueState {
+                pending: Vec::new(),
+                last: None,
+            }),
             _thrd: Some(
                 thread::Builder::new()
                     .name("rusticl queue thread".into())
@@ -53,21 +69,47 @@ impl Queue {
                         if r.is_err() {
                             break;
                         }
+
                         let new_events = r.unwrap();
-                        for e in &new_events {
-                            // all events should be processed, but we might have to wait on user
-                            // events to happen
-                            let err = e.deps.iter().map(|e| e.wait()).find(|s| *s < 0);
+                        let mut flushed = Vec::new();
+
+                        for e in new_events {
+                            // If we hit any deps from another queue, flush so we don't risk a dead
+                            // lock.
+                            if e.deps.iter().any(|ev| ev.queue != e.queue) {
+                                flush_events(&mut flushed, &pipe);
+                            }
+
+                            // We have to wait on user events or events from other queues.
+                            let err = e
+                                .deps
+                                .iter()
+                                .filter(|ev| ev.is_user() || ev.queue != e.queue)
+                                .map(|e| e.wait())
+                                .find(|s| *s < 0);
+
                             if let Some(err) = err {
-                                // if a dependency failed, fail this event as well
+                                // If a dependency failed, fail this event as well.
                                 e.set_user_status(err);
+                                continue;
+                            }
+
+                            e.call(&pipe);
+
+                            if !e.is_user() {
+                                flushed.push(e);
                             } else {
-                                e.call(&pipe);
+                                // On each user event we flush our events as application might
+                                // wait on them before signaling user events.
+                                flush_events(&mut flushed, &pipe);
+
+                                // Wait on user events as they are synchronization points in the
+                                // application's control.
+                                e.wait();
                             }
                         }
-                        for e in new_events {
-                            e.wait();
-                        }
+
+                        flush_events(&mut flushed, &pipe);
                     })
                     .unwrap(),
             ),
@@ -76,28 +118,34 @@ impl Queue {
     }
 
     pub fn queue(&self, e: Arc<Event>) {
-        self.pending.lock().unwrap().push(e);
+        self.state.lock().unwrap().pending.push(e);
     }
 
     pub fn flush(&self, wait: bool) -> CLResult<()> {
-        let mut p = self.pending.lock().unwrap();
-        let events = p.clone();
+        let mut state = self.state.lock().unwrap();
+
+        // Update last if and only if we get new events, this prevents breaking application code
+        // doing things like `clFlush(q); clFinish(q);`
+        if let Some(last) = state.pending.last() {
+            state.last = Some(last.clone());
+        }
+
         // This should never ever error, but if it does return an error
         self.chan_in
-            .send((*p).drain(0..).collect())
+            .send((state.pending).drain(0..).collect())
             .map_err(|_| CL_OUT_OF_HOST_MEMORY)?;
         if wait {
-            for e in events {
-                e.wait();
-            }
+            // Waiting on the last event is good enough here as the queue will process it in order,
+            // also take the value so we can release the event once we are done
+            state.last.take().map(|e| e.wait());
         }
         Ok(())
     }
 
     pub fn dependencies_for_pending_events(&self) -> HashSet<Arc<Queue>> {
-        let p = self.pending.lock().unwrap();
+        let state = self.state.lock().unwrap();
 
-        let mut queues = Event::deep_unflushed_queues(&p);
+        let mut queues = Event::deep_unflushed_queues(&state.pending);
         queues.remove(self);
         queues
     }