import static com.google.ipc.invalidation.external.client.SystemResources.Scheduler.NO_DELAY;
-import com.google.common.base.Preconditions;
-import com.google.ipc.invalidation.common.CommonInvalidationConstants2;
-import com.google.ipc.invalidation.common.CommonProtoStrings2;
-import com.google.ipc.invalidation.common.CommonProtos2;
import com.google.ipc.invalidation.common.DigestFunction;
import com.google.ipc.invalidation.common.ObjectIdDigestUtils;
-import com.google.ipc.invalidation.common.TiclMessageValidator2;
import com.google.ipc.invalidation.external.client.InvalidationListener;
import com.google.ipc.invalidation.external.client.InvalidationListener.RegistrationState;
import com.google.ipc.invalidation.external.client.SystemResources;
import com.google.ipc.invalidation.ticl.Statistics.ClientErrorType;
import com.google.ipc.invalidation.ticl.Statistics.IncomingOperationType;
import com.google.ipc.invalidation.ticl.Statistics.ReceivedMessageType;
+import com.google.ipc.invalidation.ticl.proto.ChannelCommon.NetworkEndpointId;
+import com.google.ipc.invalidation.ticl.proto.Client.AckHandleP;
+import com.google.ipc.invalidation.ticl.proto.Client.ExponentialBackoffState;
+import com.google.ipc.invalidation.ticl.proto.Client.PersistentTiclState;
+import com.google.ipc.invalidation.ticl.proto.Client.RunStateP;
+import com.google.ipc.invalidation.ticl.proto.ClientConstants;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ApplicationClientIdP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientConfigP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ErrorMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InfoRequestMessage.InfoType;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InvalidationP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ObjectIdP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ProtocolHandlerConfigP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationStatus;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSubtree;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSummary;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.Version;
+import com.google.ipc.invalidation.ticl.proto.CommonProtos;
+import com.google.ipc.invalidation.ticl.proto.JavaClient.InvalidationClientState;
+import com.google.ipc.invalidation.ticl.proto.JavaClient.ProtocolHandlerState;
+import com.google.ipc.invalidation.ticl.proto.JavaClient.RecurringTaskState;
+import com.google.ipc.invalidation.ticl.proto.JavaClient.RegistrationManagerStateP;
+import com.google.ipc.invalidation.ticl.proto.JavaClient.StatisticsState;
import com.google.ipc.invalidation.util.Box;
import com.google.ipc.invalidation.util.Bytes;
import com.google.ipc.invalidation.util.InternalBase;
import com.google.ipc.invalidation.util.Marshallable;
+import com.google.ipc.invalidation.util.Preconditions;
+import com.google.ipc.invalidation.util.ProtoWrapper.ValidationException;
import com.google.ipc.invalidation.util.Smearer;
import com.google.ipc.invalidation.util.TextBuilder;
import com.google.ipc.invalidation.util.TypedUtil;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protos.ipc.invalidation.ChannelCommon.NetworkEndpointId;
-import com.google.protos.ipc.invalidation.Client.AckHandleP;
-import com.google.protos.ipc.invalidation.Client.ExponentialBackoffState;
-import com.google.protos.ipc.invalidation.Client.PersistentTiclState;
-import com.google.protos.ipc.invalidation.Client.RunStateP;
-import com.google.protos.ipc.invalidation.ClientProtocol.ApplicationClientIdP;
-import com.google.protos.ipc.invalidation.ClientProtocol.ClientConfigP;
-import com.google.protos.ipc.invalidation.ClientProtocol.ErrorMessage;
-import com.google.protos.ipc.invalidation.ClientProtocol.InfoRequestMessage.InfoType;
-import com.google.protos.ipc.invalidation.ClientProtocol.InvalidationP;
-import com.google.protos.ipc.invalidation.ClientProtocol.ObjectIdP;
-import com.google.protos.ipc.invalidation.ClientProtocol.RegistrationP;
-import com.google.protos.ipc.invalidation.ClientProtocol.RegistrationP.OpType;
-import com.google.protos.ipc.invalidation.ClientProtocol.RegistrationStatus;
-import com.google.protos.ipc.invalidation.ClientProtocol.RegistrationSubtree;
-import com.google.protos.ipc.invalidation.ClientProtocol.RegistrationSummary;
-import com.google.protos.ipc.invalidation.JavaClient.InvalidationClientState;
-import com.google.protos.ipc.invalidation.JavaClient.ProtocolHandlerState;
-import com.google.protos.ipc.invalidation.JavaClient.RecurringTaskState;
-import com.google.protos.ipc.invalidation.JavaClient.RegistrationManagerStateP;
-import com.google.protos.ipc.invalidation.JavaClient.StatisticsState;
import java.util.ArrayList;
import java.util.Collection;
private static final String TASK_NAME = "PersistentWrite";
/** The last client token that was written to to persistent state successfully. */
- private final Box<ProtoWrapper<PersistentTiclState>> lastWrittenState =
- Box.of(ProtoWrapper.of(PersistentTiclState.getDefaultInstance()));
+ private final Box<PersistentTiclState> lastWrittenState =
+ Box.of(PersistentTiclState.DEFAULT_INSTANCE);
PersistentWriteTask() {
super(TASK_NAME, NO_DELAY, config.getWriteRetryDelayMs(), true);
}
// Compute the state that we will write if we decide to go ahead with the write.
- final ProtoWrapper<PersistentTiclState> state =
- ProtoWrapper.of(CommonProtos2.newPersistentTiclState(clientToken, lastMessageSendTimeMs));
- byte[] serializedState = PersistenceUtils.serializeState(state.getProto(), digestFn);
+ final PersistentTiclState state =
+ PersistentTiclState.create(clientToken, lastMessageSendTimeMs);
+ byte[] serializedState = PersistenceUtils.serializeState(state, digestFn);
// Decide whether or not to do the write. The decision varies depending on whether or
// not the channel supports offline delivery. If we decide not to do the write, then
} else {
// If we do not support offline delivery, we avoid writing the state on each message, and
// we avoid checking the last-sent time (we check only the client token).
- if (state.getProto().getClientToken().equals(
- lastWrittenState.get().getProto().getClientToken())) {
+ if (TypedUtil.<Bytes>equals(
+ state.getClientToken(), lastWrittenState.get().getClientToken())) {
return false;
}
}
storage.writeKey(CLIENT_TOKEN_KEY, serializedState, new Callback<Status>() {
@Override
public void accept(Status status) {
- logger.info("Write state completed: %s for %s", status, state.getProto());
+ logger.info("Write state completed: %s for %s", status, state);
Preconditions.checkState(resources.getInternalScheduler().isRunningOnThread());
if (status.isSuccess()) {
// Set lastWrittenToken to be the token that was written (NOT clientToken - which
}
/** The task that is scheduled to send batched messages to the server (when needed). **/
-
static class BatchingTask extends RecurringTask {
/*
* This class is static and extends RecurringTask directly so that it can be instantiated
//
/** The single key used to write all the Ticl state. */
-
public static final String CLIENT_TOKEN_KEY = "ClientToken";
/** Resources for the Ticl. */
/** Object handling low-level wire format interactions. */
private final ProtocolHandler protocolHandler;
- /** Used to validate messages */
- private final TiclMessageValidator2 msgValidator;
-
/** The function for computing the registration and persistence state digests. */
private final DigestFunction digestFn = new ObjectIdDigestUtils.Sha1DigestFunction();
private final Smearer smearer;
/** Current client token known from the server. */
- private ByteString clientToken = null;
+ private Bytes clientToken = null;
// After the client starts, exactly one of nonce and clientToken is non-null.
/** If not {@code null}, nonce for pending identifier request. */
- private ByteString nonce = null;
+ private Bytes nonce = null;
/** Whether we should send registrations to the server or not. */
private boolean shouldSendRegistrations;
/** Task to do the first heartbeat after a persistent restart. */
private InitialPersistentHeartbeatTask initialPersistentHeartbeatTask;
+ /** A cache of already acked invalidations to avoid duplicate delivery. */
+ private final AckCache ackCache = new AckCache();
+
/**
* Constructs a client.
*
this.config = config;
this.ticlState = (ticlRunState == null) ? new RunState() : new RunState(ticlRunState);
this.smearer = new Smearer(random, this.config.getSmearPercent());
- this.applicationClientId =
- CommonProtos2.newApplicationClientIdP(clientType, ByteString.copyFrom(clientName));
+ this.applicationClientId = ApplicationClientIdP.create(clientType, new Bytes(clientName));
this.listener = listener;
- this.msgValidator = new TiclMessageValidator2(resources.getLogger());
- this.statistics = (statisticsState != null) ?
- Statistics.deserializeStatistics(resources.getLogger(), statisticsState.getCounterList()) :
- new Statistics();
+ this.statistics = (statisticsState != null)
+ ? Statistics.deserializeStatistics(resources.getLogger(), statisticsState.getCounter())
+ : new Statistics();
this.registrationManager = new RegistrationManager(logger, statistics, digestFn,
regManagerState);
this.protocolHandler = new ProtocolHandler(config.getProtocolHandlerConfig(), resources,
- smearer, statistics, clientType, applicationName, this, msgValidator, protocolHandlerState);
+ smearer, statistics, clientType, applicationName, this, protocolHandlerState);
}
/**
}
/** Returns a default config builder for the client. */
- public static ClientConfigP.Builder createConfig() {
- return ClientConfigP.newBuilder()
- .setVersion(CommonProtos2.newVersion(CommonInvalidationConstants2.CONFIG_MAJOR_VERSION,
- CommonInvalidationConstants2.CONFIG_MINOR_VERSION))
- .setProtocolHandlerConfig(ProtocolHandler.createConfig());
+ public static ClientConfigP createConfig() {
+ Version version =
+ Version.create(ClientConstants.CONFIG_MAJOR_VERSION, ClientConstants.CONFIG_MINOR_VERSION);
+ ProtocolHandlerConfigP protocolHandlerConfig = ProtocolHandler.createConfig();
+ ClientConfigP.Builder builder = new ClientConfigP.Builder(version, protocolHandlerConfig);
+ return builder.build();
}
/** Returns a configuration builder with parameters set for unit tests. */
- public static ClientConfigP.Builder createConfigForTest() {
- return ClientConfigP.newBuilder()
- .setVersion(CommonProtos2.newVersion(CommonInvalidationConstants2.CONFIG_MAJOR_VERSION,
- CommonInvalidationConstants2.CONFIG_MINOR_VERSION))
- .setProtocolHandlerConfig(ProtocolHandler.createConfigForTest())
- .setNetworkTimeoutDelayMs(2 * 1000)
- .setHeartbeatIntervalMs(5 * 1000)
- .setWriteRetryDelayMs(500);
+ public static ClientConfigP createConfigForTest() {
+ Version version =
+ Version.create(ClientConstants.CONFIG_MAJOR_VERSION, ClientConstants.CONFIG_MINOR_VERSION);
+ ProtocolHandlerConfigP protocolHandlerConfig = ProtocolHandler.createConfigForTest();
+ ClientConfigP.Builder builder = new ClientConfigP.Builder(version, protocolHandlerConfig);
+ builder.networkTimeoutDelayMs = 2 * 1000;
+ builder.heartbeatIntervalMs = 5 * 1000;
+ builder.writeRetryDelayMs = 500;
+ return builder.build();
}
/**
this.batchingTask = new BatchingTask(protocolHandler, resources, smearer,
marshalledState.getBatchingTaskState());
if (marshalledState.hasLastWrittenState()) {
- persistentWriteTask.lastWrittenState.set(
- ProtoWrapper.of(marshalledState.getLastWrittenState()));
+ persistentWriteTask.lastWrittenState.set(marshalledState.getLastWrittenState());
}
}
// The handling of new InitialPersistentHeartbeatTask is a little strange. We create one when
public RegistrationManagerState getRegistrationManagerStateCopyForTest() {
Preconditions.checkState(resources.getInternalScheduler().isRunningOnThread());
- return registrationManager.getRegistrationManagerStateCopyForTest(
- new ObjectIdDigestUtils.Sha1DigestFunction());
+ return registrationManager.getRegistrationManagerStateCopyForTest();
}
@Override
public void changeNetworkTimeoutDelayForTest(int networkTimeoutDelayMs) {
- config = ClientConfigP.newBuilder(config).setNetworkTimeoutDelayMs(networkTimeoutDelayMs)
- .build();
+ ClientConfigP.Builder builder = config.toBuilder();
+ builder.networkTimeoutDelayMs = networkTimeoutDelayMs;
+ config = builder.build();
createSchedulingTasks(null);
}
@Override
public void changeHeartbeatDelayForTest(int heartbeatDelayMs) {
- config = ClientConfigP.newBuilder(config).setHeartbeatIntervalMs(heartbeatDelayMs).build();
+ ClientConfigP.Builder builder = config.toBuilder();
+ builder.heartbeatIntervalMs = heartbeatDelayMs;
+ config = builder.build();
createSchedulingTasks(null);
}
@Override
- public ByteString getClientTokenForTest() {
+ public Bytes getClientTokenForTest() {
return getClientToken();
}
// id from the server.
statistics.recordError(ClientErrorType.PERSISTENT_DESERIALIZATION_FAILURE);
logger.severe("Failed deserializing persistent state: %s",
- CommonProtoStrings2.toLazyCompactString(serializedState));
+ Bytes.toLazyCompactString(serializedState));
}
if (persistentState != null) {
// If we have persistent state, use the previously-stored token and send a heartbeat to
// We'll ask the application for all of its registrations, but to avoid
// making the registrar redo the work of performing registrations that
// probably already exist, we'll suppress sending them to the registrar.
- logger.info("Restarting from persistent state: %s",
- CommonProtoStrings2.toLazyCompactString(persistentState.getClientToken()));
+ logger.info("Restarting from persistent state: %s", persistentState.getClientToken());
setNonce(null);
setClientToken(persistentState.getClientToken());
shouldSendRegistrations = false;
* @param regOpType whether to register or unregister
*/
private void performRegisterOperations(final Collection<ObjectId> objectIds,
- final RegistrationP.OpType regOpType) {
+ final int regOpType) {
Preconditions.checkState(!objectIds.isEmpty(), "Must specify some object id");
- Preconditions.checkNotNull(regOpType, "Must specify (un)registration");
Preconditions.checkState(internalScheduler.isRunningOnThread(),
"Not running on internal thread");
List<ObjectIdP> objectIdProtos = new ArrayList<ObjectIdP>(objectIds.size());
for (ObjectId objectId : objectIds) {
Preconditions.checkNotNull(objectId, "Must specify object id");
- ObjectIdP objectIdProto = ProtoConverter.convertToObjectIdProto(objectId);
+ ObjectIdP objectIdProto = ProtoWrapperConverter.convertToObjectIdProto(objectId);
IncomingOperationType opType = (regOpType == RegistrationP.OpType.REGISTER) ?
IncomingOperationType.REGISTRATION : IncomingOperationType.UNREGISTRATION;
statistics.recordIncomingOperation(opType);
- logger.info("Register %s, %s", CommonProtoStrings2.toLazyCompactString(objectIdProto),
- regOpType);
+ logger.info("Register %s, %s", objectIdProto, regOpType);
objectIdProtos.add(objectIdProto);
}
Preconditions.checkState(internalScheduler.isRunningOnThread(),
"Not running on internal thread");
- // 1. Parse the ack handle first.
+ // Parse and validate the ack handle first.
AckHandleP ackHandle;
try {
ackHandle = AckHandleP.parseFrom(acknowledgeHandle.getHandleData());
- } catch (InvalidProtocolBufferException exception) {
+ } catch (ValidationException exception) {
logger.warning("Bad ack handle : %s",
- CommonProtoStrings2.toLazyCompactString(acknowledgeHandle.getHandleData()));
+ Bytes.toLazyCompactString(acknowledgeHandle.getHandleData()));
statistics.recordError(ClientErrorType.ACKNOWLEDGE_HANDLE_FAILURE);
return;
}
- // 2. Validate ack handle - it should have a valid invalidation.
- if (!ackHandle.hasInvalidation() ||
- !msgValidator.isValid(ackHandle.getInvalidation())) {
- logger.warning("Incorrect ack handle data: %s", acknowledgeHandle);
+ // Currently, only invalidations have non-trivial ack handle.
+ InvalidationP invalidation = ackHandle.getNullableInvalidation();
+ if (invalidation == null) {
+ logger.warning("Ack handle without invalidation : %s",
+ Bytes.toLazyCompactString(acknowledgeHandle.getHandleData()));
statistics.recordError(ClientErrorType.ACKNOWLEDGE_HANDLE_FAILURE);
return;
}
- // Currently, only invalidations have non-trivial ack handle.
- InvalidationP invalidation = ackHandle.getInvalidation();
+ // Don't send the payload back.
if (invalidation.hasPayload()) {
- // Don't send the payload back.
- invalidation = invalidation.toBuilder().clearPayload().build();
+ InvalidationP.Builder builder = invalidation.toBuilder();
+ builder.payload = null;
+ invalidation = builder.build();
}
statistics.recordIncomingOperation(IncomingOperationType.ACKNOWLEDGE);
protocolHandler.sendInvalidationAck(invalidation, batchingTask);
+
+ // Record that the invalidation has been acknowledged to potentially avoid unnecessary delivery
+ // of earlier invalidations for the same object.
+ ackCache.recordAck(invalidation);
}
//
//
@Override
- public ByteString getClientToken() {
+ public Bytes getClientToken() {
Preconditions.checkState((clientToken == null) || (nonce == null));
return clientToken;
}
// Then, handle any work remaining in the message.
if (parsedMessage.invalidationMessage != null) {
statistics.recordReceivedMessage(ReceivedMessageType.INVALIDATION);
- handleInvalidations(parsedMessage.invalidationMessage.getInvalidationList());
+ handleInvalidations(parsedMessage.invalidationMessage.getInvalidation());
}
if (parsedMessage.registrationStatusMessage != null) {
statistics.recordReceivedMessage(ReceivedMessageType.REGISTRATION_STATUS);
- handleRegistrationStatus(parsedMessage.registrationStatusMessage.getRegistrationStatusList());
+ handleRegistrationStatus(parsedMessage.registrationStatusMessage.getRegistrationStatus());
}
if (parsedMessage.registrationSyncRequestMessage != null) {
statistics.recordReceivedMessage(ReceivedMessageType.REGISTRATION_SYNC_REQUEST);
}
if (parsedMessage.infoRequestMessage != null) {
statistics.recordReceivedMessage(ReceivedMessageType.INFO_REQUEST);
- handleInfoMessage(parsedMessage.infoRequestMessage.getInfoTypeList());
+ handleInfoMessage(parsedMessage.infoRequestMessage.getInfoType());
}
if (parsedMessage.errorMessage != null) {
statistics.recordReceivedMessage(ReceivedMessageType.ERROR);
* @param headerToken token in the server message
* @param newToken the new token provided, or {@code null} if this is a destroy message.
*/
- private void handleTokenChanged(ByteString headerToken, final ByteString newToken) {
+ private void handleTokenChanged(Bytes headerToken, final Bytes newToken) {
Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
// The server is either supplying a new token in response to an InitializeMessage, spontaneously
if (newToken != null) {
// Note: headerToken cannot be null, so a null nonce or clientToken will always be non-equal.
- boolean headerTokenMatchesNonce = TypedUtil.<ByteString>equals(headerToken, nonce);
- boolean headerTokenMatchesExistingToken =
- TypedUtil.<ByteString>equals(headerToken, clientToken);
+ boolean headerTokenMatchesNonce = TypedUtil.<Bytes>equals(headerToken, nonce);
+ boolean headerTokenMatchesExistingToken = TypedUtil.<Bytes>equals(headerToken, clientToken);
boolean shouldAcceptToken = headerTokenMatchesNonce || headerTokenMatchesExistingToken;
if (!shouldAcceptToken) {
logger.info("Ignoring new token; %s does not match nonce = %s or existing token = %s",
newToken, nonce, clientToken);
return;
}
- logger.info("New token being assigned at client: %s, Old = %s",
- CommonProtoStrings2.toLazyCompactString(newToken),
- CommonProtoStrings2.toLazyCompactString(clientToken));
+ logger.info("New token being assigned at client: %s, Old = %s", newToken, clientToken);
// Start the regular heartbeats now.
heartbeatTask.ensureScheduled("Heartbeat-after-new-token");
setClientToken(newToken);
persistentWriteTask.ensureScheduled("Write-after-new-token");
} else {
- logger.info("Destroying existing token: %s",
- CommonProtoStrings2.toLazyCompactString(clientToken));
+ logger.info("Destroying existing token: %s", clientToken);
acquireToken("Destroy");
}
}
/** Handles a server {@code header}. */
private void handleIncomingHeader(ServerMessageHeader header) {
Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
- Preconditions.checkState(nonce == null,
- "Cannot process server header with non-null nonce (have %s): %s", nonce, header);
+ if (nonce != null) {
+ throw new IllegalStateException(
+ "Cannot process server header with non-null nonce (have " + nonce + "): " + header);
+ }
if (header.registrationSummary != null) {
- // We've received a summary from the server, so if we were suppressing
- // registrations, we should now allow them to go to the registrar.
+ // We've received a summary from the server, so if we were suppressing registrations, we
+ // should now allow them to go to the registrar.
shouldSendRegistrations = true;
// Pass the registration summary to the registration manager. If we are now in agreement
// with the server and we had any pending operations, we can tell the listener that those
// operations have succeeded.
- Set<ProtoWrapper<RegistrationP>> upcalls =
+ Set<RegistrationP> upcalls =
registrationManager.informServerRegistrationSummary(header.registrationSummary);
- logger.fine("Receivced new server registration summary (%s); will make %s upcalls",
+ logger.fine("Received new server registration summary (%s); will make %s upcalls",
header.registrationSummary, upcalls.size());
- for (ProtoWrapper<RegistrationP> upcall : upcalls) {
- RegistrationP registration = upcall.getProto();
- ObjectId objectId = ProtoConverter.convertFromObjectIdProto(registration.getObjectId());
+ for (RegistrationP registration : upcalls) {
+ ObjectId objectId =
+ ProtoWrapperConverter.convertFromObjectIdProto(registration.getObjectId());
RegistrationState regState = convertOpTypeToRegState(registration.getOpType());
listener.informRegistrationStatus(this, objectId, regState);
}
Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
for (InvalidationP invalidation : invalidations) {
- AckHandle ackHandle = AckHandle.newInstance(
- CommonProtos2.newAckHandleP(invalidation).toByteArray());
- if (CommonProtos2.isAllObjectId(invalidation.getObjectId())) {
+ AckHandle ackHandle = AckHandle.newInstance(AckHandleP.create(invalidation).toByteArray());
+ if (ackCache.isAcked(invalidation)) {
+ // If the ack cache indicates that the client has already acked a restarted invalidation
+ // with an equal or greater version, then the TICL can simply acknowledge it immediately
+ // rather than delivering it to the listener.
+ logger.info("Stale invalidation {0}, not delivering", invalidation);
+ acknowledge(ackHandle);
+ statistics.recordReceivedMessage(ReceivedMessageType.STALE_INVALIDATION);
+ } else if (CommonProtos.isAllObjectId(invalidation.getObjectId())) {
logger.info("Issuing invalidate all");
listener.invalidateAll(InvalidationClientCore.this, ackHandle);
} else {
// Regular object. Could be unknown version or not.
- Invalidation inv = ProtoConverter.convertFromInvalidationProto(invalidation);
+ Invalidation inv = ProtoWrapperConverter.convertFromInvalidationProto(invalidation);
boolean isSuppressed = invalidation.getIsTrickleRestart();
logger.info("Issuing invalidate (known-version = %s, is-trickle-restart = %s): %s",
boolean wasSuccess = localProcessingStatuses.get(i);
logger.fine("Process reg status: %s", regStatus);
- ObjectId objectId = ProtoConverter.convertFromObjectIdProto(
+ ObjectId objectId = ProtoWrapperConverter.convertFromObjectIdProto(
regStatus.getRegistration().getObjectId());
if (wasSuccess) {
// Server operation was both successful and agreed with what the client wanted.
- OpType regOpType = regStatus.getRegistration().getOpType();
+ int regOpType = regStatus.getRegistration().getOpType();
InvalidationListener.RegistrationState regState = convertOpTypeToRegState(regOpType);
listener.informRegistrationStatus(InvalidationClientCore.this, objectId, regState);
} else {
// Server operation either failed or disagreed with client's intent (e.g., successful
// unregister, but the client wanted a registration).
- String description = CommonProtos2.isSuccess(regStatus.getStatus()) ?
- "Registration discrepancy detected" : regStatus.getStatus().getDescription();
- boolean isPermanent = CommonProtos2.isPermanentFailure(regStatus.getStatus());
+ String description = CommonProtos.isSuccess(regStatus.getStatus())
+ ? "Registration discrepancy detected" : regStatus.getStatus().getDescription();
+ boolean isPermanent = CommonProtos.isPermanentFailure(regStatus.getStatus());
listener.informRegistrationFailure(InvalidationClientCore.this, objectId, !isPermanent,
description);
}
}
/** Handles an info message request. */
- private void handleInfoMessage(Collection<InfoType> infoTypes) {
+ private void handleInfoMessage(Collection<Integer> infoTypes) {
Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
boolean mustSendPerformanceCounters = false;
- for (InfoType infoType : infoTypes) {
+ for (int infoType : infoTypes) {
mustSendPerformanceCounters = (infoType == InfoType.GET_PERFORMANCE_COUNTERS);
if (mustSendPerformanceCounters) {
break;
}
/** Handles an error message. */
- private void handleErrorMessage(ServerMessageHeader header,
- ErrorMessage.Code code, String description) {
+ private void handleErrorMessage(ServerMessageHeader header, int code, String description) {
Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
// If it is an auth failure, we shut down the ticl.
// Translate the code to error reason.
int reason;
switch (code) {
- case AUTH_FAILURE:
+ case ErrorMessage.Code.AUTH_FAILURE:
reason = ErrorInfo.ErrorReason.AUTH_FAILURE;
break;
- case UNKNOWN_FAILURE:
- reason = ErrorInfo.ErrorReason.UNKNOWN_FAILURE;
- break;
+ case ErrorMessage.Code.UNKNOWN_FAILURE:
default:
reason = ErrorInfo.ErrorReason.UNKNOWN_FAILURE;
break;
}
// If there are any registrations, remove them and issue registration failure.
- Collection<ProtoWrapper<ObjectIdP>> desiredRegistrations =
- registrationManager.removeRegisteredObjects();
+ Collection<ObjectIdP> desiredRegistrations = registrationManager.removeRegisteredObjects();
logger.warning("Issuing failure for %s objects", desiredRegistrations.size());
- for (ProtoWrapper<ObjectIdP> objectIdWrapper : desiredRegistrations) {
- ObjectIdP objectId = objectIdWrapper.getProto();
+ for (ObjectIdP objectId : desiredRegistrations) {
listener.informRegistrationFailure(this,
- ProtoConverter.convertFromObjectIdProto(objectId), false, "Auth error: " + description);
+ ProtoWrapperConverter.convertFromObjectIdProto(objectId), false,
+ "Auth error: " + description);
}
}
private boolean validateToken(ParsedMessage parsedMessage) {
if (clientToken != null) {
// Client token case.
- if (!TypedUtil.<ByteString>equals(clientToken, parsedMessage.header.token)) {
+ if (!TypedUtil.<Bytes>equals(clientToken, parsedMessage.header.token)) {
logger.info("Incoming message has bad token: server = %s, client = %s",
- CommonProtoStrings2.toLazyCompactString(parsedMessage.header.token),
- CommonProtoStrings2.toLazyCompactString(clientToken));
+ parsedMessage.header.token, clientToken);
statistics.recordError(ClientErrorType.TOKEN_MISMATCH);
return false;
}
return true;
} else if (nonce != null) {
// Nonce case.
- if (!TypedUtil.<ByteString>equals(nonce, parsedMessage.header.token)) {
+ if (!TypedUtil.<Bytes>equals(nonce, parsedMessage.header.token)) {
statistics.recordError(ClientErrorType.NONCE_MISMATCH);
logger.info("Rejecting server message with mismatched nonce: Client = %s, Server = %s",
- CommonProtoStrings2.toLazyCompactString(nonce),
- CommonProtoStrings2.toLazyCompactString(parsedMessage.header.token));
+ nonce, parsedMessage.header.token);
return false;
} else {
- logger.info("Accepting server message with matching nonce: %s",
- CommonProtoStrings2.toLazyCompactString(nonce));
+ logger.info("Accepting server message with matching nonce: %s", nonce);
return true;
}
}
* Converts an operation type {@code regOpType} to a
* {@code InvalidationListener.RegistrationState}.
*/
- private static InvalidationListener.RegistrationState convertOpTypeToRegState(
- RegistrationP.OpType regOpType) {
+ private static InvalidationListener.RegistrationState convertOpTypeToRegState(int regOpType) {
InvalidationListener.RegistrationState regState =
regOpType == RegistrationP.OpType.REGISTER ?
InvalidationListener.RegistrationState.REGISTERED :
* The goal is to ensure that a nonce is never set unless there is no
* client token, unless the nonce is being cleared.
*/
- private void setNonce(ByteString newNonce) {
- Preconditions.checkState((newNonce == null) || (clientToken == null),
- "Tried to set nonce with existing token %s", clientToken);
+ private void setNonce(Bytes newNonce) {
+ if ((newNonce != null) && (clientToken != null)) {
+ throw new IllegalStateException("Tried to set nonce with existing token " + clientToken);
+ }
this.nonce = newNonce;
}
* Returns a randomly generated nonce. Visible for testing only.
*/
- public static ByteString generateNonce(Random random) {
+ static Bytes generateNonce(Random random) {
// Generate 8 random bytes.
byte[] randomBytes = new byte[8];
random.nextBytes(randomBytes);
-
- // Return the bytes as a ByteString.
- return ByteString.copyFrom(randomBytes);
+ return new Bytes(randomBytes);
}
/**
* The goal is to ensure that a token is never set unless there is no
* nonce, unless the token is being cleared.
*/
- private void setClientToken(ByteString newClientToken) {
- Preconditions.checkState((newClientToken == null) || (nonce == null),
- "Tried to set token with existing nonce %s", nonce);
+ private void setClientToken(Bytes newClientToken) {
+ if ((newClientToken != null) && (nonce != null)) {
+ throw new IllegalStateException("Tried to set token with existing nonce " + nonce);
+ }
// If the ticl is in the process of being started and we are getting a new token (either from
// persistence or from the server, start the ticl and inform the application.
@Override
public void toCompactString(TextBuilder builder) {
- builder.appendFormat("Client: %s, %s, %s", applicationClientId,
- CommonProtoStrings2.toLazyCompactString(clientToken), ticlState);
+ builder.append("Client: ").append(applicationClientId).append(", ")
+ .append(clientToken).append(", ").append(ticlState);
}
@Override
public InvalidationClientState marshal() {
Preconditions.checkState(internalScheduler.isRunningOnThread(),
"Not running on internal thread");
- InvalidationClientState.Builder builder = InvalidationClientState.newBuilder();
- if (clientToken != null) {
- builder.setClientToken(clientToken);
- }
- builder.setLastMessageSendTimeMs(lastMessageSendTimeMs);
- if (nonce != null) {
- builder.setNonce(nonce);
- }
- builder.setProtocolHandlerState(protocolHandler.marshal())
- .setRegistrationManagerState(registrationManager.marshal())
- .setShouldSendRegistrations(shouldSendRegistrations)
- .setRunState(ticlState.marshal())
- .setIsOnline(isOnline)
- .setAcquireTokenTaskState(acquireTokenTask.marshal())
- .setPersistentWriteTaskState(persistentWriteTask.marshal())
- .setRegSyncHeartbeatTaskState(regSyncHeartbeatTask.marshal())
- .setHeartbeatTaskState(heartbeatTask.marshal())
- .setBatchingTaskState(batchingTask.marshal())
- .setStatisticsState(statistics.marshal());
- if (clientToken != null) {
- builder.setClientToken(clientToken);
- }
- if (persistentWriteTask.lastWrittenState.get() != null) {
- builder.setLastWrittenState(persistentWriteTask.lastWrittenState.get().getProto());
- }
+ InvalidationClientState.Builder builder = new InvalidationClientState.Builder();
+ builder.runState = ticlState.marshal();
+ builder.clientToken = clientToken;
+ builder.nonce = nonce;
+ builder.shouldSendRegistrations = shouldSendRegistrations;
+ builder.lastMessageSendTimeMs = lastMessageSendTimeMs;
+ builder.isOnline = isOnline;
+ builder.protocolHandlerState = protocolHandler.marshal();
+ builder.registrationManagerState = registrationManager.marshal();
+ builder.acquireTokenTaskState = acquireTokenTask.marshal();
+ builder.regSyncHeartbeatTaskState = regSyncHeartbeatTask.marshal();
+ builder.persistentWriteTaskState = persistentWriteTask.marshal();
+ builder.heartbeatTaskState = heartbeatTask.marshal();
+ builder.batchingTaskState = batchingTask.marshal();
+ builder.lastWrittenState = persistentWriteTask.lastWrittenState.get();
+ builder.statisticsState = statistics.marshal();
return builder.build();
}
}