3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/transport/cronet/transport/cronet_transport.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/str_format.h"
29 #include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
31 #include <grpc/slice_buffer.h>
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/log.h>
35 #include "src/core/ext/transport/chttp2/transport/bin_decoder.h"
36 #include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
37 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
38 #include "src/core/ext/transport/cronet/transport/cronet_status.h"
39 #include "src/core/lib/debug/trace.h"
40 #include "src/core/lib/gpr/string.h"
41 #include "src/core/lib/gprpp/manual_constructor.h"
42 #include "src/core/lib/iomgr/endpoint.h"
43 #include "src/core/lib/iomgr/exec_ctx.h"
44 #include "src/core/lib/slice/slice_internal.h"
45 #include "src/core/lib/slice/slice_string_helpers.h"
46 #include "src/core/lib/surface/channel.h"
47 #include "src/core/lib/surface/validate_metadata.h"
48 #include "src/core/lib/transport/metadata_batch.h"
49 #include "src/core/lib/transport/static_metadata.h"
50 #include "src/core/lib/transport/timeout_encoding.h"
51 #include "src/core/lib/transport/transport_impl.h"
53 #define GRPC_HEADER_SIZE_IN_BYTES 5
54 #define GRPC_FLUSH_READ_SIZE 4096
56 grpc_core::TraceFlag grpc_cronet_trace(false, "cronet");
57 #define CRONET_LOG(...) \
59 if (grpc_cronet_trace.enabled()) gpr_log(__VA_ARGS__); \
63 ACTION_TAKEN_WITH_CALLBACK,
64 ACTION_TAKEN_NO_CALLBACK,
69 OP_SEND_INITIAL_METADATA = 0,
71 OP_SEND_TRAILING_METADATA,
73 OP_RECV_INITIAL_METADATA,
74 OP_RECV_TRAILING_METADATA,
80 OP_RECV_MESSAGE_AND_ON_COMPLETE,
85 /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
87 static void on_stream_ready(bidirectional_stream*);
88 static void on_response_headers_received(
89 bidirectional_stream*, const bidirectional_stream_header_array*,
91 static void on_write_completed(bidirectional_stream*, const char*);
92 static void on_read_completed(bidirectional_stream*, char*, int);
93 static void on_response_trailers_received(
94 bidirectional_stream*, const bidirectional_stream_header_array*);
95 static void on_succeeded(bidirectional_stream*);
96 static void on_failed(bidirectional_stream*, int);
97 static void on_canceled(bidirectional_stream*);
98 static bidirectional_stream_callback cronet_callbacks = {
100 on_response_headers_received,
103 on_response_trailers_received,
108 /* Cronet transport object */
109 struct grpc_cronet_transport {
110 grpc_transport base; /* must be first element in this structure */
111 stream_engine* engine;
113 bool use_packet_coalescing;
115 typedef struct grpc_cronet_transport grpc_cronet_transport;
117 /* TODO (makdharma): reorder structure for memory efficiency per
118 http://www.catb.org/esr/structure-packing/#_structure_reordering: */
120 explicit read_state(grpc_core::Arena* arena)
121 : trailing_metadata(arena), initial_metadata(arena) {
122 grpc_slice_buffer_init(&read_slice_buffer);
125 /* vars to store data coming from server */
126 char* read_buffer = nullptr;
127 bool length_field_received = false;
128 int received_bytes = 0;
129 int remaining_bytes = 0;
130 int length_field = 0;
131 bool compressed = false;
132 char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES] = {};
133 char* payload_field = nullptr;
134 bool read_stream_closed = false;
136 /* vars for holding data destined for the application */
137 grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs;
138 grpc_slice_buffer read_slice_buffer;
140 /* vars for trailing metadata */
141 grpc_chttp2_incoming_metadata_buffer trailing_metadata;
142 bool trailing_metadata_valid = false;
144 /* vars for initial metadata */
145 grpc_chttp2_incoming_metadata_buffer initial_metadata;
149 char* write_buffer = nullptr;
152 /* track state of one stream op */
154 explicit op_state(grpc_core::Arena* arena) : rs(arena) {}
156 bool state_op_done[OP_NUM_OPS] = {};
157 bool state_callback_received[OP_NUM_OPS] = {};
158 /* A non-zero gRPC status code has been seen */
159 bool fail_state = false;
160 /* Transport is discarding all buffered messages */
161 bool flush_read = false;
162 bool flush_cronet_when_ready = false;
163 bool pending_write_for_trailer = false;
164 bool pending_send_message = false;
165 /* User requested RECV_TRAILING_METADATA */
166 bool pending_recv_trailing_metadata = false;
167 cronet_net_error_code net_error = OK;
168 grpc_error_handle cancel_error = GRPC_ERROR_NONE;
169 /* data structure for storing data coming from server */
170 struct read_state rs;
171 /* data structure for storing data going to the server */
172 struct write_state ws;
177 struct op_and_state {
178 op_and_state(stream_obj* s, const grpc_transport_stream_op_batch& op);
180 grpc_transport_stream_op_batch op;
181 struct op_state state;
183 struct stream_obj* s; /* Pointer back to the stream object */
184 /* next op_and_state in the linked list */
185 struct op_and_state* next = nullptr;
189 int num_pending_ops = 0;
190 struct op_and_state* head = nullptr;
194 stream_obj(grpc_transport* gt, grpc_stream* gs,
195 grpc_stream_refcount* refcount, grpc_core::Arena* arena);
198 grpc_core::Arena* arena;
199 struct op_and_state* oas = nullptr;
200 grpc_transport_stream_op_batch* curr_op = nullptr;
201 grpc_cronet_transport* curr_ct;
202 grpc_stream* curr_gs;
203 bidirectional_stream* cbs = nullptr;
204 bidirectional_stream_header_array header_array =
205 bidirectional_stream_header_array(); // Zero-initialize the structure.
207 /* Stream level state. Some state will be tracked both at stream and stream_op
209 struct op_state state;
212 struct op_storage storage;
214 /* Mutex to protect storage */
217 /* Refcount object of the stream */
218 grpc_stream_refcount* refcount;
222 #define GRPC_CRONET_STREAM_REF(stream, reason) \
223 grpc_cronet_stream_ref((stream), (reason))
224 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
225 grpc_cronet_stream_unref((stream), (reason))
226 void grpc_cronet_stream_ref(stream_obj* s, const char* reason) {
227 grpc_stream_ref(s->refcount, reason);
229 void grpc_cronet_stream_unref(stream_obj* s, const char* reason) {
230 grpc_stream_unref(s->refcount, reason);
233 #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
234 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
235 grpc_cronet_stream_unref((stream))
236 void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); }
237 void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); }
240 static enum e_op_result execute_stream_op(struct op_and_state* oas);
243 Utility function to translate enum into string for printing
245 static const char* op_result_string(enum e_op_result i) {
247 case ACTION_TAKEN_WITH_CALLBACK:
248 return "ACTION_TAKEN_WITH_CALLBACK";
249 case ACTION_TAKEN_NO_CALLBACK:
250 return "ACTION_TAKEN_NO_CALLBACK";
251 case NO_ACTION_POSSIBLE:
252 return "NO_ACTION_POSSIBLE";
254 GPR_UNREACHABLE_CODE(return "UNKNOWN");
257 static const char* op_id_string(enum e_op_id i) {
259 case OP_SEND_INITIAL_METADATA:
260 return "OP_SEND_INITIAL_METADATA";
261 case OP_SEND_MESSAGE:
262 return "OP_SEND_MESSAGE";
263 case OP_SEND_TRAILING_METADATA:
264 return "OP_SEND_TRAILING_METADATA";
265 case OP_RECV_MESSAGE:
266 return "OP_RECV_MESSAGE";
267 case OP_RECV_INITIAL_METADATA:
268 return "OP_RECV_INITIAL_METADATA";
269 case OP_RECV_TRAILING_METADATA:
270 return "OP_RECV_TRAILING_METADATA";
271 case OP_CANCEL_ERROR:
272 return "OP_CANCEL_ERROR";
274 return "OP_ON_COMPLETE";
278 return "OP_SUCCEEDED";
280 return "OP_CANCELED";
281 case OP_RECV_MESSAGE_AND_ON_COMPLETE:
282 return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
283 case OP_READ_REQ_MADE:
284 return "OP_READ_REQ_MADE";
291 static void null_and_maybe_free_read_buffer(stream_obj* s) {
292 if (s->state.rs.read_buffer &&
293 s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
294 gpr_free(s->state.rs.read_buffer);
296 s->state.rs.read_buffer = nullptr;
299 static void read_grpc_header(stream_obj* s) {
300 s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
301 s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
302 s->state.rs.received_bytes = 0;
303 s->state.rs.compressed = false;
304 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
305 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
306 s->state.rs.remaining_bytes);
309 static grpc_error_handle make_error_with_desc(int error_code,
310 int cronet_internal_error_code,
312 return grpc_error_set_int(GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrFormat(
313 "Cronet error code:%d, Cronet error detail:%s",
314 cronet_internal_error_code, desc)),
315 GRPC_ERROR_INT_GRPC_STATUS, error_code);
318 inline op_and_state::op_and_state(stream_obj* s,
319 const grpc_transport_stream_op_batch& op)
320 : op(op), state(s->arena), s(s) {}
323 Add a new stream op to op storage.
325 static void add_to_storage(struct stream_obj* s,
326 grpc_transport_stream_op_batch* op) {
327 struct op_storage* storage = &s->storage;
328 /* add new op at the beginning of the linked list. The memory is freed
329 in remove_from_storage */
330 op_and_state* new_op = new op_and_state(s, *op);
332 new_op->next = storage->head;
333 storage->head = new_op;
334 storage->num_pending_ops++;
335 if (op->send_message) {
336 s->state.pending_send_message = true;
338 if (op->recv_trailing_metadata) {
339 s->state.pending_recv_trailing_metadata = true;
341 CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
342 storage->num_pending_ops);
343 gpr_mu_unlock(&s->mu);
347 Traverse the linked list and delete op and free memory
349 static void remove_from_storage(struct stream_obj* s,
350 struct op_and_state* oas) {
351 struct op_and_state* curr;
352 if (s->storage.head == nullptr || oas == nullptr) {
355 if (s->storage.head == oas) {
356 s->storage.head = oas->next;
358 s->storage.num_pending_ops--;
359 CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
360 s->storage.num_pending_ops);
362 for (curr = s->storage.head; curr != nullptr; curr = curr->next) {
363 if (curr->next == oas) {
364 curr->next = oas->next;
365 s->storage.num_pending_ops--;
366 CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
367 s->storage.num_pending_ops);
370 } else if (GPR_UNLIKELY(curr->next == nullptr)) {
371 CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
378 Cycle through ops and try to take next action. Break when either
379 an action with callback is taken, or no action is possible.
380 This can get executed from the Cronet network thread via cronet callback
381 or on the application supplied thread via the perform_stream_op function.
383 static void execute_from_storage(stream_obj* s) {
385 for (struct op_and_state* curr = s->storage.head; curr != nullptr;) {
386 CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
387 GPR_ASSERT(!curr->done);
388 enum e_op_result result = execute_stream_op(curr);
389 CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
390 op_result_string(result));
391 /* if this op is done, then remove it and free memory */
393 struct op_and_state* next = curr->next;
394 remove_from_storage(s, curr);
396 } else if (result == NO_ACTION_POSSIBLE) {
398 } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
399 /* wait for the callback */
401 } /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
403 gpr_mu_unlock(&s->mu);
406 static void convert_cronet_array_to_metadata(
407 const bidirectional_stream_header_array* header_array,
408 grpc_chttp2_incoming_metadata_buffer* mds) {
409 for (size_t i = 0; i < header_array->count; i++) {
410 CRONET_LOG(GPR_DEBUG, "header key=%s, value=%s",
411 header_array->headers[i].key, header_array->headers[i].value);
412 grpc_slice key = grpc_slice_intern(
413 grpc_slice_from_static_string(header_array->headers[i].key));
415 if (grpc_is_refcounted_slice_binary_header(key)) {
416 value = grpc_slice_from_static_string(header_array->headers[i].value);
417 value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length(
418 value, grpc_chttp2_base64_infer_length_after_decode(value)));
420 value = grpc_slice_intern(
421 grpc_slice_from_static_string(header_array->headers[i].value));
423 GRPC_LOG_IF_ERROR("convert_cronet_array_to_metadata",
424 grpc_chttp2_incoming_metadata_buffer_add(
425 mds, grpc_mdelem_from_slices(key, value)));
432 static void on_failed(bidirectional_stream* stream, int net_error) {
433 gpr_log(GPR_ERROR, "on_failed(%p, %d)", stream, net_error);
434 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
435 grpc_core::ExecCtx exec_ctx;
437 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
439 bidirectional_stream_destroy(s->cbs);
440 s->state.state_callback_received[OP_FAILED] = true;
441 s->state.net_error = static_cast<cronet_net_error_code>(net_error);
443 if (s->header_array.headers) {
444 gpr_free(s->header_array.headers);
445 s->header_array.headers = nullptr;
447 if (s->state.ws.write_buffer) {
448 gpr_free(s->state.ws.write_buffer);
449 s->state.ws.write_buffer = nullptr;
451 null_and_maybe_free_read_buffer(s);
452 gpr_mu_unlock(&s->mu);
453 execute_from_storage(s);
454 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
460 static void on_canceled(bidirectional_stream* stream) {
461 CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
462 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
463 grpc_core::ExecCtx exec_ctx;
465 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
467 bidirectional_stream_destroy(s->cbs);
468 s->state.state_callback_received[OP_CANCELED] = true;
470 if (s->header_array.headers) {
471 gpr_free(s->header_array.headers);
472 s->header_array.headers = nullptr;
474 if (s->state.ws.write_buffer) {
475 gpr_free(s->state.ws.write_buffer);
476 s->state.ws.write_buffer = nullptr;
478 null_and_maybe_free_read_buffer(s);
479 gpr_mu_unlock(&s->mu);
480 execute_from_storage(s);
481 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
487 static void on_succeeded(bidirectional_stream* stream) {
488 CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
489 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
490 grpc_core::ExecCtx exec_ctx;
492 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
494 bidirectional_stream_destroy(s->cbs);
495 s->state.state_callback_received[OP_SUCCEEDED] = true;
497 null_and_maybe_free_read_buffer(s);
498 gpr_mu_unlock(&s->mu);
499 execute_from_storage(s);
500 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
506 static void on_stream_ready(bidirectional_stream* stream) {
507 CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
508 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
509 grpc_core::ExecCtx exec_ctx;
510 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
511 grpc_cronet_transport* t = s->curr_ct;
513 s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
514 s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
515 /* Free the memory allocated for headers */
516 if (s->header_array.headers) {
517 gpr_free(s->header_array.headers);
518 s->header_array.headers = nullptr;
520 /* Send the initial metadata on wire if there is no SEND_MESSAGE or
521 * SEND_TRAILING_METADATA ops pending */
522 if (t->use_packet_coalescing) {
523 if (s->state.flush_cronet_when_ready) {
524 CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
525 bidirectional_stream_flush(stream);
528 gpr_mu_unlock(&s->mu);
529 execute_from_storage(s);
535 static void on_response_headers_received(
536 bidirectional_stream* stream,
537 const bidirectional_stream_header_array* headers,
538 const char* negotiated_protocol) {
539 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
540 grpc_core::ExecCtx exec_ctx;
541 CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
542 headers, negotiated_protocol);
543 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
545 /* Identify if this is a header or a trailer (in a trailer-only response case)
547 for (size_t i = 0; i < headers->count; i++) {
548 if (0 == strcmp("grpc-status", headers->headers[i].key)) {
549 on_response_trailers_received(stream, headers);
551 /* Do an extra read for a trailer-only stream to trigger on_succeeded()
559 convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata);
560 s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
561 if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
562 s->state.state_callback_received[OP_FAILED])) {
563 /* Do an extra read to trigger on_succeeded() callback in case connection
565 GPR_ASSERT(s->state.rs.length_field_received == false);
568 gpr_mu_unlock(&s->mu);
569 execute_from_storage(s);
575 static void on_write_completed(bidirectional_stream* stream, const char* data) {
576 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
577 grpc_core::ExecCtx exec_ctx;
578 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
579 CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
581 if (s->state.ws.write_buffer) {
582 gpr_free(s->state.ws.write_buffer);
583 s->state.ws.write_buffer = nullptr;
585 s->state.state_callback_received[OP_SEND_MESSAGE] = true;
586 gpr_mu_unlock(&s->mu);
587 execute_from_storage(s);
593 static void on_read_completed(bidirectional_stream* stream, char* data,
595 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
596 grpc_core::ExecCtx exec_ctx;
597 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
598 CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
601 s->state.state_callback_received[OP_RECV_MESSAGE] = true;
602 if (count > 0 && s->state.flush_read) {
603 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
604 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
605 GRPC_FLUSH_READ_SIZE);
606 gpr_mu_unlock(&s->mu);
607 } else if (count > 0) {
608 s->state.rs.received_bytes += count;
609 s->state.rs.remaining_bytes -= count;
610 if (s->state.rs.remaining_bytes > 0) {
611 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
612 s->state.state_op_done[OP_READ_REQ_MADE] = true;
613 bidirectional_stream_read(
614 s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
615 s->state.rs.remaining_bytes);
616 gpr_mu_unlock(&s->mu);
618 gpr_mu_unlock(&s->mu);
619 execute_from_storage(s);
622 null_and_maybe_free_read_buffer(s);
623 s->state.rs.read_stream_closed = true;
624 gpr_mu_unlock(&s->mu);
625 execute_from_storage(s);
632 static void on_response_trailers_received(
633 bidirectional_stream* stream,
634 const bidirectional_stream_header_array* trailers) {
635 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
636 grpc_core::ExecCtx exec_ctx;
637 CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
639 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
640 grpc_cronet_transport* t = s->curr_ct;
642 s->state.rs.trailing_metadata_valid = false;
643 convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata);
644 if (trailers->count > 0) {
645 s->state.rs.trailing_metadata_valid = true;
647 s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
648 /* Send a EOS when server terminates the stream (testServerFinishesRequest) to
649 * trigger on_succeeded */
650 if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] &&
651 !(s->state.state_op_done[OP_CANCEL_ERROR] ||
652 s->state.state_callback_received[OP_FAILED])) {
653 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
654 s->state.state_callback_received[OP_SEND_MESSAGE] = false;
655 bidirectional_stream_write(s->cbs, "", 0, true);
656 if (t->use_packet_coalescing) {
657 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
658 bidirectional_stream_flush(s->cbs);
660 s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
662 gpr_mu_unlock(&s->mu);
664 gpr_mu_unlock(&s->mu);
665 execute_from_storage(s);
670 Utility function that takes the data from s->write_slice_buffer and assembles
671 into a contiguous byte stream with 5 byte gRPC header prepended.
673 static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer,
674 char** pp_write_buffer,
675 size_t* p_write_buffer_size, uint32_t flags) {
676 size_t length = write_slice_buffer->length;
677 *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
678 /* This is freed in the on_write_completed callback */
680 static_cast<char*>(gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES));
681 *pp_write_buffer = write_buffer;
682 uint8_t* p = reinterpret_cast<uint8_t*>(write_buffer);
683 /* Append 5 byte header */
684 /* Compressed flag */
685 *p++ = static_cast<uint8_t>((flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0);
687 *p++ = static_cast<uint8_t>(length >> 24);
688 *p++ = static_cast<uint8_t>(length >> 16);
689 *p++ = static_cast<uint8_t>(length >> 8);
690 *p++ = static_cast<uint8_t>(length);
691 /* append actual data */
693 for (size_t i = 0; i < write_slice_buffer->count; ++i) {
694 memcpy(p + offset, GRPC_SLICE_START_PTR(write_slice_buffer->slices[i]),
695 GRPC_SLICE_LENGTH(write_slice_buffer->slices[i]));
696 offset += GRPC_SLICE_LENGTH(write_slice_buffer->slices[i]);
701 Convert metadata in a format that Cronet can consume
703 static void convert_metadata_to_cronet_headers(
704 grpc_metadata_batch* metadata, const char* host, std::string* pp_url,
705 bidirectional_stream_header** pp_headers, size_t* p_num_headers,
706 const char** method) {
707 grpc_linked_mdelem* curr = metadata->list.head;
708 /* Walk the linked list and get number of header fields */
709 size_t num_headers_available = 0;
710 while (curr != nullptr) {
712 num_headers_available++;
714 grpc_millis deadline = metadata->deadline;
715 if (deadline != GRPC_MILLIS_INF_FUTURE) {
716 num_headers_available++;
718 /* Allocate enough memory. It is freed in the on_stream_ready callback
720 bidirectional_stream_header* headers =
721 static_cast<bidirectional_stream_header*>(gpr_malloc(
722 sizeof(bidirectional_stream_header) * num_headers_available));
723 *pp_headers = headers;
725 /* Walk the linked list again, this time copying the header fields.
726 s->num_headers can be less than num_headers_available, as some headers
727 are not used for cronet.
728 TODO (makdharma): Eliminate need to traverse the LL second time for perf.
730 curr = metadata->list.head;
731 size_t num_headers = 0;
732 while (num_headers < num_headers_available) {
733 grpc_mdelem mdelem = curr->md;
735 char* key = grpc_slice_to_c_string(GRPC_MDKEY(mdelem));
737 if (grpc_is_binary_header_internal(GRPC_MDKEY(mdelem))) {
738 grpc_slice wire_value = grpc_chttp2_base64_encode(GRPC_MDVALUE(mdelem));
739 value = grpc_slice_to_c_string(wire_value);
740 grpc_slice_unref_internal(wire_value);
742 value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem));
744 if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_SCHEME) ||
745 grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem),
746 GRPC_MDSTR_AUTHORITY)) {
747 /* Cronet populates these fields on its own */
752 if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_METHOD)) {
753 if (grpc_slice_eq_static_interned(GRPC_MDVALUE(mdelem), GRPC_MDSTR_PUT)) {
756 /* POST method in default*/
763 if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_PATH)) {
764 /* Create URL by appending :path value to the hostname */
765 *pp_url = absl::StrCat("https://", host, value);
770 CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
771 headers[num_headers].key = key;
772 headers[num_headers].value = value;
774 if (curr == nullptr) {
778 if (deadline != GRPC_MILLIS_INF_FUTURE) {
779 char* key = grpc_slice_to_c_string(GRPC_MDSTR_GRPC_TIMEOUT);
781 static_cast<char*>(gpr_malloc(GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE));
782 grpc_http2_encode_timeout(deadline - grpc_core::ExecCtx::Get()->Now(),
784 headers[num_headers].key = key;
785 headers[num_headers].value = value;
790 *p_num_headers = num_headers;
793 static void parse_grpc_header(const uint8_t* data, int* length,
795 const uint8_t c = *data;
796 const uint8_t* p = data + 1;
797 *compressed = ((c & 0x01) == 0x01);
799 *length |= (*p++) << 24;
800 *length |= (*p++) << 16;
801 *length |= (*p++) << 8;
805 static bool header_has_authority(grpc_linked_mdelem* head) {
806 while (head != nullptr) {
807 if (grpc_slice_eq_static_interned(GRPC_MDKEY(head->md),
808 GRPC_MDSTR_AUTHORITY)) {
817 Op Execution: Decide if one of the actions contained in the stream op can be
818 executed. This is the heart of the state machine.
820 static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
821 struct stream_obj* s, struct op_state* op_state,
822 enum e_op_id op_id) {
823 struct op_state* stream_state = &s->state;
824 grpc_cronet_transport* t = s->curr_ct;
826 /* When call is canceled, every op can be run, except under following
829 bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
830 stream_state->state_callback_received[OP_FAILED];
831 if (is_canceled_or_failed) {
832 if (op_id == OP_SEND_INITIAL_METADATA) {
833 CRONET_LOG(GPR_DEBUG, "Because");
836 if (op_id == OP_SEND_MESSAGE) {
837 CRONET_LOG(GPR_DEBUG, "Because");
840 if (op_id == OP_SEND_TRAILING_METADATA) {
841 CRONET_LOG(GPR_DEBUG, "Because");
844 if (op_id == OP_CANCEL_ERROR) {
845 CRONET_LOG(GPR_DEBUG, "Because");
848 /* already executed */
849 if (op_id == OP_RECV_INITIAL_METADATA &&
850 stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
851 CRONET_LOG(GPR_DEBUG, "Because");
854 if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) {
855 CRONET_LOG(GPR_DEBUG, "Because");
858 if (op_id == OP_RECV_TRAILING_METADATA &&
859 stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
860 CRONET_LOG(GPR_DEBUG, "Because");
863 /* ON_COMPLETE can be processed if one of the following conditions is met:
864 * 1. the stream failed
865 * 2. the stream is cancelled, and the callback is received
866 * 3. the stream succeeded before cancel is effective
867 * 4. the stream is cancelled, and the stream is never started */
868 if (op_id == OP_ON_COMPLETE &&
869 !(stream_state->state_callback_received[OP_FAILED] ||
870 stream_state->state_callback_received[OP_CANCELED] ||
871 stream_state->state_callback_received[OP_SUCCEEDED] ||
872 !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
873 CRONET_LOG(GPR_DEBUG, "Because");
876 } else if (op_id == OP_SEND_INITIAL_METADATA) {
877 /* already executed */
878 if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
879 } else if (op_id == OP_RECV_INITIAL_METADATA) {
880 if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
881 /* already executed */
883 } else if (!stream_state
884 ->state_callback_received[OP_SEND_INITIAL_METADATA]) {
885 /* we haven't sent headers yet. */
887 } else if (!stream_state
888 ->state_callback_received[OP_RECV_INITIAL_METADATA] &&
889 !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
890 /* we haven't received headers yet. */
893 } else if (op_id == OP_SEND_MESSAGE) {
894 if (op_state->state_op_done[OP_SEND_MESSAGE]) {
895 /* already executed (note we're checking op specific state, not stream
898 } else if (!stream_state
899 ->state_callback_received[OP_SEND_INITIAL_METADATA]) {
900 /* we haven't sent headers yet. */
903 } else if (op_id == OP_RECV_MESSAGE) {
904 if (op_state->state_op_done[OP_RECV_MESSAGE]) {
905 /* already executed */
907 } else if (!stream_state
908 ->state_callback_received[OP_RECV_INITIAL_METADATA] &&
909 !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
910 /* we haven't received headers yet. */
913 } else if (op_id == OP_RECV_TRAILING_METADATA) {
914 if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
915 /* already executed */
917 } else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
918 !stream_state->state_op_done[OP_RECV_MESSAGE]) {
919 /* we have asked for but haven't received message yet. */
921 } else if (!stream_state
922 ->state_callback_received[OP_RECV_TRAILING_METADATA]) {
923 /* we haven't received trailers yet. */
925 } else if (!stream_state->state_callback_received[OP_SUCCEEDED]) {
926 /* we haven't received on_succeeded yet. */
929 } else if (op_id == OP_SEND_TRAILING_METADATA) {
930 if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
931 /* already executed */
933 } else if (!stream_state
934 ->state_callback_received[OP_SEND_INITIAL_METADATA]) {
935 /* we haven't sent initial metadata yet */
937 } else if (stream_state->pending_send_message &&
938 !stream_state->state_op_done[OP_SEND_MESSAGE]) {
939 /* we haven't sent message yet */
941 } else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
942 !stream_state->state_callback_received[OP_SEND_MESSAGE] &&
943 !(t->use_packet_coalescing &&
944 stream_state->pending_write_for_trailer)) {
945 /* we haven't got on_write_completed for the send yet */
948 } else if (op_id == OP_CANCEL_ERROR) {
949 /* already executed */
950 if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
951 } else if (op_id == OP_ON_COMPLETE) {
952 if (op_state->state_op_done[OP_ON_COMPLETE]) {
953 /* already executed (note we're checking op specific state, not stream
955 CRONET_LOG(GPR_DEBUG, "Because");
958 /* Check if every op that was asked for is done. */
959 /* TODO(muxi): We should not consider the recv ops here, since they
960 * have their own callbacks. We should invoke a batch's on_complete
961 * as soon as all of the batch's send ops are complete, even if
962 * there are still recv ops pending. */
963 else if (curr_op->send_initial_metadata &&
964 !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
965 CRONET_LOG(GPR_DEBUG, "Because");
967 } else if (curr_op->send_message &&
968 !op_state->state_op_done[OP_SEND_MESSAGE]) {
969 CRONET_LOG(GPR_DEBUG, "Because");
971 } else if (curr_op->send_message &&
972 !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
973 CRONET_LOG(GPR_DEBUG, "Because");
975 } else if (curr_op->send_trailing_metadata &&
976 !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
977 CRONET_LOG(GPR_DEBUG, "Because");
979 } else if (curr_op->recv_initial_metadata &&
980 !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
981 CRONET_LOG(GPR_DEBUG, "Because");
983 } else if (curr_op->recv_message &&
984 !op_state->state_op_done[OP_RECV_MESSAGE]) {
985 CRONET_LOG(GPR_DEBUG, "Because");
987 } else if (curr_op->cancel_stream &&
988 !stream_state->state_callback_received[OP_CANCELED]) {
989 CRONET_LOG(GPR_DEBUG, "Because");
991 } else if (curr_op->recv_trailing_metadata) {
992 /* We aren't done with trailing metadata yet */
993 if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
994 CRONET_LOG(GPR_DEBUG, "Because");
997 /* We've asked for actual message in an earlier op, and it hasn't been
999 else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
1000 /* If this op is not the one asking for read, (which means some earlier
1001 op has asked), and the read hasn't been delivered. */
1002 if (!curr_op->recv_message &&
1003 !stream_state->state_callback_received[OP_SUCCEEDED]) {
1004 CRONET_LOG(GPR_DEBUG, "Because");
1009 /* We should see at least one on_write_completed for the trailers that we
1011 else if (curr_op->send_trailing_metadata &&
1012 !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
1016 CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
1017 result ? "YES" : "NO");
1022 TODO (makdharma): Break down this function in smaller chunks for readability.
1024 static enum e_op_result execute_stream_op(struct op_and_state* oas) {
1025 grpc_transport_stream_op_batch* stream_op = &oas->op;
1026 struct stream_obj* s = oas->s;
1027 grpc_cronet_transport* t = s->curr_ct;
1028 struct op_state* stream_state = &s->state;
1029 enum e_op_result result = NO_ACTION_POSSIBLE;
1030 if (stream_op->send_initial_metadata &&
1031 op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) {
1032 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
1033 /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
1035 GPR_ASSERT(s->cbs == nullptr);
1036 GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
1038 bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks);
1039 CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
1040 if (t->use_packet_coalescing) {
1041 bidirectional_stream_disable_auto_flush(s->cbs, true);
1042 bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
1045 const char* method = "POST";
1046 s->header_array.headers = nullptr;
1047 convert_metadata_to_cronet_headers(
1048 stream_op->payload->send_initial_metadata.send_initial_metadata,
1049 t->host, &url, &s->header_array.headers, &s->header_array.count,
1051 s->header_array.capacity = s->header_array.count;
1052 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs,
1054 bidirectional_stream_start(s->cbs, url.c_str(), 0, method, &s->header_array,
1056 unsigned int header_index;
1057 for (header_index = 0; header_index < s->header_array.count;
1059 gpr_free(const_cast<char*>(s->header_array.headers[header_index].key));
1060 gpr_free(const_cast<char*>(s->header_array.headers[header_index].value));
1062 stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
1063 if (t->use_packet_coalescing) {
1064 if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
1065 s->state.flush_cronet_when_ready = true;
1068 result = ACTION_TAKEN_WITH_CALLBACK;
1069 } else if (stream_op->send_message &&
1070 op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
1071 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas);
1072 stream_state->pending_send_message = false;
1073 if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1074 stream_state->state_callback_received[OP_FAILED] ||
1075 stream_state->state_callback_received[OP_SUCCEEDED]) {
1076 result = NO_ACTION_POSSIBLE;
1077 CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1079 grpc_slice_buffer write_slice_buffer;
1081 grpc_slice_buffer_init(&write_slice_buffer);
1082 while (write_slice_buffer.length <
1083 stream_op->payload->send_message.send_message->length()) {
1084 /* TODO(roth): When we add support for incremental sending,this code
1085 * will need to be changed to support asynchronous delivery of the
1086 * send_message payload. */
1087 if (!stream_op->payload->send_message.send_message->Next(
1088 stream_op->payload->send_message.send_message->length(),
1090 /* Should never reach here */
1093 if (GRPC_ERROR_NONE !=
1094 stream_op->payload->send_message.send_message->Pull(&slice)) {
1095 /* Should never reach here */
1098 grpc_slice_buffer_add(&write_slice_buffer, slice);
1100 size_t write_buffer_size;
1101 create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
1103 stream_op->payload->send_message.send_message->flags());
1104 if (write_buffer_size > 0) {
1105 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
1106 stream_state->ws.write_buffer);
1107 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1108 bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
1109 static_cast<int>(write_buffer_size), false);
1110 grpc_slice_buffer_destroy_internal(&write_slice_buffer);
1111 if (t->use_packet_coalescing) {
1112 if (!stream_op->send_trailing_metadata) {
1113 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1114 bidirectional_stream_flush(s->cbs);
1115 result = ACTION_TAKEN_WITH_CALLBACK;
1117 stream_state->pending_write_for_trailer = true;
1118 result = ACTION_TAKEN_NO_CALLBACK;
1121 result = ACTION_TAKEN_WITH_CALLBACK;
1124 /* Should never reach here */
1128 stream_state->state_op_done[OP_SEND_MESSAGE] = true;
1129 oas->state.state_op_done[OP_SEND_MESSAGE] = true;
1130 stream_op->payload->send_message.send_message.reset();
1131 } else if (stream_op->send_trailing_metadata &&
1132 op_can_be_run(stream_op, s, &oas->state,
1133 OP_SEND_TRAILING_METADATA)) {
1134 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas);
1135 if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1136 stream_state->state_callback_received[OP_FAILED] ||
1137 stream_state->state_callback_received[OP_SUCCEEDED]) {
1138 result = NO_ACTION_POSSIBLE;
1139 CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1141 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
1142 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1143 bidirectional_stream_write(s->cbs, "", 0, true);
1144 if (t->use_packet_coalescing) {
1145 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1146 bidirectional_stream_flush(s->cbs);
1148 result = ACTION_TAKEN_WITH_CALLBACK;
1150 stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
1151 } else if (stream_op->recv_initial_metadata &&
1152 op_can_be_run(stream_op, s, &oas->state,
1153 OP_RECV_INITIAL_METADATA)) {
1154 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
1155 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1156 grpc_core::ExecCtx::Run(
1158 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1160 } else if (stream_state->state_callback_received[OP_FAILED]) {
1161 grpc_core::ExecCtx::Run(
1163 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1165 } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1166 grpc_core::ExecCtx::Run(
1168 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1171 grpc_chttp2_incoming_metadata_buffer_publish(
1172 &oas->s->state.rs.initial_metadata,
1173 stream_op->payload->recv_initial_metadata.recv_initial_metadata);
1174 grpc_core::ExecCtx::Run(
1176 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1179 stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
1180 result = ACTION_TAKEN_NO_CALLBACK;
1181 } else if (stream_op->recv_message &&
1182 op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) {
1183 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
1184 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1185 CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
1186 grpc_core::ExecCtx::Run(
1187 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1189 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1190 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1191 result = ACTION_TAKEN_NO_CALLBACK;
1192 } else if (stream_state->state_callback_received[OP_FAILED]) {
1193 CRONET_LOG(GPR_DEBUG, "Stream failed.");
1194 grpc_core::ExecCtx::Run(
1195 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1197 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1198 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1199 result = ACTION_TAKEN_NO_CALLBACK;
1200 } else if (stream_state->rs.read_stream_closed) {
1201 /* No more data will be received */
1202 CRONET_LOG(GPR_DEBUG, "read stream closed");
1203 grpc_core::ExecCtx::Run(
1204 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1206 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1207 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1208 result = ACTION_TAKEN_NO_CALLBACK;
1209 } else if (stream_state->flush_read) {
1210 CRONET_LOG(GPR_DEBUG, "flush read");
1211 grpc_core::ExecCtx::Run(
1212 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1214 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1215 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1216 result = ACTION_TAKEN_NO_CALLBACK;
1217 } else if (!stream_state->rs.length_field_received) {
1218 if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
1219 stream_state->rs.remaining_bytes == 0) {
1220 /* Start a read operation for data */
1221 stream_state->rs.length_field_received = true;
1223 reinterpret_cast<const uint8_t*>(stream_state->rs.read_buffer),
1224 &stream_state->rs.length_field, &stream_state->rs.compressed);
1225 CRONET_LOG(GPR_DEBUG, "length field = %d",
1226 stream_state->rs.length_field);
1227 if (stream_state->rs.length_field > 0) {
1228 stream_state->rs.read_buffer = static_cast<char*>(
1229 gpr_malloc(static_cast<size_t>(stream_state->rs.length_field)));
1230 GPR_ASSERT(stream_state->rs.read_buffer);
1231 stream_state->rs.remaining_bytes = stream_state->rs.length_field;
1232 stream_state->rs.received_bytes = 0;
1233 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1234 stream_state->state_op_done[OP_READ_REQ_MADE] =
1235 true; /* Indicates that at least one read request has been made */
1236 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1237 stream_state->rs.remaining_bytes);
1238 result = ACTION_TAKEN_WITH_CALLBACK;
1240 stream_state->rs.remaining_bytes = 0;
1241 CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
1242 /* Clean up read_slice_buffer in case there is unread data. */
1243 grpc_slice_buffer_destroy_internal(
1244 &stream_state->rs.read_slice_buffer);
1245 grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1247 if (stream_state->rs.compressed) {
1248 flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1250 stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1251 stream_op->payload->recv_message.recv_message->reset(
1252 stream_state->rs.sbs.get());
1253 grpc_core::ExecCtx::Run(
1255 stream_op->payload->recv_message.recv_message_ready,
1257 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1258 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1260 /* Extra read to trigger on_succeed */
1261 stream_state->rs.length_field_received = false;
1262 stream_state->state_op_done[OP_READ_REQ_MADE] =
1263 true; /* Indicates that at least one read request has been made */
1264 read_grpc_header(s);
1265 result = ACTION_TAKEN_NO_CALLBACK;
1267 } else if (stream_state->rs.remaining_bytes == 0) {
1268 /* Start a read operation for first 5 bytes (GRPC header) */
1269 stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1270 stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1271 stream_state->rs.received_bytes = 0;
1272 stream_state->rs.compressed = false;
1273 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1274 stream_state->state_op_done[OP_READ_REQ_MADE] =
1275 true; /* Indicates that at least one read request has been made */
1276 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1277 stream_state->rs.remaining_bytes);
1278 result = ACTION_TAKEN_WITH_CALLBACK;
1280 result = NO_ACTION_POSSIBLE;
1282 } else if (stream_state->rs.remaining_bytes == 0) {
1283 CRONET_LOG(GPR_DEBUG, "read operation complete");
1284 grpc_slice read_data_slice =
1285 GRPC_SLICE_MALLOC((uint32_t)stream_state->rs.length_field);
1286 uint8_t* dst_p = GRPC_SLICE_START_PTR(read_data_slice);
1287 memcpy(dst_p, stream_state->rs.read_buffer,
1288 static_cast<size_t>(stream_state->rs.length_field));
1289 null_and_maybe_free_read_buffer(s);
1290 /* Clean up read_slice_buffer in case there is unread data. */
1291 grpc_slice_buffer_destroy_internal(&stream_state->rs.read_slice_buffer);
1292 grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1293 grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
1296 if (stream_state->rs.compressed) {
1297 flags = GRPC_WRITE_INTERNAL_COMPRESS;
1299 stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1300 stream_op->payload->recv_message.recv_message->reset(
1301 stream_state->rs.sbs.get());
1302 grpc_core::ExecCtx::Run(
1303 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1305 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1306 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1307 /* Do an extra read to trigger on_succeeded() callback in case connection
1309 stream_state->rs.length_field_received = false;
1310 read_grpc_header(s);
1311 result = ACTION_TAKEN_NO_CALLBACK;
1313 } else if (stream_op->recv_trailing_metadata &&
1314 op_can_be_run(stream_op, s, &oas->state,
1315 OP_RECV_TRAILING_METADATA)) {
1316 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
1317 grpc_error_handle error = GRPC_ERROR_NONE;
1318 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1319 error = GRPC_ERROR_REF(stream_state->cancel_error);
1320 } else if (stream_state->state_callback_received[OP_FAILED]) {
1321 grpc_status_code grpc_error_code =
1322 cronet_net_error_to_grpc_error(stream_state->net_error);
1323 const char* desc = cronet_net_error_as_string(stream_state->net_error);
1325 make_error_with_desc(grpc_error_code, stream_state->net_error, desc);
1326 } else if (oas->s->state.rs.trailing_metadata_valid) {
1327 grpc_chttp2_incoming_metadata_buffer_publish(
1328 &oas->s->state.rs.trailing_metadata,
1329 stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
1330 stream_state->rs.trailing_metadata_valid = false;
1332 grpc_core::ExecCtx::Run(
1334 stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1336 stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
1337 result = ACTION_TAKEN_NO_CALLBACK;
1338 } else if (stream_op->cancel_stream &&
1339 op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
1340 CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas);
1342 CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
1343 bidirectional_stream_cancel(s->cbs);
1344 result = ACTION_TAKEN_WITH_CALLBACK;
1346 result = ACTION_TAKEN_NO_CALLBACK;
1348 stream_state->state_op_done[OP_CANCEL_ERROR] = true;
1349 if (!stream_state->cancel_error) {
1350 stream_state->cancel_error =
1351 GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error);
1353 } else if (op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
1354 CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
1355 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1356 if (stream_op->on_complete) {
1357 grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1358 GRPC_ERROR_REF(stream_state->cancel_error));
1360 } else if (stream_state->state_callback_received[OP_FAILED]) {
1361 if (stream_op->on_complete) {
1362 const char* error_message =
1363 cronet_net_error_as_string(stream_state->net_error);
1364 grpc_status_code grpc_error_code =
1365 cronet_net_error_to_grpc_error(stream_state->net_error);
1366 grpc_core::ExecCtx::Run(
1367 DEBUG_LOCATION, stream_op->on_complete,
1368 make_error_with_desc(grpc_error_code, stream_state->net_error,
1372 /* All actions in this stream_op are complete. Call the on_complete
1375 if (stream_op->on_complete) {
1376 grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1380 oas->state.state_op_done[OP_ON_COMPLETE] = true;
1382 /* reset any send message state, only if this ON_COMPLETE is about a send.
1384 if (stream_op->send_message) {
1385 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1386 stream_state->state_op_done[OP_SEND_MESSAGE] = false;
1388 result = ACTION_TAKEN_NO_CALLBACK;
1389 /* If this is the on_complete callback being called for a received message -
1391 if (stream_op->recv_message) {
1392 stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
1395 result = NO_ACTION_POSSIBLE;
1401 Functions used by upper layers to access transport functionality.
1404 inline stream_obj::stream_obj(grpc_transport* gt, grpc_stream* gs,
1405 grpc_stream_refcount* refcount,
1406 grpc_core::Arena* arena)
1408 curr_ct(reinterpret_cast<grpc_cronet_transport*>(gt)),
1411 refcount(refcount) {
1412 GRPC_CRONET_STREAM_REF(this, "cronet transport");
1416 inline stream_obj::~stream_obj() {
1417 null_and_maybe_free_read_buffer(this);
1418 /* Clean up read_slice_buffer in case there is unread data. */
1419 grpc_slice_buffer_destroy_internal(&state.rs.read_slice_buffer);
1420 GRPC_ERROR_UNREF(state.cancel_error);
1423 static int init_stream(grpc_transport* gt, grpc_stream* gs,
1424 grpc_stream_refcount* refcount,
1425 const void* /*server_data*/, grpc_core::Arena* arena) {
1426 new (gs) stream_obj(gt, gs, refcount, arena);
1430 static void set_pollset_do_nothing(grpc_transport* /*gt*/, grpc_stream* /*gs*/,
1431 grpc_pollset* /*pollset*/) {}
1433 static void set_pollset_set_do_nothing(grpc_transport* /*gt*/,
1434 grpc_stream* /*gs*/,
1435 grpc_pollset_set* /*pollset_set*/) {}
1437 static void perform_stream_op(grpc_transport* /*gt*/, grpc_stream* gs,
1438 grpc_transport_stream_op_batch* op) {
1439 CRONET_LOG(GPR_DEBUG, "perform_stream_op");
1440 if (op->send_initial_metadata &&
1441 header_has_authority(op->payload->send_initial_metadata
1442 .send_initial_metadata->list.head)) {
1443 /* Cronet does not support :authority header field. We cancel the call when
1444 this field is present in metadata */
1445 if (op->recv_initial_metadata) {
1446 grpc_core::ExecCtx::Run(
1448 op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1449 GRPC_ERROR_CANCELLED);
1451 if (op->recv_message) {
1452 grpc_core::ExecCtx::Run(DEBUG_LOCATION,
1453 op->payload->recv_message.recv_message_ready,
1454 GRPC_ERROR_CANCELLED);
1456 if (op->recv_trailing_metadata) {
1457 grpc_core::ExecCtx::Run(
1459 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1460 GRPC_ERROR_CANCELLED);
1462 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
1463 GRPC_ERROR_CANCELLED);
1466 stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1467 add_to_storage(s, op);
1468 execute_from_storage(s);
1471 static void destroy_stream(grpc_transport* /*gt*/, grpc_stream* gs,
1472 grpc_closure* then_schedule_closure) {
1473 stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1475 grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
1479 static void destroy_transport(grpc_transport* /*gt*/) {}
1481 static grpc_endpoint* get_endpoint(grpc_transport* /*gt*/) { return nullptr; }
1483 static void perform_op(grpc_transport* /*gt*/, grpc_transport_op* /*op*/) {}
1485 static const grpc_transport_vtable grpc_cronet_vtable = {
1489 set_pollset_do_nothing,
1490 set_pollset_set_do_nothing,
1497 grpc_transport* grpc_create_cronet_transport(void* engine, const char* target,
1498 const grpc_channel_args* args,
1499 void* /*reserved*/) {
1500 grpc_cronet_transport* ct = static_cast<grpc_cronet_transport*>(
1501 gpr_malloc(sizeof(grpc_cronet_transport)));
1505 ct->base.vtable = &grpc_cronet_vtable;
1506 ct->engine = static_cast<stream_engine*>(engine);
1507 ct->host = static_cast<char*>(gpr_malloc(strlen(target) + 1));
1511 strcpy(ct->host, target);
1513 ct->use_packet_coalescing = true;
1515 for (size_t i = 0; i < args->num_args; i++) {
1517 strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) {
1518 if (GPR_UNLIKELY(args->args[i].type != GRPC_ARG_INTEGER)) {
1519 gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
1520 GRPC_ARG_USE_CRONET_PACKET_COALESCING);
1522 ct->use_packet_coalescing = (args->args[i].value.integer != 0);