0787968a871a6752b2963dfd912b5a52320e36de
[platform/upstream/grpc.git] / src / core / ext / transport / chttp2 / transport / frame_data.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
22
23 #include <string.h>
24
25 #include "absl/strings/str_format.h"
26
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include "src/core/ext/transport/chttp2/transport/internal.h"
30 #include "src/core/lib/gpr/string.h"
31 #include "src/core/lib/gprpp/memory.h"
32 #include "src/core/lib/slice/slice_internal.h"
33 #include "src/core/lib/slice/slice_string_helpers.h"
34 #include "src/core/lib/transport/transport.h"
35
36 grpc_chttp2_data_parser::~grpc_chttp2_data_parser() {
37   if (parsing_frame != nullptr) {
38     GRPC_ERROR_UNREF(parsing_frame->Finished(
39         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false));
40   }
41   GRPC_ERROR_UNREF(error);
42 }
43
44 grpc_error_handle grpc_chttp2_data_parser_begin_frame(
45     grpc_chttp2_data_parser* /*parser*/, uint8_t flags, uint32_t stream_id,
46     grpc_chttp2_stream* s) {
47   if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
48     return grpc_error_set_int(
49         GRPC_ERROR_CREATE_FROM_COPIED_STRING(
50             absl::StrFormat("unsupported data flags: 0x%02x", flags).c_str()),
51         GRPC_ERROR_INT_STREAM_ID, static_cast<intptr_t>(stream_id));
52   }
53
54   if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
55     s->received_last_frame = true;
56     s->eos_received = true;
57   } else {
58     s->received_last_frame = false;
59   }
60
61   return GRPC_ERROR_NONE;
62 }
63
64 void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
65                              uint32_t write_bytes, int is_eof,
66                              grpc_transport_one_way_stats* stats,
67                              grpc_slice_buffer* outbuf) {
68   grpc_slice hdr;
69   uint8_t* p;
70   static const size_t header_size = 9;
71
72   hdr = GRPC_SLICE_MALLOC(header_size);
73   p = GRPC_SLICE_START_PTR(hdr);
74   GPR_ASSERT(write_bytes < (1 << 24));
75   *p++ = static_cast<uint8_t>(write_bytes >> 16);
76   *p++ = static_cast<uint8_t>(write_bytes >> 8);
77   *p++ = static_cast<uint8_t>(write_bytes);
78   *p++ = GRPC_CHTTP2_FRAME_DATA;
79   *p++ = is_eof ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0;
80   *p++ = static_cast<uint8_t>(id >> 24);
81   *p++ = static_cast<uint8_t>(id >> 16);
82   *p++ = static_cast<uint8_t>(id >> 8);
83   *p++ = static_cast<uint8_t>(id);
84   grpc_slice_buffer_add(outbuf, hdr);
85
86   grpc_slice_buffer_move_first_no_ref(inbuf, write_bytes, outbuf);
87
88   stats->framing_bytes += header_size;
89   stats->data_bytes += write_bytes;
90 }
91
92 grpc_error_handle grpc_deframe_unprocessed_incoming_frames(
93     grpc_chttp2_data_parser* p, grpc_chttp2_stream* s,
94     grpc_slice_buffer* slices, grpc_slice* slice_out,
95     grpc_core::OrphanablePtr<grpc_core::ByteStream>* stream_out) {
96   grpc_error_handle error = GRPC_ERROR_NONE;
97   grpc_chttp2_transport* t = s->t;
98
99   while (slices->count > 0) {
100     uint8_t* beg = nullptr;
101     uint8_t* end = nullptr;
102     uint8_t* cur = nullptr;
103
104     grpc_slice* slice = grpc_slice_buffer_peek_first(slices);
105     beg = GRPC_SLICE_START_PTR(*slice);
106     end = GRPC_SLICE_END_PTR(*slice);
107     cur = beg;
108     uint32_t message_flags;
109
110     if (cur == end) {
111       grpc_slice_buffer_remove_first(slices);
112       continue;
113     }
114
115     switch (p->state) {
116       case GRPC_CHTTP2_DATA_ERROR:
117         p->state = GRPC_CHTTP2_DATA_ERROR;
118         grpc_slice_buffer_remove_first(slices);
119         return GRPC_ERROR_REF(p->error);
120       case GRPC_CHTTP2_DATA_FH_0:
121         s->stats.incoming.framing_bytes++;
122         p->frame_type = *cur;
123         switch (p->frame_type) {
124           case 0:
125             p->is_frame_compressed = false; /* GPR_FALSE */
126             break;
127           case 1:
128             p->is_frame_compressed = true; /* GPR_TRUE */
129             break;
130           default:
131             p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
132                 absl::StrFormat("Bad GRPC frame type 0x%02x", p->frame_type)
133                     .c_str());
134             p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
135                                           static_cast<intptr_t>(s->id));
136             p->error = grpc_error_set_str(
137                 p->error, GRPC_ERROR_STR_RAW_BYTES,
138                 grpc_slice_from_moved_string(grpc_core::UniquePtr<char>(
139                     grpc_dump_slice(*slice, GPR_DUMP_HEX | GPR_DUMP_ASCII))));
140             p->error =
141                 grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
142             p->state = GRPC_CHTTP2_DATA_ERROR;
143             grpc_slice_buffer_remove_first(slices);
144             return GRPC_ERROR_REF(p->error);
145         }
146         if (++cur == end) {
147           p->state = GRPC_CHTTP2_DATA_FH_1;
148           grpc_slice_buffer_remove_first(slices);
149           continue;
150         }
151         ABSL_FALLTHROUGH_INTENDED;
152       case GRPC_CHTTP2_DATA_FH_1:
153         s->stats.incoming.framing_bytes++;
154         p->frame_size = (static_cast<uint32_t>(*cur)) << 24;
155         if (++cur == end) {
156           p->state = GRPC_CHTTP2_DATA_FH_2;
157           grpc_slice_buffer_remove_first(slices);
158           continue;
159         }
160         ABSL_FALLTHROUGH_INTENDED;
161       case GRPC_CHTTP2_DATA_FH_2:
162         s->stats.incoming.framing_bytes++;
163         p->frame_size |= (static_cast<uint32_t>(*cur)) << 16;
164         if (++cur == end) {
165           p->state = GRPC_CHTTP2_DATA_FH_3;
166           grpc_slice_buffer_remove_first(slices);
167           continue;
168         }
169         ABSL_FALLTHROUGH_INTENDED;
170       case GRPC_CHTTP2_DATA_FH_3:
171         s->stats.incoming.framing_bytes++;
172         p->frame_size |= (static_cast<uint32_t>(*cur)) << 8;
173         if (++cur == end) {
174           p->state = GRPC_CHTTP2_DATA_FH_4;
175           grpc_slice_buffer_remove_first(slices);
176           continue;
177         }
178         ABSL_FALLTHROUGH_INTENDED;
179       case GRPC_CHTTP2_DATA_FH_4:
180         s->stats.incoming.framing_bytes++;
181         GPR_ASSERT(stream_out != nullptr);
182         GPR_ASSERT(p->parsing_frame == nullptr);
183         p->frame_size |= (static_cast<uint32_t>(*cur));
184         if (t->channelz_socket != nullptr) {
185           t->channelz_socket->RecordMessageReceived();
186         }
187         p->state = GRPC_CHTTP2_DATA_FRAME;
188         ++cur;
189         message_flags = 0;
190         if (p->is_frame_compressed) {
191           message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
192         }
193         p->parsing_frame = new grpc_core::Chttp2IncomingByteStream(
194             t, s, p->frame_size, message_flags);
195         stream_out->reset(p->parsing_frame);
196         if (p->parsing_frame->remaining_bytes() == 0) {
197           GRPC_ERROR_UNREF(p->parsing_frame->Finished(GRPC_ERROR_NONE, true));
198           p->parsing_frame = nullptr;
199           p->state = GRPC_CHTTP2_DATA_FH_0;
200         }
201         s->pending_byte_stream = true;
202         if (cur != end) {
203           grpc_slice_buffer_sub_first(slices, static_cast<size_t>(cur - beg),
204                                       static_cast<size_t>(end - beg));
205         } else {
206           grpc_slice_buffer_remove_first(slices);
207         }
208         return GRPC_ERROR_NONE;
209       case GRPC_CHTTP2_DATA_FRAME: {
210         GPR_ASSERT(p->parsing_frame != nullptr);
211         GPR_ASSERT(slice_out != nullptr);
212         if (cur == end) {
213           grpc_slice_buffer_remove_first(slices);
214           continue;
215         }
216         uint32_t remaining = static_cast<uint32_t>(end - cur);
217         if (remaining == p->frame_size) {
218           s->stats.incoming.data_bytes += remaining;
219           if (GRPC_ERROR_NONE !=
220               (error = p->parsing_frame->Push(
221                    grpc_slice_sub(*slice, static_cast<size_t>(cur - beg),
222                                   static_cast<size_t>(end - beg)),
223                    slice_out))) {
224             grpc_slice_buffer_remove_first(slices);
225             return error;
226           }
227           if (GRPC_ERROR_NONE !=
228               (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
229             grpc_slice_buffer_remove_first(slices);
230             return error;
231           }
232           p->parsing_frame = nullptr;
233           p->state = GRPC_CHTTP2_DATA_FH_0;
234           grpc_slice_buffer_remove_first(slices);
235           return GRPC_ERROR_NONE;
236         } else if (remaining < p->frame_size) {
237           s->stats.incoming.data_bytes += remaining;
238           if (GRPC_ERROR_NONE !=
239               (error = p->parsing_frame->Push(
240                    grpc_slice_sub(*slice, static_cast<size_t>(cur - beg),
241                                   static_cast<size_t>(end - beg)),
242                    slice_out))) {
243             return error;
244           }
245           p->frame_size -= remaining;
246           grpc_slice_buffer_remove_first(slices);
247           return GRPC_ERROR_NONE;
248         } else {
249           GPR_ASSERT(remaining > p->frame_size);
250           s->stats.incoming.data_bytes += p->frame_size;
251           if (GRPC_ERROR_NONE !=
252               p->parsing_frame->Push(
253                   grpc_slice_sub(
254                       *slice, static_cast<size_t>(cur - beg),
255                       static_cast<size_t>(cur + p->frame_size - beg)),
256                   slice_out)) {
257             grpc_slice_buffer_remove_first(slices);
258             return error;
259           }
260           if (GRPC_ERROR_NONE !=
261               (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
262             grpc_slice_buffer_remove_first(slices);
263             return error;
264           }
265           p->parsing_frame = nullptr;
266           p->state = GRPC_CHTTP2_DATA_FH_0;
267           cur += p->frame_size;
268           grpc_slice_buffer_sub_first(slices, static_cast<size_t>(cur - beg),
269                                       static_cast<size_t>(end - beg));
270           return GRPC_ERROR_NONE;
271         }
272       }
273     }
274   }
275   return GRPC_ERROR_NONE;
276 }
277
278 grpc_error_handle grpc_chttp2_data_parser_parse(void* /*parser*/,
279                                                 grpc_chttp2_transport* t,
280                                                 grpc_chttp2_stream* s,
281                                                 const grpc_slice& slice,
282                                                 int is_last) {
283   if (!s->pending_byte_stream) {
284     grpc_slice_ref_internal(slice);
285     grpc_slice_buffer_add(&s->frame_storage, slice);
286     grpc_chttp2_maybe_complete_recv_message(t, s);
287   } else if (s->on_next) {
288     GPR_ASSERT(s->frame_storage.length == 0);
289     grpc_slice_ref_internal(slice);
290     grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
291     grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->on_next, GRPC_ERROR_NONE);
292     s->on_next = nullptr;
293     s->unprocessed_incoming_frames_decompressed = false;
294   } else {
295     grpc_slice_ref_internal(slice);
296     grpc_slice_buffer_add(&s->frame_storage, slice);
297   }
298
299   if (is_last && s->received_last_frame) {
300     grpc_chttp2_mark_stream_closed(
301         t, s, true, false,
302         t->is_client ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
303                            "Data frame with END_STREAM flag received")
304                      : GRPC_ERROR_NONE);
305   }
306
307   return GRPC_ERROR_NONE;
308 }