3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
23 #include <grpc/slice_buffer.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/string_util.h>
28 #include "src/core/ext/transport/chttp2/transport/bin_decoder.h"
29 #include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
30 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
31 #include "src/core/ext/transport/cronet/transport/cronet_transport.h"
32 #include "src/core/lib/debug/trace.h"
33 #include "src/core/lib/gpr/host_port.h"
34 #include "src/core/lib/gpr/string.h"
35 #include "src/core/lib/gprpp/manual_constructor.h"
36 #include "src/core/lib/iomgr/endpoint.h"
37 #include "src/core/lib/iomgr/exec_ctx.h"
38 #include "src/core/lib/slice/slice_internal.h"
39 #include "src/core/lib/slice/slice_string_helpers.h"
40 #include "src/core/lib/surface/channel.h"
41 #include "src/core/lib/transport/metadata_batch.h"
42 #include "src/core/lib/transport/static_metadata.h"
43 #include "src/core/lib/transport/transport_impl.h"
44 #include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
46 #define GRPC_HEADER_SIZE_IN_BYTES 5
47 #define GRPC_FLUSH_READ_SIZE 4096
49 grpc_core::TraceFlag grpc_cronet_trace(false, "cronet");
50 #define CRONET_LOG(...) \
52 if (grpc_cronet_trace.enabled()) gpr_log(__VA_ARGS__); \
56 ACTION_TAKEN_WITH_CALLBACK,
57 ACTION_TAKEN_NO_CALLBACK,
62 OP_SEND_INITIAL_METADATA = 0,
64 OP_SEND_TRAILING_METADATA,
66 OP_RECV_INITIAL_METADATA,
67 OP_RECV_TRAILING_METADATA,
73 OP_RECV_MESSAGE_AND_ON_COMPLETE,
78 /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
80 static void on_stream_ready(bidirectional_stream*);
81 static void on_response_headers_received(
82 bidirectional_stream*, const bidirectional_stream_header_array*,
84 static void on_write_completed(bidirectional_stream*, const char*);
85 static void on_read_completed(bidirectional_stream*, char*, int);
86 static void on_response_trailers_received(
87 bidirectional_stream*, const bidirectional_stream_header_array*);
88 static void on_succeeded(bidirectional_stream*);
89 static void on_failed(bidirectional_stream*, int);
90 static void on_canceled(bidirectional_stream*);
91 static bidirectional_stream_callback cronet_callbacks = {
93 on_response_headers_received,
96 on_response_trailers_received,
101 /* Cronet transport object */
102 struct grpc_cronet_transport {
103 grpc_transport base; /* must be first element in this structure */
104 stream_engine* engine;
106 bool use_packet_coalescing;
108 typedef struct grpc_cronet_transport grpc_cronet_transport;
110 /* TODO (makdharma): reorder structure for memory efficiency per
111 http://www.catb.org/esr/structure-packing/#_structure_reordering: */
113 read_state(grpc_core::Arena* arena)
114 : trailing_metadata(arena), initial_metadata(arena) {
115 grpc_slice_buffer_init(&read_slice_buffer);
118 /* vars to store data coming from server */
119 char* read_buffer = nullptr;
120 bool length_field_received = false;
121 int received_bytes = 0;
122 int remaining_bytes = 0;
123 int length_field = 0;
125 char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES] = {};
126 char* payload_field = nullptr;
127 bool read_stream_closed = 0;
129 /* vars for holding data destined for the application */
130 grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs;
131 grpc_slice_buffer read_slice_buffer;
133 /* vars for trailing metadata */
134 grpc_chttp2_incoming_metadata_buffer trailing_metadata;
135 bool trailing_metadata_valid = false;
137 /* vars for initial metadata */
138 grpc_chttp2_incoming_metadata_buffer initial_metadata;
142 char* write_buffer = nullptr;
145 /* track state of one stream op */
147 op_state(grpc_core::Arena* arena) : rs(arena) {}
149 bool state_op_done[OP_NUM_OPS] = {};
150 bool state_callback_received[OP_NUM_OPS] = {};
151 /* A non-zero gRPC status code has been seen */
152 bool fail_state = false;
153 /* Transport is discarding all buffered messages */
154 bool flush_read = false;
155 bool flush_cronet_when_ready = false;
156 bool pending_write_for_trailer = false;
157 bool pending_send_message = false;
158 /* User requested RECV_TRAILING_METADATA */
159 bool pending_recv_trailing_metadata = false;
160 /* Cronet has not issued a callback of a bidirectional read */
161 bool pending_read_from_cronet = false;
162 grpc_error* cancel_error = GRPC_ERROR_NONE;
163 /* data structure for storing data coming from server */
164 struct read_state rs;
165 /* data structure for storing data going to the server */
166 struct write_state ws;
171 struct op_and_state {
172 op_and_state(stream_obj* s, const grpc_transport_stream_op_batch& op);
174 grpc_transport_stream_op_batch op;
175 struct op_state state;
177 struct stream_obj* s; /* Pointer back to the stream object */
178 /* next op_and_state in the linked list */
179 struct op_and_state* next = nullptr;
183 int num_pending_ops = 0;
184 struct op_and_state* head = nullptr;
188 stream_obj(grpc_transport* gt, grpc_stream* gs,
189 grpc_stream_refcount* refcount, grpc_core::Arena* arena);
192 grpc_core::Arena* arena;
193 struct op_and_state* oas = nullptr;
194 grpc_transport_stream_op_batch* curr_op = nullptr;
195 grpc_cronet_transport* curr_ct;
196 grpc_stream* curr_gs;
197 bidirectional_stream* cbs = nullptr;
198 bidirectional_stream_header_array header_array =
199 bidirectional_stream_header_array(); // Zero-initialize the structure.
201 /* Stream level state. Some state will be tracked both at stream and stream_op
203 struct op_state state;
206 struct op_storage storage;
208 /* Mutex to protect storage */
211 /* Refcount object of the stream */
212 grpc_stream_refcount* refcount;
216 #define GRPC_CRONET_STREAM_REF(stream, reason) \
217 grpc_cronet_stream_ref((stream), (reason))
218 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
219 grpc_cronet_stream_unref((stream), (reason))
220 void grpc_cronet_stream_ref(stream_obj* s, const char* reason) {
221 grpc_stream_ref(s->refcount, reason);
223 void grpc_cronet_stream_unref(stream_obj* s, const char* reason) {
224 grpc_stream_unref(s->refcount, reason);
227 #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
228 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
229 grpc_cronet_stream_unref((stream))
230 void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); }
231 void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); }
234 static enum e_op_result execute_stream_op(struct op_and_state* oas);
237 Utility function to translate enum into string for printing
239 static const char* op_result_string(enum e_op_result i) {
241 case ACTION_TAKEN_WITH_CALLBACK:
242 return "ACTION_TAKEN_WITH_CALLBACK";
243 case ACTION_TAKEN_NO_CALLBACK:
244 return "ACTION_TAKEN_NO_CALLBACK";
245 case NO_ACTION_POSSIBLE:
246 return "NO_ACTION_POSSIBLE";
248 GPR_UNREACHABLE_CODE(return "UNKNOWN");
251 static const char* op_id_string(enum e_op_id i) {
253 case OP_SEND_INITIAL_METADATA:
254 return "OP_SEND_INITIAL_METADATA";
255 case OP_SEND_MESSAGE:
256 return "OP_SEND_MESSAGE";
257 case OP_SEND_TRAILING_METADATA:
258 return "OP_SEND_TRAILING_METADATA";
259 case OP_RECV_MESSAGE:
260 return "OP_RECV_MESSAGE";
261 case OP_RECV_INITIAL_METADATA:
262 return "OP_RECV_INITIAL_METADATA";
263 case OP_RECV_TRAILING_METADATA:
264 return "OP_RECV_TRAILING_METADATA";
265 case OP_CANCEL_ERROR:
266 return "OP_CANCEL_ERROR";
268 return "OP_ON_COMPLETE";
272 return "OP_SUCCEEDED";
274 return "OP_CANCELED";
275 case OP_RECV_MESSAGE_AND_ON_COMPLETE:
276 return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
277 case OP_READ_REQ_MADE:
278 return "OP_READ_REQ_MADE";
285 static void null_and_maybe_free_read_buffer(stream_obj* s) {
286 if (s->state.rs.read_buffer &&
287 s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
288 gpr_free(s->state.rs.read_buffer);
290 s->state.rs.read_buffer = nullptr;
293 static void maybe_flush_read(stream_obj* s) {
294 /* To enter flush read state (discarding all the buffered messages in
295 * transport layer), two conditions must be satisfied: 1) non-zero grpc status
296 * has been received, and 2) an op requesting the status code
297 * (RECV_TRAILING_METADATA) is issued by the user. (See
298 * doc/status_ordering.md) */
299 /* Whenever the evaluation of any of the two condition is changed, we check
300 * whether we should enter the flush read state. */
301 if (s->state.pending_recv_trailing_metadata && s->state.fail_state) {
302 if (!s->state.flush_read && !s->state.rs.read_stream_closed) {
303 CRONET_LOG(GPR_DEBUG, "%p: Flush read", s);
304 s->state.flush_read = true;
305 null_and_maybe_free_read_buffer(s);
306 s->state.rs.read_buffer =
307 static_cast<char*>(gpr_malloc(GRPC_FLUSH_READ_SIZE));
308 if (!s->state.pending_read_from_cronet) {
309 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
310 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
311 GRPC_FLUSH_READ_SIZE);
312 s->state.pending_read_from_cronet = true;
318 static grpc_error* make_error_with_desc(int error_code, const char* desc) {
319 grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
320 error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, error_code);
324 inline op_and_state::op_and_state(stream_obj* s,
325 const grpc_transport_stream_op_batch& op)
326 : op(op), state(s->arena), s(s) {}
329 Add a new stream op to op storage.
331 static void add_to_storage(struct stream_obj* s,
332 grpc_transport_stream_op_batch* op) {
333 struct op_storage* storage = &s->storage;
334 /* add new op at the beginning of the linked list. The memory is freed
335 in remove_from_storage */
336 op_and_state* new_op = grpc_core::New<op_and_state>(s, *op);
338 new_op->next = storage->head;
339 storage->head = new_op;
340 storage->num_pending_ops++;
341 if (op->send_message) {
342 s->state.pending_send_message = true;
344 if (op->recv_trailing_metadata) {
345 s->state.pending_recv_trailing_metadata = true;
348 CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
349 storage->num_pending_ops);
350 gpr_mu_unlock(&s->mu);
354 Traverse the linked list and delete op and free memory
356 static void remove_from_storage(struct stream_obj* s,
357 struct op_and_state* oas) {
358 struct op_and_state* curr;
359 if (s->storage.head == nullptr || oas == nullptr) {
362 if (s->storage.head == oas) {
363 s->storage.head = oas->next;
364 grpc_core::Delete(oas);
365 s->storage.num_pending_ops--;
366 CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
367 s->storage.num_pending_ops);
369 for (curr = s->storage.head; curr != nullptr; curr = curr->next) {
370 if (curr->next == oas) {
371 curr->next = oas->next;
372 s->storage.num_pending_ops--;
373 CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
374 s->storage.num_pending_ops);
375 grpc_core::Delete(oas);
377 } else if (GPR_UNLIKELY(curr->next == nullptr)) {
378 CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
385 Cycle through ops and try to take next action. Break when either
386 an action with callback is taken, or no action is possible.
387 This can get executed from the Cronet network thread via cronet callback
388 or on the application supplied thread via the perform_stream_op function.
390 static void execute_from_storage(stream_obj* s) {
392 for (struct op_and_state* curr = s->storage.head; curr != nullptr;) {
393 CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
394 GPR_ASSERT(!curr->done);
395 enum e_op_result result = execute_stream_op(curr);
396 CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
397 op_result_string(result));
398 /* if this op is done, then remove it and free memory */
400 struct op_and_state* next = curr->next;
401 remove_from_storage(s, curr);
403 } else if (result == NO_ACTION_POSSIBLE) {
405 } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
406 /* wait for the callback */
408 } /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
410 gpr_mu_unlock(&s->mu);
413 static void convert_cronet_array_to_metadata(
414 const bidirectional_stream_header_array* header_array,
415 grpc_chttp2_incoming_metadata_buffer* mds) {
416 for (size_t i = 0; i < header_array->count; i++) {
417 CRONET_LOG(GPR_DEBUG, "header key=%s, value=%s",
418 header_array->headers[i].key, header_array->headers[i].value);
419 grpc_slice key = grpc_slice_intern(
420 grpc_slice_from_static_string(header_array->headers[i].key));
422 if (grpc_is_binary_header(key)) {
423 value = grpc_slice_from_static_string(header_array->headers[i].value);
424 value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length(
425 value, grpc_chttp2_base64_infer_length_after_decode(value)));
427 value = grpc_slice_intern(
428 grpc_slice_from_static_string(header_array->headers[i].value));
430 GRPC_LOG_IF_ERROR("convert_cronet_array_to_metadata",
431 grpc_chttp2_incoming_metadata_buffer_add(
432 mds, grpc_mdelem_from_slices(key, value)));
439 static void on_failed(bidirectional_stream* stream, int net_error) {
440 gpr_log(GPR_ERROR, "on_failed(%p, %d)", stream, net_error);
441 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
442 grpc_core::ExecCtx exec_ctx;
444 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
446 bidirectional_stream_destroy(s->cbs);
447 s->state.state_callback_received[OP_FAILED] = true;
449 if (s->header_array.headers) {
450 gpr_free(s->header_array.headers);
451 s->header_array.headers = nullptr;
453 if (s->state.ws.write_buffer) {
454 gpr_free(s->state.ws.write_buffer);
455 s->state.ws.write_buffer = nullptr;
457 null_and_maybe_free_read_buffer(s);
458 gpr_mu_unlock(&s->mu);
459 execute_from_storage(s);
460 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
466 static void on_canceled(bidirectional_stream* stream) {
467 CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
468 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
469 grpc_core::ExecCtx exec_ctx;
471 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
473 bidirectional_stream_destroy(s->cbs);
474 s->state.state_callback_received[OP_CANCELED] = true;
476 if (s->header_array.headers) {
477 gpr_free(s->header_array.headers);
478 s->header_array.headers = nullptr;
480 if (s->state.ws.write_buffer) {
481 gpr_free(s->state.ws.write_buffer);
482 s->state.ws.write_buffer = nullptr;
484 null_and_maybe_free_read_buffer(s);
485 gpr_mu_unlock(&s->mu);
486 execute_from_storage(s);
487 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
493 static void on_succeeded(bidirectional_stream* stream) {
494 CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
495 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
496 grpc_core::ExecCtx exec_ctx;
498 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
500 bidirectional_stream_destroy(s->cbs);
501 s->state.state_callback_received[OP_SUCCEEDED] = true;
503 null_and_maybe_free_read_buffer(s);
504 gpr_mu_unlock(&s->mu);
505 execute_from_storage(s);
506 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
512 static void on_stream_ready(bidirectional_stream* stream) {
513 CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
514 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
515 grpc_core::ExecCtx exec_ctx;
516 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
517 grpc_cronet_transport* t = s->curr_ct;
519 s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
520 s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
521 /* Free the memory allocated for headers */
522 if (s->header_array.headers) {
523 gpr_free(s->header_array.headers);
524 s->header_array.headers = nullptr;
526 /* Send the initial metadata on wire if there is no SEND_MESSAGE or
527 * SEND_TRAILING_METADATA ops pending */
528 if (t->use_packet_coalescing) {
529 if (s->state.flush_cronet_when_ready) {
530 CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
531 bidirectional_stream_flush(stream);
534 gpr_mu_unlock(&s->mu);
535 execute_from_storage(s);
541 static void on_response_headers_received(
542 bidirectional_stream* stream,
543 const bidirectional_stream_header_array* headers,
544 const char* negotiated_protocol) {
545 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
546 grpc_core::ExecCtx exec_ctx;
547 CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
548 headers, negotiated_protocol);
549 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
551 /* Identify if this is a header or a trailer (in a trailer-only response case)
553 for (size_t i = 0; i < headers->count; i++) {
554 if (0 == strcmp("grpc-status", headers->headers[i].key)) {
555 on_response_trailers_received(stream, headers);
561 convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata);
562 s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
563 if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
564 s->state.state_callback_received[OP_FAILED])) {
565 /* Do an extra read to trigger on_succeeded() callback in case connection
567 GPR_ASSERT(s->state.rs.length_field_received == false);
568 s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
569 s->state.rs.compressed = false;
570 s->state.rs.received_bytes = 0;
571 s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
572 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
573 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
574 s->state.rs.remaining_bytes);
575 s->state.pending_read_from_cronet = true;
577 gpr_mu_unlock(&s->mu);
578 execute_from_storage(s);
584 static void on_write_completed(bidirectional_stream* stream, const char* data) {
585 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
586 grpc_core::ExecCtx exec_ctx;
587 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
588 CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
590 if (s->state.ws.write_buffer) {
591 gpr_free(s->state.ws.write_buffer);
592 s->state.ws.write_buffer = nullptr;
594 s->state.state_callback_received[OP_SEND_MESSAGE] = true;
595 gpr_mu_unlock(&s->mu);
596 execute_from_storage(s);
602 static void on_read_completed(bidirectional_stream* stream, char* data,
604 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
605 grpc_core::ExecCtx exec_ctx;
606 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
607 CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
610 s->state.pending_read_from_cronet = false;
611 s->state.state_callback_received[OP_RECV_MESSAGE] = true;
612 if (count > 0 && s->state.flush_read) {
613 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
614 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
615 GRPC_FLUSH_READ_SIZE);
616 s->state.pending_read_from_cronet = true;
617 gpr_mu_unlock(&s->mu);
618 } else if (count > 0) {
619 s->state.rs.received_bytes += count;
620 s->state.rs.remaining_bytes -= count;
621 if (s->state.rs.remaining_bytes > 0) {
622 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
623 s->state.state_op_done[OP_READ_REQ_MADE] = true;
624 bidirectional_stream_read(
625 s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
626 s->state.rs.remaining_bytes);
627 s->state.pending_read_from_cronet = true;
628 gpr_mu_unlock(&s->mu);
630 gpr_mu_unlock(&s->mu);
631 execute_from_storage(s);
634 null_and_maybe_free_read_buffer(s);
635 s->state.rs.read_stream_closed = true;
636 gpr_mu_unlock(&s->mu);
637 execute_from_storage(s);
644 static void on_response_trailers_received(
645 bidirectional_stream* stream,
646 const bidirectional_stream_header_array* trailers) {
647 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
648 grpc_core::ExecCtx exec_ctx;
649 CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
651 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
652 grpc_cronet_transport* t = s->curr_ct;
654 s->state.rs.trailing_metadata_valid = false;
655 convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata);
656 if (trailers->count > 0) {
657 s->state.rs.trailing_metadata_valid = true;
659 for (size_t i = 0; i < trailers->count; i++) {
660 if (0 == strcmp(trailers->headers[i].key, "grpc-status") &&
661 0 != strcmp(trailers->headers[i].value, "0")) {
662 s->state.fail_state = true;
666 s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
667 /* Send a EOS when server terminates the stream (testServerFinishesRequest) to
668 * trigger on_succeeded */
669 if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] &&
670 !(s->state.state_op_done[OP_CANCEL_ERROR] ||
671 s->state.state_callback_received[OP_FAILED])) {
672 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
673 s->state.state_callback_received[OP_SEND_MESSAGE] = false;
674 bidirectional_stream_write(s->cbs, "", 0, true);
675 if (t->use_packet_coalescing) {
676 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
677 bidirectional_stream_flush(s->cbs);
679 s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
681 gpr_mu_unlock(&s->mu);
683 gpr_mu_unlock(&s->mu);
684 execute_from_storage(s);
689 Utility function that takes the data from s->write_slice_buffer and assembles
690 into a contiguous byte stream with 5 byte gRPC header prepended.
692 static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer,
693 char** pp_write_buffer,
694 size_t* p_write_buffer_size, uint32_t flags) {
695 grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer);
696 size_t length = GRPC_SLICE_LENGTH(slice);
697 *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
698 /* This is freed in the on_write_completed callback */
700 static_cast<char*>(gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES));
701 *pp_write_buffer = write_buffer;
702 uint8_t* p = reinterpret_cast<uint8_t*>(write_buffer);
703 /* Append 5 byte header */
704 /* Compressed flag */
705 *p++ = static_cast<uint8_t>((flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0);
707 *p++ = static_cast<uint8_t>(length >> 24);
708 *p++ = static_cast<uint8_t>(length >> 16);
709 *p++ = static_cast<uint8_t>(length >> 8);
710 *p++ = static_cast<uint8_t>(length);
711 /* append actual data */
712 memcpy(p, GRPC_SLICE_START_PTR(slice), length);
713 grpc_slice_unref_internal(slice);
717 Convert metadata in a format that Cronet can consume
719 static void convert_metadata_to_cronet_headers(
720 grpc_linked_mdelem* head, const char* host, char** pp_url,
721 bidirectional_stream_header** pp_headers, size_t* p_num_headers,
722 const char** method) {
723 grpc_linked_mdelem* curr = head;
724 /* Walk the linked list and get number of header fields */
725 size_t num_headers_available = 0;
726 while (curr != nullptr) {
728 num_headers_available++;
730 /* Allocate enough memory. It is freed in the on_stream_ready callback
732 bidirectional_stream_header* headers =
733 static_cast<bidirectional_stream_header*>(gpr_malloc(
734 sizeof(bidirectional_stream_header) * num_headers_available));
735 *pp_headers = headers;
737 /* Walk the linked list again, this time copying the header fields.
738 s->num_headers can be less than num_headers_available, as some headers
739 are not used for cronet.
740 TODO (makdharma): Eliminate need to traverse the LL second time for perf.
743 size_t num_headers = 0;
744 while (num_headers < num_headers_available) {
745 grpc_mdelem mdelem = curr->md;
747 char* key = grpc_slice_to_c_string(GRPC_MDKEY(mdelem));
749 if (grpc_is_binary_header(GRPC_MDKEY(mdelem))) {
750 grpc_slice wire_value = grpc_chttp2_base64_encode(GRPC_MDVALUE(mdelem));
751 value = grpc_slice_to_c_string(wire_value);
752 grpc_slice_unref_internal(wire_value);
754 value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem));
756 if (grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_SCHEME) ||
757 grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_AUTHORITY)) {
758 /* Cronet populates these fields on its own */
763 if (grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_METHOD)) {
764 if (grpc_slice_eq(GRPC_MDVALUE(mdelem), GRPC_MDSTR_PUT)) {
767 /* POST method in default*/
774 if (grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_PATH)) {
775 /* Create URL by appending :path value to the hostname */
776 gpr_asprintf(pp_url, "https://%s%s", host, value);
781 CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
782 headers[num_headers].key = key;
783 headers[num_headers].value = value;
785 if (curr == nullptr) {
789 *p_num_headers = num_headers;
792 static void parse_grpc_header(const uint8_t* data, int* length,
794 const uint8_t c = *data;
795 const uint8_t* p = data + 1;
796 *compressed = ((c & 0x01) == 0x01);
798 *length |= (*p++) << 24;
799 *length |= (*p++) << 16;
800 *length |= (*p++) << 8;
804 static bool header_has_authority(grpc_linked_mdelem* head) {
805 while (head != nullptr) {
806 if (grpc_slice_eq(GRPC_MDKEY(head->md), GRPC_MDSTR_AUTHORITY)) {
815 Op Execution: Decide if one of the actions contained in the stream op can be
816 executed. This is the heart of the state machine.
818 static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
819 struct stream_obj* s, struct op_state* op_state,
820 enum e_op_id op_id) {
821 struct op_state* stream_state = &s->state;
822 grpc_cronet_transport* t = s->curr_ct;
824 /* When call is canceled, every op can be run, except under following
827 bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
828 stream_state->state_callback_received[OP_FAILED];
829 if (is_canceled_or_failed) {
830 if (op_id == OP_SEND_INITIAL_METADATA) {
831 CRONET_LOG(GPR_DEBUG, "Because");
834 if (op_id == OP_SEND_MESSAGE) {
835 CRONET_LOG(GPR_DEBUG, "Because");
838 if (op_id == OP_SEND_TRAILING_METADATA) {
839 CRONET_LOG(GPR_DEBUG, "Because");
842 if (op_id == OP_CANCEL_ERROR) {
843 CRONET_LOG(GPR_DEBUG, "Because");
846 /* already executed */
847 if (op_id == OP_RECV_INITIAL_METADATA &&
848 stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
849 CRONET_LOG(GPR_DEBUG, "Because");
852 if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) {
853 CRONET_LOG(GPR_DEBUG, "Because");
856 if (op_id == OP_RECV_TRAILING_METADATA &&
857 stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
858 CRONET_LOG(GPR_DEBUG, "Because");
861 /* ON_COMPLETE can be processed if one of the following conditions is met:
862 * 1. the stream failed
863 * 2. the stream is cancelled, and the callback is received
864 * 3. the stream succeeded before cancel is effective
865 * 4. the stream is cancelled, and the stream is never started */
866 if (op_id == OP_ON_COMPLETE &&
867 !(stream_state->state_callback_received[OP_FAILED] ||
868 stream_state->state_callback_received[OP_CANCELED] ||
869 stream_state->state_callback_received[OP_SUCCEEDED] ||
870 !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
871 CRONET_LOG(GPR_DEBUG, "Because");
874 } else if (op_id == OP_SEND_INITIAL_METADATA) {
875 /* already executed */
876 if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
877 } else if (op_id == OP_RECV_INITIAL_METADATA) {
878 /* already executed */
879 if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) result = false;
880 /* we haven't sent headers yet. */
881 else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
883 /* we haven't received headers yet. */
884 else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
885 !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
887 } else if (op_id == OP_SEND_MESSAGE) {
888 /* already executed (note we're checking op specific state, not stream
890 if (op_state->state_op_done[OP_SEND_MESSAGE]) result = false;
891 /* we haven't sent headers yet. */
892 else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
894 } else if (op_id == OP_RECV_MESSAGE) {
895 /* already executed */
896 if (op_state->state_op_done[OP_RECV_MESSAGE]) result = false;
897 /* we haven't received headers yet. */
898 else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
899 !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
901 } else if (op_id == OP_RECV_TRAILING_METADATA) {
902 /* already executed */
903 if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false;
904 /* we have asked for but haven't received message yet. */
905 else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
906 !stream_state->state_op_done[OP_RECV_MESSAGE])
908 /* we haven't received trailers yet. */
909 else if (!stream_state->state_callback_received[OP_RECV_TRAILING_METADATA])
911 /* we haven't received on_succeeded yet. */
912 else if (!stream_state->state_callback_received[OP_SUCCEEDED])
914 } else if (op_id == OP_SEND_TRAILING_METADATA) {
915 /* already executed */
916 if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false;
917 /* we haven't sent initial metadata yet */
918 else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
920 /* we haven't sent message yet */
921 else if (stream_state->pending_send_message &&
922 !stream_state->state_op_done[OP_SEND_MESSAGE])
924 /* we haven't got on_write_completed for the send yet */
925 else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
926 !stream_state->state_callback_received[OP_SEND_MESSAGE] &&
927 !(t->use_packet_coalescing &&
928 stream_state->pending_write_for_trailer))
930 } else if (op_id == OP_CANCEL_ERROR) {
931 /* already executed */
932 if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
933 } else if (op_id == OP_ON_COMPLETE) {
934 /* already executed (note we're checking op specific state, not stream
936 if (op_state->state_op_done[OP_ON_COMPLETE]) {
937 CRONET_LOG(GPR_DEBUG, "Because");
940 /* Check if every op that was asked for is done. */
941 /* TODO(muxi): We should not consider the recv ops here, since they
942 * have their own callbacks. We should invoke a batch's on_complete
943 * as soon as all of the batch's send ops are complete, even if
944 * there are still recv ops pending. */
945 else if (curr_op->send_initial_metadata &&
946 !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
947 CRONET_LOG(GPR_DEBUG, "Because");
949 } else if (curr_op->send_message &&
950 !op_state->state_op_done[OP_SEND_MESSAGE]) {
951 CRONET_LOG(GPR_DEBUG, "Because");
953 } else if (curr_op->send_message &&
954 !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
955 CRONET_LOG(GPR_DEBUG, "Because");
957 } else if (curr_op->send_trailing_metadata &&
958 !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
959 CRONET_LOG(GPR_DEBUG, "Because");
961 } else if (curr_op->recv_initial_metadata &&
962 !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
963 CRONET_LOG(GPR_DEBUG, "Because");
965 } else if (curr_op->recv_message &&
966 !op_state->state_op_done[OP_RECV_MESSAGE]) {
967 CRONET_LOG(GPR_DEBUG, "Because");
969 } else if (curr_op->cancel_stream &&
970 !stream_state->state_callback_received[OP_CANCELED]) {
971 CRONET_LOG(GPR_DEBUG, "Because");
973 } else if (curr_op->recv_trailing_metadata) {
974 /* We aren't done with trailing metadata yet */
975 if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
976 CRONET_LOG(GPR_DEBUG, "Because");
979 /* We've asked for actual message in an earlier op, and it hasn't been
981 else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
982 /* If this op is not the one asking for read, (which means some earlier
983 op has asked), and the read hasn't been delivered. */
984 if (!curr_op->recv_message &&
985 !stream_state->state_callback_received[OP_SUCCEEDED]) {
986 CRONET_LOG(GPR_DEBUG, "Because");
991 /* We should see at least one on_write_completed for the trailers that we
993 else if (curr_op->send_trailing_metadata &&
994 !stream_state->state_callback_received[OP_SEND_MESSAGE])
997 CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
998 result ? "YES" : "NO");
1003 TODO (makdharma): Break down this function in smaller chunks for readability.
1005 static enum e_op_result execute_stream_op(struct op_and_state* oas) {
1006 grpc_transport_stream_op_batch* stream_op = &oas->op;
1007 struct stream_obj* s = oas->s;
1008 grpc_cronet_transport* t = s->curr_ct;
1009 struct op_state* stream_state = &s->state;
1010 enum e_op_result result = NO_ACTION_POSSIBLE;
1011 if (stream_op->send_initial_metadata &&
1012 op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) {
1013 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
1014 /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
1016 GPR_ASSERT(s->cbs == nullptr);
1017 GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
1019 bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks);
1020 CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
1021 if (t->use_packet_coalescing) {
1022 bidirectional_stream_disable_auto_flush(s->cbs, true);
1023 bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
1025 char* url = nullptr;
1026 const char* method = "POST";
1027 s->header_array.headers = nullptr;
1028 convert_metadata_to_cronet_headers(stream_op->payload->send_initial_metadata
1029 .send_initial_metadata->list.head,
1030 t->host, &url, &s->header_array.headers,
1031 &s->header_array.count, &method);
1032 s->header_array.capacity = s->header_array.count;
1033 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url);
1034 bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false);
1038 unsigned int header_index;
1039 for (header_index = 0; header_index < s->header_array.count;
1041 gpr_free((void*)s->header_array.headers[header_index].key);
1042 gpr_free((void*)s->header_array.headers[header_index].value);
1044 stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
1045 if (t->use_packet_coalescing) {
1046 if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
1047 s->state.flush_cronet_when_ready = true;
1050 result = ACTION_TAKEN_WITH_CALLBACK;
1051 } else if (stream_op->send_message &&
1052 op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
1053 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas);
1054 stream_state->pending_send_message = false;
1055 if (stream_state->state_callback_received[OP_FAILED]) {
1056 result = NO_ACTION_POSSIBLE;
1057 CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
1059 grpc_slice_buffer write_slice_buffer;
1061 grpc_slice_buffer_init(&write_slice_buffer);
1062 if (1 != stream_op->payload->send_message.send_message->Next(
1063 stream_op->payload->send_message.send_message->length(),
1065 /* Should never reach here */
1068 if (GRPC_ERROR_NONE !=
1069 stream_op->payload->send_message.send_message->Pull(&slice)) {
1070 /* Should never reach here */
1073 grpc_slice_buffer_add(&write_slice_buffer, slice);
1074 if (GPR_UNLIKELY(write_slice_buffer.count != 1)) {
1075 /* Empty request not handled yet */
1076 gpr_log(GPR_ERROR, "Empty request is not supported");
1077 GPR_ASSERT(write_slice_buffer.count == 1);
1079 if (write_slice_buffer.count > 0) {
1080 size_t write_buffer_size;
1082 &write_slice_buffer, &stream_state->ws.write_buffer,
1084 stream_op->payload->send_message.send_message->flags());
1085 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
1086 stream_state->ws.write_buffer);
1087 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1088 bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
1089 static_cast<int>(write_buffer_size), false);
1090 grpc_slice_buffer_destroy_internal(&write_slice_buffer);
1091 if (t->use_packet_coalescing) {
1092 if (!stream_op->send_trailing_metadata) {
1093 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1094 bidirectional_stream_flush(s->cbs);
1095 result = ACTION_TAKEN_WITH_CALLBACK;
1097 stream_state->pending_write_for_trailer = true;
1098 result = ACTION_TAKEN_NO_CALLBACK;
1101 result = ACTION_TAKEN_WITH_CALLBACK;
1104 result = NO_ACTION_POSSIBLE;
1107 stream_state->state_op_done[OP_SEND_MESSAGE] = true;
1108 oas->state.state_op_done[OP_SEND_MESSAGE] = true;
1109 stream_op->payload->send_message.send_message.reset();
1110 } else if (stream_op->send_trailing_metadata &&
1111 op_can_be_run(stream_op, s, &oas->state,
1112 OP_SEND_TRAILING_METADATA)) {
1113 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas);
1114 if (stream_state->state_callback_received[OP_FAILED]) {
1115 result = NO_ACTION_POSSIBLE;
1116 CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
1118 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
1119 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1120 bidirectional_stream_write(s->cbs, "", 0, true);
1121 if (t->use_packet_coalescing) {
1122 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1123 bidirectional_stream_flush(s->cbs);
1125 result = ACTION_TAKEN_WITH_CALLBACK;
1127 stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
1128 } else if (stream_op->recv_initial_metadata &&
1129 op_can_be_run(stream_op, s, &oas->state,
1130 OP_RECV_INITIAL_METADATA)) {
1131 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
1132 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1134 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1136 } else if (stream_state->state_callback_received[OP_FAILED]) {
1138 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1140 } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1142 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1145 grpc_chttp2_incoming_metadata_buffer_publish(
1146 &oas->s->state.rs.initial_metadata,
1147 stream_op->payload->recv_initial_metadata.recv_initial_metadata);
1149 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1152 stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
1153 result = ACTION_TAKEN_NO_CALLBACK;
1154 } else if (stream_op->recv_message &&
1155 op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) {
1156 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
1157 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1158 CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
1159 GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1161 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1162 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1163 result = ACTION_TAKEN_NO_CALLBACK;
1164 } else if (stream_state->state_callback_received[OP_FAILED]) {
1165 CRONET_LOG(GPR_DEBUG, "Stream failed.");
1166 GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1168 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1169 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1170 result = ACTION_TAKEN_NO_CALLBACK;
1171 } else if (stream_state->rs.read_stream_closed == true) {
1172 /* No more data will be received */
1173 CRONET_LOG(GPR_DEBUG, "read stream closed");
1174 GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1176 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1177 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1178 result = ACTION_TAKEN_NO_CALLBACK;
1179 } else if (stream_state->flush_read) {
1180 CRONET_LOG(GPR_DEBUG, "flush read");
1181 GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1183 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1184 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1185 result = ACTION_TAKEN_NO_CALLBACK;
1186 } else if (stream_state->rs.length_field_received == false) {
1187 if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
1188 stream_state->rs.remaining_bytes == 0) {
1189 /* Start a read operation for data */
1190 stream_state->rs.length_field_received = true;
1192 reinterpret_cast<const uint8_t*>(stream_state->rs.read_buffer),
1193 &stream_state->rs.length_field, &stream_state->rs.compressed);
1194 CRONET_LOG(GPR_DEBUG, "length field = %d",
1195 stream_state->rs.length_field);
1196 if (stream_state->rs.length_field > 0) {
1197 stream_state->rs.read_buffer = static_cast<char*>(
1198 gpr_malloc(static_cast<size_t>(stream_state->rs.length_field)));
1199 GPR_ASSERT(stream_state->rs.read_buffer);
1200 stream_state->rs.remaining_bytes = stream_state->rs.length_field;
1201 stream_state->rs.received_bytes = 0;
1202 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1203 stream_state->state_op_done[OP_READ_REQ_MADE] =
1204 true; /* Indicates that at least one read request has been made */
1205 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1206 stream_state->rs.remaining_bytes);
1207 stream_state->pending_read_from_cronet = true;
1208 result = ACTION_TAKEN_WITH_CALLBACK;
1210 stream_state->rs.remaining_bytes = 0;
1211 CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
1212 /* Clean up read_slice_buffer in case there is unread data. */
1213 grpc_slice_buffer_destroy_internal(
1214 &stream_state->rs.read_slice_buffer);
1215 grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1217 if (stream_state->rs.compressed) {
1218 flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1220 stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1221 stream_op->payload->recv_message.recv_message->reset(
1222 stream_state->rs.sbs.get());
1224 stream_op->payload->recv_message.recv_message_ready,
1226 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1227 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1229 /* Extra read to trigger on_succeed */
1230 stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1231 stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1232 stream_state->rs.received_bytes = 0;
1233 stream_state->rs.compressed = false;
1234 stream_state->rs.length_field_received = false;
1235 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1236 stream_state->state_op_done[OP_READ_REQ_MADE] =
1237 true; /* Indicates that at least one read request has been made */
1238 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1239 stream_state->rs.remaining_bytes);
1240 stream_state->pending_read_from_cronet = true;
1241 result = ACTION_TAKEN_NO_CALLBACK;
1243 } else if (stream_state->rs.remaining_bytes == 0) {
1244 /* Start a read operation for first 5 bytes (GRPC header) */
1245 stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1246 stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1247 stream_state->rs.received_bytes = 0;
1248 stream_state->rs.compressed = false;
1249 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1250 stream_state->state_op_done[OP_READ_REQ_MADE] =
1251 true; /* Indicates that at least one read request has been made */
1252 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1253 stream_state->rs.remaining_bytes);
1254 stream_state->pending_read_from_cronet = true;
1255 result = ACTION_TAKEN_WITH_CALLBACK;
1257 result = NO_ACTION_POSSIBLE;
1259 } else if (stream_state->rs.remaining_bytes == 0) {
1260 CRONET_LOG(GPR_DEBUG, "read operation complete");
1261 grpc_slice read_data_slice =
1262 GRPC_SLICE_MALLOC((uint32_t)stream_state->rs.length_field);
1263 uint8_t* dst_p = GRPC_SLICE_START_PTR(read_data_slice);
1264 memcpy(dst_p, stream_state->rs.read_buffer,
1265 static_cast<size_t>(stream_state->rs.length_field));
1266 null_and_maybe_free_read_buffer(s);
1267 /* Clean up read_slice_buffer in case there is unread data. */
1268 grpc_slice_buffer_destroy_internal(&stream_state->rs.read_slice_buffer);
1269 grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1270 grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
1273 if (stream_state->rs.compressed) {
1274 flags = GRPC_WRITE_INTERNAL_COMPRESS;
1276 stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1277 stream_op->payload->recv_message.recv_message->reset(
1278 stream_state->rs.sbs.get());
1279 GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1281 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1282 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1283 /* Do an extra read to trigger on_succeeded() callback in case connection
1285 stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1286 stream_state->rs.compressed = false;
1287 stream_state->rs.received_bytes = 0;
1288 stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1289 stream_state->rs.length_field_received = false;
1290 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1291 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1292 stream_state->rs.remaining_bytes);
1293 stream_state->pending_read_from_cronet = true;
1294 result = ACTION_TAKEN_NO_CALLBACK;
1296 } else if (stream_op->recv_trailing_metadata &&
1297 op_can_be_run(stream_op, s, &oas->state,
1298 OP_RECV_TRAILING_METADATA)) {
1299 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
1300 grpc_error* error = GRPC_ERROR_NONE;
1301 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1302 error = GRPC_ERROR_REF(stream_state->cancel_error);
1303 } else if (stream_state->state_callback_received[OP_FAILED]) {
1304 error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.");
1305 } else if (oas->s->state.rs.trailing_metadata_valid) {
1306 grpc_chttp2_incoming_metadata_buffer_publish(
1307 &oas->s->state.rs.trailing_metadata,
1308 stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
1309 stream_state->rs.trailing_metadata_valid = false;
1312 stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1314 stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
1315 result = ACTION_TAKEN_NO_CALLBACK;
1316 } else if (stream_op->cancel_stream &&
1317 op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
1318 CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas);
1320 CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
1321 bidirectional_stream_cancel(s->cbs);
1322 result = ACTION_TAKEN_WITH_CALLBACK;
1324 result = ACTION_TAKEN_NO_CALLBACK;
1326 stream_state->state_op_done[OP_CANCEL_ERROR] = true;
1327 if (!stream_state->cancel_error) {
1328 stream_state->cancel_error =
1329 GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error);
1331 } else if (stream_op->on_complete &&
1332 op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
1333 CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
1334 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1335 GRPC_CLOSURE_SCHED(stream_op->on_complete,
1336 GRPC_ERROR_REF(stream_state->cancel_error));
1337 } else if (stream_state->state_callback_received[OP_FAILED]) {
1339 stream_op->on_complete,
1340 make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
1342 /* All actions in this stream_op are complete. Call the on_complete
1345 GRPC_CLOSURE_SCHED(stream_op->on_complete, GRPC_ERROR_NONE);
1347 oas->state.state_op_done[OP_ON_COMPLETE] = true;
1349 /* reset any send message state, only if this ON_COMPLETE is about a send.
1351 if (stream_op->send_message) {
1352 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1353 stream_state->state_op_done[OP_SEND_MESSAGE] = false;
1355 result = ACTION_TAKEN_NO_CALLBACK;
1356 /* If this is the on_complete callback being called for a received message -
1358 if (stream_op->recv_message)
1359 stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
1361 result = NO_ACTION_POSSIBLE;
1367 Functions used by upper layers to access transport functionality.
1370 inline stream_obj::stream_obj(grpc_transport* gt, grpc_stream* gs,
1371 grpc_stream_refcount* refcount,
1372 grpc_core::Arena* arena)
1374 curr_ct(reinterpret_cast<grpc_cronet_transport*>(gt)),
1377 refcount(refcount) {
1378 GRPC_CRONET_STREAM_REF(this, "cronet transport");
1382 inline stream_obj::~stream_obj() {
1383 null_and_maybe_free_read_buffer(this);
1384 /* Clean up read_slice_buffer in case there is unread data. */
1385 grpc_slice_buffer_destroy_internal(&state.rs.read_slice_buffer);
1386 GRPC_ERROR_UNREF(state.cancel_error);
1389 static int init_stream(grpc_transport* gt, grpc_stream* gs,
1390 grpc_stream_refcount* refcount, const void* server_data,
1391 grpc_core::Arena* arena) {
1392 new (gs) stream_obj(gt, gs, refcount, arena);
1396 static void set_pollset_do_nothing(grpc_transport* gt, grpc_stream* gs,
1397 grpc_pollset* pollset) {}
1399 static void set_pollset_set_do_nothing(grpc_transport* gt, grpc_stream* gs,
1400 grpc_pollset_set* pollset_set) {}
1402 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1403 grpc_transport_stream_op_batch* op) {
1404 CRONET_LOG(GPR_DEBUG, "perform_stream_op");
1405 if (op->send_initial_metadata &&
1406 header_has_authority(op->payload->send_initial_metadata
1407 .send_initial_metadata->list.head)) {
1408 /* Cronet does not support :authority header field. We cancel the call when
1409 this field is present in metadata */
1410 if (op->recv_initial_metadata) {
1412 op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1413 GRPC_ERROR_CANCELLED);
1415 if (op->recv_message) {
1416 GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
1417 GRPC_ERROR_CANCELLED);
1419 if (op->recv_trailing_metadata) {
1421 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1422 GRPC_ERROR_CANCELLED);
1424 GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED);
1427 stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1428 add_to_storage(s, op);
1429 execute_from_storage(s);
1432 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
1433 grpc_closure* then_schedule_closure) {
1434 stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1436 GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
1439 static void destroy_transport(grpc_transport* gt) {}
1441 static grpc_endpoint* get_endpoint(grpc_transport* gt) { return nullptr; }
1443 static void perform_op(grpc_transport* gt, grpc_transport_op* op) {}
1445 static const grpc_transport_vtable grpc_cronet_vtable = {
1449 set_pollset_do_nothing,
1450 set_pollset_set_do_nothing,
1457 grpc_transport* grpc_create_cronet_transport(void* engine, const char* target,
1458 const grpc_channel_args* args,
1460 grpc_cronet_transport* ct = static_cast<grpc_cronet_transport*>(
1461 gpr_malloc(sizeof(grpc_cronet_transport)));
1465 ct->base.vtable = &grpc_cronet_vtable;
1466 ct->engine = static_cast<stream_engine*>(engine);
1467 ct->host = static_cast<char*>(gpr_malloc(strlen(target) + 1));
1471 strcpy(ct->host, target);
1473 ct->use_packet_coalescing = true;
1475 for (size_t i = 0; i < args->num_args; i++) {
1477 strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) {
1478 if (GPR_UNLIKELY(args->args[i].type != GRPC_ARG_INTEGER)) {
1479 gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
1480 GRPC_ARG_USE_CRONET_PACKET_COALESCING);
1482 ct->use_packet_coalescing = (args->args[i].value.integer != 0);