import base64
import BaseHTTPServer
import cgi
+import glob
import google.protobuf.text_format
import hashlib
import logging
# device_management_backend.proto.
if (self.GetUniqueParam('devicetype') != '2' or
self.GetUniqueParam('apptype') != 'Chrome' or
- (request_type != 'ping' and
- len(self.GetUniqueParam('deviceid')) >= 64) or
+ len(self.GetUniqueParam('deviceid')) >= 64 or
len(self.GetUniqueParam('agent')) >= 64):
return (400, 'Invalid request parameter')
if request_type == 'register':
response = self.ProcessApiAuthorization(rmsg.service_api_access_request)
elif request_type == 'unregister':
response = self.ProcessUnregister(rmsg.unregister_request)
- elif request_type == 'policy' or request_type == 'ping':
+ elif request_type == 'policy':
response = self.ProcessPolicy(rmsg, request_type)
elif request_type == 'enterprise_check':
response = self.ProcessAutoEnrollment(rmsg.auto_enrollment_request)
Otherwise, the protobuf will be empty.
Args:
- policy_key: the policy type and settings entity id, joined by '/'.
+ policy_key: The policy type and settings entity id, joined by '/'.
Returns:
A serialized ExternalPolicyData.
response = dm.DeviceManagementResponse()
for request in msg.policy_request.request:
- fetch_response = response.policy_response.response.add()
if (request.policy_type in
('google/android/user',
- 'google/chrome/extension',
'google/chromeos/device',
'google/chromeos/publicaccount',
'google/chromeos/user',
'google/chrome/user',
'google/ios/user')):
- if request_type != 'policy':
- fetch_response.error_code = 400
- fetch_response.error_message = 'Invalid request type'
- else:
- self.ProcessCloudPolicy(request, token_info, fetch_response)
+ fetch_response = response.policy_response.response.add()
+ self.ProcessCloudPolicy(request, token_info, fetch_response)
+ elif request.policy_type == 'google/chrome/extension':
+ self.ProcessCloudPolicyForExtensions(
+ request, response.policy_response, token_info)
else:
fetch_response.error_code = 400
fetch_response.error_message = 'Invalid policy_type'
return (200, response)
def SetProtobufMessageField(self, group_message, field, field_value):
- '''Sets a field in a protobuf message.
+ """Sets a field in a protobuf message.
Args:
group_message: The protobuf message.
field: The field of the message to set, it should be a member of
group_message.DESCRIPTOR.fields.
field_value: The value to set.
- '''
+ """
if field.label == field.LABEL_REPEATED:
assert type(field_value) == list
entries = group_message.__getattribute__(field.name)
group_message.__setattr__(field.name, field_value)
def GatherDevicePolicySettings(self, settings, policies):
- '''Copies all the policies from a dictionary into a protobuf of type
+ """Copies all the policies from a dictionary into a protobuf of type
CloudDeviceSettingsProto.
Args:
settings: The destination ChromeDeviceSettingsProto protobuf.
policies: The source dictionary containing policies in JSON format.
- '''
+ """
for group in settings.DESCRIPTOR.fields:
# Create protobuf message for group.
group_message = eval('dp.' + group.message_type.name + '()')
settings.__getattribute__(group.name).CopyFrom(group_message)
def GatherUserPolicySettings(self, settings, policies):
- '''Copies all the policies from a dictionary into a protobuf of type
+ """Copies all the policies from a dictionary into a protobuf of type
CloudPolicySettings.
Args:
settings: The destination: a CloudPolicySettings protobuf.
policies: The source: a dictionary containing policies under keys
'recommended' and 'mandatory'.
- '''
+ """
for field in settings.DESCRIPTOR.fields:
# |field| is the entry for a specific policy in the top-level
# CloudPolicySettings proto.
self.SetProtobufMessageField(policy_message, field_descriptor, value)
settings.__getattribute__(field.name).CopyFrom(policy_message)
+ def ProcessCloudPolicyForExtensions(self, request, response, token_info):
+ """Handles a request for policy for extensions.
+
+ A request for policy for extensions is slightly different from the other
+ cloud policy requests, because it can trigger 0, one or many
+ PolicyFetchResponse messages in the response.
+
+ Args:
+ request: The PolicyFetchRequest that triggered this handler.
+ response: The DevicePolicyResponse message for the response. Multiple
+ PolicyFetchResponses will be appended to this message.
+ token_info: The token extracted from the request.
+ """
+ # Send one PolicyFetchResponse for each extension that has
+ # configuration data at the server.
+ ids = self.server.ListMatchingComponents('google/chrome/extension')
+ for settings_entity_id in ids:
+ # Reuse the extension policy request, to trigger the same signature
+ # type in the response.
+ request.settings_entity_id = settings_entity_id
+ fetch_response = response.response.add()
+ self.ProcessCloudPolicy(request, token_info, fetch_response)
+ # Don't do key rotations for these messages.
+ fetch_response.ClearField('new_public_key')
+ fetch_response.ClearField('new_public_key_signature')
+ fetch_response.ClearField('new_public_key_verification_signature')
+
def ProcessCloudPolicy(self, msg, token_info, response):
"""Handles a cloud policy request. (New protocol for policy requests.)
Args:
msg: The CloudPolicyRequest message received from the client.
- token_info: the token extracted from the request.
+ token_info: The token extracted from the request.
response: A PolicyFetchResponse message that should be filled with the
response data.
"""
response.policy_data = signed_data
if signing_key:
response.policy_data_signature = (
- signing_key['private_key'].hashAndSign(signed_data).tostring())
+ bytes(signing_key['private_key'].hashAndSign(signed_data)))
if msg.public_key_version != current_key_index + 1:
response.new_public_key = signing_key['public_key']
if req_key:
response.new_public_key_signature = (
- req_key.hashAndSign(response.new_public_key).tostring())
+ bytes(req_key.hashAndSign(response.new_public_key)))
return (200, response.SerializeToString())
try:
key = tlslite.api.parsePEMKey(key_str, private=True)
except SyntaxError:
- key = tlslite.utils.Python_RSAKey.Python_RSAKey._parsePKCS8(
- tlslite.utils.cryptomath.stringToBytes(key_str))
+ key = tlslite.utils.python_rsakey.Python_RSAKey._parsePKCS8(
+ bytearray(key_str))
assert key is not None
key_info = { 'private_key' : key }
# Use the canned private keys if none were passed from the command line.
for signing_key in SIGNING_KEYS:
decoded_key = base64.b64decode(signing_key['key']);
- key = tlslite.utils.Python_RSAKey.Python_RSAKey._parsePKCS8(
- tlslite.utils.cryptomath.stringToBytes(decoded_key))
+ key = tlslite.utils.python_rsakey.Python_RSAKey._parsePKCS8(
+ bytearray(decoded_key))
assert key is not None
# Grab the signature dictionary for this key and decode all of the
# signatures.
"""Returns the base filename for the given policy_selector.
Args:
- policy_selector: the policy type and settings entity id, joined by '/'.
+ policy_selector: The policy type and settings entity id, joined by '/'.
Returns:
The filename corresponding to the policy_selector, without a file
return os.path.join(self.data_dir or '',
'policy_%s' % sanitized_policy_selector)
+ def ListMatchingComponents(self, policy_type):
+ """Returns a list of settings entity IDs that have a configuration file.
+
+ Args:
+ policy_type: The policy type to look for. Only settings entity IDs for
+ file selectors That match this policy_type will be returned.
+
+ Returns:
+ A list of settings entity IDs for the given |policy_type| that have a
+ configuration file in this server (either as a .bin, .txt or .data file).
+ """
+ base_name = self.GetBaseFilename(policy_type)
+ files = glob.glob('%s_*.*' % base_name)
+ len_base_name = len(base_name) + 1
+ return [ file[len_base_name:file.rfind('.')] for file in files ]
+
def ReadPolicyFromDataDir(self, policy_selector, proto_message):
"""Tries to read policy payload from a file in the data directory.