tpm: selftest: add test covering async mode
authorTadeusz Struk <tadeusz.struk@intel.com>
Thu, 12 Dec 2019 17:48:53 +0000 (09:48 -0800)
committerJarkko Sakkinen <jarkko.sakkinen@linux.intel.com>
Tue, 17 Dec 2019 10:20:12 +0000 (12:20 +0200)
Add a test that sends a tpm cmd in an async mode.
Currently there is a gap in test coverage with regards
to this functionality.

Signed-off-by: Tadeusz Struk <tadeusz.struk@intel.com>
Reviewed-by: Jarkko Sakkinen <jarkko.sakkinen@linux.intel.com>
Signed-off-by: Jarkko Sakkinen <jarkko.sakkinen@linux.intel.com>
tools/testing/selftests/tpm2/test_smoke.sh
tools/testing/selftests/tpm2/tpm2.py
tools/testing/selftests/tpm2/tpm2_tests.py

index 80521d46220ccc0a33edb79f8f4dbd993f39ebd7..cb54ab637ea6578dfb69f9cd0728f5ae6c83c24b 100755 (executable)
@@ -2,3 +2,4 @@
 # SPDX-License-Identifier: (GPL-2.0 OR BSD-3-Clause)
 
 python -m unittest -v tpm2_tests.SmokeTest
+python -m unittest -v tpm2_tests.AsyncTest
index 828c185846248031ff598670d393e34a38fa24df..d0fcb66a88a68a0bf9e57d5da24aadfd695c12f9 100644 (file)
@@ -6,8 +6,8 @@ import socket
 import struct
 import sys
 import unittest
-from fcntl import ioctl
-
+import fcntl
+import select
 
 TPM2_ST_NO_SESSIONS = 0x8001
 TPM2_ST_SESSIONS = 0x8002
@@ -352,6 +352,7 @@ def hex_dump(d):
 class Client:
     FLAG_DEBUG = 0x01
     FLAG_SPACE = 0x02
+    FLAG_NONBLOCK = 0x04
     TPM_IOC_NEW_SPACE = 0xa200
 
     def __init__(self, flags = 0):
@@ -362,13 +363,27 @@ class Client:
         else:
             self.tpm = open('/dev/tpmrm0', 'r+b', buffering=0)
 
+        if (self.flags & Client.FLAG_NONBLOCK):
+            flags = fcntl.fcntl(self.tpm, fcntl.F_GETFL)
+            flags |= os.O_NONBLOCK
+            fcntl.fcntl(self.tpm, fcntl.F_SETFL, flags)
+            self.tpm_poll = select.poll()
+
     def close(self):
         self.tpm.close()
 
     def send_cmd(self, cmd):
         self.tpm.write(cmd)
+
+        if (self.flags & Client.FLAG_NONBLOCK):
+            self.tpm_poll.register(self.tpm, select.POLLIN)
+            self.tpm_poll.poll(10000)
+
         rsp = self.tpm.read()
 
+        if (self.flags & Client.FLAG_NONBLOCK):
+            self.tpm_poll.unregister(self.tpm)
+
         if (self.flags & Client.FLAG_DEBUG) != 0:
             sys.stderr.write('cmd' + os.linesep)
             sys.stderr.write(hex_dump(cmd) + os.linesep)
index d4973be53493226b19dcca5e7140e997ea964e8b..728be7c69b764fe48592bd22e20d01a0fe19d68c 100644 (file)
@@ -288,3 +288,16 @@ class SpaceTest(unittest.TestCase):
 
         self.assertEqual(rc, tpm2.TPM2_RC_COMMAND_CODE |
                          tpm2.TSS2_RESMGR_TPM_RC_LAYER)
+
+class AsyncTest(unittest.TestCase):
+    def setUp(self):
+        logging.basicConfig(filename='AsyncTest.log', level=logging.DEBUG)
+
+    def test_async(self):
+        log = logging.getLogger(__name__)
+        log.debug(sys._getframe().f_code.co_name)
+
+        async_client = tpm2.Client(tpm2.Client.FLAG_NONBLOCK)
+        log.debug("Calling get_cap in a NON_BLOCKING mode")
+        async_client.get_cap(tpm2.TPM2_CAP_HANDLES, tpm2.HR_LOADED_SESSION)
+        async_client.close()