bitbake: prserv/serv: Settle on two threads for optimal performance
authorRichard Purdie <richard.purdie@linuxfoundation.org>
Sat, 31 Aug 2013 22:44:42 +0000 (23:44 +0100)
committerRichard Purdie <richard.purdie@linuxfoundation.org>
Sun, 1 Sep 2013 14:51:11 +0000 (15:51 +0100)
Using the threading mixin class resulted in large amounts of memory
being used by the PR server for no good reason. Using a receiver thread
and a thread to do the actual database operations on a single connection
gives the same performance with a much saner memory overhead so
switch to this.

(Bitbake rev: e08455d5f3b8e96765942b9c3b9767c30650557d)

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
bitbake/lib/prserv/serv.py

index 3677f77..a9c7ed1 100644 (file)
@@ -2,6 +2,8 @@ import os,sys,logging
 import signal, time, atexit, threading
 from SimpleXMLRPCServer import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
 import xmlrpclib
+import threading
+import Queue
 
 try:
     import sqlite3
@@ -31,14 +33,11 @@ class Handler(SimpleXMLRPCRequestHandler):
 PIDPREFIX = "/tmp/PRServer_%s_%s.pid"
 singleton = None
 
-import SocketServer
-class SimpleThreadedXMLRPCServer(SocketServer.ThreadingMixIn, SimpleXMLRPCServer):
-    pass
 
-class PRServer(SimpleThreadedXMLRPCServer):
+class PRServer(SimpleXMLRPCServer):
     def __init__(self, dbfile, logfile, interface, daemon=True):
         ''' constructor '''
-        SimpleThreadedXMLRPCServer.__init__(self, interface,
+        SimpleXMLRPCServer.__init__(self, interface,
                                     logRequests=False, allow_none=True)
         self.dbfile=dbfile
         self.daemon=daemon
@@ -54,19 +53,41 @@ class PRServer(SimpleThreadedXMLRPCServer):
         self.register_function(self.importone, "importone")
         self.register_introspection_functions()
 
+        self.db = prserv.db.PRData(self.dbfile)
+        self.table = self.db["PRMAIN"]
+
+        self.requestqueue = Queue.Queue()
+        self.handlerthread = threading.Thread(target = self.process_request_thread)
+        self.handlerthread.daemon = False
+
+    def process_request_thread(self):
+        """Same as in BaseServer but as a thread.
+
+        In addition, exception handling is done here.
+
+        """
+        while True:
+            (request, client_address) = self.requestqueue.get()
+            try:
+                self.finish_request(request, client_address)
+                self.shutdown_request(request)
+            except:
+                self.handle_error(request, client_address)
+                self.shutdown_request(request)
+
+
+    def process_request(self, request, client_address):
+        self.requestqueue.put((request, client_address))
+
     def export(self, version=None, pkgarch=None, checksum=None, colinfo=True):
         try:
-            db = prserv.db.PRData(self.dbfile)
-            table = db["PRMAIN"]
-            return table.export(version, pkgarch, checksum, colinfo)
+            return self.table.export(version, pkgarch, checksum, colinfo)
         except sqlite3.Error as exc:
             logger.error(str(exc))
             return None
 
     def importone(self, version, pkgarch, checksum, value):
-        db = prserv.db.PRData(self.dbfile)
-        table = db["PRMAIN"]
-        return table.importone(version, pkgarch, checksum, value)
+        return self.table.importone(version, pkgarch, checksum, value)
 
     def ping(self):
         return not self.quit
@@ -76,9 +97,7 @@ class PRServer(SimpleThreadedXMLRPCServer):
 
     def getPR(self, version, pkgarch, checksum):
         try:
-            db = prserv.db.PRData(self.dbfile)
-            table = db["PRMAIN"]
-            return table.getValue(version, pkgarch, checksum)
+            return self.table.getValue(version, pkgarch, checksum)
         except prserv.NotFoundError:
             logger.error("can not find value for (%s, %s)",version, checksum)
             return None
@@ -97,6 +116,7 @@ class PRServer(SimpleThreadedXMLRPCServer):
         logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" %
                      (self.dbfile, self.host, self.port, str(os.getpid())))
 
+        self.handlerthread.start()
         while not self.quit:
             self.handle_request()