3 * Copyright (c) 2020-2021 Project CHIP Authors
4 * Copyright (c) 2013-2017 Nest Labs, Inc.
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
22 * This file implements the CHIP Transport object that maintains TCP connections
23 * to peers. Handles both establishing new connections and accepting peer connection
26 #include <transport/raw/TCP.h>
28 #include <core/CHIPEncoding.h>
29 #include <support/CodeUtils.h>
30 #include <support/logging/CHIPLogging.h>
31 #include <transport/raw/MessageHeader.h>
39 using namespace chip::Encoding;
41 // Packets start with a 16-bit size
42 constexpr size_t kPacketSizeBytes = 2;
44 // TODO: Actual limit may be lower (spec issue #2119)
45 constexpr uint16_t kMaxMessageSize = static_cast<uint16_t>(System::PacketBuffer::kMaxSizeWithoutReserve - kPacketSizeBytes);
47 constexpr int kListenBacklogSize = 2;
53 if (mListenSocket != nullptr)
55 // endpoint is only non null if it is initialized and listening
56 mListenSocket->Free();
57 mListenSocket = nullptr;
60 CloseActiveConnections();
62 for (size_t i = 0; i < mPendingPacketsSize; i++)
64 mPendingPackets[i].packetBuffer = nullptr;
68 void TCPBase::CloseActiveConnections()
70 for (size_t i = 0; i < mActiveConnectionsSize; i++)
72 if (mActiveConnections[i].InUse())
74 mActiveConnections[i].Free();
80 CHIP_ERROR TCPBase::Init(TcpListenParameters & params)
82 CHIP_ERROR err = CHIP_NO_ERROR;
84 VerifyOrExit(mState == State::kNotReady, err = CHIP_ERROR_INCORRECT_STATE);
86 #if INET_CONFIG_ENABLE_TCP_ENDPOINT
87 err = params.GetInetLayer()->NewTCPEndPoint(&mListenSocket);
89 err = CHIP_SYSTEM_ERROR_NOT_SUPPORTED;
93 err = mListenSocket->Bind(params.GetAddressType(), Inet::IPAddress::Any, params.GetListenPort(), params.GetInterfaceId());
96 err = mListenSocket->Listen(kListenBacklogSize);
99 mListenSocket->AppState = reinterpret_cast<void *>(this);
100 mListenSocket->OnDataReceived = OnTcpReceive;
101 mListenSocket->OnConnectComplete = OnConnectionComplete;
102 mListenSocket->OnConnectionClosed = OnConnectionClosed;
103 mListenSocket->OnConnectionReceived = OnConnectionReceived;
104 mListenSocket->OnAcceptError = OnAcceptError;
105 mEndpointType = params.GetAddressType();
107 mState = State::kInitialized;
110 if (err != CHIP_NO_ERROR)
112 ChipLogError(Inet, "Failed to initialize TCP transport: %s", ErrorStr(err));
115 mListenSocket->Free();
116 mListenSocket = nullptr;
123 TCPBase::ActiveConnectionState * TCPBase::FindActiveConnection(const PeerAddress & address)
125 if (address.GetTransportType() != Type::kTcp)
130 for (size_t i = 0; i < mActiveConnectionsSize; i++)
132 if (!mActiveConnections[i].InUse())
136 Inet::IPAddress addr;
138 mActiveConnections[i].mEndPoint->GetPeerInfo(&addr, &port);
140 if ((addr == address.GetIPAddress()) && (port == address.GetPort()))
142 return &mActiveConnections[i];
149 TCPBase::ActiveConnectionState * TCPBase::FindActiveConnection(const Inet::TCPEndPoint * endPoint)
151 for (size_t i = 0; i < mActiveConnectionsSize; i++)
153 if (mActiveConnections[i].mEndPoint == endPoint)
155 return &mActiveConnections[i];
161 CHIP_ERROR TCPBase::SendMessage(const PacketHeader & header, const Transport::PeerAddress & address,
162 System::PacketBufferHandle msgBuf)
164 // Sent buffer data format is:
165 // - packet size as a uint16_t (inludes size of header and actual data)
168 const size_t prefixSize = header.EncodeSizeBytes() + kPacketSizeBytes;
170 VerifyOrReturnError(address.GetTransportType() == Type::kTcp, CHIP_ERROR_INVALID_ARGUMENT);
171 VerifyOrReturnError(mState == State::kInitialized, CHIP_ERROR_INCORRECT_STATE);
172 VerifyOrReturnError(prefixSize + msgBuf->DataLength() <= std::numeric_limits<uint16_t>::max(), CHIP_ERROR_INVALID_ARGUMENT);
174 // The check above about prefixSize + msgBuf->DataLength() means prefixSize
175 // definitely fits in uint16_t.
176 VerifyOrReturnError(msgBuf->EnsureReservedSize(static_cast<uint16_t>(prefixSize)), CHIP_ERROR_NO_MEMORY);
178 msgBuf->SetStart(msgBuf->Start() - prefixSize);
180 // Length is actual data, without considering the length bytes themselves
181 VerifyOrReturnError(msgBuf->DataLength() >= kPacketSizeBytes, CHIP_ERROR_INTERNAL);
183 uint8_t * output = msgBuf->Start();
184 LittleEndian::Write16(output, static_cast<uint16_t>(msgBuf->DataLength() - kPacketSizeBytes));
186 uint16_t actualEncodedHeaderSize;
187 ReturnErrorOnFailure(header.Encode(output, msgBuf->DataLength(), &actualEncodedHeaderSize));
189 // header encoding has to match space that we allocated
190 VerifyOrReturnError(prefixSize == actualEncodedHeaderSize + kPacketSizeBytes, CHIP_ERROR_INTERNAL);
192 // Reuse existing connection if one exists, otherwise a new one
193 // will be established
194 ActiveConnectionState * connection = FindActiveConnection(address);
196 if (connection != nullptr)
198 return connection->mEndPoint->Send(std::move(msgBuf));
202 return SendAfterConnect(address, std::move(msgBuf));
206 CHIP_ERROR TCPBase::SendAfterConnect(const PeerAddress & addr, System::PacketBufferHandle msg)
208 // This will initiate a connection to the specified peer
209 CHIP_ERROR err = CHIP_NO_ERROR;
210 PendingPacket * packet = nullptr;
211 bool alreadyConnecting = false;
212 Inet::TCPEndPoint * endPoint = nullptr;
214 // Iterate through the ENTIRE array. If a pending packet for
215 // the address already exists, this means a connection is pending and
216 // does NOT need to be re-established.
217 for (size_t i = 0; i < mPendingPacketsSize; i++)
219 if (mPendingPackets[i].packetBuffer.IsNull())
221 if (packet == nullptr)
223 // found a slot to store the packet into
224 packet = mPendingPackets + i;
227 else if (mPendingPackets[i].peerAddress == addr)
229 // same destination exists.
230 alreadyConnecting = true;
232 // ensure packets are ORDERED
233 if (packet != nullptr)
235 packet->peerAddress = addr;
236 packet->packetBuffer = std::move(mPendingPackets[i].packetBuffer);
237 packet = mPendingPackets + i;
242 VerifyOrExit(packet != nullptr, err = CHIP_ERROR_NO_MEMORY);
244 // If already connecting, buffer was just enqueued for more sending
245 VerifyOrExit(!alreadyConnecting, err = CHIP_NO_ERROR);
247 // Ensures sufficient active connections size exist
248 VerifyOrExit(mUsedEndPointCount < mActiveConnectionsSize, err = CHIP_ERROR_NO_MEMORY);
250 #if INET_CONFIG_ENABLE_TCP_ENDPOINT
251 err = mListenSocket->Layer().NewTCPEndPoint(&endPoint);
253 err = CHIP_SYSTEM_ERROR_NOT_SUPPORTED;
257 endPoint->AppState = reinterpret_cast<void *>(this);
258 endPoint->OnDataReceived = OnTcpReceive;
259 endPoint->OnConnectComplete = OnConnectionComplete;
260 endPoint->OnConnectionClosed = OnConnectionClosed;
261 endPoint->OnConnectionReceived = OnConnectionReceived;
262 endPoint->OnAcceptError = OnAcceptError;
263 endPoint->OnPeerClose = OnPeerClosed;
265 err = endPoint->Connect(addr.GetIPAddress(), addr.GetPort(), addr.GetInterface());
268 // enqueue the packet once the connection succeeds
269 packet->peerAddress = addr;
270 packet->packetBuffer = std::move(msg);
271 mUsedEndPointCount++;
274 if (err != CHIP_NO_ERROR)
276 if (endPoint != nullptr)
284 CHIP_ERROR TCPBase::ProcessReceivedBuffer(Inet::TCPEndPoint * endPoint, const PeerAddress & peerAddress,
285 System::PacketBufferHandle buffer)
287 ActiveConnectionState * state = FindActiveConnection(endPoint);
288 VerifyOrReturnError(state != nullptr, CHIP_ERROR_INTERNAL);
289 state->mReceived.AddToEnd(std::move(buffer));
291 while (!state->mReceived.IsNull())
293 uint8_t messageSizeBuf[kPacketSizeBytes];
294 CHIP_ERROR err = state->mReceived->Read(messageSizeBuf);
295 if (err == CHIP_ERROR_BUFFER_TOO_SMALL)
297 // We don't have enough data to read the message size. Wait until there's more.
298 return CHIP_NO_ERROR;
300 else if (err != CHIP_NO_ERROR)
304 uint16_t messageSize = LittleEndian::Get16(messageSizeBuf);
305 if (messageSize >= kMaxMessageSize)
307 // This message is too long for upper layers.
308 return CHIP_ERROR_MESSAGE_TOO_LONG;
310 // The subtraction will not underflow because we successfully read kPacketSizeBytes.
311 if (messageSize > (state->mReceived->TotalLength() - kPacketSizeBytes))
313 // We have not yet received the complete message.
314 return CHIP_NO_ERROR;
316 state->mReceived.Consume(kPacketSizeBytes);
317 ReturnErrorOnFailure(ProcessSingleMessage(peerAddress, state, messageSize));
320 return CHIP_NO_ERROR;
323 CHIP_ERROR TCPBase::ProcessSingleMessage(const PeerAddress & peerAddress, ActiveConnectionState * state, uint16_t messageSize)
325 // We enter with `state->mReceived` containing at least one full message, perhaps in a chain.
326 // `state->mReceived->Start()` currently points to the message data.
327 // On exit, `state->mReceived` will have had `messageSize` bytes consumed, no matter what.
328 System::PacketBufferHandle message;
329 if (state->mReceived->DataLength() == messageSize)
331 // In this case, the head packet buffer contains exactly the message.
332 // This is common because typical messages fit in a network packet, and are delivered as such.
333 // Peel off the head to pass upstream, which effectively consumes it from `state->mReceived`.
334 message = state->mReceived.PopHead();
338 // The message is either longer or shorter than the head buffer.
339 // In either case, copy the message to a fresh linear buffer to pass upstream. We always copy, rather than provide
340 // a shared reference to the current buffer, in case upper layers manipulate the buffer in ways that would affect
341 // our use, e.g. chaining it elsewhere or reusing space beyond the current message.
342 message = System::PacketBufferHandle::New(messageSize, 0);
343 if (message.IsNull())
345 return CHIP_ERROR_NO_MEMORY;
347 CHIP_ERROR err = state->mReceived->Read(message->Start(), messageSize);
348 state->mReceived.Consume(messageSize);
349 ReturnErrorOnFailure(err);
350 message->SetDataLength(messageSize);
354 ReturnErrorOnFailure(header.DecodeAndConsume(message));
356 HandleMessageReceived(header, peerAddress, std::move(message));
357 return CHIP_NO_ERROR;
360 INET_ERROR TCPBase::OnTcpReceive(Inet::TCPEndPoint * endPoint, System::PacketBufferHandle buffer)
362 Inet::IPAddress ipAddress;
365 endPoint->GetPeerInfo(&ipAddress, &port);
366 PeerAddress peerAddress = PeerAddress::TCP(ipAddress, port);
368 TCPBase * tcp = reinterpret_cast<TCPBase *>(endPoint->AppState);
369 CHIP_ERROR err = tcp->ProcessReceivedBuffer(endPoint, peerAddress, std::move(buffer));
371 if (err != CHIP_NO_ERROR)
373 // Connection could need to be closed at this point
374 ChipLogError(Inet, "Failed to accept received TCP message: %s", ErrorStr(err));
375 return INET_ERROR_UNEXPECTED_EVENT;
377 return INET_NO_ERROR;
380 void TCPBase::OnConnectionComplete(Inet::TCPEndPoint * endPoint, INET_ERROR inetErr)
382 CHIP_ERROR err = CHIP_NO_ERROR;
383 bool foundPendingPacket = false;
384 TCPBase * tcp = reinterpret_cast<TCPBase *>(endPoint->AppState);
385 Inet::IPAddress ipAddress;
388 endPoint->GetPeerInfo(&ipAddress, &port);
389 PeerAddress addr = PeerAddress::TCP(ipAddress, port);
391 // Send any pending packets
392 for (size_t i = 0; i < tcp->mPendingPacketsSize; i++)
394 if ((tcp->mPendingPackets[i].peerAddress != addr) || (tcp->mPendingPackets[i].packetBuffer.IsNull()))
398 foundPendingPacket = true;
400 System::PacketBufferHandle buffer = std::move(tcp->mPendingPackets[i].packetBuffer);
401 tcp->mPendingPackets[i].peerAddress = PeerAddress::Uninitialized();
403 if ((inetErr == CHIP_NO_ERROR) && (err == CHIP_NO_ERROR))
405 err = endPoint->Send(std::move(buffer));
409 if (err == CHIP_NO_ERROR)
414 if (!foundPendingPacket && (err == CHIP_NO_ERROR))
416 // Force a close: new connections are only expected when a
417 // new buffer is being sent.
418 ChipLogError(Inet, "Connection accepted without pending buffers");
419 err = CHIP_ERROR_CONNECTION_CLOSED_UNEXPECTEDLY;
422 // cleanup packets or mark as free
423 if (err != CHIP_NO_ERROR)
425 ChipLogError(Inet, "Connection complete encountered an error: %s", ErrorStr(err));
427 tcp->mUsedEndPointCount--;
431 bool connectionStored = false;
432 for (size_t i = 0; i < tcp->mActiveConnectionsSize; i++)
434 if (!tcp->mActiveConnections[i].InUse())
436 tcp->mActiveConnections[i].Init(endPoint);
437 connectionStored = true;
442 // since we track end points counts, we always expect to store the
444 if (!connectionStored)
447 ChipLogError(Inet, "Internal logic error: insufficient space to store active connection");
452 void TCPBase::OnConnectionClosed(Inet::TCPEndPoint * endPoint, INET_ERROR err)
454 TCPBase * tcp = reinterpret_cast<TCPBase *>(endPoint->AppState);
456 ChipLogProgress(Inet, "Connection closed.");
458 for (size_t i = 0; i < tcp->mActiveConnectionsSize; i++)
460 if (tcp->mActiveConnections[i].mEndPoint == endPoint)
462 ChipLogProgress(Inet, "Freeing closed connection.");
463 tcp->mActiveConnections[i].Free();
464 tcp->mUsedEndPointCount--;
469 void TCPBase::OnConnectionReceived(Inet::TCPEndPoint * listenEndPoint, Inet::TCPEndPoint * endPoint,
470 const Inet::IPAddress & peerAddress, uint16_t peerPort)
472 TCPBase * tcp = reinterpret_cast<TCPBase *>(listenEndPoint->AppState);
474 if (tcp->mUsedEndPointCount < tcp->mActiveConnectionsSize)
476 // have space to use one more (even if considering pending connections)
477 for (size_t i = 0; i < tcp->mActiveConnectionsSize; i++)
479 if (!tcp->mActiveConnections[i].InUse())
481 tcp->mActiveConnections[i].Init(endPoint);
486 endPoint->AppState = listenEndPoint->AppState;
487 endPoint->OnDataReceived = OnTcpReceive;
488 endPoint->OnConnectComplete = OnConnectionComplete;
489 endPoint->OnConnectionClosed = OnConnectionClosed;
490 endPoint->OnConnectionReceived = OnConnectionReceived;
491 endPoint->OnAcceptError = OnAcceptError;
492 endPoint->OnPeerClose = OnPeerClosed;
496 ChipLogError(Inet, "Insufficient connection space to accept new connections");
501 void TCPBase::OnAcceptError(Inet::TCPEndPoint * endPoint, INET_ERROR err)
503 ChipLogError(Inet, "Accept error: %s", ErrorStr(err));
506 void TCPBase::Disconnect(const PeerAddress & address)
508 // Closes an existing connection
509 for (size_t i = 0; i < mActiveConnectionsSize; i++)
511 if (mActiveConnections[i].InUse())
513 Inet::IPAddress ipAddress;
516 mActiveConnections[i].mEndPoint->GetPeerInfo(&ipAddress, &port);
517 if (address == PeerAddress::TCP(ipAddress, port))
519 // NOTE: this leaves the socket in TIME_WAIT.
520 // Calling Abort() would clean it since SO_LINGER would be set to 0,
521 // however this seems not to be useful.
522 mActiveConnections[i].Free();
523 mUsedEndPointCount--;
529 void TCPBase::OnPeerClosed(Inet::TCPEndPoint * endPoint)
531 TCPBase * tcp = reinterpret_cast<TCPBase *>(endPoint->AppState);
533 for (size_t i = 0; i < tcp->mActiveConnectionsSize; i++)
535 if (tcp->mActiveConnections[i].mEndPoint == endPoint)
537 ChipLogProgress(Inet, "Freeing connection: connection closed by peer");
538 tcp->mActiveConnections[i].Free();
539 tcp->mUsedEndPointCount--;
544 bool TCPBase::HasActiveConnections() const
546 for (size_t i = 0; i < mActiveConnectionsSize; i++)
548 if (mActiveConnections[i].InUse())
557 } // namespace Transport