package com.google.ipc.invalidation.ticl;
-import com.google.common.base.Preconditions;
-import com.google.ipc.invalidation.common.CommonInvalidationConstants2;
-import com.google.ipc.invalidation.common.CommonProtoStrings2;
-import com.google.ipc.invalidation.common.CommonProtos2;
-import com.google.ipc.invalidation.common.TiclMessageValidator2;
import com.google.ipc.invalidation.external.client.SystemResources;
import com.google.ipc.invalidation.external.client.SystemResources.Logger;
import com.google.ipc.invalidation.external.client.SystemResources.NetworkChannel;
import com.google.ipc.invalidation.ticl.Statistics.ClientErrorType;
import com.google.ipc.invalidation.ticl.Statistics.ReceivedMessageType;
import com.google.ipc.invalidation.ticl.Statistics.SentMessageType;
+import com.google.ipc.invalidation.ticl.proto.ClientConstants;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ApplicationClientIdP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientConfigP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientHeader;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientToServerMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientVersion;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ConfigChangeMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ErrorMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InfoMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InfoRequestMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InitializeMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InitializeMessage.DigestSerializationType;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InvalidationMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InvalidationP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ObjectIdP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.PropertyRecord;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ProtocolHandlerConfigP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RateLimitP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationP.OpType;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationStatusMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSubtree;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSummary;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSyncMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSyncRequestMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ServerHeader;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ServerToClientMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.TokenControlMessage;
+import com.google.ipc.invalidation.ticl.proto.CommonProtos;
+import com.google.ipc.invalidation.ticl.proto.JavaClient.BatcherState;
+import com.google.ipc.invalidation.ticl.proto.JavaClient.ProtocolHandlerState;
+import com.google.ipc.invalidation.util.Bytes;
import com.google.ipc.invalidation.util.InternalBase;
import com.google.ipc.invalidation.util.Marshallable;
+import com.google.ipc.invalidation.util.Preconditions;
+import com.google.ipc.invalidation.util.ProtoWrapper;
+import com.google.ipc.invalidation.util.ProtoWrapper.ValidationException;
import com.google.ipc.invalidation.util.Smearer;
import com.google.ipc.invalidation.util.TextBuilder;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protos.ipc.invalidation.ClientProtocol.ApplicationClientIdP;
-import com.google.protos.ipc.invalidation.ClientProtocol.ClientConfigP;
-import com.google.protos.ipc.invalidation.ClientProtocol.ClientHeader;
-import com.google.protos.ipc.invalidation.ClientProtocol.ClientToServerMessage;
-import com.google.protos.ipc.invalidation.ClientProtocol.ClientVersion;
-import com.google.protos.ipc.invalidation.ClientProtocol.ConfigChangeMessage;
-import com.google.protos.ipc.invalidation.ClientProtocol.ErrorMessage;
-import com.google.protos.ipc.invalidation.ClientProtocol.InfoMessage;
-import com.google.protos.ipc.invalidation.ClientProtocol.InfoRequestMessage;
-import com.google.protos.ipc.invalidation.ClientProtocol.InitializeMessage;
-import com.google.protos.ipc.invalidation.ClientProtocol.InitializeMessage.DigestSerializationType;
-import com.google.protos.ipc.invalidation.ClientProtocol.InvalidationMessage;
-import com.google.protos.ipc.invalidation.ClientProtocol.InvalidationP;
-import com.google.protos.ipc.invalidation.ClientProtocol.ObjectIdP;
-import com.google.protos.ipc.invalidation.ClientProtocol.PropertyRecord;
-import com.google.protos.ipc.invalidation.ClientProtocol.ProtocolHandlerConfigP;
-import com.google.protos.ipc.invalidation.ClientProtocol.RegistrationMessage;
-import com.google.protos.ipc.invalidation.ClientProtocol.RegistrationP;
-import com.google.protos.ipc.invalidation.ClientProtocol.RegistrationP.OpType;
-import com.google.protos.ipc.invalidation.ClientProtocol.RegistrationStatusMessage;
-import com.google.protos.ipc.invalidation.ClientProtocol.RegistrationSubtree;
-import com.google.protos.ipc.invalidation.ClientProtocol.RegistrationSummary;
-import com.google.protos.ipc.invalidation.ClientProtocol.RegistrationSyncMessage;
-import com.google.protos.ipc.invalidation.ClientProtocol.RegistrationSyncRequestMessage;
-import com.google.protos.ipc.invalidation.ClientProtocol.ServerHeader;
-import com.google.protos.ipc.invalidation.ClientProtocol.ServerToClientMessage;
-import com.google.protos.ipc.invalidation.ClientProtocol.TokenControlMessage;
-import com.google.protos.ipc.invalidation.JavaClient.BatcherState;
-import com.google.protos.ipc.invalidation.JavaClient.ProtocolHandlerState;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
private final SystemResources resources;
/** Set of pending registrations stored as a map for overriding later operations. */
- private final Map<ProtoWrapper<ObjectIdP>, RegistrationP.OpType> pendingRegistrations =
- new HashMap<ProtoWrapper<ObjectIdP>, RegistrationP.OpType>();
+ private final Map<ObjectIdP, Integer> pendingRegistrations = new HashMap<ObjectIdP, Integer>();
/** Set of pending invalidation acks. */
- private final Set<ProtoWrapper<InvalidationP>> pendingAckedInvalidations =
- new HashSet<ProtoWrapper<InvalidationP>>();
+ private final Set<InvalidationP> pendingAckedInvalidations = new HashSet<InvalidationP>();
/** Set of pending registration sub trees for registration sync. */
- private final Set<ProtoWrapper<RegistrationSubtree>> pendingRegSubtrees =
- new HashSet<ProtoWrapper<RegistrationSubtree>>();
+ private final Set<RegistrationSubtree> pendingRegSubtrees = new HashSet<RegistrationSubtree>();
/** Pending initialization message to send to the server, if any. */
private InitializeMessage pendingInitializeMessage = null;
/** Creates a batcher from {@code marshalledState}. */
Batcher(SystemResources resources, Statistics statistics, BatcherState marshalledState) {
this(resources, statistics);
- for (ObjectIdP registration : marshalledState.getRegistrationList()) {
- pendingRegistrations.put(ProtoWrapper.of(registration), RegistrationP.OpType.REGISTER);
+ for (ObjectIdP registration : marshalledState.getRegistration()) {
+ pendingRegistrations.put(registration, RegistrationP.OpType.REGISTER);
}
- for (ObjectIdP unregistration : marshalledState.getUnregistrationList()) {
- pendingRegistrations.put(ProtoWrapper.of(unregistration), RegistrationP.OpType.UNREGISTER);
+ for (ObjectIdP unregistration : marshalledState.getUnregistration()) {
+ pendingRegistrations.put(unregistration, RegistrationP.OpType.UNREGISTER);
}
- for (InvalidationP ack : marshalledState.getAcknowledgementList()) {
- pendingAckedInvalidations.add(ProtoWrapper.of(ack));
+ for (InvalidationP ack : marshalledState.getAcknowledgement()) {
+ pendingAckedInvalidations.add(ack);
}
- for (RegistrationSubtree subtree : marshalledState.getRegistrationSubtreeList()) {
- pendingRegSubtrees.add(ProtoWrapper.of(subtree));
- }
- if (marshalledState.hasInitializeMessage()) {
- pendingInitializeMessage = marshalledState.getInitializeMessage();
+ for (RegistrationSubtree subtree : marshalledState.getRegistrationSubtree()) {
+ pendingRegSubtrees.add(subtree);
}
+ pendingInitializeMessage = marshalledState.getNullableInitializeMessage();
if (marshalledState.hasInfoMessage()) {
pendingInfoMessage = marshalledState.getInfoMessage();
}
}
/** Adds a registration on {@code oid} of {@code opType} to the registrations to be sent. */
- void addRegistration(ObjectIdP oid, RegistrationP.OpType opType) {
- pendingRegistrations.put(ProtoWrapper.of(oid), opType);
+ void addRegistration(ObjectIdP oid, Integer opType) {
+ pendingRegistrations.put(oid, opType);
}
/** Adds {@code ack} to the set of acknowledgements to be sent. */
void addAck(InvalidationP ack) {
- pendingAckedInvalidations.add(ProtoWrapper.of(ack));
+ pendingAckedInvalidations.add(ack);
}
/** Adds {@code subtree} to the set of registration subtrees to be sent. */
void addRegSubtree(RegistrationSubtree subtree) {
- pendingRegSubtrees.add(ProtoWrapper.of(subtree));
+ pendingRegSubtrees.add(subtree);
}
/**
* the builder does <b>NOT</b> include the message header.
* @param hasClientToken whether the client currently holds a token
*/
- ClientToServerMessage.Builder toBuilder(boolean hasClientToken) {
- ClientToServerMessage.Builder builder = ClientToServerMessage.newBuilder();
+ ClientToServerMessage toMessage(final ClientHeader header, boolean hasClientToken) {
+ final InitializeMessage initializeMessage;
+ final RegistrationMessage registrationMessage;
+ final RegistrationSyncMessage registrationSyncMessage;
+ final InvalidationMessage invalidationAckMessage;
+ final InfoMessage infoMessage;
+
if (pendingInitializeMessage != null) {
statistics.recordSentMessage(SentMessageType.INITIALIZE);
- builder.setInitializeMessage(pendingInitializeMessage);
+ initializeMessage = pendingInitializeMessage;
pendingInitializeMessage = null;
+ } else {
+ initializeMessage = null;
}
// Note: Even if an initialize message is being sent, we can send additional
// messages such as regisration messages, etc to the server. But if there is no token
// and an initialize message is not being sent, we cannot send any other message.
- if (!hasClientToken && !builder.hasInitializeMessage()) {
+ if (!hasClientToken && (initializeMessage == null)) {
// Cannot send any message
resources.getLogger().warning(
- "Cannot send message since no token and no initialize msg: %s", builder);
+ "Cannot send message since no token and no initialize msg");
statistics.recordError(ClientErrorType.TOKEN_MISSING_FAILURE);
return null;
}
// Add reg, acks, reg subtrees - clear them after adding.
if (!pendingAckedInvalidations.isEmpty()) {
- builder.setInvalidationAckMessage(createInvalidationAckMessage());
+ invalidationAckMessage = createInvalidationAckMessage();
statistics.recordSentMessage(SentMessageType.INVALIDATION_ACK);
+ } else {
+ invalidationAckMessage = null;
}
// Check regs.
if (!pendingRegistrations.isEmpty()) {
- builder.setRegistrationMessage(createRegistrationMessage());
+ registrationMessage = createRegistrationMessage();
statistics.recordSentMessage(SentMessageType.REGISTRATION);
+ } else {
+ registrationMessage = null;
}
// Check reg substrees.
if (!pendingRegSubtrees.isEmpty()) {
- for (ProtoWrapper<RegistrationSubtree> subtree : pendingRegSubtrees) {
- builder.setRegistrationSyncMessage(RegistrationSyncMessage.newBuilder()
- .addSubtree(subtree.getProto()));
- }
+ // If there are multiple pending reg subtrees, only one is sent.
+ ArrayList<RegistrationSubtree> regSubtrees = new ArrayList<RegistrationSubtree>(1);
+ regSubtrees.add(pendingRegSubtrees.iterator().next());
+ registrationSyncMessage = RegistrationSyncMessage.create(regSubtrees);
pendingRegSubtrees.clear();
statistics.recordSentMessage(SentMessageType.REGISTRATION_SYNC);
+ } else {
+ registrationSyncMessage = null;
}
// Check if an info message has to be sent.
if (pendingInfoMessage != null) {
statistics.recordSentMessage(SentMessageType.INFO);
- builder.setInfoMessage(pendingInfoMessage);
+ infoMessage = pendingInfoMessage;
pendingInfoMessage = null;
+ } else {
+ infoMessage = null;
}
- return builder;
+
+ return ClientToServerMessage.create(header, initializeMessage, registrationMessage,
+ registrationSyncMessage, invalidationAckMessage, infoMessage);
}
/**
*/
private RegistrationMessage createRegistrationMessage() {
Preconditions.checkState(!pendingRegistrations.isEmpty());
- RegistrationMessage.Builder regMessage = RegistrationMessage.newBuilder();
// Run through the pendingRegistrations map.
- for (Map.Entry<ProtoWrapper<ObjectIdP>, RegistrationP.OpType> entry :
- pendingRegistrations.entrySet()) {
- RegistrationP reg = CommonProtos2.newRegistrationP(entry.getKey().getProto(),
- entry.getValue() == RegistrationP.OpType.REGISTER);
- regMessage.addRegistration(reg);
+ List<RegistrationP> pendingRegistrations =
+ new ArrayList<RegistrationP>(this.pendingRegistrations.size());
+ for (Map.Entry<ObjectIdP, Integer> entry : this.pendingRegistrations.entrySet()) {
+ pendingRegistrations.add(RegistrationP.create(entry.getKey(), entry.getValue()));
}
- pendingRegistrations.clear();
- return regMessage.build();
+ this.pendingRegistrations.clear();
+ return RegistrationMessage.create(pendingRegistrations);
}
/**
*/
private InvalidationMessage createInvalidationAckMessage() {
Preconditions.checkState(!pendingAckedInvalidations.isEmpty());
- InvalidationMessage.Builder ackMessage = InvalidationMessage.newBuilder();
- for (ProtoWrapper<InvalidationP> wrapper : pendingAckedInvalidations) {
- ackMessage.addInvalidation(wrapper.getProto());
- }
+ InvalidationMessage ackMessage =
+ InvalidationMessage.create(new ArrayList<InvalidationP>(pendingAckedInvalidations));
pendingAckedInvalidations.clear();
- return ackMessage.build();
+ return ackMessage;
}
@Override
public BatcherState marshal() {
- BatcherState.Builder builder = BatcherState.newBuilder();
-
// Marshall (un)registrations.
- for (Map.Entry<ProtoWrapper<ObjectIdP>, RegistrationP.OpType> entry :
- pendingRegistrations.entrySet()) {
- OpType opType = entry.getValue();
- ObjectIdP oid = entry.getKey().getProto();
+ ArrayList<ObjectIdP> registrations = new ArrayList<ObjectIdP>(pendingRegistrations.size());
+ ArrayList<ObjectIdP> unregistrations = new ArrayList<ObjectIdP>(pendingRegistrations.size());
+ for (Map.Entry<ObjectIdP, Integer> entry : pendingRegistrations.entrySet()) {
+ Integer opType = entry.getValue();
+ ObjectIdP oid = entry.getKey();
+ new ArrayList<ObjectIdP>(pendingRegistrations.size());
switch (opType) {
- case REGISTER:
- builder.addRegistration(oid);
+ case OpType.REGISTER:
+ registrations.add(oid);
break;
- case UNREGISTER:
- builder.addUnregistration(oid);
+ case OpType.UNREGISTER:
+ unregistrations.add(oid);
break;
default:
throw new IllegalArgumentException(opType.toString());
}
}
-
- // Marshall acks.
- for (ProtoWrapper<InvalidationP> ack : pendingAckedInvalidations) {
- builder.addAcknowledgement(ack.getProto());
- }
-
- // Marshall registration subtrees.
- for (ProtoWrapper<RegistrationSubtree> subtree : pendingRegSubtrees) {
- builder.addRegistrationSubtree(subtree.getProto());
- }
-
- // Marshall initialize and info messages if present.
- if (pendingInitializeMessage != null) {
- builder.setInitializeMessage(pendingInitializeMessage);
- }
- if (pendingInfoMessage != null) {
- builder.setInfoMessage(pendingInfoMessage);
- }
- return builder.build();
+ return BatcherState.create(registrations, unregistrations, pendingAckedInvalidations,
+ pendingRegSubtrees, pendingInitializeMessage, pendingInfoMessage);
}
}
* @param token server-sent token
* @param registrationSummary summary over server registration state
*/
- ServerMessageHeader(ByteString token, RegistrationSummary registrationSummary) {
+ ServerMessageHeader(Bytes token, RegistrationSummary registrationSummary) {
this.token = token;
this.registrationSummary = registrationSummary;
}
/** Server-sent token. */
- ByteString token;
+ Bytes token;
/** Summary of the client's registration state at the server. */
RegistrationSummary registrationSummary;
@Override
public void toCompactString(TextBuilder builder) {
- builder.appendFormat("Token: %s, Summary: %s", CommonProtoStrings2.toLazyCompactString(token),
- registrationSummary);
+ builder.appendFormat("Token: %s, Summary: %s", token, registrationSummary);
}
}
/**
* Representation of a message receiver for the server. Such a message is guaranteed to be
- * valid (i.e. checked by {@link TiclMessageValidator2}, but the session token is <b>not</b>
- * checked.
+ * valid, but the session token is <b>not</b> checked.
*/
static class ParsedMessage {
/*
* Each of these fields corresponds directly to a field in the ServerToClientMessage protobuf.
- * It is non-null iff the correspondig hasYYY method in the protobuf would return true.
+ * It is non-null iff the corresponding hasYYY method in the protobuf would return true.
*/
final ServerMessageHeader header;
final TokenControlMessage tokenControlMessage;
// For each field, assign it to the corresponding protobuf field if present, else null.
ServerHeader messageHeader = rawMessage.getHeader();
header = new ServerMessageHeader(messageHeader.getClientToken(),
- messageHeader.hasRegistrationSummary() ? messageHeader.getRegistrationSummary() : null);
- tokenControlMessage = rawMessage.hasTokenControlMessage() ?
- rawMessage.getTokenControlMessage() : null;
- invalidationMessage = rawMessage.hasInvalidationMessage() ?
- rawMessage.getInvalidationMessage() : null;
- registrationStatusMessage = rawMessage.hasRegistrationStatusMessage() ?
- rawMessage.getRegistrationStatusMessage() : null;
- registrationSyncRequestMessage = rawMessage.hasRegistrationSyncRequestMessage() ?
- rawMessage.getRegistrationSyncRequestMessage() : null;
- configChangeMessage = rawMessage.hasConfigChangeMessage() ?
- rawMessage.getConfigChangeMessage() : null;
- infoRequestMessage = rawMessage.hasInfoRequestMessage() ?
- rawMessage.getInfoRequestMessage() : null;
- errorMessage = rawMessage.hasErrorMessage() ? rawMessage.getErrorMessage() : null;
+ messageHeader.getNullableRegistrationSummary());
+ tokenControlMessage =
+ rawMessage.hasTokenControlMessage() ? rawMessage.getTokenControlMessage() : null;
+ invalidationMessage = rawMessage.getNullableInvalidationMessage();
+ registrationStatusMessage = rawMessage.getNullableRegistrationStatusMessage();
+ registrationSyncRequestMessage = rawMessage.hasRegistrationSyncRequestMessage()
+ ? rawMessage.getRegistrationSyncRequestMessage() : null;
+ configChangeMessage =
+ rawMessage.hasConfigChangeMessage() ? rawMessage.getConfigChangeMessage() : null;
+ infoRequestMessage = rawMessage.getNullableInfoRequestMessage();
+ errorMessage = rawMessage.getNullableErrorMessage();
}
}
RegistrationSummary getRegistrationSummary();
/** Returns the current server-assigned client token, if any. */
- ByteString getClientToken();
+ Bytes getClientToken();
}
/** Information about the client, e.g., application name, OS, etc. */
/** The protocol listener. */
private final ProtocolListener listener;
- /** Checks that messages (inbound and outbound) conform to basic validity constraints. */
- private final TiclMessageValidator2 msgValidator;
-
/** Batches messages to the server. */
private final Batcher batcher;
*/
ProtocolHandler(ProtocolHandlerConfigP config, final SystemResources resources,
Smearer smearer, Statistics statistics, int clientType, String applicationName,
- ProtocolListener listener, TiclMessageValidator2 msgValidator,
- ProtocolHandlerState marshalledState) {
+ ProtocolListener listener, ProtocolHandlerState marshalledState) {
this.logger = resources.getLogger();
this.statistics = statistics;
this.internalScheduler = resources.getInternalScheduler();
this.network = resources.getNetwork();
this.listener = listener;
- this.msgValidator = msgValidator;
- this.clientVersion = CommonProtos2.newClientVersion(resources.getPlatform(), "Java",
+ this.clientVersion = CommonProtos.newClientVersion(resources.getPlatform(), "Java",
applicationName);
this.clientType = clientType;
if (marshalledState == null) {
}
/** Returns a default config for the protocol handler. */
- static ProtocolHandlerConfigP.Builder createConfig() {
+ static ProtocolHandlerConfigP createConfig() {
// Allow at most 3 messages every 5 seconds.
int windowMs = 5 * 1000;
int numMessagesPerWindow = 3;
- return ProtocolHandlerConfigP.newBuilder()
- .addRateLimit(CommonProtos2.newRateLimitP(windowMs, numMessagesPerWindow));
+ List<RateLimitP> rateLimits = new ArrayList<RateLimitP>();
+ rateLimits.add(RateLimitP.create(windowMs, numMessagesPerWindow));
+ return ProtocolHandlerConfigP.create(null, rateLimits);
}
/** Returns a configuration object with parameters set for unit tests. */
- static ProtocolHandlerConfigP.Builder createConfigForTest() {
+ static ProtocolHandlerConfigP createConfigForTest() {
// No rate limits
int smallBatchDelayForTest = 200;
- return ProtocolHandlerConfigP.newBuilder().setBatchingDelayMs(smallBatchDelayForTest);
+ return ProtocolHandlerConfigP.create(smallBatchDelayForTest, new ArrayList<RateLimitP>(0));
}
/**
ServerToClientMessage message;
try {
message = ServerToClientMessage.parseFrom(incomingMessage);
- } catch (InvalidProtocolBufferException exception) {
- logger.warning("Incoming message is unparseable: %s",
- CommonProtoStrings2.toLazyCompactString(incomingMessage));
- return null;
- }
-
- // Validate the message. If this passes, we can blindly assume valid messages from here on.
- logger.fine("Incoming message: %s", message);
- if (!msgValidator.isValid(message)) {
+ } catch (ValidationException exception) {
statistics.recordError(ClientErrorType.INCOMING_MESSAGE_FAILURE);
- logger.severe("Received invalid message: %s", message);
+ logger.warning("Incoming message is invalid: %s", Bytes.toLazyCompactString(incomingMessage));
return null;
}
// Check the version of the message.
if (message.getHeader().getProtocolVersion().getVersion().getMajorVersion() !=
- CommonInvalidationConstants2.PROTOCOL_MAJOR_VERSION) {
+ ClientConstants.PROTOCOL_MAJOR_VERSION) {
statistics.recordError(ClientErrorType.PROTOCOL_VERSION_FAILURE);
logger.severe("Dropping message with incompatible version: %s", message);
return null;
* @param nonce nonce for the request
* @param debugString information to identify the caller
*/
- void sendInitializeMessage(ApplicationClientIdP applicationClientId, ByteString nonce,
+ void sendInitializeMessage(ApplicationClientIdP applicationClientId, Bytes nonce,
BatchingTask batchingTask, String debugString) {
Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
if (applicationClientId.getClientType() != clientType) {
}
// Simply store the message in pendingInitializeMessage and send it when the batching task runs.
- InitializeMessage initializeMsg = CommonProtos2.newInitializeMessage(clientType,
- applicationClientId, nonce, DigestSerializationType.BYTE_BASED);
+ InitializeMessage initializeMsg = InitializeMessage.create(clientType, nonce,
+ applicationClientId, DigestSerializationType.BYTE_BASED);
batcher.setInitializeMessage(initializeMsg);
logger.info("Batching initialize message for client: %s, %s", debugString, initializeMsg);
batchingTask.ensureScheduled(debugString);
ClientConfigP clientConfig, boolean requestServerRegistrationSummary,
BatchingTask batchingTask) {
Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
- InfoMessage.Builder infoMessage = InfoMessage.newBuilder()
- .setClientVersion(clientVersion);
- // Add configuration parameters.
- if (clientConfig != null) {
- infoMessage.setClientConfig(clientConfig);
+ List<PropertyRecord> performanceCounterRecords =
+ new ArrayList<PropertyRecord>(performanceCounters.size());
+ for (SimplePair<String, Integer> counter : performanceCounters) {
+ performanceCounterRecords.add(PropertyRecord.create(counter.first, counter.second));
}
-
- // Add performance counters.
- for (SimplePair<String, Integer> performanceCounter : performanceCounters) {
- PropertyRecord counter =
- CommonProtos2.newPropertyRecord(performanceCounter.first, performanceCounter.second);
- infoMessage.addPerformanceCounter(counter);
- }
-
- // Indicate whether we want the server's registration summary sent back.
- infoMessage.setServerRegistrationSummaryRequested(requestServerRegistrationSummary);
+ InfoMessage infoMessage = InfoMessage.create(clientVersion, /* configParameter */ null,
+ performanceCounterRecords, requestServerRegistrationSummary, clientConfig);
// Simply store the message in pendingInfoMessage and send it when the batching task runs.
- batcher.setInfoMessage(infoMessage.build());
+ batcher.setInfoMessage(infoMessage);
batchingTask.ensureScheduled("Send-info");
}
* @param objectIds object ids on which to (un)register
* @param regOpType whether to register or unregister
*/
- void sendRegistrations(Collection<ObjectIdP> objectIds, RegistrationP.OpType regOpType,
+ void sendRegistrations(Collection<ObjectIdP> objectIds, Integer regOpType,
BatchingTask batchingTask) {
Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
for (ObjectIdP objectId : objectIds) {
/** Sends an acknowledgement for {@code invalidation} to the server. */
void sendInvalidationAck(InvalidationP invalidation, BatchingTask batchingTask) {
Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
- // We could do squelching - we don't since it is unlikely to be too beneficial here.
+ // We could summarize acks when there are suppressing invalidations - we don't since it is
+ // unlikely to be too beneficial here.
logger.fine("Sending ack for invalidation %s", invalidation);
batcher.addAck(invalidation);
batchingTask.ensureScheduled("Send-Ack");
}
// Create the message from the batcher.
- ClientToServerMessage.Builder msgBuilder =
- batcher.toBuilder(listener.getClientToken() != null);
- if (msgBuilder == null) {
- // Happens when we don't have a token and are not sending an initialize message. Logged
- // in batcher.toBuilder().
- return;
- }
- msgBuilder.setHeader(createClientHeader());
- ++messageId;
-
- // Validate the message and send it.
- ClientToServerMessage message = msgBuilder.build();
- if (!msgValidator.isValid(message)) {
- logger.severe("Tried to send invalid message: %s", message);
+ ClientToServerMessage message;
+ try {
+ message = batcher.toMessage(createClientHeader(), listener.getClientToken() != null);
+ if (message == null) {
+ // Happens when we don't have a token and are not sending an initialize message. Logged
+ // in batcher.toMessage().
+ return;
+ }
+ } catch (ProtoWrapper.ValidationArgumentException exception) {
+ logger.severe("Tried to send invalid message: %s", batcher);
statistics.recordError(ClientErrorType.OUTGOING_MESSAGE_FAILURE);
return;
}
+ ++messageId;
statistics.recordSentMessage(SentMessageType.TOTAL);
- logger.fine("Sending message to server: %s",
- CommonProtoStrings2.toLazyCompactString(message, true));
+ logger.fine("Sending message to server: %s", message);
network.sendMessage(message.toByteArray());
// Record that the message was sent. We're invoking the listener directly, rather than
}
/** Returns the header to include on a message to the server. */
- private ClientHeader.Builder createClientHeader() {
+ private ClientHeader createClientHeader() {
Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
- ClientHeader.Builder builder = ClientHeader.newBuilder()
- .setProtocolVersion(CommonInvalidationConstants2.PROTOCOL_VERSION)
- .setClientTimeMs(internalScheduler.getCurrentTimeMs())
- .setMessageId(Integer.toString(messageId))
- .setMaxKnownServerTimeMs(lastKnownServerTimeMs)
- .setRegistrationSummary(listener.getRegistrationSummary())
- .setClientType(clientType);
- ByteString clientToken = listener.getClientToken();
- if (clientToken != null) {
- logger.fine("Sending token on client->server message: %s",
- CommonProtoStrings2.toLazyCompactString(clientToken));
- builder.setClientToken(clientToken);
- }
- return builder;
+ return ClientHeader.create(ClientConstants.PROTOCOL_VERSION,
+ listener.getClientToken(), listener.getRegistrationSummary(),
+ internalScheduler.getCurrentTimeMs(), lastKnownServerTimeMs, Integer.toString(messageId),
+ clientType);
}
@Override
public ProtocolHandlerState marshal() {
- ProtocolHandlerState.Builder builder = ProtocolHandlerState.newBuilder();
- builder.setLastKnownServerTimeMs(lastKnownServerTimeMs);
- builder.setMessageId(messageId);
- builder.setNextMessageSendTimeMs(nextMessageSendTimeMs);
- builder.setBatcherState(batcher.marshal());
- return builder.build();
+ return ProtocolHandlerState.create(messageId, lastKnownServerTimeMs, nextMessageSendTimeMs,
+ batcher.marshal());
}
}