Imported Upstream version 1.22.0
[platform/upstream/grpc.git] / src / core / ext / transport / chttp2 / transport / frame_data.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
22
23 #include <string.h>
24
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/string_util.h>
28 #include "src/core/ext/transport/chttp2/transport/internal.h"
29 #include "src/core/lib/gpr/string.h"
30 #include "src/core/lib/gprpp/memory.h"
31 #include "src/core/lib/slice/slice_internal.h"
32 #include "src/core/lib/slice/slice_string_helpers.h"
33 #include "src/core/lib/transport/transport.h"
34
35 grpc_chttp2_data_parser::~grpc_chttp2_data_parser() {
36   if (parsing_frame != nullptr) {
37     GRPC_ERROR_UNREF(parsing_frame->Finished(
38         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false));
39   }
40   GRPC_ERROR_UNREF(error);
41 }
42
43 grpc_error* grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser* parser,
44                                                 uint8_t flags,
45                                                 uint32_t stream_id,
46                                                 grpc_chttp2_stream* s) {
47   if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
48     char* msg;
49     gpr_asprintf(&msg, "unsupported data flags: 0x%02x", flags);
50     grpc_error* err = grpc_error_set_int(
51         GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg), GRPC_ERROR_INT_STREAM_ID,
52         static_cast<intptr_t>(stream_id));
53     gpr_free(msg);
54     return err;
55   }
56
57   if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
58     s->received_last_frame = true;
59     s->eos_received = true;
60   } else {
61     s->received_last_frame = false;
62   }
63
64   return GRPC_ERROR_NONE;
65 }
66
67 void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
68                              uint32_t write_bytes, int is_eof,
69                              grpc_transport_one_way_stats* stats,
70                              grpc_slice_buffer* outbuf) {
71   grpc_slice hdr;
72   uint8_t* p;
73   static const size_t header_size = 9;
74
75   hdr = GRPC_SLICE_MALLOC(header_size);
76   p = GRPC_SLICE_START_PTR(hdr);
77   GPR_ASSERT(write_bytes < (1 << 24));
78   *p++ = static_cast<uint8_t>(write_bytes >> 16);
79   *p++ = static_cast<uint8_t>(write_bytes >> 8);
80   *p++ = static_cast<uint8_t>(write_bytes);
81   *p++ = GRPC_CHTTP2_FRAME_DATA;
82   *p++ = is_eof ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0;
83   *p++ = static_cast<uint8_t>(id >> 24);
84   *p++ = static_cast<uint8_t>(id >> 16);
85   *p++ = static_cast<uint8_t>(id >> 8);
86   *p++ = static_cast<uint8_t>(id);
87   grpc_slice_buffer_add(outbuf, hdr);
88
89   grpc_slice_buffer_move_first_no_ref(inbuf, write_bytes, outbuf);
90
91   stats->framing_bytes += header_size;
92   stats->data_bytes += write_bytes;
93 }
94
95 grpc_error* grpc_deframe_unprocessed_incoming_frames(
96     grpc_chttp2_data_parser* p, grpc_chttp2_stream* s,
97     grpc_slice_buffer* slices, grpc_slice* slice_out,
98     grpc_core::OrphanablePtr<grpc_core::ByteStream>* stream_out) {
99   grpc_error* error = GRPC_ERROR_NONE;
100   grpc_chttp2_transport* t = s->t;
101
102   while (slices->count > 0) {
103     uint8_t* beg = nullptr;
104     uint8_t* end = nullptr;
105     uint8_t* cur = nullptr;
106
107     grpc_slice* slice = grpc_slice_buffer_peek_first(slices);
108     beg = GRPC_SLICE_START_PTR(*slice);
109     end = GRPC_SLICE_END_PTR(*slice);
110     cur = beg;
111     uint32_t message_flags;
112
113     if (cur == end) {
114       grpc_slice_buffer_remove_first(slices);
115       continue;
116     }
117
118     switch (p->state) {
119       case GRPC_CHTTP2_DATA_ERROR:
120         p->state = GRPC_CHTTP2_DATA_ERROR;
121         grpc_slice_buffer_remove_first(slices);
122         return GRPC_ERROR_REF(p->error);
123       case GRPC_CHTTP2_DATA_FH_0:
124         s->stats.incoming.framing_bytes++;
125         p->frame_type = *cur;
126         switch (p->frame_type) {
127           case 0:
128             p->is_frame_compressed = false; /* GPR_FALSE */
129             break;
130           case 1:
131             p->is_frame_compressed = true; /* GPR_TRUE */
132             break;
133           default:
134             char* msg;
135             gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
136             p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
137             p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
138                                           static_cast<intptr_t>(s->id));
139             gpr_free(msg);
140             p->error =
141                 grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
142                                    grpc_dump_slice_to_slice(
143                                        *slice, GPR_DUMP_HEX | GPR_DUMP_ASCII));
144             p->error =
145                 grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
146             p->state = GRPC_CHTTP2_DATA_ERROR;
147             grpc_slice_buffer_remove_first(slices);
148             return GRPC_ERROR_REF(p->error);
149         }
150         if (++cur == end) {
151           p->state = GRPC_CHTTP2_DATA_FH_1;
152           grpc_slice_buffer_remove_first(slices);
153           continue;
154         }
155       /* fallthrough */
156       case GRPC_CHTTP2_DATA_FH_1:
157         s->stats.incoming.framing_bytes++;
158         p->frame_size = (static_cast<uint32_t>(*cur)) << 24;
159         if (++cur == end) {
160           p->state = GRPC_CHTTP2_DATA_FH_2;
161           grpc_slice_buffer_remove_first(slices);
162           continue;
163         }
164       /* fallthrough */
165       case GRPC_CHTTP2_DATA_FH_2:
166         s->stats.incoming.framing_bytes++;
167         p->frame_size |= (static_cast<uint32_t>(*cur)) << 16;
168         if (++cur == end) {
169           p->state = GRPC_CHTTP2_DATA_FH_3;
170           grpc_slice_buffer_remove_first(slices);
171           continue;
172         }
173       /* fallthrough */
174       case GRPC_CHTTP2_DATA_FH_3:
175         s->stats.incoming.framing_bytes++;
176         p->frame_size |= (static_cast<uint32_t>(*cur)) << 8;
177         if (++cur == end) {
178           p->state = GRPC_CHTTP2_DATA_FH_4;
179           grpc_slice_buffer_remove_first(slices);
180           continue;
181         }
182       /* fallthrough */
183       case GRPC_CHTTP2_DATA_FH_4:
184         s->stats.incoming.framing_bytes++;
185         GPR_ASSERT(stream_out != nullptr);
186         GPR_ASSERT(p->parsing_frame == nullptr);
187         p->frame_size |= (static_cast<uint32_t>(*cur));
188         if (t->channelz_socket != nullptr) {
189           t->channelz_socket->RecordMessageReceived();
190         }
191         p->state = GRPC_CHTTP2_DATA_FRAME;
192         ++cur;
193         message_flags = 0;
194         if (p->is_frame_compressed) {
195           message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
196         }
197         p->parsing_frame = grpc_core::New<grpc_core::Chttp2IncomingByteStream>(
198             t, s, p->frame_size, message_flags);
199         stream_out->reset(p->parsing_frame);
200         if (p->parsing_frame->remaining_bytes() == 0) {
201           GRPC_ERROR_UNREF(p->parsing_frame->Finished(GRPC_ERROR_NONE, true));
202           p->parsing_frame = nullptr;
203           p->state = GRPC_CHTTP2_DATA_FH_0;
204         }
205         s->pending_byte_stream = true;
206         if (cur != end) {
207           grpc_slice_buffer_sub_first(slices, static_cast<size_t>(cur - beg),
208                                       static_cast<size_t>(end - beg));
209         } else {
210           grpc_slice_buffer_remove_first(slices);
211         }
212         return GRPC_ERROR_NONE;
213       case GRPC_CHTTP2_DATA_FRAME: {
214         GPR_ASSERT(p->parsing_frame != nullptr);
215         GPR_ASSERT(slice_out != nullptr);
216         if (cur == end) {
217           grpc_slice_buffer_remove_first(slices);
218           continue;
219         }
220         uint32_t remaining = static_cast<uint32_t>(end - cur);
221         if (remaining == p->frame_size) {
222           s->stats.incoming.data_bytes += remaining;
223           if (GRPC_ERROR_NONE !=
224               (error = p->parsing_frame->Push(
225                    grpc_slice_sub(*slice, static_cast<size_t>(cur - beg),
226                                   static_cast<size_t>(end - beg)),
227                    slice_out))) {
228             grpc_slice_buffer_remove_first(slices);
229             return error;
230           }
231           if (GRPC_ERROR_NONE !=
232               (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
233             grpc_slice_buffer_remove_first(slices);
234             return error;
235           }
236           p->parsing_frame = nullptr;
237           p->state = GRPC_CHTTP2_DATA_FH_0;
238           grpc_slice_buffer_remove_first(slices);
239           return GRPC_ERROR_NONE;
240         } else if (remaining < p->frame_size) {
241           s->stats.incoming.data_bytes += remaining;
242           if (GRPC_ERROR_NONE !=
243               (error = p->parsing_frame->Push(
244                    grpc_slice_sub(*slice, static_cast<size_t>(cur - beg),
245                                   static_cast<size_t>(end - beg)),
246                    slice_out))) {
247             return error;
248           }
249           p->frame_size -= remaining;
250           grpc_slice_buffer_remove_first(slices);
251           return GRPC_ERROR_NONE;
252         } else {
253           GPR_ASSERT(remaining > p->frame_size);
254           s->stats.incoming.data_bytes += p->frame_size;
255           if (GRPC_ERROR_NONE !=
256               p->parsing_frame->Push(
257                   grpc_slice_sub(
258                       *slice, static_cast<size_t>(cur - beg),
259                       static_cast<size_t>(cur + p->frame_size - beg)),
260                   slice_out)) {
261             grpc_slice_buffer_remove_first(slices);
262             return error;
263           }
264           if (GRPC_ERROR_NONE !=
265               (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
266             grpc_slice_buffer_remove_first(slices);
267             return error;
268           }
269           p->parsing_frame = nullptr;
270           p->state = GRPC_CHTTP2_DATA_FH_0;
271           cur += p->frame_size;
272           grpc_slice_buffer_sub_first(slices, static_cast<size_t>(cur - beg),
273                                       static_cast<size_t>(end - beg));
274           return GRPC_ERROR_NONE;
275         }
276       }
277     }
278   }
279   return GRPC_ERROR_NONE;
280 }
281
282 grpc_error* grpc_chttp2_data_parser_parse(void* parser,
283                                           grpc_chttp2_transport* t,
284                                           grpc_chttp2_stream* s,
285                                           const grpc_slice& slice,
286                                           int is_last) {
287   if (!s->pending_byte_stream) {
288     grpc_slice_ref_internal(slice);
289     grpc_slice_buffer_add(&s->frame_storage, slice);
290     grpc_chttp2_maybe_complete_recv_message(t, s);
291   } else if (s->on_next) {
292     GPR_ASSERT(s->frame_storage.length == 0);
293     grpc_slice_ref_internal(slice);
294     grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
295     GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_NONE);
296     s->on_next = nullptr;
297     s->unprocessed_incoming_frames_decompressed = false;
298   } else {
299     grpc_slice_ref_internal(slice);
300     grpc_slice_buffer_add(&s->frame_storage, slice);
301   }
302
303   if (is_last && s->received_last_frame) {
304     grpc_chttp2_mark_stream_closed(t, s, true, false, GRPC_ERROR_NONE);
305   }
306
307   return GRPC_ERROR_NONE;
308 }