- add sources.
[platform/framework/web/crosswalk.git] / src / chrome / test / pyautolib / policy_base.py
1 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 """Base class for tests that need to update the policies enforced by Chrome.
6
7 Subclasses can call SetUserPolicy (ChromeOS, Linux, Windows) and
8 SetDevicePolicy (ChromeOS only) with a dictionary of the policies to install.
9
10 The current implementation depends on the platform. The implementations might
11 change in the future, but tests relying on the above calls will keep working.
12 """
13
14 # On ChromeOS, a mock DMServer is started and enterprise enrollment faked
15 # against it. The mock DMServer then serves user and device policy to Chrome.
16 #
17 # For this setup to work, the DNS, GAIA and TPM (if present) are mocked as well:
18 #
19 # * The mock DNS resolves all addresses to 127.0.0.1. This allows the mock GAIA
20 #   to handle all login attempts. It also eliminates the impact of flaky network
21 #   connections on tests. Beware though that no cloud services can be accessed
22 #   due to this DNS redirect.
23 #
24 # * The mock GAIA permits login with arbitrary credentials and accepts any OAuth
25 #   tokens sent to it for verification as valid.
26 #
27 # * When running on a real device, its TPM is disabled. If the TPM were enabled,
28 #   enrollment could not be undone without a reboot. Disabling the TPM makes
29 #   cryptohomed behave as if no TPM was present, allowing enrollment to be
30 #   undone by removing the install attributes.
31 #
32 # To disable the TPM, 0 must be written to /sys/class/misc/tpm0/device/enabled.
33 # Since this file is not writeable, a tpmfs is mounted that shadows the file
34 # with a writeable copy.
35
36 import json
37 import logging
38 import os
39 import subprocess
40
41 import pyauto
42
43 if pyauto.PyUITest.IsChromeOS():
44   import sys
45   import warnings
46
47   import pyauto_paths
48
49   # Ignore deprecation warnings, they make our output more cluttered.
50   warnings.filterwarnings('ignore', category=DeprecationWarning)
51
52   # Find the path to the pyproto and add it to sys.path.
53   # Prepend it so that google.protobuf is loaded from here.
54   for path in pyauto_paths.GetBuildDirs():
55     p = os.path.join(path, 'pyproto')
56     if os.path.isdir(p):
57       sys.path = [p, os.path.join(p, 'chrome', 'browser', 'policy',
58                                   'proto')] + sys.path
59       break
60   sys.path.append('/usr/local')  # to import autotest libs.
61
62   import dbus
63   import device_management_backend_pb2 as dm
64   import pyauto_utils
65   import string
66   import tempfile
67   import urllib
68   import urllib2
69   import uuid
70   from autotest.cros import auth_server
71   from autotest.cros import constants
72   from autotest.cros import cros_ui
73   from autotest.cros import dns_server
74 elif pyauto.PyUITest.IsWin():
75   import _winreg as winreg
76 elif pyauto.PyUITest.IsMac():
77   import getpass
78   import plistlib
79
80 # ASN.1 object identifier for PKCS#1/RSA.
81 PKCS1_RSA_OID = '\x2a\x86\x48\x86\xf7\x0d\x01\x01\x01'
82
83 TPM_SYSFS_PATH = '/sys/class/misc/tpm0'
84 TPM_SYSFS_ENABLED_FILE = os.path.join(TPM_SYSFS_PATH, 'device/enabled')
85
86
87 class PolicyTestBase(pyauto.PyUITest):
88   """A base class for tests that need to set up and modify policies.
89
90   Subclasses can use the methods SetUserPolicy (ChromeOS, Linux, Windows) and
91   SetDevicePolicy (ChromeOS only) to set the policies seen by Chrome.
92   """
93
94   if pyauto.PyUITest.IsChromeOS():
95     # TODO(bartfab): Extend the C++ wrapper that starts the mock DMServer so
96     # that an owner can be passed in. Without this, the server will assume that
97     # the owner is user@example.com and for consistency, so must we.
98     owner = 'user@example.com'
99     # Subclasses may override these credentials to fake enrollment into another
100     # mode or use different device and machine IDs.
101     mode = 'enterprise'
102     device_id = string.upper(str(uuid.uuid4()))
103     machine_id = 'CROSTEST'
104
105     _auth_server = None
106     _dns_server = None
107
108   def ShouldAutoLogin(self):
109     return False
110
111   @staticmethod
112   def _Call(command, check=False):
113     """Invokes a subprocess and optionally asserts the return value is zero."""
114     with open(os.devnull, 'w') as devnull:
115       if check:
116         return subprocess.check_call(command.split(' '), stdout=devnull)
117       else:
118         return subprocess.call(command.split(' '), stdout=devnull)
119
120   def _WriteFile(self, path, content):
121     """Writes content to path, creating any intermediary directories."""
122     if not os.path.exists(os.path.dirname(path)):
123       os.makedirs(os.path.dirname(path))
124     f = open(path, 'w')
125     f.write(content)
126     f.close()
127
128   def _GetTestServerPoliciesFilePath(self):
129     """Returns the path of the cloud policy configuration file."""
130     assert self.IsChromeOS()
131     return os.path.join(self._temp_data_dir, 'device_management')
132
133   def _GetHttpURLForDeviceManagement(self):
134     """Returns the URL at which the TestServer is serving user policy."""
135     assert self.IsChromeOS()
136     return self._http_server.GetURL('device_management').spec()
137
138   def _RemoveIfExists(self, filename):
139     """Removes a file if it exists."""
140     if os.path.exists(filename):
141       os.remove(filename)
142
143   def _StartSessionManagerAndChrome(self):
144     """Starts the session manager and Chrome.
145
146     Requires that the session manager be stopped already.
147     """
148     # Ugly hack: session manager will not spawn Chrome if this file exists. That
149     # is usually a good thing (to keep the automation channel open), but in this
150     # case we really want to restart chrome. PyUITest.setUp() will be called
151     # after session manager and chrome have restarted, and will setup the
152     # automation channel.
153     restore_magic_file = False
154     if os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE):
155       logging.debug('DISABLE_BROWSER_RESTART_MAGIC_FILE found. '
156                     'Removing temporarily for the next restart.')
157       restore_magic_file = True
158       os.remove(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE)
159       assert not os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE)
160
161     logging.debug('Starting session manager again')
162     cros_ui.start()
163
164     # cros_ui.start() waits for the login prompt to be visible, so Chrome has
165     # already started once it returns.
166     if restore_magic_file:
167       open(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE, 'w').close()
168       assert os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE)
169
170   def _WritePolicyOnChromeOS(self):
171     """Updates the mock DMServer's input file with current policy."""
172     assert self.IsChromeOS()
173     policy_dict = {
174       'google/chromeos/device': self._device_policy,
175       'google/chromeos/user': {
176         'mandatory': self._user_policy,
177         'recommended': {},
178       },
179       'managed_users': ['*'],
180     }
181     self._WriteFile(self._GetTestServerPoliciesFilePath(),
182                     json.dumps(policy_dict, sort_keys=True, indent=2) + '\n')
183
184   @staticmethod
185   def _IsCryptohomedReadyOnChromeOS():
186     """Checks whether cryptohomed is running and ready to accept DBus calls."""
187     assert pyauto.PyUITest.IsChromeOS()
188     try:
189       bus = dbus.SystemBus()
190       proxy = bus.get_object('org.chromium.Cryptohome',
191                              '/org/chromium/Cryptohome')
192       dbus.Interface(proxy, 'org.chromium.CryptohomeInterface')
193     except dbus.DBusException:
194       return False
195     return True
196
197   def _ClearInstallAttributesOnChromeOS(self):
198     """Resets the install attributes."""
199     assert self.IsChromeOS()
200     self._RemoveIfExists('/home/.shadow/install_attributes.pb')
201     self._Call('restart cryptohomed', check=True)
202     assert self.WaitUntil(self._IsCryptohomedReadyOnChromeOS)
203
204   def _DMPostRequest(self, request_type, request, headers):
205     """Posts a request to the mock DMServer."""
206     assert self.IsChromeOS()
207     url = self._GetHttpURLForDeviceManagement()
208     url += '?' + urllib.urlencode({
209       'deviceid': self.device_id,
210       'oauth_token': 'dummy_oauth_token_that_is_not_checked_anyway',
211       'request': request_type,
212       'devicetype': 2,
213       'apptype': 'Chrome',
214       'agent': 'Chrome',
215     })
216     response = dm.DeviceManagementResponse()
217     response.ParseFromString(urllib2.urlopen(urllib2.Request(
218         url, request.SerializeToString(), headers)).read())
219     return response
220
221   def _DMRegisterDevice(self):
222     """Registers with the mock DMServer and returns the DMToken."""
223     assert self.IsChromeOS()
224     dm_request = dm.DeviceManagementRequest()
225     request = dm_request.register_request
226     request.type = dm.DeviceRegisterRequest.DEVICE
227     request.machine_id = self.machine_id
228     dm_response = self._DMPostRequest('register', dm_request, {})
229     return dm_response.register_response.device_management_token
230
231   def _DMFetchPolicy(self, dm_token):
232     """Fetches device policy from the mock DMServer."""
233     assert self.IsChromeOS()
234     dm_request = dm.DeviceManagementRequest()
235     policy_request = dm_request.policy_request
236     request = policy_request.request.add()
237     request.policy_type = 'google/chromeos/device'
238     request.signature_type = dm.PolicyFetchRequest.SHA1_RSA
239     headers = {'Authorization': 'GoogleDMToken token=' + dm_token}
240     dm_response = self._DMPostRequest('policy', dm_request, headers)
241     response = dm_response.policy_response.response[0]
242     assert response.policy_data
243     assert response.policy_data_signature
244     assert response.new_public_key
245     return response
246
247   def ExtraChromeFlags(self):
248     """Sets up Chrome to use cloud policies on ChromeOS."""
249     flags = pyauto.PyUITest.ExtraChromeFlags(self)
250     if self.IsChromeOS():
251       while '--skip-oauth-login' in flags:
252         flags.remove('--skip-oauth-login')
253       url = self._GetHttpURLForDeviceManagement()
254       flags.append('--device-management-url=' + url)
255       flags.append('--disable-sync')
256     return flags
257
258   def _SetUpWithSessionManagerStopped(self):
259     """Sets up the test environment after stopping the session manager."""
260     assert self.IsChromeOS()
261     logging.debug('Stopping session manager')
262     cros_ui.stop(allow_fail=True)
263
264     # Start mock GAIA server.
265     self._auth_server = auth_server.GoogleAuthServer()
266     self._auth_server.run()
267
268     # Disable TPM if present.
269     if os.path.exists(TPM_SYSFS_PATH):
270       self._Call('mount -t tmpfs -o size=1k tmpfs %s'
271           % os.path.realpath(TPM_SYSFS_PATH), check=True)
272       self._WriteFile(TPM_SYSFS_ENABLED_FILE, '0')
273
274     # Clear install attributes and restart cryptohomed to pick up the change.
275     self._ClearInstallAttributesOnChromeOS()
276
277     # Set install attributes to mock enterprise enrollment.
278     bus = dbus.SystemBus()
279     proxy = bus.get_object('org.chromium.Cryptohome',
280                             '/org/chromium/Cryptohome')
281     install_attributes = {
282       'enterprise.device_id': self.device_id,
283       'enterprise.domain': string.split(self.owner, '@')[-1],
284       'enterprise.mode': self.mode,
285       'enterprise.owned': 'true',
286       'enterprise.user': self.owner
287     }
288     interface = dbus.Interface(proxy, 'org.chromium.CryptohomeInterface')
289     for name, value in install_attributes.iteritems():
290       interface.InstallAttributesSet(name, '%s\0' % value)
291     interface.InstallAttributesFinalize()
292
293     # Start mock DNS server that redirects all traffic to 127.0.0.1.
294     self._dns_server = dns_server.LocalDns()
295     self._dns_server.run()
296
297     # Start mock DMServer.
298     source_dir = os.path.normpath(pyauto_paths.GetSourceDir())
299     self._temp_data_dir = tempfile.mkdtemp(dir=source_dir)
300     logging.debug('TestServer input path: %s' % self._temp_data_dir)
301     relative_temp_data_dir = os.path.basename(self._temp_data_dir)
302     self._http_server = self.StartHTTPServer(relative_temp_data_dir)
303
304     # Initialize the policy served.
305     self._device_policy = {}
306     self._user_policy = {}
307     self._WritePolicyOnChromeOS()
308
309     # Register with mock DMServer and retrieve initial device policy blob.
310     dm_token = self._DMRegisterDevice()
311     policy = self._DMFetchPolicy(dm_token)
312
313     # Write the initial device policy blob.
314     self._WriteFile(constants.OWNER_KEY_FILE, policy.new_public_key)
315     self._WriteFile(constants.SIGNED_POLICY_FILE, policy.SerializeToString())
316
317     # Remove any existing vaults.
318     self.RemoveAllCryptohomeVaultsOnChromeOS()
319
320     # Restart session manager and Chrome.
321     self._StartSessionManagerAndChrome()
322
323   def _tearDownWithSessionManagerStopped(self):
324     """Resets the test environment after stopping the session manager."""
325     assert self.IsChromeOS()
326     logging.debug('Stopping session manager')
327     cros_ui.stop(allow_fail=True)
328
329     # Stop mock GAIA server.
330     if self._auth_server:
331       self._auth_server.stop()
332
333     # Reenable TPM if present.
334     if os.path.exists(TPM_SYSFS_PATH):
335       self._Call('umount %s' % os.path.realpath(TPM_SYSFS_PATH))
336
337     # Clear install attributes and restart cryptohomed to pick up the change.
338     self._ClearInstallAttributesOnChromeOS()
339
340     # Stop mock DNS server.
341     if self._dns_server:
342       self._dns_server.stop()
343
344     # Stop mock DMServer.
345     self.StopHTTPServer(self._http_server)
346
347     # Clear the policy served.
348     pyauto_utils.RemovePath(self._temp_data_dir)
349
350     # Remove the device policy blob.
351     self._RemoveIfExists(constants.OWNER_KEY_FILE)
352     self._RemoveIfExists(constants.SIGNED_POLICY_FILE)
353
354     # Remove any existing vaults.
355     self.RemoveAllCryptohomeVaultsOnChromeOS()
356
357     # Restart session manager and Chrome.
358     self._StartSessionManagerAndChrome()
359
360   def setUp(self):
361     """Sets up the platform for policy testing.
362
363     On ChromeOS, part of the setup involves restarting the session manager to
364     inject an initial device policy blob.
365     """
366     if self.IsChromeOS():
367       # Perform the remainder of the setup with the device manager stopped.
368       try:
369         self.WaitForSessionManagerRestart(
370             self._SetUpWithSessionManagerStopped)
371       except:
372         # Destroy the non re-entrant services.
373         if self._auth_server:
374           self._auth_server.stop()
375         if self._dns_server:
376           self._dns_server.stop()
377         raise
378
379     pyauto.PyUITest.setUp(self)
380     self._branding = self.GetBrowserInfo()['properties']['branding']
381
382   def tearDown(self):
383     """Cleans up the policies and related files created in tests."""
384     if self.IsChromeOS():
385       # Perform the cleanup with the device manager stopped.
386       self.WaitForSessionManagerRestart(self._tearDownWithSessionManagerStopped)
387     else:
388       # On other platforms, there is only user policy to clear.
389       self.SetUserPolicy(refresh=False)
390
391     pyauto.PyUITest.tearDown(self)
392
393   def LoginWithTestAccount(self, account='prod_enterprise_test_user'):
394     """Convenience method for logging in with one of the test accounts."""
395     assert self.IsChromeOS()
396     credentials = self.GetPrivateInfo()[account]
397     self.Login(credentials['username'], credentials['password'])
398     assert self.GetLoginInfo()['is_logged_in']
399
400   def _GetCurrentLoginScreenId(self):
401     return self.ExecuteJavascriptInOOBEWebUI(
402         """window.domAutomationController.send(
403                String(cr.ui.Oobe.getInstance().currentScreen.id));
404         """)
405
406   def _WaitForLoginScreenId(self, id):
407     self.assertTrue(
408         self.WaitUntil(function=self._GetCurrentLoginScreenId,
409                        expect_retval=id),
410                        msg='Expected login screen "%s" to be visible.' % id)
411
412   def _CheckLoginFormLoading(self):
413     return self.ExecuteJavascriptInOOBEWebUI(
414         """window.domAutomationController.send(
415                cr.ui.Oobe.getInstance().currentScreen.loading);
416         """)
417
418   def PrepareToWaitForLoginFormReload(self):
419     self.assertEqual('gaia-signin',
420                      self._GetCurrentLoginScreenId(),
421                      msg='Expected the login form to be visible.')
422     self.assertTrue(
423         self.WaitUntil(function=self._CheckLoginFormLoading,
424                        expect_retval=False),
425                        msg='Expected the login form to finish loading.')
426     # Set up a sentinel variable that is false now and will toggle to true when
427     # the login form starts reloading.
428     self.ExecuteJavascriptInOOBEWebUI(
429         """var screen = cr.ui.Oobe.getInstance().currentScreen;
430            if (!('reload_started' in screen)) {
431              screen.orig_loadAuthExtension_ = screen.loadAuthExtension_;
432              screen.loadAuthExtension_ = function(data) {
433                this.orig_loadAuthExtension_(data);
434                if (this.loading)
435                  this.reload_started = true;
436              }
437            }
438            screen.reload_started = false;
439            window.domAutomationController.send(true);""")
440
441   def _CheckLoginFormReloaded(self):
442     return self.ExecuteJavascriptInOOBEWebUI(
443         """window.domAutomationController.send(
444                cr.ui.Oobe.getInstance().currentScreen.reload_started &&
445                !cr.ui.Oobe.getInstance().currentScreen.loading);
446         """)
447
448   def WaitForLoginFormReload(self):
449     self.assertEqual('gaia-signin',
450                      self._GetCurrentLoginScreenId(),
451                      msg='Expected the login form to be visible.')
452     self.assertTrue(
453         self.WaitUntil(function=self._CheckLoginFormReloaded),
454                        msg='Expected the login form to finish reloading.')
455
456   def _SetUserPolicyChromeOS(self, user_policy=None):
457     """Writes the given user policy to the mock DMServer's input file."""
458     self._user_policy = user_policy or {}
459     self._WritePolicyOnChromeOS()
460
461   def _SetUserPolicyWin(self, user_policy=None):
462     """Writes the given user policy to the Windows registry."""
463     def SetValueEx(key, sub_key, value):
464       if isinstance(value, int):
465         winreg.SetValueEx(key, sub_key, 0, winreg.REG_DWORD, int(value))
466       elif isinstance(value, basestring):
467         winreg.SetValueEx(key, sub_key, 0, winreg.REG_SZ, value.encode('ascii'))
468       elif isinstance(value, list):
469         k = winreg.CreateKey(key, sub_key)
470         for index, v in list(enumerate(value)):
471           SetValueEx(k, str(index + 1), v)
472         winreg.CloseKey(k)
473       else:
474         raise TypeError('Unsupported data type: "%s"' % value)
475
476     assert self.IsWin()
477     if self._branding == 'Google Chrome':
478       reg_base = r'SOFTWARE\Policies\Google\Chrome'
479     else:
480       reg_base = r'SOFTWARE\Policies\Chromium'
481
482     if subprocess.call(
483         r'reg query HKEY_LOCAL_MACHINE\%s' % reg_base) == 0:
484       logging.debug(r'Removing %s' % reg_base)
485       subprocess.call(r'reg delete HKLM\%s /f' % reg_base)
486
487     if user_policy is not None:
488       root_key = winreg.CreateKey(winreg.HKEY_LOCAL_MACHINE, reg_base)
489       for k, v in user_policy.iteritems():
490         SetValueEx(root_key, k, v)
491       winreg.CloseKey(root_key)
492
493   def _SetUserPolicyLinux(self, user_policy=None):
494     """Writes the given user policy to the JSON policy file read by Chrome."""
495     assert self.IsLinux()
496     sudo_cmd_file = os.path.join(os.path.dirname(__file__),
497                                  'policy_posix_util.py')
498
499     if self._branding == 'Google Chrome':
500       policies_location_base = '/etc/opt/chrome'
501     else:
502       policies_location_base = '/etc/chromium'
503
504     if os.path.exists(policies_location_base):
505       logging.debug('Removing directory %s' % policies_location_base)
506       subprocess.call(['suid-python', sudo_cmd_file,
507                        'remove_dir', policies_location_base])
508
509     if user_policy is not None:
510       self._WriteFile('/tmp/chrome.json',
511                       json.dumps(user_policy, sort_keys=True, indent=2) + '\n')
512
513       policies_location = '%s/policies/managed' % policies_location_base
514       subprocess.call(['suid-python', sudo_cmd_file,
515                        'setup_dir', policies_location])
516       subprocess.call(['suid-python', sudo_cmd_file,
517                        'perm_dir', policies_location])
518       # Copy chrome.json file to the managed directory
519       subprocess.call(['suid-python', sudo_cmd_file,
520                        'copy', '/tmp/chrome.json', policies_location])
521       os.remove('/tmp/chrome.json')
522
523   def _SetUserPolicyMac(self, user_policy=None):
524     """Writes the given user policy to the plist policy file read by Chrome."""
525     assert self.IsMac()
526     sudo_cmd_file = os.path.join(os.path.dirname(__file__),
527                                  'policy_posix_util.py')
528
529     if self._branding == 'Google Chrome':
530       policies_file_base = 'com.google.Chrome.plist'
531     else:
532       policies_file_base = 'org.chromium.Chromium.plist'
533
534     policies_location = os.path.join('/Library', 'Managed Preferences',
535                                      getpass.getuser())
536
537     if os.path.exists(policies_location):
538       logging.debug('Removing directory %s' % policies_location)
539       subprocess.call(['suid-python', sudo_cmd_file,
540                        'remove_dir', policies_location])
541
542     if user_policy is not None:
543       policies_tmp_file = os.path.join('/tmp', policies_file_base)
544       plistlib.writePlist(user_policy, policies_tmp_file)
545       subprocess.call(['suid-python', sudo_cmd_file,
546                        'setup_dir', policies_location])
547       # Copy policy file to the managed directory
548       subprocess.call(['suid-python', sudo_cmd_file,
549                        'copy', policies_tmp_file, policies_location])
550       os.remove(policies_tmp_file)
551
552   def SetUserPolicy(self, user_policy=None, refresh=True):
553     """Sets the user policy provided as a dict.
554
555     Args:
556       user_policy: The user policy to set. None clears it.
557       refresh: If True, Chrome will refresh and apply the new policy.
558                Requires Chrome to be alive for it.
559     """
560     if self.IsChromeOS():
561       self._SetUserPolicyChromeOS(user_policy=user_policy)
562     elif self.IsWin():
563       self._SetUserPolicyWin(user_policy=user_policy)
564     elif self.IsLinux():
565       self._SetUserPolicyLinux(user_policy=user_policy)
566     elif self.IsMac():
567       self._SetUserPolicyMac(user_policy=user_policy)
568     else:
569       raise NotImplementedError('Not available on this platform.')
570
571     if refresh:
572       self.RefreshPolicies()
573
574   def SetDevicePolicy(self, device_policy=None, refresh=True):
575     """Sets the device policy provided as a dict.
576
577     Args:
578       device_policy: The device policy to set. None clears it.
579       refresh: If True, Chrome will refresh and apply the new policy.
580                Requires Chrome to be alive for it.
581     """
582     assert self.IsChromeOS()
583     self._device_policy = device_policy or {}
584     self._WritePolicyOnChromeOS()
585     if refresh:
586       self.RefreshPolicies()