3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/string_util.h>
28 #include "src/core/ext/transport/chttp2/transport/internal.h"
29 #include "src/core/lib/gpr/string.h"
30 #include "src/core/lib/gprpp/memory.h"
31 #include "src/core/lib/slice/slice_internal.h"
32 #include "src/core/lib/slice/slice_string_helpers.h"
33 #include "src/core/lib/transport/transport.h"
35 grpc_chttp2_data_parser::~grpc_chttp2_data_parser() {
36 if (parsing_frame != nullptr) {
37 GRPC_ERROR_UNREF(parsing_frame->Finished(
38 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false));
40 GRPC_ERROR_UNREF(error);
43 grpc_error* grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser* parser,
46 grpc_chttp2_stream* s) {
47 if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
49 gpr_asprintf(&msg, "unsupported data flags: 0x%02x", flags);
50 grpc_error* err = grpc_error_set_int(
51 GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg), GRPC_ERROR_INT_STREAM_ID,
52 static_cast<intptr_t>(stream_id));
57 if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
58 s->received_last_frame = true;
59 s->eos_received = true;
61 s->received_last_frame = false;
64 return GRPC_ERROR_NONE;
67 void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
68 uint32_t write_bytes, int is_eof,
69 grpc_transport_one_way_stats* stats,
70 grpc_slice_buffer* outbuf) {
73 static const size_t header_size = 9;
75 hdr = GRPC_SLICE_MALLOC(header_size);
76 p = GRPC_SLICE_START_PTR(hdr);
77 GPR_ASSERT(write_bytes < (1 << 24));
78 *p++ = static_cast<uint8_t>(write_bytes >> 16);
79 *p++ = static_cast<uint8_t>(write_bytes >> 8);
80 *p++ = static_cast<uint8_t>(write_bytes);
81 *p++ = GRPC_CHTTP2_FRAME_DATA;
82 *p++ = is_eof ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0;
83 *p++ = static_cast<uint8_t>(id >> 24);
84 *p++ = static_cast<uint8_t>(id >> 16);
85 *p++ = static_cast<uint8_t>(id >> 8);
86 *p++ = static_cast<uint8_t>(id);
87 grpc_slice_buffer_add(outbuf, hdr);
89 grpc_slice_buffer_move_first_no_ref(inbuf, write_bytes, outbuf);
91 stats->framing_bytes += header_size;
92 stats->data_bytes += write_bytes;
95 grpc_error* grpc_deframe_unprocessed_incoming_frames(
96 grpc_chttp2_data_parser* p, grpc_chttp2_stream* s,
97 grpc_slice_buffer* slices, grpc_slice* slice_out,
98 grpc_core::OrphanablePtr<grpc_core::ByteStream>* stream_out) {
99 grpc_error* error = GRPC_ERROR_NONE;
100 grpc_chttp2_transport* t = s->t;
102 while (slices->count > 0) {
103 uint8_t* beg = nullptr;
104 uint8_t* end = nullptr;
105 uint8_t* cur = nullptr;
107 grpc_slice slice = grpc_slice_buffer_take_first(slices);
109 beg = GRPC_SLICE_START_PTR(slice);
110 end = GRPC_SLICE_END_PTR(slice);
112 uint32_t message_flags;
116 grpc_slice_unref_internal(slice);
121 case GRPC_CHTTP2_DATA_ERROR:
122 p->state = GRPC_CHTTP2_DATA_ERROR;
123 grpc_slice_unref_internal(slice);
124 return GRPC_ERROR_REF(p->error);
125 case GRPC_CHTTP2_DATA_FH_0:
126 s->stats.incoming.framing_bytes++;
127 p->frame_type = *cur;
128 switch (p->frame_type) {
130 p->is_frame_compressed = false; /* GPR_FALSE */
133 p->is_frame_compressed = true; /* GPR_TRUE */
136 gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
137 p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
138 p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
139 static_cast<intptr_t>(s->id));
141 msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
142 p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
143 grpc_slice_from_copied_string(msg));
146 grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
147 p->state = GRPC_CHTTP2_DATA_ERROR;
148 grpc_slice_unref_internal(slice);
149 return GRPC_ERROR_REF(p->error);
152 p->state = GRPC_CHTTP2_DATA_FH_1;
153 grpc_slice_unref_internal(slice);
157 case GRPC_CHTTP2_DATA_FH_1:
158 s->stats.incoming.framing_bytes++;
159 p->frame_size = (static_cast<uint32_t>(*cur)) << 24;
161 p->state = GRPC_CHTTP2_DATA_FH_2;
162 grpc_slice_unref_internal(slice);
166 case GRPC_CHTTP2_DATA_FH_2:
167 s->stats.incoming.framing_bytes++;
168 p->frame_size |= (static_cast<uint32_t>(*cur)) << 16;
170 p->state = GRPC_CHTTP2_DATA_FH_3;
171 grpc_slice_unref_internal(slice);
175 case GRPC_CHTTP2_DATA_FH_3:
176 s->stats.incoming.framing_bytes++;
177 p->frame_size |= (static_cast<uint32_t>(*cur)) << 8;
179 p->state = GRPC_CHTTP2_DATA_FH_4;
180 grpc_slice_unref_internal(slice);
184 case GRPC_CHTTP2_DATA_FH_4:
185 s->stats.incoming.framing_bytes++;
186 GPR_ASSERT(stream_out != nullptr);
187 GPR_ASSERT(p->parsing_frame == nullptr);
188 p->frame_size |= (static_cast<uint32_t>(*cur));
189 if (t->channelz_socket != nullptr) {
190 t->channelz_socket->RecordMessageReceived();
192 p->state = GRPC_CHTTP2_DATA_FRAME;
195 if (p->is_frame_compressed) {
196 message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
198 p->parsing_frame = grpc_core::New<grpc_core::Chttp2IncomingByteStream>(
199 t, s, p->frame_size, message_flags);
200 stream_out->reset(p->parsing_frame);
201 if (p->parsing_frame->remaining_bytes() == 0) {
202 GRPC_ERROR_UNREF(p->parsing_frame->Finished(GRPC_ERROR_NONE, true));
203 p->parsing_frame = nullptr;
204 p->state = GRPC_CHTTP2_DATA_FH_0;
206 s->pending_byte_stream = true;
209 grpc_slice_buffer_undo_take_first(
210 slices, grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
211 static_cast<size_t>(end - beg)));
213 grpc_slice_unref_internal(slice);
214 return GRPC_ERROR_NONE;
215 case GRPC_CHTTP2_DATA_FRAME: {
216 GPR_ASSERT(p->parsing_frame != nullptr);
217 GPR_ASSERT(slice_out != nullptr);
219 grpc_slice_unref_internal(slice);
222 uint32_t remaining = static_cast<uint32_t>(end - cur);
223 if (remaining == p->frame_size) {
224 s->stats.incoming.data_bytes += remaining;
225 if (GRPC_ERROR_NONE !=
226 (error = p->parsing_frame->Push(
227 grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
228 static_cast<size_t>(end - beg)),
230 grpc_slice_unref_internal(slice);
233 if (GRPC_ERROR_NONE !=
234 (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
235 grpc_slice_unref_internal(slice);
238 p->parsing_frame = nullptr;
239 p->state = GRPC_CHTTP2_DATA_FH_0;
240 grpc_slice_unref_internal(slice);
241 return GRPC_ERROR_NONE;
242 } else if (remaining < p->frame_size) {
243 s->stats.incoming.data_bytes += remaining;
244 if (GRPC_ERROR_NONE !=
245 (error = p->parsing_frame->Push(
246 grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
247 static_cast<size_t>(end - beg)),
251 p->frame_size -= remaining;
252 grpc_slice_unref_internal(slice);
253 return GRPC_ERROR_NONE;
255 GPR_ASSERT(remaining > p->frame_size);
256 s->stats.incoming.data_bytes += p->frame_size;
257 if (GRPC_ERROR_NONE !=
258 p->parsing_frame->Push(
260 slice, static_cast<size_t>(cur - beg),
261 static_cast<size_t>(cur + p->frame_size - beg)),
263 grpc_slice_unref_internal(slice);
266 if (GRPC_ERROR_NONE !=
267 (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
268 grpc_slice_unref_internal(slice);
271 p->parsing_frame = nullptr;
272 p->state = GRPC_CHTTP2_DATA_FH_0;
273 cur += p->frame_size;
274 grpc_slice_buffer_undo_take_first(
275 slices, grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
276 static_cast<size_t>(end - beg)));
277 grpc_slice_unref_internal(slice);
278 return GRPC_ERROR_NONE;
284 return GRPC_ERROR_NONE;
287 grpc_error* grpc_chttp2_data_parser_parse(void* parser,
288 grpc_chttp2_transport* t,
289 grpc_chttp2_stream* s,
290 const grpc_slice& slice,
292 if (!s->pending_byte_stream) {
293 grpc_slice_ref_internal(slice);
294 grpc_slice_buffer_add(&s->frame_storage, slice);
295 grpc_chttp2_maybe_complete_recv_message(t, s);
296 } else if (s->on_next) {
297 GPR_ASSERT(s->frame_storage.length == 0);
298 grpc_slice_ref_internal(slice);
299 grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
300 GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_NONE);
301 s->on_next = nullptr;
302 s->unprocessed_incoming_frames_decompressed = false;
304 grpc_slice_ref_internal(slice);
305 grpc_slice_buffer_add(&s->frame_storage, slice);
308 if (is_last && s->received_last_frame) {
309 grpc_chttp2_mark_stream_closed(t, s, true, false, GRPC_ERROR_NONE);
312 return GRPC_ERROR_NONE;