3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
25 #include "absl/strings/str_cat.h"
26 #include "absl/strings/str_format.h"
28 #include <grpc/slice_buffer.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
32 #include "src/core/ext/transport/chttp2/transport/bin_decoder.h"
33 #include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
34 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
35 #include "src/core/ext/transport/cronet/transport/cronet_status.h"
36 #include "src/core/ext/transport/cronet/transport/cronet_transport.h"
37 #include "src/core/lib/debug/trace.h"
38 #include "src/core/lib/gpr/string.h"
39 #include "src/core/lib/gprpp/manual_constructor.h"
40 #include "src/core/lib/iomgr/endpoint.h"
41 #include "src/core/lib/iomgr/exec_ctx.h"
42 #include "src/core/lib/slice/slice_internal.h"
43 #include "src/core/lib/slice/slice_string_helpers.h"
44 #include "src/core/lib/surface/channel.h"
45 #include "src/core/lib/surface/validate_metadata.h"
46 #include "src/core/lib/transport/metadata_batch.h"
47 #include "src/core/lib/transport/static_metadata.h"
48 #include "src/core/lib/transport/timeout_encoding.h"
49 #include "src/core/lib/transport/transport_impl.h"
51 #include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
53 #define GRPC_HEADER_SIZE_IN_BYTES 5
54 #define GRPC_FLUSH_READ_SIZE 4096
56 grpc_core::TraceFlag grpc_cronet_trace(false, "cronet");
57 #define CRONET_LOG(...) \
59 if (grpc_cronet_trace.enabled()) gpr_log(__VA_ARGS__); \
63 ACTION_TAKEN_WITH_CALLBACK,
64 ACTION_TAKEN_NO_CALLBACK,
69 OP_SEND_INITIAL_METADATA = 0,
71 OP_SEND_TRAILING_METADATA,
73 OP_RECV_INITIAL_METADATA,
74 OP_RECV_TRAILING_METADATA,
80 OP_RECV_MESSAGE_AND_ON_COMPLETE,
85 /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
87 static void on_stream_ready(bidirectional_stream*);
88 static void on_response_headers_received(
89 bidirectional_stream*, const bidirectional_stream_header_array*,
91 static void on_write_completed(bidirectional_stream*, const char*);
92 static void on_read_completed(bidirectional_stream*, char*, int);
93 static void on_response_trailers_received(
94 bidirectional_stream*, const bidirectional_stream_header_array*);
95 static void on_succeeded(bidirectional_stream*);
96 static void on_failed(bidirectional_stream*, int);
97 static void on_canceled(bidirectional_stream*);
98 static bidirectional_stream_callback cronet_callbacks = {
100 on_response_headers_received,
103 on_response_trailers_received,
108 /* Cronet transport object */
109 struct grpc_cronet_transport {
110 grpc_transport base; /* must be first element in this structure */
111 stream_engine* engine;
113 bool use_packet_coalescing;
115 typedef struct grpc_cronet_transport grpc_cronet_transport;
117 /* TODO (makdharma): reorder structure for memory efficiency per
118 http://www.catb.org/esr/structure-packing/#_structure_reordering: */
120 explicit read_state(grpc_core::Arena* arena)
121 : trailing_metadata(arena), initial_metadata(arena) {
122 grpc_slice_buffer_init(&read_slice_buffer);
125 /* vars to store data coming from server */
126 char* read_buffer = nullptr;
127 bool length_field_received = false;
128 int received_bytes = 0;
129 int remaining_bytes = 0;
130 int length_field = 0;
131 bool compressed = false;
132 char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES] = {};
133 char* payload_field = nullptr;
134 bool read_stream_closed = false;
136 /* vars for holding data destined for the application */
137 grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs;
138 grpc_slice_buffer read_slice_buffer;
140 /* vars for trailing metadata */
141 grpc_chttp2_incoming_metadata_buffer trailing_metadata;
142 bool trailing_metadata_valid = false;
144 /* vars for initial metadata */
145 grpc_chttp2_incoming_metadata_buffer initial_metadata;
149 char* write_buffer = nullptr;
152 /* track state of one stream op */
154 explicit op_state(grpc_core::Arena* arena) : rs(arena) {}
156 bool state_op_done[OP_NUM_OPS] = {};
157 bool state_callback_received[OP_NUM_OPS] = {};
158 /* A non-zero gRPC status code has been seen */
159 bool fail_state = false;
160 /* Transport is discarding all buffered messages */
161 bool flush_read = false;
162 bool flush_cronet_when_ready = false;
163 bool pending_write_for_trailer = false;
164 bool pending_send_message = false;
165 /* User requested RECV_TRAILING_METADATA */
166 bool pending_recv_trailing_metadata = false;
167 cronet_net_error_code net_error = OK;
168 grpc_error* cancel_error = GRPC_ERROR_NONE;
169 /* data structure for storing data coming from server */
170 struct read_state rs;
171 /* data structure for storing data going to the server */
172 struct write_state ws;
177 struct op_and_state {
178 op_and_state(stream_obj* s, const grpc_transport_stream_op_batch& op);
180 grpc_transport_stream_op_batch op;
181 struct op_state state;
183 struct stream_obj* s; /* Pointer back to the stream object */
184 /* next op_and_state in the linked list */
185 struct op_and_state* next = nullptr;
189 int num_pending_ops = 0;
190 struct op_and_state* head = nullptr;
194 stream_obj(grpc_transport* gt, grpc_stream* gs,
195 grpc_stream_refcount* refcount, grpc_core::Arena* arena);
198 grpc_core::Arena* arena;
199 struct op_and_state* oas = nullptr;
200 grpc_transport_stream_op_batch* curr_op = nullptr;
201 grpc_cronet_transport* curr_ct;
202 grpc_stream* curr_gs;
203 bidirectional_stream* cbs = nullptr;
204 bidirectional_stream_header_array header_array =
205 bidirectional_stream_header_array(); // Zero-initialize the structure.
207 /* Stream level state. Some state will be tracked both at stream and stream_op
209 struct op_state state;
212 struct op_storage storage;
214 /* Mutex to protect storage */
217 /* Refcount object of the stream */
218 grpc_stream_refcount* refcount;
222 #define GRPC_CRONET_STREAM_REF(stream, reason) \
223 grpc_cronet_stream_ref((stream), (reason))
224 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
225 grpc_cronet_stream_unref((stream), (reason))
226 void grpc_cronet_stream_ref(stream_obj* s, const char* reason) {
227 grpc_stream_ref(s->refcount, reason);
229 void grpc_cronet_stream_unref(stream_obj* s, const char* reason) {
230 grpc_stream_unref(s->refcount, reason);
233 #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
234 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
235 grpc_cronet_stream_unref((stream))
236 void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); }
237 void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); }
240 static enum e_op_result execute_stream_op(struct op_and_state* oas);
243 Utility function to translate enum into string for printing
245 static const char* op_result_string(enum e_op_result i) {
247 case ACTION_TAKEN_WITH_CALLBACK:
248 return "ACTION_TAKEN_WITH_CALLBACK";
249 case ACTION_TAKEN_NO_CALLBACK:
250 return "ACTION_TAKEN_NO_CALLBACK";
251 case NO_ACTION_POSSIBLE:
252 return "NO_ACTION_POSSIBLE";
254 GPR_UNREACHABLE_CODE(return "UNKNOWN");
257 static const char* op_id_string(enum e_op_id i) {
259 case OP_SEND_INITIAL_METADATA:
260 return "OP_SEND_INITIAL_METADATA";
261 case OP_SEND_MESSAGE:
262 return "OP_SEND_MESSAGE";
263 case OP_SEND_TRAILING_METADATA:
264 return "OP_SEND_TRAILING_METADATA";
265 case OP_RECV_MESSAGE:
266 return "OP_RECV_MESSAGE";
267 case OP_RECV_INITIAL_METADATA:
268 return "OP_RECV_INITIAL_METADATA";
269 case OP_RECV_TRAILING_METADATA:
270 return "OP_RECV_TRAILING_METADATA";
271 case OP_CANCEL_ERROR:
272 return "OP_CANCEL_ERROR";
274 return "OP_ON_COMPLETE";
278 return "OP_SUCCEEDED";
280 return "OP_CANCELED";
281 case OP_RECV_MESSAGE_AND_ON_COMPLETE:
282 return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
283 case OP_READ_REQ_MADE:
284 return "OP_READ_REQ_MADE";
291 static void null_and_maybe_free_read_buffer(stream_obj* s) {
292 if (s->state.rs.read_buffer &&
293 s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
294 gpr_free(s->state.rs.read_buffer);
296 s->state.rs.read_buffer = nullptr;
299 static void read_grpc_header(stream_obj* s) {
300 s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
301 s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
302 s->state.rs.received_bytes = 0;
303 s->state.rs.compressed = false;
304 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
305 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
306 s->state.rs.remaining_bytes);
309 static grpc_error* make_error_with_desc(int error_code,
310 int cronet_internal_error_code,
312 std::string error_message =
313 absl::StrFormat("Cronet error code:%d, Cronet error detail:%s",
314 cronet_internal_error_code, desc);
316 GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_message.c_str());
317 error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, error_code);
321 inline op_and_state::op_and_state(stream_obj* s,
322 const grpc_transport_stream_op_batch& op)
323 : op(op), state(s->arena), s(s) {}
326 Add a new stream op to op storage.
328 static void add_to_storage(struct stream_obj* s,
329 grpc_transport_stream_op_batch* op) {
330 struct op_storage* storage = &s->storage;
331 /* add new op at the beginning of the linked list. The memory is freed
332 in remove_from_storage */
333 op_and_state* new_op = new op_and_state(s, *op);
335 new_op->next = storage->head;
336 storage->head = new_op;
337 storage->num_pending_ops++;
338 if (op->send_message) {
339 s->state.pending_send_message = true;
341 if (op->recv_trailing_metadata) {
342 s->state.pending_recv_trailing_metadata = true;
344 CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
345 storage->num_pending_ops);
346 gpr_mu_unlock(&s->mu);
350 Traverse the linked list and delete op and free memory
352 static void remove_from_storage(struct stream_obj* s,
353 struct op_and_state* oas) {
354 struct op_and_state* curr;
355 if (s->storage.head == nullptr || oas == nullptr) {
358 if (s->storage.head == oas) {
359 s->storage.head = oas->next;
361 s->storage.num_pending_ops--;
362 CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
363 s->storage.num_pending_ops);
365 for (curr = s->storage.head; curr != nullptr; curr = curr->next) {
366 if (curr->next == oas) {
367 curr->next = oas->next;
368 s->storage.num_pending_ops--;
369 CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
370 s->storage.num_pending_ops);
373 } else if (GPR_UNLIKELY(curr->next == nullptr)) {
374 CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
381 Cycle through ops and try to take next action. Break when either
382 an action with callback is taken, or no action is possible.
383 This can get executed from the Cronet network thread via cronet callback
384 or on the application supplied thread via the perform_stream_op function.
386 static void execute_from_storage(stream_obj* s) {
388 for (struct op_and_state* curr = s->storage.head; curr != nullptr;) {
389 CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
390 GPR_ASSERT(!curr->done);
391 enum e_op_result result = execute_stream_op(curr);
392 CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
393 op_result_string(result));
394 /* if this op is done, then remove it and free memory */
396 struct op_and_state* next = curr->next;
397 remove_from_storage(s, curr);
399 } else if (result == NO_ACTION_POSSIBLE) {
401 } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
402 /* wait for the callback */
404 } /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
406 gpr_mu_unlock(&s->mu);
409 static void convert_cronet_array_to_metadata(
410 const bidirectional_stream_header_array* header_array,
411 grpc_chttp2_incoming_metadata_buffer* mds) {
412 for (size_t i = 0; i < header_array->count; i++) {
413 CRONET_LOG(GPR_DEBUG, "header key=%s, value=%s",
414 header_array->headers[i].key, header_array->headers[i].value);
415 grpc_slice key = grpc_slice_intern(
416 grpc_slice_from_static_string(header_array->headers[i].key));
418 if (grpc_is_refcounted_slice_binary_header(key)) {
419 value = grpc_slice_from_static_string(header_array->headers[i].value);
420 value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length(
421 value, grpc_chttp2_base64_infer_length_after_decode(value)));
423 value = grpc_slice_intern(
424 grpc_slice_from_static_string(header_array->headers[i].value));
426 GRPC_LOG_IF_ERROR("convert_cronet_array_to_metadata",
427 grpc_chttp2_incoming_metadata_buffer_add(
428 mds, grpc_mdelem_from_slices(key, value)));
435 static void on_failed(bidirectional_stream* stream, int net_error) {
436 gpr_log(GPR_ERROR, "on_failed(%p, %d)", stream, net_error);
437 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
438 grpc_core::ExecCtx exec_ctx;
440 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
442 bidirectional_stream_destroy(s->cbs);
443 s->state.state_callback_received[OP_FAILED] = true;
444 s->state.net_error = static_cast<cronet_net_error_code>(net_error);
446 if (s->header_array.headers) {
447 gpr_free(s->header_array.headers);
448 s->header_array.headers = nullptr;
450 if (s->state.ws.write_buffer) {
451 gpr_free(s->state.ws.write_buffer);
452 s->state.ws.write_buffer = nullptr;
454 null_and_maybe_free_read_buffer(s);
455 gpr_mu_unlock(&s->mu);
456 execute_from_storage(s);
457 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
463 static void on_canceled(bidirectional_stream* stream) {
464 CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
465 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
466 grpc_core::ExecCtx exec_ctx;
468 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
470 bidirectional_stream_destroy(s->cbs);
471 s->state.state_callback_received[OP_CANCELED] = true;
473 if (s->header_array.headers) {
474 gpr_free(s->header_array.headers);
475 s->header_array.headers = nullptr;
477 if (s->state.ws.write_buffer) {
478 gpr_free(s->state.ws.write_buffer);
479 s->state.ws.write_buffer = nullptr;
481 null_and_maybe_free_read_buffer(s);
482 gpr_mu_unlock(&s->mu);
483 execute_from_storage(s);
484 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
490 static void on_succeeded(bidirectional_stream* stream) {
491 CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
492 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
493 grpc_core::ExecCtx exec_ctx;
495 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
497 bidirectional_stream_destroy(s->cbs);
498 s->state.state_callback_received[OP_SUCCEEDED] = true;
500 null_and_maybe_free_read_buffer(s);
501 gpr_mu_unlock(&s->mu);
502 execute_from_storage(s);
503 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
509 static void on_stream_ready(bidirectional_stream* stream) {
510 CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
511 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
512 grpc_core::ExecCtx exec_ctx;
513 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
514 grpc_cronet_transport* t = s->curr_ct;
516 s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
517 s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
518 /* Free the memory allocated for headers */
519 if (s->header_array.headers) {
520 gpr_free(s->header_array.headers);
521 s->header_array.headers = nullptr;
523 /* Send the initial metadata on wire if there is no SEND_MESSAGE or
524 * SEND_TRAILING_METADATA ops pending */
525 if (t->use_packet_coalescing) {
526 if (s->state.flush_cronet_when_ready) {
527 CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
528 bidirectional_stream_flush(stream);
531 gpr_mu_unlock(&s->mu);
532 execute_from_storage(s);
538 static void on_response_headers_received(
539 bidirectional_stream* stream,
540 const bidirectional_stream_header_array* headers,
541 const char* negotiated_protocol) {
542 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
543 grpc_core::ExecCtx exec_ctx;
544 CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
545 headers, negotiated_protocol);
546 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
548 /* Identify if this is a header or a trailer (in a trailer-only response case)
550 for (size_t i = 0; i < headers->count; i++) {
551 if (0 == strcmp("grpc-status", headers->headers[i].key)) {
552 on_response_trailers_received(stream, headers);
554 /* Do an extra read for a trailer-only stream to trigger on_succeeded()
562 convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata);
563 s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
564 if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
565 s->state.state_callback_received[OP_FAILED])) {
566 /* Do an extra read to trigger on_succeeded() callback in case connection
568 GPR_ASSERT(s->state.rs.length_field_received == false);
571 gpr_mu_unlock(&s->mu);
572 execute_from_storage(s);
578 static void on_write_completed(bidirectional_stream* stream, const char* data) {
579 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
580 grpc_core::ExecCtx exec_ctx;
581 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
582 CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
584 if (s->state.ws.write_buffer) {
585 gpr_free(s->state.ws.write_buffer);
586 s->state.ws.write_buffer = nullptr;
588 s->state.state_callback_received[OP_SEND_MESSAGE] = true;
589 gpr_mu_unlock(&s->mu);
590 execute_from_storage(s);
596 static void on_read_completed(bidirectional_stream* stream, char* data,
598 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
599 grpc_core::ExecCtx exec_ctx;
600 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
601 CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
604 s->state.state_callback_received[OP_RECV_MESSAGE] = true;
605 if (count > 0 && s->state.flush_read) {
606 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
607 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
608 GRPC_FLUSH_READ_SIZE);
609 gpr_mu_unlock(&s->mu);
610 } else if (count > 0) {
611 s->state.rs.received_bytes += count;
612 s->state.rs.remaining_bytes -= count;
613 if (s->state.rs.remaining_bytes > 0) {
614 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
615 s->state.state_op_done[OP_READ_REQ_MADE] = true;
616 bidirectional_stream_read(
617 s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
618 s->state.rs.remaining_bytes);
619 gpr_mu_unlock(&s->mu);
621 gpr_mu_unlock(&s->mu);
622 execute_from_storage(s);
625 null_and_maybe_free_read_buffer(s);
626 s->state.rs.read_stream_closed = true;
627 gpr_mu_unlock(&s->mu);
628 execute_from_storage(s);
635 static void on_response_trailers_received(
636 bidirectional_stream* stream,
637 const bidirectional_stream_header_array* trailers) {
638 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
639 grpc_core::ExecCtx exec_ctx;
640 CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
642 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
643 grpc_cronet_transport* t = s->curr_ct;
645 s->state.rs.trailing_metadata_valid = false;
646 convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata);
647 if (trailers->count > 0) {
648 s->state.rs.trailing_metadata_valid = true;
650 s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
651 /* Send a EOS when server terminates the stream (testServerFinishesRequest) to
652 * trigger on_succeeded */
653 if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] &&
654 !(s->state.state_op_done[OP_CANCEL_ERROR] ||
655 s->state.state_callback_received[OP_FAILED])) {
656 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
657 s->state.state_callback_received[OP_SEND_MESSAGE] = false;
658 bidirectional_stream_write(s->cbs, "", 0, true);
659 if (t->use_packet_coalescing) {
660 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
661 bidirectional_stream_flush(s->cbs);
663 s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
665 gpr_mu_unlock(&s->mu);
667 gpr_mu_unlock(&s->mu);
668 execute_from_storage(s);
673 Utility function that takes the data from s->write_slice_buffer and assembles
674 into a contiguous byte stream with 5 byte gRPC header prepended.
676 static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer,
677 char** pp_write_buffer,
678 size_t* p_write_buffer_size, uint32_t flags) {
679 size_t length = write_slice_buffer->length;
680 *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
681 /* This is freed in the on_write_completed callback */
683 static_cast<char*>(gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES));
684 *pp_write_buffer = write_buffer;
685 uint8_t* p = reinterpret_cast<uint8_t*>(write_buffer);
686 /* Append 5 byte header */
687 /* Compressed flag */
688 *p++ = static_cast<uint8_t>((flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0);
690 *p++ = static_cast<uint8_t>(length >> 24);
691 *p++ = static_cast<uint8_t>(length >> 16);
692 *p++ = static_cast<uint8_t>(length >> 8);
693 *p++ = static_cast<uint8_t>(length);
694 /* append actual data */
696 for (size_t i = 0; i < write_slice_buffer->count; ++i) {
697 memcpy(p + offset, GRPC_SLICE_START_PTR(write_slice_buffer->slices[i]),
698 GRPC_SLICE_LENGTH(write_slice_buffer->slices[i]));
699 offset += GRPC_SLICE_LENGTH(write_slice_buffer->slices[i]);
704 Convert metadata in a format that Cronet can consume
706 static void convert_metadata_to_cronet_headers(
707 grpc_metadata_batch* metadata, const char* host, std::string* pp_url,
708 bidirectional_stream_header** pp_headers, size_t* p_num_headers,
709 const char** method) {
710 grpc_linked_mdelem* curr = metadata->list.head;
711 /* Walk the linked list and get number of header fields */
712 size_t num_headers_available = 0;
713 while (curr != nullptr) {
715 num_headers_available++;
717 grpc_millis deadline = metadata->deadline;
718 if (deadline != GRPC_MILLIS_INF_FUTURE) {
719 num_headers_available++;
721 /* Allocate enough memory. It is freed in the on_stream_ready callback
723 bidirectional_stream_header* headers =
724 static_cast<bidirectional_stream_header*>(gpr_malloc(
725 sizeof(bidirectional_stream_header) * num_headers_available));
726 *pp_headers = headers;
728 /* Walk the linked list again, this time copying the header fields.
729 s->num_headers can be less than num_headers_available, as some headers
730 are not used for cronet.
731 TODO (makdharma): Eliminate need to traverse the LL second time for perf.
733 curr = metadata->list.head;
734 size_t num_headers = 0;
735 while (num_headers < num_headers_available) {
736 grpc_mdelem mdelem = curr->md;
738 char* key = grpc_slice_to_c_string(GRPC_MDKEY(mdelem));
740 if (grpc_is_binary_header_internal(GRPC_MDKEY(mdelem))) {
741 grpc_slice wire_value = grpc_chttp2_base64_encode(GRPC_MDVALUE(mdelem));
742 value = grpc_slice_to_c_string(wire_value);
743 grpc_slice_unref_internal(wire_value);
745 value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem));
747 if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_SCHEME) ||
748 grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem),
749 GRPC_MDSTR_AUTHORITY)) {
750 /* Cronet populates these fields on its own */
755 if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_METHOD)) {
756 if (grpc_slice_eq_static_interned(GRPC_MDVALUE(mdelem), GRPC_MDSTR_PUT)) {
759 /* POST method in default*/
766 if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_PATH)) {
767 /* Create URL by appending :path value to the hostname */
768 *pp_url = absl::StrCat("https://", host, value);
773 CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
774 headers[num_headers].key = key;
775 headers[num_headers].value = value;
777 if (curr == nullptr) {
781 if (deadline != GRPC_MILLIS_INF_FUTURE) {
782 char* key = grpc_slice_to_c_string(GRPC_MDSTR_GRPC_TIMEOUT);
784 static_cast<char*>(gpr_malloc(GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE));
785 grpc_http2_encode_timeout(deadline - grpc_core::ExecCtx::Get()->Now(),
787 headers[num_headers].key = key;
788 headers[num_headers].value = value;
793 *p_num_headers = num_headers;
796 static void parse_grpc_header(const uint8_t* data, int* length,
798 const uint8_t c = *data;
799 const uint8_t* p = data + 1;
800 *compressed = ((c & 0x01) == 0x01);
802 *length |= (*p++) << 24;
803 *length |= (*p++) << 16;
804 *length |= (*p++) << 8;
808 static bool header_has_authority(grpc_linked_mdelem* head) {
809 while (head != nullptr) {
810 if (grpc_slice_eq_static_interned(GRPC_MDKEY(head->md),
811 GRPC_MDSTR_AUTHORITY)) {
820 Op Execution: Decide if one of the actions contained in the stream op can be
821 executed. This is the heart of the state machine.
823 static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
824 struct stream_obj* s, struct op_state* op_state,
825 enum e_op_id op_id) {
826 struct op_state* stream_state = &s->state;
827 grpc_cronet_transport* t = s->curr_ct;
829 /* When call is canceled, every op can be run, except under following
832 bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
833 stream_state->state_callback_received[OP_FAILED];
834 if (is_canceled_or_failed) {
835 if (op_id == OP_SEND_INITIAL_METADATA) {
836 CRONET_LOG(GPR_DEBUG, "Because");
839 if (op_id == OP_SEND_MESSAGE) {
840 CRONET_LOG(GPR_DEBUG, "Because");
843 if (op_id == OP_SEND_TRAILING_METADATA) {
844 CRONET_LOG(GPR_DEBUG, "Because");
847 if (op_id == OP_CANCEL_ERROR) {
848 CRONET_LOG(GPR_DEBUG, "Because");
851 /* already executed */
852 if (op_id == OP_RECV_INITIAL_METADATA &&
853 stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
854 CRONET_LOG(GPR_DEBUG, "Because");
857 if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) {
858 CRONET_LOG(GPR_DEBUG, "Because");
861 if (op_id == OP_RECV_TRAILING_METADATA &&
862 stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
863 CRONET_LOG(GPR_DEBUG, "Because");
866 /* ON_COMPLETE can be processed if one of the following conditions is met:
867 * 1. the stream failed
868 * 2. the stream is cancelled, and the callback is received
869 * 3. the stream succeeded before cancel is effective
870 * 4. the stream is cancelled, and the stream is never started */
871 if (op_id == OP_ON_COMPLETE &&
872 !(stream_state->state_callback_received[OP_FAILED] ||
873 stream_state->state_callback_received[OP_CANCELED] ||
874 stream_state->state_callback_received[OP_SUCCEEDED] ||
875 !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
876 CRONET_LOG(GPR_DEBUG, "Because");
879 } else if (op_id == OP_SEND_INITIAL_METADATA) {
880 /* already executed */
881 if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
882 } else if (op_id == OP_RECV_INITIAL_METADATA) {
883 if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
884 /* already executed */
886 } else if (!stream_state
887 ->state_callback_received[OP_SEND_INITIAL_METADATA]) {
888 /* we haven't sent headers yet. */
890 } else if (!stream_state
891 ->state_callback_received[OP_RECV_INITIAL_METADATA] &&
892 !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
893 /* we haven't received headers yet. */
896 } else if (op_id == OP_SEND_MESSAGE) {
897 if (op_state->state_op_done[OP_SEND_MESSAGE]) {
898 /* already executed (note we're checking op specific state, not stream
901 } else if (!stream_state
902 ->state_callback_received[OP_SEND_INITIAL_METADATA]) {
903 /* we haven't sent headers yet. */
906 } else if (op_id == OP_RECV_MESSAGE) {
907 if (op_state->state_op_done[OP_RECV_MESSAGE]) {
908 /* already executed */
910 } else if (!stream_state
911 ->state_callback_received[OP_RECV_INITIAL_METADATA] &&
912 !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
913 /* we haven't received headers yet. */
916 } else if (op_id == OP_RECV_TRAILING_METADATA) {
917 if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
918 /* already executed */
920 } else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
921 !stream_state->state_op_done[OP_RECV_MESSAGE]) {
922 /* we have asked for but haven't received message yet. */
924 } else if (!stream_state
925 ->state_callback_received[OP_RECV_TRAILING_METADATA]) {
926 /* we haven't received trailers yet. */
928 } else if (!stream_state->state_callback_received[OP_SUCCEEDED]) {
929 /* we haven't received on_succeeded yet. */
932 } else if (op_id == OP_SEND_TRAILING_METADATA) {
933 if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
934 /* already executed */
936 } else if (!stream_state
937 ->state_callback_received[OP_SEND_INITIAL_METADATA]) {
938 /* we haven't sent initial metadata yet */
940 } else if (stream_state->pending_send_message &&
941 !stream_state->state_op_done[OP_SEND_MESSAGE]) {
942 /* we haven't sent message yet */
944 } else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
945 !stream_state->state_callback_received[OP_SEND_MESSAGE] &&
946 !(t->use_packet_coalescing &&
947 stream_state->pending_write_for_trailer)) {
948 /* we haven't got on_write_completed for the send yet */
951 } else if (op_id == OP_CANCEL_ERROR) {
952 /* already executed */
953 if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
954 } else if (op_id == OP_ON_COMPLETE) {
955 if (op_state->state_op_done[OP_ON_COMPLETE]) {
956 /* already executed (note we're checking op specific state, not stream
958 CRONET_LOG(GPR_DEBUG, "Because");
961 /* Check if every op that was asked for is done. */
962 /* TODO(muxi): We should not consider the recv ops here, since they
963 * have their own callbacks. We should invoke a batch's on_complete
964 * as soon as all of the batch's send ops are complete, even if
965 * there are still recv ops pending. */
966 else if (curr_op->send_initial_metadata &&
967 !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
968 CRONET_LOG(GPR_DEBUG, "Because");
970 } else if (curr_op->send_message &&
971 !op_state->state_op_done[OP_SEND_MESSAGE]) {
972 CRONET_LOG(GPR_DEBUG, "Because");
974 } else if (curr_op->send_message &&
975 !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
976 CRONET_LOG(GPR_DEBUG, "Because");
978 } else if (curr_op->send_trailing_metadata &&
979 !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
980 CRONET_LOG(GPR_DEBUG, "Because");
982 } else if (curr_op->recv_initial_metadata &&
983 !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
984 CRONET_LOG(GPR_DEBUG, "Because");
986 } else if (curr_op->recv_message &&
987 !op_state->state_op_done[OP_RECV_MESSAGE]) {
988 CRONET_LOG(GPR_DEBUG, "Because");
990 } else if (curr_op->cancel_stream &&
991 !stream_state->state_callback_received[OP_CANCELED]) {
992 CRONET_LOG(GPR_DEBUG, "Because");
994 } else if (curr_op->recv_trailing_metadata) {
995 /* We aren't done with trailing metadata yet */
996 if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
997 CRONET_LOG(GPR_DEBUG, "Because");
1000 /* We've asked for actual message in an earlier op, and it hasn't been
1002 else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
1003 /* If this op is not the one asking for read, (which means some earlier
1004 op has asked), and the read hasn't been delivered. */
1005 if (!curr_op->recv_message &&
1006 !stream_state->state_callback_received[OP_SUCCEEDED]) {
1007 CRONET_LOG(GPR_DEBUG, "Because");
1012 /* We should see at least one on_write_completed for the trailers that we
1014 else if (curr_op->send_trailing_metadata &&
1015 !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
1019 CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
1020 result ? "YES" : "NO");
1025 TODO (makdharma): Break down this function in smaller chunks for readability.
1027 static enum e_op_result execute_stream_op(struct op_and_state* oas) {
1028 grpc_transport_stream_op_batch* stream_op = &oas->op;
1029 struct stream_obj* s = oas->s;
1030 grpc_cronet_transport* t = s->curr_ct;
1031 struct op_state* stream_state = &s->state;
1032 enum e_op_result result = NO_ACTION_POSSIBLE;
1033 if (stream_op->send_initial_metadata &&
1034 op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) {
1035 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
1036 /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
1038 GPR_ASSERT(s->cbs == nullptr);
1039 GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
1041 bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks);
1042 CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
1043 if (t->use_packet_coalescing) {
1044 bidirectional_stream_disable_auto_flush(s->cbs, true);
1045 bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
1048 const char* method = "POST";
1049 s->header_array.headers = nullptr;
1050 convert_metadata_to_cronet_headers(
1051 stream_op->payload->send_initial_metadata.send_initial_metadata,
1052 t->host, &url, &s->header_array.headers, &s->header_array.count,
1054 s->header_array.capacity = s->header_array.count;
1055 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs,
1057 bidirectional_stream_start(s->cbs, url.c_str(), 0, method, &s->header_array,
1059 unsigned int header_index;
1060 for (header_index = 0; header_index < s->header_array.count;
1062 gpr_free(const_cast<char*>(s->header_array.headers[header_index].key));
1063 gpr_free(const_cast<char*>(s->header_array.headers[header_index].value));
1065 stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
1066 if (t->use_packet_coalescing) {
1067 if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
1068 s->state.flush_cronet_when_ready = true;
1071 result = ACTION_TAKEN_WITH_CALLBACK;
1072 } else if (stream_op->send_message &&
1073 op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
1074 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas);
1075 stream_state->pending_send_message = false;
1076 if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1077 stream_state->state_callback_received[OP_FAILED] ||
1078 stream_state->state_callback_received[OP_SUCCEEDED]) {
1079 result = NO_ACTION_POSSIBLE;
1080 CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1082 grpc_slice_buffer write_slice_buffer;
1084 grpc_slice_buffer_init(&write_slice_buffer);
1085 while (write_slice_buffer.length <
1086 stream_op->payload->send_message.send_message->length()) {
1087 /* TODO(roth): When we add support for incremental sending,this code
1088 * will need to be changed to support asynchronous delivery of the
1089 * send_message payload. */
1090 if (!stream_op->payload->send_message.send_message->Next(
1091 stream_op->payload->send_message.send_message->length(),
1093 /* Should never reach here */
1096 if (GRPC_ERROR_NONE !=
1097 stream_op->payload->send_message.send_message->Pull(&slice)) {
1098 /* Should never reach here */
1101 grpc_slice_buffer_add(&write_slice_buffer, slice);
1103 size_t write_buffer_size;
1104 create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
1106 stream_op->payload->send_message.send_message->flags());
1107 if (write_buffer_size > 0) {
1108 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
1109 stream_state->ws.write_buffer);
1110 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1111 bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
1112 static_cast<int>(write_buffer_size), false);
1113 grpc_slice_buffer_destroy_internal(&write_slice_buffer);
1114 if (t->use_packet_coalescing) {
1115 if (!stream_op->send_trailing_metadata) {
1116 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1117 bidirectional_stream_flush(s->cbs);
1118 result = ACTION_TAKEN_WITH_CALLBACK;
1120 stream_state->pending_write_for_trailer = true;
1121 result = ACTION_TAKEN_NO_CALLBACK;
1124 result = ACTION_TAKEN_WITH_CALLBACK;
1127 /* Should never reach here */
1131 stream_state->state_op_done[OP_SEND_MESSAGE] = true;
1132 oas->state.state_op_done[OP_SEND_MESSAGE] = true;
1133 stream_op->payload->send_message.send_message.reset();
1134 } else if (stream_op->send_trailing_metadata &&
1135 op_can_be_run(stream_op, s, &oas->state,
1136 OP_SEND_TRAILING_METADATA)) {
1137 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas);
1138 if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1139 stream_state->state_callback_received[OP_FAILED] ||
1140 stream_state->state_callback_received[OP_SUCCEEDED]) {
1141 result = NO_ACTION_POSSIBLE;
1142 CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1144 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
1145 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1146 bidirectional_stream_write(s->cbs, "", 0, true);
1147 if (t->use_packet_coalescing) {
1148 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1149 bidirectional_stream_flush(s->cbs);
1151 result = ACTION_TAKEN_WITH_CALLBACK;
1153 stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
1154 } else if (stream_op->recv_initial_metadata &&
1155 op_can_be_run(stream_op, s, &oas->state,
1156 OP_RECV_INITIAL_METADATA)) {
1157 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
1158 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1159 grpc_core::ExecCtx::Run(
1161 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1163 } else if (stream_state->state_callback_received[OP_FAILED]) {
1164 grpc_core::ExecCtx::Run(
1166 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1168 } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1169 grpc_core::ExecCtx::Run(
1171 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1174 grpc_chttp2_incoming_metadata_buffer_publish(
1175 &oas->s->state.rs.initial_metadata,
1176 stream_op->payload->recv_initial_metadata.recv_initial_metadata);
1177 grpc_core::ExecCtx::Run(
1179 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1182 stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
1183 result = ACTION_TAKEN_NO_CALLBACK;
1184 } else if (stream_op->recv_message &&
1185 op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) {
1186 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
1187 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1188 CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
1189 grpc_core::ExecCtx::Run(
1190 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1192 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1193 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1194 result = ACTION_TAKEN_NO_CALLBACK;
1195 } else if (stream_state->state_callback_received[OP_FAILED]) {
1196 CRONET_LOG(GPR_DEBUG, "Stream failed.");
1197 grpc_core::ExecCtx::Run(
1198 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1200 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1201 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1202 result = ACTION_TAKEN_NO_CALLBACK;
1203 } else if (stream_state->rs.read_stream_closed == true) {
1204 /* No more data will be received */
1205 CRONET_LOG(GPR_DEBUG, "read stream closed");
1206 grpc_core::ExecCtx::Run(
1207 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1209 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1210 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1211 result = ACTION_TAKEN_NO_CALLBACK;
1212 } else if (stream_state->flush_read) {
1213 CRONET_LOG(GPR_DEBUG, "flush read");
1214 grpc_core::ExecCtx::Run(
1215 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1217 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1218 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1219 result = ACTION_TAKEN_NO_CALLBACK;
1220 } else if (stream_state->rs.length_field_received == false) {
1221 if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
1222 stream_state->rs.remaining_bytes == 0) {
1223 /* Start a read operation for data */
1224 stream_state->rs.length_field_received = true;
1226 reinterpret_cast<const uint8_t*>(stream_state->rs.read_buffer),
1227 &stream_state->rs.length_field, &stream_state->rs.compressed);
1228 CRONET_LOG(GPR_DEBUG, "length field = %d",
1229 stream_state->rs.length_field);
1230 if (stream_state->rs.length_field > 0) {
1231 stream_state->rs.read_buffer = static_cast<char*>(
1232 gpr_malloc(static_cast<size_t>(stream_state->rs.length_field)));
1233 GPR_ASSERT(stream_state->rs.read_buffer);
1234 stream_state->rs.remaining_bytes = stream_state->rs.length_field;
1235 stream_state->rs.received_bytes = 0;
1236 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1237 stream_state->state_op_done[OP_READ_REQ_MADE] =
1238 true; /* Indicates that at least one read request has been made */
1239 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1240 stream_state->rs.remaining_bytes);
1241 result = ACTION_TAKEN_WITH_CALLBACK;
1243 stream_state->rs.remaining_bytes = 0;
1244 CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
1245 /* Clean up read_slice_buffer in case there is unread data. */
1246 grpc_slice_buffer_destroy_internal(
1247 &stream_state->rs.read_slice_buffer);
1248 grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1250 if (stream_state->rs.compressed) {
1251 flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1253 stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1254 stream_op->payload->recv_message.recv_message->reset(
1255 stream_state->rs.sbs.get());
1256 grpc_core::ExecCtx::Run(
1258 stream_op->payload->recv_message.recv_message_ready,
1260 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1261 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1263 /* Extra read to trigger on_succeed */
1264 stream_state->rs.length_field_received = false;
1265 stream_state->state_op_done[OP_READ_REQ_MADE] =
1266 true; /* Indicates that at least one read request has been made */
1267 read_grpc_header(s);
1268 result = ACTION_TAKEN_NO_CALLBACK;
1270 } else if (stream_state->rs.remaining_bytes == 0) {
1271 /* Start a read operation for first 5 bytes (GRPC header) */
1272 stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1273 stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1274 stream_state->rs.received_bytes = 0;
1275 stream_state->rs.compressed = false;
1276 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1277 stream_state->state_op_done[OP_READ_REQ_MADE] =
1278 true; /* Indicates that at least one read request has been made */
1279 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1280 stream_state->rs.remaining_bytes);
1281 result = ACTION_TAKEN_WITH_CALLBACK;
1283 result = NO_ACTION_POSSIBLE;
1285 } else if (stream_state->rs.remaining_bytes == 0) {
1286 CRONET_LOG(GPR_DEBUG, "read operation complete");
1287 grpc_slice read_data_slice =
1288 GRPC_SLICE_MALLOC((uint32_t)stream_state->rs.length_field);
1289 uint8_t* dst_p = GRPC_SLICE_START_PTR(read_data_slice);
1290 memcpy(dst_p, stream_state->rs.read_buffer,
1291 static_cast<size_t>(stream_state->rs.length_field));
1292 null_and_maybe_free_read_buffer(s);
1293 /* Clean up read_slice_buffer in case there is unread data. */
1294 grpc_slice_buffer_destroy_internal(&stream_state->rs.read_slice_buffer);
1295 grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1296 grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
1299 if (stream_state->rs.compressed) {
1300 flags = GRPC_WRITE_INTERNAL_COMPRESS;
1302 stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1303 stream_op->payload->recv_message.recv_message->reset(
1304 stream_state->rs.sbs.get());
1305 grpc_core::ExecCtx::Run(
1306 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1308 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1309 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1310 /* Do an extra read to trigger on_succeeded() callback in case connection
1312 stream_state->rs.length_field_received = false;
1313 read_grpc_header(s);
1314 result = ACTION_TAKEN_NO_CALLBACK;
1316 } else if (stream_op->recv_trailing_metadata &&
1317 op_can_be_run(stream_op, s, &oas->state,
1318 OP_RECV_TRAILING_METADATA)) {
1319 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
1320 grpc_error* error = GRPC_ERROR_NONE;
1321 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1322 error = GRPC_ERROR_REF(stream_state->cancel_error);
1323 } else if (stream_state->state_callback_received[OP_FAILED]) {
1324 const char* desc = cronet_net_error_as_string(stream_state->net_error);
1325 error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE,
1326 stream_state->net_error, desc);
1327 } else if (oas->s->state.rs.trailing_metadata_valid) {
1328 grpc_chttp2_incoming_metadata_buffer_publish(
1329 &oas->s->state.rs.trailing_metadata,
1330 stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
1331 stream_state->rs.trailing_metadata_valid = false;
1333 grpc_core::ExecCtx::Run(
1335 stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1337 stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
1338 result = ACTION_TAKEN_NO_CALLBACK;
1339 } else if (stream_op->cancel_stream &&
1340 op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
1341 CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas);
1343 CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
1344 bidirectional_stream_cancel(s->cbs);
1345 result = ACTION_TAKEN_WITH_CALLBACK;
1347 result = ACTION_TAKEN_NO_CALLBACK;
1349 stream_state->state_op_done[OP_CANCEL_ERROR] = true;
1350 if (!stream_state->cancel_error) {
1351 stream_state->cancel_error =
1352 GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error);
1354 } else if (op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
1355 CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
1356 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1357 if (stream_op->on_complete) {
1358 grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1359 GRPC_ERROR_REF(stream_state->cancel_error));
1361 } else if (stream_state->state_callback_received[OP_FAILED]) {
1362 if (stream_op->on_complete) {
1363 const char* error_message =
1364 cronet_net_error_as_string(stream_state->net_error);
1365 grpc_core::ExecCtx::Run(
1366 DEBUG_LOCATION, stream_op->on_complete,
1367 make_error_with_desc(GRPC_STATUS_UNAVAILABLE,
1368 stream_state->net_error, error_message));
1371 /* All actions in this stream_op are complete. Call the on_complete
1374 if (stream_op->on_complete) {
1375 grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1379 oas->state.state_op_done[OP_ON_COMPLETE] = true;
1381 /* reset any send message state, only if this ON_COMPLETE is about a send.
1383 if (stream_op->send_message) {
1384 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1385 stream_state->state_op_done[OP_SEND_MESSAGE] = false;
1387 result = ACTION_TAKEN_NO_CALLBACK;
1388 /* If this is the on_complete callback being called for a received message -
1390 if (stream_op->recv_message) {
1391 stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
1394 result = NO_ACTION_POSSIBLE;
1400 Functions used by upper layers to access transport functionality.
1403 inline stream_obj::stream_obj(grpc_transport* gt, grpc_stream* gs,
1404 grpc_stream_refcount* refcount,
1405 grpc_core::Arena* arena)
1407 curr_ct(reinterpret_cast<grpc_cronet_transport*>(gt)),
1410 refcount(refcount) {
1411 GRPC_CRONET_STREAM_REF(this, "cronet transport");
1415 inline stream_obj::~stream_obj() {
1416 null_and_maybe_free_read_buffer(this);
1417 /* Clean up read_slice_buffer in case there is unread data. */
1418 grpc_slice_buffer_destroy_internal(&state.rs.read_slice_buffer);
1419 GRPC_ERROR_UNREF(state.cancel_error);
1422 static int init_stream(grpc_transport* gt, grpc_stream* gs,
1423 grpc_stream_refcount* refcount,
1424 const void* /*server_data*/, grpc_core::Arena* arena) {
1425 new (gs) stream_obj(gt, gs, refcount, arena);
1429 static void set_pollset_do_nothing(grpc_transport* /*gt*/, grpc_stream* /*gs*/,
1430 grpc_pollset* /*pollset*/) {}
1432 static void set_pollset_set_do_nothing(grpc_transport* /*gt*/,
1433 grpc_stream* /*gs*/,
1434 grpc_pollset_set* /*pollset_set*/) {}
1436 static void perform_stream_op(grpc_transport* /*gt*/, grpc_stream* gs,
1437 grpc_transport_stream_op_batch* op) {
1438 CRONET_LOG(GPR_DEBUG, "perform_stream_op");
1439 if (op->send_initial_metadata &&
1440 header_has_authority(op->payload->send_initial_metadata
1441 .send_initial_metadata->list.head)) {
1442 /* Cronet does not support :authority header field. We cancel the call when
1443 this field is present in metadata */
1444 if (op->recv_initial_metadata) {
1445 grpc_core::ExecCtx::Run(
1447 op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1448 GRPC_ERROR_CANCELLED);
1450 if (op->recv_message) {
1451 grpc_core::ExecCtx::Run(DEBUG_LOCATION,
1452 op->payload->recv_message.recv_message_ready,
1453 GRPC_ERROR_CANCELLED);
1455 if (op->recv_trailing_metadata) {
1456 grpc_core::ExecCtx::Run(
1458 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1459 GRPC_ERROR_CANCELLED);
1461 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
1462 GRPC_ERROR_CANCELLED);
1465 stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1466 add_to_storage(s, op);
1467 execute_from_storage(s);
1470 static void destroy_stream(grpc_transport* /*gt*/, grpc_stream* gs,
1471 grpc_closure* then_schedule_closure) {
1472 stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1474 grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
1478 static void destroy_transport(grpc_transport* /*gt*/) {}
1480 static grpc_endpoint* get_endpoint(grpc_transport* /*gt*/) { return nullptr; }
1482 static void perform_op(grpc_transport* /*gt*/, grpc_transport_op* /*op*/) {}
1484 static const grpc_transport_vtable grpc_cronet_vtable = {
1488 set_pollset_do_nothing,
1489 set_pollset_set_do_nothing,
1496 grpc_transport* grpc_create_cronet_transport(void* engine, const char* target,
1497 const grpc_channel_args* args,
1498 void* /*reserved*/) {
1499 grpc_cronet_transport* ct = static_cast<grpc_cronet_transport*>(
1500 gpr_malloc(sizeof(grpc_cronet_transport)));
1504 ct->base.vtable = &grpc_cronet_vtable;
1505 ct->engine = static_cast<stream_engine*>(engine);
1506 ct->host = static_cast<char*>(gpr_malloc(strlen(target) + 1));
1510 strcpy(ct->host, target);
1512 ct->use_packet_coalescing = true;
1514 for (size_t i = 0; i < args->num_args; i++) {
1516 strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) {
1517 if (GPR_UNLIKELY(args->args[i].type != GRPC_ARG_INTEGER)) {
1518 gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
1519 GRPC_ARG_USE_CRONET_PACKET_COALESCING);
1521 ct->use_packet_coalescing = (args->args[i].value.integer != 0);