1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/tools/quic/quic_dispatcher.h"
9 #include "base/logging.h"
10 #include "base/stl_util.h"
11 #include "net/quic/quic_blocked_writer_interface.h"
12 #include "net/quic/quic_utils.h"
13 #include "net/tools/quic/quic_default_packet_writer.h"
14 #include "net/tools/quic/quic_epoll_connection_helper.h"
15 #include "net/tools/quic/quic_packet_writer_wrapper.h"
16 #include "net/tools/quic/quic_socket_utils.h"
22 using base::StringPiece;
25 class DeleteSessionsAlarm : public EpollAlarm {
27 explicit DeleteSessionsAlarm(QuicDispatcher* dispatcher)
28 : dispatcher_(dispatcher) {
31 virtual int64 OnAlarm() OVERRIDE {
32 EpollAlarm::OnAlarm();
33 dispatcher_->DeleteSessions();
38 QuicDispatcher* dispatcher_;
41 class QuicDispatcher::QuicFramerVisitor : public QuicFramerVisitorInterface {
43 explicit QuicFramerVisitor(QuicDispatcher* dispatcher)
44 : dispatcher_(dispatcher) {}
46 // QuicFramerVisitorInterface implementation
47 virtual void OnPacket() OVERRIDE {}
48 virtual bool OnUnauthenticatedPublicHeader(
49 const QuicPacketPublicHeader& header) OVERRIDE {
50 return dispatcher_->OnUnauthenticatedPublicHeader(header);
52 virtual bool OnUnauthenticatedHeader(
53 const QuicPacketHeader& header) OVERRIDE {
54 dispatcher_->OnUnauthenticatedHeader(header);
57 virtual void OnError(QuicFramer* framer) OVERRIDE {
58 DVLOG(1) << QuicUtils::ErrorToString(framer->error());
61 // The following methods should never get called because we always return
62 // false from OnUnauthenticatedHeader(). As a result, we never process the
63 // payload of the packet.
64 virtual bool OnProtocolVersionMismatch(
65 QuicVersion /*received_version*/) OVERRIDE {
69 virtual void OnPublicResetPacket(
70 const QuicPublicResetPacket& /*packet*/) OVERRIDE {
73 virtual void OnVersionNegotiationPacket(
74 const QuicVersionNegotiationPacket& /*packet*/) OVERRIDE {
77 virtual void OnPacketComplete() OVERRIDE {
80 virtual bool OnPacketHeader(const QuicPacketHeader& /*header*/) OVERRIDE {
84 virtual void OnRevivedPacket() OVERRIDE {
87 virtual void OnFecProtectedPayload(StringPiece /*payload*/) OVERRIDE {
90 virtual bool OnStreamFrame(const QuicStreamFrame& /*frame*/) OVERRIDE {
94 virtual bool OnAckFrame(const QuicAckFrame& /*frame*/) OVERRIDE {
98 virtual bool OnCongestionFeedbackFrame(
99 const QuicCongestionFeedbackFrame& /*frame*/) OVERRIDE {
103 virtual bool OnRstStreamFrame(const QuicRstStreamFrame& /*frame*/) OVERRIDE {
107 virtual bool OnConnectionCloseFrame(
108 const QuicConnectionCloseFrame & /*frame*/) OVERRIDE {
112 virtual bool OnGoAwayFrame(const QuicGoAwayFrame& /*frame*/) OVERRIDE {
116 virtual void OnFecData(const QuicFecData& /*fec*/) OVERRIDE {
121 QuicDispatcher* dispatcher_;
124 QuicDispatcher::QuicDispatcher(const QuicConfig& config,
125 const QuicCryptoServerConfig& crypto_config,
126 const QuicVersionVector& supported_versions,
127 EpollServer* epoll_server)
129 crypto_config_(crypto_config),
130 delete_sessions_alarm_(new DeleteSessionsAlarm(this)),
131 epoll_server_(epoll_server),
132 helper_(new QuicEpollConnectionHelper(epoll_server_)),
133 supported_versions_(supported_versions),
134 current_packet_(NULL),
135 framer_(supported_versions, /*unused*/ QuicTime::Zero(), true),
136 framer_visitor_(new QuicFramerVisitor(this)) {
137 framer_.set_visitor(framer_visitor_.get());
140 QuicDispatcher::~QuicDispatcher() {
141 STLDeleteValues(&session_map_);
142 STLDeleteElements(&closed_session_list_);
145 void QuicDispatcher::Initialize(int fd) {
146 DCHECK(writer_ == NULL);
147 writer_.reset(CreateWriterWrapper(CreateWriter(fd)));
148 time_wait_list_manager_.reset(
149 new QuicTimeWaitListManager(writer_.get(), this,
150 epoll_server(), supported_versions()));
153 void QuicDispatcher::ProcessPacket(const IPEndPoint& server_address,
154 const IPEndPoint& client_address,
155 const QuicEncryptedPacket& packet) {
156 current_server_address_ = server_address;
157 current_client_address_ = client_address;
158 current_packet_ = &packet;
159 // ProcessPacket will cause the packet to be dispatched in
160 // OnUnauthenticatedPublicHeader, or sent to the time wait list manager
161 // in OnAuthenticatedHeader.
162 framer_.ProcessPacket(packet);
163 // TODO(rjshade): Return a status describing if/why a packet was dropped,
164 // and log somehow. Maybe expose as a varz.
167 bool QuicDispatcher::OnUnauthenticatedPublicHeader(
168 const QuicPacketPublicHeader& header) {
169 QuicSession* session = NULL;
171 QuicGuid guid = header.guid;
172 SessionMap::iterator it = session_map_.find(guid);
173 if (it == session_map_.end()) {
174 if (header.reset_flag) {
177 if (time_wait_list_manager_->IsGuidInTimeWait(guid)) {
178 return HandlePacketForTimeWait(header);
181 // Ensure the packet has a version negotiation bit set before creating a new
182 // session for it. All initial packets for a new connection are required to
183 // have the flag set. Otherwise it may be a stray packet.
184 if (header.version_flag) {
185 session = CreateQuicSession(guid, current_server_address_,
186 current_client_address_);
189 if (session == NULL) {
190 DVLOG(1) << "Failed to create session for " << guid;
191 // Add this guid fo the time-wait state, to safely reject future packets.
193 if (header.version_flag &&
194 !framer_.IsSupportedVersion(header.versions.front())) {
195 // TODO(ianswett): Produce a no-version version negotiation packet.
199 // Use the version in the packet if possible, otherwise assume the latest.
200 QuicVersion version = header.version_flag ? header.versions.front() :
201 supported_versions_.front();
202 time_wait_list_manager_->AddGuidToTimeWait(guid, version, NULL);
203 DCHECK(time_wait_list_manager_->IsGuidInTimeWait(guid));
204 return HandlePacketForTimeWait(header);
206 DVLOG(1) << "Created new session for " << guid;
207 session_map_.insert(make_pair(guid, session));
209 session = it->second;
212 session->connection()->ProcessUdpPacket(
213 current_server_address_, current_client_address_, *current_packet_);
215 // Do not parse the packet further. The session will process it completely.
219 void QuicDispatcher::OnUnauthenticatedHeader(const QuicPacketHeader& header) {
220 DCHECK(time_wait_list_manager_->IsGuidInTimeWait(header.public_header.guid));
221 time_wait_list_manager_->ProcessPacket(current_server_address_,
222 current_client_address_,
223 header.public_header.guid,
224 header.packet_sequence_number);
227 void QuicDispatcher::CleanUpSession(SessionMap::iterator it) {
228 QuicConnection* connection = it->second->connection();
229 QuicEncryptedPacket* connection_close_packet =
230 connection->ReleaseConnectionClosePacket();
231 write_blocked_list_.erase(connection);
232 time_wait_list_manager_->AddGuidToTimeWait(it->first,
233 connection->version(),
234 connection_close_packet);
235 session_map_.erase(it);
238 void QuicDispatcher::DeleteSessions() {
239 STLDeleteElements(&closed_session_list_);
242 bool QuicDispatcher::OnCanWrite() {
243 // We got an EPOLLOUT: the socket should not be blocked.
244 writer_->SetWritable();
246 // Give each writer one attempt to write.
247 int num_writers = write_blocked_list_.size();
248 for (int i = 0; i < num_writers; ++i) {
249 if (write_blocked_list_.empty()) {
252 QuicBlockedWriterInterface* writer = write_blocked_list_.begin()->first;
253 write_blocked_list_.erase(write_blocked_list_.begin());
254 bool can_write_more = writer->OnCanWrite();
255 if (writer_->IsWriteBlocked()) {
256 // We were unable to write. Wait for the next EPOLLOUT.
257 // In this case, the session would have been added to the blocked list
258 // up in WritePacket.
261 // The socket is not blocked but the writer has ceded work. Add it to the
263 if (can_write_more) {
264 write_blocked_list_.insert(make_pair(writer, true));
268 // We're not write blocked. Return true if there's more work to do.
269 return !write_blocked_list_.empty();
272 void QuicDispatcher::Shutdown() {
273 while (!session_map_.empty()) {
274 QuicSession* session = session_map_.begin()->second;
275 session->connection()->SendConnectionClose(QUIC_PEER_GOING_AWAY);
276 // Validate that the session removes itself from the session map on close.
277 DCHECK(session_map_.empty() || session_map_.begin()->second != session);
282 void QuicDispatcher::OnConnectionClosed(QuicGuid guid, QuicErrorCode error) {
283 SessionMap::iterator it = session_map_.find(guid);
284 if (it == session_map_.end()) {
285 LOG(DFATAL) << "GUID " << guid << " does not exist in the session map. "
286 << "Error: " << QuicUtils::ErrorToString(error);
290 DLOG_IF(INFO, error != QUIC_NO_ERROR) << "Closing connection (" << guid
291 << ") due to error: "
292 << QuicUtils::ErrorToString(error);
294 if (closed_session_list_.empty()) {
295 epoll_server_->RegisterAlarmApproximateDelta(
296 0, delete_sessions_alarm_.get());
298 closed_session_list_.push_back(it->second);
302 void QuicDispatcher::OnWriteBlocked(QuicBlockedWriterInterface* writer) {
303 DCHECK(writer_->IsWriteBlocked());
304 write_blocked_list_.insert(make_pair(writer, true));
307 QuicPacketWriter* QuicDispatcher::CreateWriter(int fd) {
308 return new QuicDefaultPacketWriter(fd);
311 QuicPacketWriterWrapper* QuicDispatcher::CreateWriterWrapper(
312 QuicPacketWriter* writer) {
313 return new QuicPacketWriterWrapper(writer);
316 QuicSession* QuicDispatcher::CreateQuicSession(
318 const IPEndPoint& server_address,
319 const IPEndPoint& client_address) {
320 QuicServerSession* session = new QuicServerSession(
322 CreateQuicConnection(guid, server_address, client_address),
324 session->InitializeSession(crypto_config_);
328 QuicConnection* QuicDispatcher::CreateQuicConnection(
330 const IPEndPoint& server_address,
331 const IPEndPoint& client_address) {
332 return new QuicConnection(guid, client_address, helper_.get(), writer_.get(),
333 true, supported_versions_);
336 void QuicDispatcher::set_writer(QuicPacketWriter* writer) {
337 writer_->set_writer(writer);
340 bool QuicDispatcher::HandlePacketForTimeWait(
341 const QuicPacketPublicHeader& header) {
342 if (header.reset_flag) {
343 // Public reset packets do not have sequence numbers, so ignore the packet.
347 // Switch the framer to the correct version, so that the sequence number can
348 // be parsed correctly.
349 framer_.set_version(time_wait_list_manager_->GetQuicVersionFromGuid(
352 // Continue parsing the packet to extract the sequence number. Then
353 // send it to the time wait manager in OnUnathenticatedHeader.