6080a4bd1c408cf67a594962fbdcd39a11e81f6e
[platform/upstream/grpc.git] / src / core / ext / transport / chttp2 / transport / frame_data.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
22
23 #include <string.h>
24
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/string_util.h>
28 #include "src/core/ext/transport/chttp2/transport/internal.h"
29 #include "src/core/lib/gpr/string.h"
30 #include "src/core/lib/gprpp/memory.h"
31 #include "src/core/lib/slice/slice_internal.h"
32 #include "src/core/lib/slice/slice_string_helpers.h"
33 #include "src/core/lib/transport/transport.h"
34
35 grpc_chttp2_data_parser::~grpc_chttp2_data_parser() {
36   if (parsing_frame != nullptr) {
37     GRPC_ERROR_UNREF(parsing_frame->Finished(
38         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false));
39   }
40   GRPC_ERROR_UNREF(error);
41 }
42
43 grpc_error* grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser* parser,
44                                                 uint8_t flags,
45                                                 uint32_t stream_id,
46                                                 grpc_chttp2_stream* s) {
47   if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
48     char* msg;
49     gpr_asprintf(&msg, "unsupported data flags: 0x%02x", flags);
50     grpc_error* err = grpc_error_set_int(
51         GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg), GRPC_ERROR_INT_STREAM_ID,
52         static_cast<intptr_t>(stream_id));
53     gpr_free(msg);
54     return err;
55   }
56
57   if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
58     s->received_last_frame = true;
59     s->eos_received = true;
60   } else {
61     s->received_last_frame = false;
62   }
63
64   return GRPC_ERROR_NONE;
65 }
66
67 void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
68                              uint32_t write_bytes, int is_eof,
69                              grpc_transport_one_way_stats* stats,
70                              grpc_slice_buffer* outbuf) {
71   grpc_slice hdr;
72   uint8_t* p;
73   static const size_t header_size = 9;
74
75   hdr = GRPC_SLICE_MALLOC(header_size);
76   p = GRPC_SLICE_START_PTR(hdr);
77   GPR_ASSERT(write_bytes < (1 << 24));
78   *p++ = static_cast<uint8_t>(write_bytes >> 16);
79   *p++ = static_cast<uint8_t>(write_bytes >> 8);
80   *p++ = static_cast<uint8_t>(write_bytes);
81   *p++ = GRPC_CHTTP2_FRAME_DATA;
82   *p++ = is_eof ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0;
83   *p++ = static_cast<uint8_t>(id >> 24);
84   *p++ = static_cast<uint8_t>(id >> 16);
85   *p++ = static_cast<uint8_t>(id >> 8);
86   *p++ = static_cast<uint8_t>(id);
87   grpc_slice_buffer_add(outbuf, hdr);
88
89   grpc_slice_buffer_move_first_no_ref(inbuf, write_bytes, outbuf);
90
91   stats->framing_bytes += header_size;
92   stats->data_bytes += write_bytes;
93 }
94
95 grpc_error* grpc_deframe_unprocessed_incoming_frames(
96     grpc_chttp2_data_parser* p, grpc_chttp2_stream* s,
97     grpc_slice_buffer* slices, grpc_slice* slice_out,
98     grpc_core::OrphanablePtr<grpc_core::ByteStream>* stream_out) {
99   grpc_error* error = GRPC_ERROR_NONE;
100   grpc_chttp2_transport* t = s->t;
101
102   while (slices->count > 0) {
103     uint8_t* beg = nullptr;
104     uint8_t* end = nullptr;
105     uint8_t* cur = nullptr;
106
107     grpc_slice slice = grpc_slice_buffer_take_first(slices);
108
109     beg = GRPC_SLICE_START_PTR(slice);
110     end = GRPC_SLICE_END_PTR(slice);
111     cur = beg;
112     uint32_t message_flags;
113     char* msg;
114
115     if (cur == end) {
116       grpc_slice_unref_internal(slice);
117       continue;
118     }
119
120     switch (p->state) {
121       case GRPC_CHTTP2_DATA_ERROR:
122         p->state = GRPC_CHTTP2_DATA_ERROR;
123         grpc_slice_unref_internal(slice);
124         return GRPC_ERROR_REF(p->error);
125       case GRPC_CHTTP2_DATA_FH_0:
126         s->stats.incoming.framing_bytes++;
127         p->frame_type = *cur;
128         switch (p->frame_type) {
129           case 0:
130             p->is_frame_compressed = false; /* GPR_FALSE */
131             break;
132           case 1:
133             p->is_frame_compressed = true; /* GPR_TRUE */
134             break;
135           default:
136             gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
137             p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
138             p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
139                                           static_cast<intptr_t>(s->id));
140             gpr_free(msg);
141             msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
142             p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
143                                           grpc_slice_from_copied_string(msg));
144             gpr_free(msg);
145             p->error =
146                 grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
147             p->state = GRPC_CHTTP2_DATA_ERROR;
148             grpc_slice_unref_internal(slice);
149             return GRPC_ERROR_REF(p->error);
150         }
151         if (++cur == end) {
152           p->state = GRPC_CHTTP2_DATA_FH_1;
153           grpc_slice_unref_internal(slice);
154           continue;
155         }
156       /* fallthrough */
157       case GRPC_CHTTP2_DATA_FH_1:
158         s->stats.incoming.framing_bytes++;
159         p->frame_size = (static_cast<uint32_t>(*cur)) << 24;
160         if (++cur == end) {
161           p->state = GRPC_CHTTP2_DATA_FH_2;
162           grpc_slice_unref_internal(slice);
163           continue;
164         }
165       /* fallthrough */
166       case GRPC_CHTTP2_DATA_FH_2:
167         s->stats.incoming.framing_bytes++;
168         p->frame_size |= (static_cast<uint32_t>(*cur)) << 16;
169         if (++cur == end) {
170           p->state = GRPC_CHTTP2_DATA_FH_3;
171           grpc_slice_unref_internal(slice);
172           continue;
173         }
174       /* fallthrough */
175       case GRPC_CHTTP2_DATA_FH_3:
176         s->stats.incoming.framing_bytes++;
177         p->frame_size |= (static_cast<uint32_t>(*cur)) << 8;
178         if (++cur == end) {
179           p->state = GRPC_CHTTP2_DATA_FH_4;
180           grpc_slice_unref_internal(slice);
181           continue;
182         }
183       /* fallthrough */
184       case GRPC_CHTTP2_DATA_FH_4:
185         s->stats.incoming.framing_bytes++;
186         GPR_ASSERT(stream_out != nullptr);
187         GPR_ASSERT(p->parsing_frame == nullptr);
188         p->frame_size |= (static_cast<uint32_t>(*cur));
189         if (t->channelz_socket != nullptr) {
190           t->channelz_socket->RecordMessageReceived();
191         }
192         p->state = GRPC_CHTTP2_DATA_FRAME;
193         ++cur;
194         message_flags = 0;
195         if (p->is_frame_compressed) {
196           message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
197         }
198         p->parsing_frame = grpc_core::New<grpc_core::Chttp2IncomingByteStream>(
199             t, s, p->frame_size, message_flags);
200         stream_out->reset(p->parsing_frame);
201         if (p->parsing_frame->remaining_bytes() == 0) {
202           GRPC_ERROR_UNREF(p->parsing_frame->Finished(GRPC_ERROR_NONE, true));
203           p->parsing_frame = nullptr;
204           p->state = GRPC_CHTTP2_DATA_FH_0;
205         }
206         s->pending_byte_stream = true;
207
208         if (cur != end) {
209           grpc_slice_buffer_undo_take_first(
210               slices, grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
211                                      static_cast<size_t>(end - beg)));
212         }
213         grpc_slice_unref_internal(slice);
214         return GRPC_ERROR_NONE;
215       case GRPC_CHTTP2_DATA_FRAME: {
216         GPR_ASSERT(p->parsing_frame != nullptr);
217         GPR_ASSERT(slice_out != nullptr);
218         if (cur == end) {
219           grpc_slice_unref_internal(slice);
220           continue;
221         }
222         uint32_t remaining = static_cast<uint32_t>(end - cur);
223         if (remaining == p->frame_size) {
224           s->stats.incoming.data_bytes += remaining;
225           if (GRPC_ERROR_NONE !=
226               (error = p->parsing_frame->Push(
227                    grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
228                                   static_cast<size_t>(end - beg)),
229                    slice_out))) {
230             grpc_slice_unref_internal(slice);
231             return error;
232           }
233           if (GRPC_ERROR_NONE !=
234               (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
235             grpc_slice_unref_internal(slice);
236             return error;
237           }
238           p->parsing_frame = nullptr;
239           p->state = GRPC_CHTTP2_DATA_FH_0;
240           grpc_slice_unref_internal(slice);
241           return GRPC_ERROR_NONE;
242         } else if (remaining < p->frame_size) {
243           s->stats.incoming.data_bytes += remaining;
244           if (GRPC_ERROR_NONE !=
245               (error = p->parsing_frame->Push(
246                    grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
247                                   static_cast<size_t>(end - beg)),
248                    slice_out))) {
249             return error;
250           }
251           p->frame_size -= remaining;
252           grpc_slice_unref_internal(slice);
253           return GRPC_ERROR_NONE;
254         } else {
255           GPR_ASSERT(remaining > p->frame_size);
256           s->stats.incoming.data_bytes += p->frame_size;
257           if (GRPC_ERROR_NONE !=
258               p->parsing_frame->Push(
259                   grpc_slice_sub(
260                       slice, static_cast<size_t>(cur - beg),
261                       static_cast<size_t>(cur + p->frame_size - beg)),
262                   slice_out)) {
263             grpc_slice_unref_internal(slice);
264             return error;
265           }
266           if (GRPC_ERROR_NONE !=
267               (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
268             grpc_slice_unref_internal(slice);
269             return error;
270           }
271           p->parsing_frame = nullptr;
272           p->state = GRPC_CHTTP2_DATA_FH_0;
273           cur += p->frame_size;
274           grpc_slice_buffer_undo_take_first(
275               slices, grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
276                                      static_cast<size_t>(end - beg)));
277           grpc_slice_unref_internal(slice);
278           return GRPC_ERROR_NONE;
279         }
280       }
281     }
282   }
283
284   return GRPC_ERROR_NONE;
285 }
286
287 grpc_error* grpc_chttp2_data_parser_parse(void* parser,
288                                           grpc_chttp2_transport* t,
289                                           grpc_chttp2_stream* s,
290                                           const grpc_slice& slice,
291                                           int is_last) {
292   if (!s->pending_byte_stream) {
293     grpc_slice_ref_internal(slice);
294     grpc_slice_buffer_add(&s->frame_storage, slice);
295     grpc_chttp2_maybe_complete_recv_message(t, s);
296   } else if (s->on_next) {
297     GPR_ASSERT(s->frame_storage.length == 0);
298     grpc_slice_ref_internal(slice);
299     grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
300     GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_NONE);
301     s->on_next = nullptr;
302     s->unprocessed_incoming_frames_decompressed = false;
303   } else {
304     grpc_slice_ref_internal(slice);
305     grpc_slice_buffer_add(&s->frame_storage, slice);
306   }
307
308   if (is_last && s->received_last_frame) {
309     grpc_chttp2_mark_stream_closed(t, s, true, false, GRPC_ERROR_NONE);
310   }
311
312   return GRPC_ERROR_NONE;
313 }