Imported Upstream version 1.41.0
[platform/upstream/grpc.git] / src / core / ext / transport / cronet / transport / cronet_transport.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/cronet/transport/cronet_transport.h"
22
23 #include <string.h>
24
25 #include <string>
26
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/str_format.h"
29 #include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
30
31 #include <grpc/slice_buffer.h>
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/log.h>
34
35 #include "src/core/ext/transport/chttp2/transport/bin_decoder.h"
36 #include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
37 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
38 #include "src/core/ext/transport/cronet/transport/cronet_status.h"
39 #include "src/core/lib/debug/trace.h"
40 #include "src/core/lib/gpr/string.h"
41 #include "src/core/lib/gprpp/manual_constructor.h"
42 #include "src/core/lib/iomgr/endpoint.h"
43 #include "src/core/lib/iomgr/exec_ctx.h"
44 #include "src/core/lib/slice/slice_internal.h"
45 #include "src/core/lib/slice/slice_string_helpers.h"
46 #include "src/core/lib/surface/channel.h"
47 #include "src/core/lib/surface/validate_metadata.h"
48 #include "src/core/lib/transport/metadata_batch.h"
49 #include "src/core/lib/transport/static_metadata.h"
50 #include "src/core/lib/transport/timeout_encoding.h"
51 #include "src/core/lib/transport/transport_impl.h"
52
53 #define GRPC_HEADER_SIZE_IN_BYTES 5
54 #define GRPC_FLUSH_READ_SIZE 4096
55
56 grpc_core::TraceFlag grpc_cronet_trace(false, "cronet");
57 #define CRONET_LOG(...)                                    \
58   do {                                                     \
59     if (grpc_cronet_trace.enabled()) gpr_log(__VA_ARGS__); \
60   } while (0)
61
62 enum e_op_result {
63   ACTION_TAKEN_WITH_CALLBACK,
64   ACTION_TAKEN_NO_CALLBACK,
65   NO_ACTION_POSSIBLE
66 };
67
68 enum e_op_id {
69   OP_SEND_INITIAL_METADATA = 0,
70   OP_SEND_MESSAGE,
71   OP_SEND_TRAILING_METADATA,
72   OP_RECV_MESSAGE,
73   OP_RECV_INITIAL_METADATA,
74   OP_RECV_TRAILING_METADATA,
75   OP_CANCEL_ERROR,
76   OP_ON_COMPLETE,
77   OP_FAILED,
78   OP_SUCCEEDED,
79   OP_CANCELED,
80   OP_RECV_MESSAGE_AND_ON_COMPLETE,
81   OP_READ_REQ_MADE,
82   OP_NUM_OPS
83 };
84
85 /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
86
87 static void on_stream_ready(bidirectional_stream*);
88 static void on_response_headers_received(
89     bidirectional_stream*, const bidirectional_stream_header_array*,
90     const char*);
91 static void on_write_completed(bidirectional_stream*, const char*);
92 static void on_read_completed(bidirectional_stream*, char*, int);
93 static void on_response_trailers_received(
94     bidirectional_stream*, const bidirectional_stream_header_array*);
95 static void on_succeeded(bidirectional_stream*);
96 static void on_failed(bidirectional_stream*, int);
97 static void on_canceled(bidirectional_stream*);
98 static bidirectional_stream_callback cronet_callbacks = {
99     on_stream_ready,
100     on_response_headers_received,
101     on_read_completed,
102     on_write_completed,
103     on_response_trailers_received,
104     on_succeeded,
105     on_failed,
106     on_canceled};
107
108 /* Cronet transport object */
109 struct grpc_cronet_transport {
110   grpc_transport base; /* must be first element in this structure */
111   stream_engine* engine;
112   char* host;
113   bool use_packet_coalescing;
114 };
115 typedef struct grpc_cronet_transport grpc_cronet_transport;
116
117 /* TODO (makdharma): reorder structure for memory efficiency per
118    http://www.catb.org/esr/structure-packing/#_structure_reordering: */
119 struct read_state {
120   explicit read_state(grpc_core::Arena* arena)
121       : trailing_metadata(arena), initial_metadata(arena) {
122     grpc_slice_buffer_init(&read_slice_buffer);
123   }
124
125   /* vars to store data coming from server */
126   char* read_buffer = nullptr;
127   bool length_field_received = false;
128   int received_bytes = 0;
129   int remaining_bytes = 0;
130   int length_field = 0;
131   bool compressed = false;
132   char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES] = {};
133   char* payload_field = nullptr;
134   bool read_stream_closed = false;
135
136   /* vars for holding data destined for the application */
137   grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs;
138   grpc_slice_buffer read_slice_buffer;
139
140   /* vars for trailing metadata */
141   grpc_chttp2_incoming_metadata_buffer trailing_metadata;
142   bool trailing_metadata_valid = false;
143
144   /* vars for initial metadata */
145   grpc_chttp2_incoming_metadata_buffer initial_metadata;
146 };
147
148 struct write_state {
149   char* write_buffer = nullptr;
150 };
151
152 /* track state of one stream op */
153 struct op_state {
154   explicit op_state(grpc_core::Arena* arena) : rs(arena) {}
155
156   bool state_op_done[OP_NUM_OPS] = {};
157   bool state_callback_received[OP_NUM_OPS] = {};
158   /* A non-zero gRPC status code has been seen */
159   bool fail_state = false;
160   /* Transport is discarding all buffered messages */
161   bool flush_read = false;
162   bool flush_cronet_when_ready = false;
163   bool pending_write_for_trailer = false;
164   bool pending_send_message = false;
165   /* User requested RECV_TRAILING_METADATA */
166   bool pending_recv_trailing_metadata = false;
167   cronet_net_error_code net_error = OK;
168   grpc_error_handle cancel_error = GRPC_ERROR_NONE;
169   /* data structure for storing data coming from server */
170   struct read_state rs;
171   /* data structure for storing data going to the server */
172   struct write_state ws;
173 };
174
175 struct stream_obj;
176
177 struct op_and_state {
178   op_and_state(stream_obj* s, const grpc_transport_stream_op_batch& op);
179
180   grpc_transport_stream_op_batch op;
181   struct op_state state;
182   bool done = false;
183   struct stream_obj* s; /* Pointer back to the stream object */
184   /* next op_and_state in the linked list */
185   struct op_and_state* next = nullptr;
186 };
187
188 struct op_storage {
189   int num_pending_ops = 0;
190   struct op_and_state* head = nullptr;
191 };
192
193 struct stream_obj {
194   stream_obj(grpc_transport* gt, grpc_stream* gs,
195              grpc_stream_refcount* refcount, grpc_core::Arena* arena);
196   ~stream_obj();
197
198   grpc_core::Arena* arena;
199   struct op_and_state* oas = nullptr;
200   grpc_transport_stream_op_batch* curr_op = nullptr;
201   grpc_cronet_transport* curr_ct;
202   grpc_stream* curr_gs;
203   bidirectional_stream* cbs = nullptr;
204   bidirectional_stream_header_array header_array =
205       bidirectional_stream_header_array();  // Zero-initialize the structure.
206
207   /* Stream level state. Some state will be tracked both at stream and stream_op
208    * level */
209   struct op_state state;
210
211   /* OP storage */
212   struct op_storage storage;
213
214   /* Mutex to protect storage */
215   gpr_mu mu;
216
217   /* Refcount object of the stream */
218   grpc_stream_refcount* refcount;
219 };
220
221 #ifndef NDEBUG
222 #define GRPC_CRONET_STREAM_REF(stream, reason) \
223   grpc_cronet_stream_ref((stream), (reason))
224 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
225   grpc_cronet_stream_unref((stream), (reason))
226 void grpc_cronet_stream_ref(stream_obj* s, const char* reason) {
227   grpc_stream_ref(s->refcount, reason);
228 }
229 void grpc_cronet_stream_unref(stream_obj* s, const char* reason) {
230   grpc_stream_unref(s->refcount, reason);
231 }
232 #else
233 #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
234 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
235   grpc_cronet_stream_unref((stream))
236 void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); }
237 void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); }
238 #endif
239
240 static enum e_op_result execute_stream_op(struct op_and_state* oas);
241
242 /*
243   Utility function to translate enum into string for printing
244 */
245 static const char* op_result_string(enum e_op_result i) {
246   switch (i) {
247     case ACTION_TAKEN_WITH_CALLBACK:
248       return "ACTION_TAKEN_WITH_CALLBACK";
249     case ACTION_TAKEN_NO_CALLBACK:
250       return "ACTION_TAKEN_NO_CALLBACK";
251     case NO_ACTION_POSSIBLE:
252       return "NO_ACTION_POSSIBLE";
253   }
254   GPR_UNREACHABLE_CODE(return "UNKNOWN");
255 }
256
257 static const char* op_id_string(enum e_op_id i) {
258   switch (i) {
259     case OP_SEND_INITIAL_METADATA:
260       return "OP_SEND_INITIAL_METADATA";
261     case OP_SEND_MESSAGE:
262       return "OP_SEND_MESSAGE";
263     case OP_SEND_TRAILING_METADATA:
264       return "OP_SEND_TRAILING_METADATA";
265     case OP_RECV_MESSAGE:
266       return "OP_RECV_MESSAGE";
267     case OP_RECV_INITIAL_METADATA:
268       return "OP_RECV_INITIAL_METADATA";
269     case OP_RECV_TRAILING_METADATA:
270       return "OP_RECV_TRAILING_METADATA";
271     case OP_CANCEL_ERROR:
272       return "OP_CANCEL_ERROR";
273     case OP_ON_COMPLETE:
274       return "OP_ON_COMPLETE";
275     case OP_FAILED:
276       return "OP_FAILED";
277     case OP_SUCCEEDED:
278       return "OP_SUCCEEDED";
279     case OP_CANCELED:
280       return "OP_CANCELED";
281     case OP_RECV_MESSAGE_AND_ON_COMPLETE:
282       return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
283     case OP_READ_REQ_MADE:
284       return "OP_READ_REQ_MADE";
285     case OP_NUM_OPS:
286       return "OP_NUM_OPS";
287   }
288   return "UNKNOWN";
289 }
290
291 static void null_and_maybe_free_read_buffer(stream_obj* s) {
292   if (s->state.rs.read_buffer &&
293       s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
294     gpr_free(s->state.rs.read_buffer);
295   }
296   s->state.rs.read_buffer = nullptr;
297 }
298
299 static void read_grpc_header(stream_obj* s) {
300   s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
301   s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
302   s->state.rs.received_bytes = 0;
303   s->state.rs.compressed = false;
304   CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
305   bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
306                             s->state.rs.remaining_bytes);
307 }
308
309 static grpc_error_handle make_error_with_desc(int error_code,
310                                               int cronet_internal_error_code,
311                                               const char* desc) {
312   return grpc_error_set_int(GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrFormat(
313                                 "Cronet error code:%d, Cronet error detail:%s",
314                                 cronet_internal_error_code, desc)),
315                             GRPC_ERROR_INT_GRPC_STATUS, error_code);
316 }
317
318 inline op_and_state::op_and_state(stream_obj* s,
319                                   const grpc_transport_stream_op_batch& op)
320     : op(op), state(s->arena), s(s) {}
321
322 /*
323   Add a new stream op to op storage.
324 */
325 static void add_to_storage(struct stream_obj* s,
326                            grpc_transport_stream_op_batch* op) {
327   struct op_storage* storage = &s->storage;
328   /* add new op at the beginning of the linked list. The memory is freed
329   in remove_from_storage */
330   op_and_state* new_op = new op_and_state(s, *op);
331   gpr_mu_lock(&s->mu);
332   new_op->next = storage->head;
333   storage->head = new_op;
334   storage->num_pending_ops++;
335   if (op->send_message) {
336     s->state.pending_send_message = true;
337   }
338   if (op->recv_trailing_metadata) {
339     s->state.pending_recv_trailing_metadata = true;
340   }
341   CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
342              storage->num_pending_ops);
343   gpr_mu_unlock(&s->mu);
344 }
345
346 /*
347   Traverse the linked list and delete op and free memory
348 */
349 static void remove_from_storage(struct stream_obj* s,
350                                 struct op_and_state* oas) {
351   struct op_and_state* curr;
352   if (s->storage.head == nullptr || oas == nullptr) {
353     return;
354   }
355   if (s->storage.head == oas) {
356     s->storage.head = oas->next;
357     delete oas;
358     s->storage.num_pending_ops--;
359     CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
360                s->storage.num_pending_ops);
361   } else {
362     for (curr = s->storage.head; curr != nullptr; curr = curr->next) {
363       if (curr->next == oas) {
364         curr->next = oas->next;
365         s->storage.num_pending_ops--;
366         CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
367                    s->storage.num_pending_ops);
368         delete oas;
369         break;
370       } else if (GPR_UNLIKELY(curr->next == nullptr)) {
371         CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
372       }
373     }
374   }
375 }
376
377 /*
378   Cycle through ops and try to take next action. Break when either
379   an action with callback is taken, or no action is possible.
380   This can get executed from the Cronet network thread via cronet callback
381   or on the application supplied thread via the perform_stream_op function.
382 */
383 static void execute_from_storage(stream_obj* s) {
384   gpr_mu_lock(&s->mu);
385   for (struct op_and_state* curr = s->storage.head; curr != nullptr;) {
386     CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
387     GPR_ASSERT(!curr->done);
388     enum e_op_result result = execute_stream_op(curr);
389     CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
390                op_result_string(result));
391     /* if this op is done, then remove it and free memory */
392     if (curr->done) {
393       struct op_and_state* next = curr->next;
394       remove_from_storage(s, curr);
395       curr = next;
396     } else if (result == NO_ACTION_POSSIBLE) {
397       curr = curr->next;
398     } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
399       /* wait for the callback */
400       break;
401     } /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
402   }
403   gpr_mu_unlock(&s->mu);
404 }
405
406 static void convert_cronet_array_to_metadata(
407     const bidirectional_stream_header_array* header_array,
408     grpc_chttp2_incoming_metadata_buffer* mds) {
409   for (size_t i = 0; i < header_array->count; i++) {
410     CRONET_LOG(GPR_DEBUG, "header key=%s, value=%s",
411                header_array->headers[i].key, header_array->headers[i].value);
412     grpc_slice key = grpc_slice_intern(
413         grpc_slice_from_static_string(header_array->headers[i].key));
414     grpc_slice value;
415     if (grpc_is_refcounted_slice_binary_header(key)) {
416       value = grpc_slice_from_static_string(header_array->headers[i].value);
417       value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length(
418           value, grpc_chttp2_base64_infer_length_after_decode(value)));
419     } else {
420       value = grpc_slice_intern(
421           grpc_slice_from_static_string(header_array->headers[i].value));
422     }
423     GRPC_LOG_IF_ERROR("convert_cronet_array_to_metadata",
424                       grpc_chttp2_incoming_metadata_buffer_add(
425                           mds, grpc_mdelem_from_slices(key, value)));
426   }
427 }
428
429 /*
430   Cronet callback
431 */
432 static void on_failed(bidirectional_stream* stream, int net_error) {
433   gpr_log(GPR_ERROR, "on_failed(%p, %d)", stream, net_error);
434   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
435   grpc_core::ExecCtx exec_ctx;
436
437   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
438   gpr_mu_lock(&s->mu);
439   bidirectional_stream_destroy(s->cbs);
440   s->state.state_callback_received[OP_FAILED] = true;
441   s->state.net_error = static_cast<cronet_net_error_code>(net_error);
442   s->cbs = nullptr;
443   if (s->header_array.headers) {
444     gpr_free(s->header_array.headers);
445     s->header_array.headers = nullptr;
446   }
447   if (s->state.ws.write_buffer) {
448     gpr_free(s->state.ws.write_buffer);
449     s->state.ws.write_buffer = nullptr;
450   }
451   null_and_maybe_free_read_buffer(s);
452   gpr_mu_unlock(&s->mu);
453   execute_from_storage(s);
454   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
455 }
456
457 /*
458   Cronet callback
459 */
460 static void on_canceled(bidirectional_stream* stream) {
461   CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
462   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
463   grpc_core::ExecCtx exec_ctx;
464
465   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
466   gpr_mu_lock(&s->mu);
467   bidirectional_stream_destroy(s->cbs);
468   s->state.state_callback_received[OP_CANCELED] = true;
469   s->cbs = nullptr;
470   if (s->header_array.headers) {
471     gpr_free(s->header_array.headers);
472     s->header_array.headers = nullptr;
473   }
474   if (s->state.ws.write_buffer) {
475     gpr_free(s->state.ws.write_buffer);
476     s->state.ws.write_buffer = nullptr;
477   }
478   null_and_maybe_free_read_buffer(s);
479   gpr_mu_unlock(&s->mu);
480   execute_from_storage(s);
481   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
482 }
483
484 /*
485   Cronet callback
486 */
487 static void on_succeeded(bidirectional_stream* stream) {
488   CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
489   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
490   grpc_core::ExecCtx exec_ctx;
491
492   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
493   gpr_mu_lock(&s->mu);
494   bidirectional_stream_destroy(s->cbs);
495   s->state.state_callback_received[OP_SUCCEEDED] = true;
496   s->cbs = nullptr;
497   null_and_maybe_free_read_buffer(s);
498   gpr_mu_unlock(&s->mu);
499   execute_from_storage(s);
500   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
501 }
502
503 /*
504   Cronet callback
505 */
506 static void on_stream_ready(bidirectional_stream* stream) {
507   CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
508   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
509   grpc_core::ExecCtx exec_ctx;
510   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
511   grpc_cronet_transport* t = s->curr_ct;
512   gpr_mu_lock(&s->mu);
513   s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
514   s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
515   /* Free the memory allocated for headers */
516   if (s->header_array.headers) {
517     gpr_free(s->header_array.headers);
518     s->header_array.headers = nullptr;
519   }
520   /* Send the initial metadata on wire if there is no SEND_MESSAGE or
521    * SEND_TRAILING_METADATA ops pending */
522   if (t->use_packet_coalescing) {
523     if (s->state.flush_cronet_when_ready) {
524       CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
525       bidirectional_stream_flush(stream);
526     }
527   }
528   gpr_mu_unlock(&s->mu);
529   execute_from_storage(s);
530 }
531
532 /*
533   Cronet callback
534 */
535 static void on_response_headers_received(
536     bidirectional_stream* stream,
537     const bidirectional_stream_header_array* headers,
538     const char* negotiated_protocol) {
539   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
540   grpc_core::ExecCtx exec_ctx;
541   CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
542              headers, negotiated_protocol);
543   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
544
545   /* Identify if this is a header or a trailer (in a trailer-only response case)
546    */
547   for (size_t i = 0; i < headers->count; i++) {
548     if (0 == strcmp("grpc-status", headers->headers[i].key)) {
549       on_response_trailers_received(stream, headers);
550
551       /* Do an extra read for a trailer-only stream to trigger on_succeeded()
552        * callback */
553       read_grpc_header(s);
554       return;
555     }
556   }
557
558   gpr_mu_lock(&s->mu);
559   convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata);
560   s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
561   if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
562         s->state.state_callback_received[OP_FAILED])) {
563     /* Do an extra read to trigger on_succeeded() callback in case connection
564      is closed */
565     GPR_ASSERT(s->state.rs.length_field_received == false);
566     read_grpc_header(s);
567   }
568   gpr_mu_unlock(&s->mu);
569   execute_from_storage(s);
570 }
571
572 /*
573   Cronet callback
574 */
575 static void on_write_completed(bidirectional_stream* stream, const char* data) {
576   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
577   grpc_core::ExecCtx exec_ctx;
578   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
579   CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
580   gpr_mu_lock(&s->mu);
581   if (s->state.ws.write_buffer) {
582     gpr_free(s->state.ws.write_buffer);
583     s->state.ws.write_buffer = nullptr;
584   }
585   s->state.state_callback_received[OP_SEND_MESSAGE] = true;
586   gpr_mu_unlock(&s->mu);
587   execute_from_storage(s);
588 }
589
590 /*
591   Cronet callback
592 */
593 static void on_read_completed(bidirectional_stream* stream, char* data,
594                               int count) {
595   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
596   grpc_core::ExecCtx exec_ctx;
597   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
598   CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
599              count);
600   gpr_mu_lock(&s->mu);
601   s->state.state_callback_received[OP_RECV_MESSAGE] = true;
602   if (count > 0 && s->state.flush_read) {
603     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
604     bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
605                               GRPC_FLUSH_READ_SIZE);
606     gpr_mu_unlock(&s->mu);
607   } else if (count > 0) {
608     s->state.rs.received_bytes += count;
609     s->state.rs.remaining_bytes -= count;
610     if (s->state.rs.remaining_bytes > 0) {
611       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
612       s->state.state_op_done[OP_READ_REQ_MADE] = true;
613       bidirectional_stream_read(
614           s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
615           s->state.rs.remaining_bytes);
616       gpr_mu_unlock(&s->mu);
617     } else {
618       gpr_mu_unlock(&s->mu);
619       execute_from_storage(s);
620     }
621   } else {
622     null_and_maybe_free_read_buffer(s);
623     s->state.rs.read_stream_closed = true;
624     gpr_mu_unlock(&s->mu);
625     execute_from_storage(s);
626   }
627 }
628
629 /*
630   Cronet callback
631 */
632 static void on_response_trailers_received(
633     bidirectional_stream* stream,
634     const bidirectional_stream_header_array* trailers) {
635   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
636   grpc_core::ExecCtx exec_ctx;
637   CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
638              trailers);
639   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
640   grpc_cronet_transport* t = s->curr_ct;
641   gpr_mu_lock(&s->mu);
642   s->state.rs.trailing_metadata_valid = false;
643   convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata);
644   if (trailers->count > 0) {
645     s->state.rs.trailing_metadata_valid = true;
646   }
647   s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
648   /* Send a EOS when server terminates the stream (testServerFinishesRequest) to
649    * trigger on_succeeded */
650   if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] &&
651       !(s->state.state_op_done[OP_CANCEL_ERROR] ||
652         s->state.state_callback_received[OP_FAILED])) {
653     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
654     s->state.state_callback_received[OP_SEND_MESSAGE] = false;
655     bidirectional_stream_write(s->cbs, "", 0, true);
656     if (t->use_packet_coalescing) {
657       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
658       bidirectional_stream_flush(s->cbs);
659     }
660     s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
661
662     gpr_mu_unlock(&s->mu);
663   } else {
664     gpr_mu_unlock(&s->mu);
665     execute_from_storage(s);
666   }
667 }
668
669 /*
670  Utility function that takes the data from s->write_slice_buffer and assembles
671  into a contiguous byte stream with 5 byte gRPC header prepended.
672 */
673 static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer,
674                               char** pp_write_buffer,
675                               size_t* p_write_buffer_size, uint32_t flags) {
676   size_t length = write_slice_buffer->length;
677   *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
678   /* This is freed in the on_write_completed callback */
679   char* write_buffer =
680       static_cast<char*>(gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES));
681   *pp_write_buffer = write_buffer;
682   uint8_t* p = reinterpret_cast<uint8_t*>(write_buffer);
683   /* Append 5 byte header */
684   /* Compressed flag */
685   *p++ = static_cast<uint8_t>((flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0);
686   /* Message length */
687   *p++ = static_cast<uint8_t>(length >> 24);
688   *p++ = static_cast<uint8_t>(length >> 16);
689   *p++ = static_cast<uint8_t>(length >> 8);
690   *p++ = static_cast<uint8_t>(length);
691   /* append actual data */
692   size_t offset = 0;
693   for (size_t i = 0; i < write_slice_buffer->count; ++i) {
694     memcpy(p + offset, GRPC_SLICE_START_PTR(write_slice_buffer->slices[i]),
695            GRPC_SLICE_LENGTH(write_slice_buffer->slices[i]));
696     offset += GRPC_SLICE_LENGTH(write_slice_buffer->slices[i]);
697   }
698 }
699
700 /*
701  Convert metadata in a format that Cronet can consume
702 */
703 static void convert_metadata_to_cronet_headers(
704     grpc_metadata_batch* metadata, const char* host, std::string* pp_url,
705     bidirectional_stream_header** pp_headers, size_t* p_num_headers,
706     const char** method) {
707   grpc_linked_mdelem* curr = metadata->list.head;
708   /* Walk the linked list and get number of header fields */
709   size_t num_headers_available = 0;
710   while (curr != nullptr) {
711     curr = curr->next;
712     num_headers_available++;
713   }
714   grpc_millis deadline = metadata->deadline;
715   if (deadline != GRPC_MILLIS_INF_FUTURE) {
716     num_headers_available++;
717   }
718   /* Allocate enough memory. It is freed in the on_stream_ready callback
719    */
720   bidirectional_stream_header* headers =
721       static_cast<bidirectional_stream_header*>(gpr_malloc(
722           sizeof(bidirectional_stream_header) * num_headers_available));
723   *pp_headers = headers;
724
725   /* Walk the linked list again, this time copying the header fields.
726     s->num_headers can be less than num_headers_available, as some headers
727     are not used for cronet.
728     TODO (makdharma): Eliminate need to traverse the LL second time for perf.
729    */
730   curr = metadata->list.head;
731   size_t num_headers = 0;
732   while (num_headers < num_headers_available) {
733     grpc_mdelem mdelem = curr->md;
734     curr = curr->next;
735     char* key = grpc_slice_to_c_string(GRPC_MDKEY(mdelem));
736     char* value;
737     if (grpc_is_binary_header_internal(GRPC_MDKEY(mdelem))) {
738       grpc_slice wire_value = grpc_chttp2_base64_encode(GRPC_MDVALUE(mdelem));
739       value = grpc_slice_to_c_string(wire_value);
740       grpc_slice_unref_internal(wire_value);
741     } else {
742       value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem));
743     }
744     if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_SCHEME) ||
745         grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem),
746                                       GRPC_MDSTR_AUTHORITY)) {
747       /* Cronet populates these fields on its own */
748       gpr_free(key);
749       gpr_free(value);
750       continue;
751     }
752     if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_METHOD)) {
753       if (grpc_slice_eq_static_interned(GRPC_MDVALUE(mdelem), GRPC_MDSTR_PUT)) {
754         *method = "PUT";
755       } else {
756         /* POST method in default*/
757         *method = "POST";
758       }
759       gpr_free(key);
760       gpr_free(value);
761       continue;
762     }
763     if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_PATH)) {
764       /* Create URL by appending :path value to the hostname */
765       *pp_url = absl::StrCat("https://", host, value);
766       gpr_free(key);
767       gpr_free(value);
768       continue;
769     }
770     CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
771     headers[num_headers].key = key;
772     headers[num_headers].value = value;
773     num_headers++;
774     if (curr == nullptr) {
775       break;
776     }
777   }
778   if (deadline != GRPC_MILLIS_INF_FUTURE) {
779     char* key = grpc_slice_to_c_string(GRPC_MDSTR_GRPC_TIMEOUT);
780     char* value =
781         static_cast<char*>(gpr_malloc(GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE));
782     grpc_http2_encode_timeout(deadline - grpc_core::ExecCtx::Get()->Now(),
783                               value);
784     headers[num_headers].key = key;
785     headers[num_headers].value = value;
786
787     num_headers++;
788   }
789
790   *p_num_headers = num_headers;
791 }
792
793 static void parse_grpc_header(const uint8_t* data, int* length,
794                               bool* compressed) {
795   const uint8_t c = *data;
796   const uint8_t* p = data + 1;
797   *compressed = ((c & 0x01) == 0x01);
798   *length = 0;
799   *length |= (*p++) << 24;
800   *length |= (*p++) << 16;
801   *length |= (*p++) << 8;
802   *length |= (*p++);
803 }
804
805 static bool header_has_authority(grpc_linked_mdelem* head) {
806   while (head != nullptr) {
807     if (grpc_slice_eq_static_interned(GRPC_MDKEY(head->md),
808                                       GRPC_MDSTR_AUTHORITY)) {
809       return true;
810     }
811     head = head->next;
812   }
813   return false;
814 }
815
816 /*
817   Op Execution: Decide if one of the actions contained in the stream op can be
818   executed. This is the heart of the state machine.
819 */
820 static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
821                           struct stream_obj* s, struct op_state* op_state,
822                           enum e_op_id op_id) {
823   struct op_state* stream_state = &s->state;
824   grpc_cronet_transport* t = s->curr_ct;
825   bool result = true;
826   /* When call is canceled, every op can be run, except under following
827   conditions
828   */
829   bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
830                                stream_state->state_callback_received[OP_FAILED];
831   if (is_canceled_or_failed) {
832     if (op_id == OP_SEND_INITIAL_METADATA) {
833       CRONET_LOG(GPR_DEBUG, "Because");
834       result = false;
835     }
836     if (op_id == OP_SEND_MESSAGE) {
837       CRONET_LOG(GPR_DEBUG, "Because");
838       result = false;
839     }
840     if (op_id == OP_SEND_TRAILING_METADATA) {
841       CRONET_LOG(GPR_DEBUG, "Because");
842       result = false;
843     }
844     if (op_id == OP_CANCEL_ERROR) {
845       CRONET_LOG(GPR_DEBUG, "Because");
846       result = false;
847     }
848     /* already executed */
849     if (op_id == OP_RECV_INITIAL_METADATA &&
850         stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
851       CRONET_LOG(GPR_DEBUG, "Because");
852       result = false;
853     }
854     if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) {
855       CRONET_LOG(GPR_DEBUG, "Because");
856       result = false;
857     }
858     if (op_id == OP_RECV_TRAILING_METADATA &&
859         stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
860       CRONET_LOG(GPR_DEBUG, "Because");
861       result = false;
862     }
863     /* ON_COMPLETE can be processed if one of the following conditions is met:
864      * 1. the stream failed
865      * 2. the stream is cancelled, and the callback is received
866      * 3. the stream succeeded before cancel is effective
867      * 4. the stream is cancelled, and the stream is never started */
868     if (op_id == OP_ON_COMPLETE &&
869         !(stream_state->state_callback_received[OP_FAILED] ||
870           stream_state->state_callback_received[OP_CANCELED] ||
871           stream_state->state_callback_received[OP_SUCCEEDED] ||
872           !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
873       CRONET_LOG(GPR_DEBUG, "Because");
874       result = false;
875     }
876   } else if (op_id == OP_SEND_INITIAL_METADATA) {
877     /* already executed */
878     if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
879   } else if (op_id == OP_RECV_INITIAL_METADATA) {
880     if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
881       /* already executed */
882       result = false;
883     } else if (!stream_state
884                     ->state_callback_received[OP_SEND_INITIAL_METADATA]) {
885       /* we haven't sent headers yet. */
886       result = false;
887     } else if (!stream_state
888                     ->state_callback_received[OP_RECV_INITIAL_METADATA] &&
889                !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
890       /* we haven't received headers yet. */
891       result = false;
892     }
893   } else if (op_id == OP_SEND_MESSAGE) {
894     if (op_state->state_op_done[OP_SEND_MESSAGE]) {
895       /* already executed (note we're checking op specific state, not stream
896          state) */
897       result = false;
898     } else if (!stream_state
899                     ->state_callback_received[OP_SEND_INITIAL_METADATA]) {
900       /* we haven't sent headers yet. */
901       result = false;
902     }
903   } else if (op_id == OP_RECV_MESSAGE) {
904     if (op_state->state_op_done[OP_RECV_MESSAGE]) {
905       /* already executed */
906       result = false;
907     } else if (!stream_state
908                     ->state_callback_received[OP_RECV_INITIAL_METADATA] &&
909                !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
910       /* we haven't received headers yet. */
911       result = false;
912     }
913   } else if (op_id == OP_RECV_TRAILING_METADATA) {
914     if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
915       /* already executed */
916       result = false;
917     } else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
918                !stream_state->state_op_done[OP_RECV_MESSAGE]) {
919       /* we have asked for but haven't received message yet. */
920       result = false;
921     } else if (!stream_state
922                     ->state_callback_received[OP_RECV_TRAILING_METADATA]) {
923       /* we haven't received trailers  yet. */
924       result = false;
925     } else if (!stream_state->state_callback_received[OP_SUCCEEDED]) {
926       /* we haven't received on_succeeded  yet. */
927       result = false;
928     }
929   } else if (op_id == OP_SEND_TRAILING_METADATA) {
930     if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
931       /* already executed */
932       result = false;
933     } else if (!stream_state
934                     ->state_callback_received[OP_SEND_INITIAL_METADATA]) {
935       /* we haven't sent initial metadata yet */
936       result = false;
937     } else if (stream_state->pending_send_message &&
938                !stream_state->state_op_done[OP_SEND_MESSAGE]) {
939       /* we haven't sent message yet */
940       result = false;
941     } else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
942                !stream_state->state_callback_received[OP_SEND_MESSAGE] &&
943                !(t->use_packet_coalescing &&
944                  stream_state->pending_write_for_trailer)) {
945       /* we haven't got on_write_completed for the send yet */
946       result = false;
947     }
948   } else if (op_id == OP_CANCEL_ERROR) {
949     /* already executed */
950     if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
951   } else if (op_id == OP_ON_COMPLETE) {
952     if (op_state->state_op_done[OP_ON_COMPLETE]) {
953       /* already executed (note we're checking op specific state, not stream
954       state) */
955       CRONET_LOG(GPR_DEBUG, "Because");
956       result = false;
957     }
958     /* Check if every op that was asked for is done. */
959     /* TODO(muxi): We should not consider the recv ops here, since they
960      * have their own callbacks.  We should invoke a batch's on_complete
961      * as soon as all of the batch's send ops are complete, even if
962      * there are still recv ops pending. */
963     else if (curr_op->send_initial_metadata &&
964              !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
965       CRONET_LOG(GPR_DEBUG, "Because");
966       result = false;
967     } else if (curr_op->send_message &&
968                !op_state->state_op_done[OP_SEND_MESSAGE]) {
969       CRONET_LOG(GPR_DEBUG, "Because");
970       result = false;
971     } else if (curr_op->send_message &&
972                !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
973       CRONET_LOG(GPR_DEBUG, "Because");
974       result = false;
975     } else if (curr_op->send_trailing_metadata &&
976                !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
977       CRONET_LOG(GPR_DEBUG, "Because");
978       result = false;
979     } else if (curr_op->recv_initial_metadata &&
980                !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
981       CRONET_LOG(GPR_DEBUG, "Because");
982       result = false;
983     } else if (curr_op->recv_message &&
984                !op_state->state_op_done[OP_RECV_MESSAGE]) {
985       CRONET_LOG(GPR_DEBUG, "Because");
986       result = false;
987     } else if (curr_op->cancel_stream &&
988                !stream_state->state_callback_received[OP_CANCELED]) {
989       CRONET_LOG(GPR_DEBUG, "Because");
990       result = false;
991     } else if (curr_op->recv_trailing_metadata) {
992       /* We aren't done with trailing metadata yet */
993       if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
994         CRONET_LOG(GPR_DEBUG, "Because");
995         result = false;
996       }
997       /* We've asked for actual message in an earlier op, and it hasn't been
998         delivered yet. */
999       else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
1000         /* If this op is not the one asking for read, (which means some earlier
1001           op has asked), and the read hasn't been delivered. */
1002         if (!curr_op->recv_message &&
1003             !stream_state->state_callback_received[OP_SUCCEEDED]) {
1004           CRONET_LOG(GPR_DEBUG, "Because");
1005           result = false;
1006         }
1007       }
1008     }
1009     /* We should see at least one on_write_completed for the trailers that we
1010       sent */
1011     else if (curr_op->send_trailing_metadata &&
1012              !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
1013       result = false;
1014     }
1015   }
1016   CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
1017              result ? "YES" : "NO");
1018   return result;
1019 }
1020
1021 /*
1022   TODO (makdharma): Break down this function in smaller chunks for readability.
1023 */
1024 static enum e_op_result execute_stream_op(struct op_and_state* oas) {
1025   grpc_transport_stream_op_batch* stream_op = &oas->op;
1026   struct stream_obj* s = oas->s;
1027   grpc_cronet_transport* t = s->curr_ct;
1028   struct op_state* stream_state = &s->state;
1029   enum e_op_result result = NO_ACTION_POSSIBLE;
1030   if (stream_op->send_initial_metadata &&
1031       op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) {
1032     CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
1033     /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
1034      * on_failed */
1035     GPR_ASSERT(s->cbs == nullptr);
1036     GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
1037     s->cbs =
1038         bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks);
1039     CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
1040     if (t->use_packet_coalescing) {
1041       bidirectional_stream_disable_auto_flush(s->cbs, true);
1042       bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
1043     }
1044     std::string url;
1045     const char* method = "POST";
1046     s->header_array.headers = nullptr;
1047     convert_metadata_to_cronet_headers(
1048         stream_op->payload->send_initial_metadata.send_initial_metadata,
1049         t->host, &url, &s->header_array.headers, &s->header_array.count,
1050         &method);
1051     s->header_array.capacity = s->header_array.count;
1052     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs,
1053                url.c_str());
1054     bidirectional_stream_start(s->cbs, url.c_str(), 0, method, &s->header_array,
1055                                false);
1056     unsigned int header_index;
1057     for (header_index = 0; header_index < s->header_array.count;
1058          header_index++) {
1059       gpr_free(const_cast<char*>(s->header_array.headers[header_index].key));
1060       gpr_free(const_cast<char*>(s->header_array.headers[header_index].value));
1061     }
1062     stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
1063     if (t->use_packet_coalescing) {
1064       if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
1065         s->state.flush_cronet_when_ready = true;
1066       }
1067     }
1068     result = ACTION_TAKEN_WITH_CALLBACK;
1069   } else if (stream_op->send_message &&
1070              op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
1071     CRONET_LOG(GPR_DEBUG, "running: %p  OP_SEND_MESSAGE", oas);
1072     stream_state->pending_send_message = false;
1073     if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1074         stream_state->state_callback_received[OP_FAILED] ||
1075         stream_state->state_callback_received[OP_SUCCEEDED]) {
1076       result = NO_ACTION_POSSIBLE;
1077       CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1078     } else {
1079       grpc_slice_buffer write_slice_buffer;
1080       grpc_slice slice;
1081       grpc_slice_buffer_init(&write_slice_buffer);
1082       while (write_slice_buffer.length <
1083              stream_op->payload->send_message.send_message->length()) {
1084         /* TODO(roth): When we add support for incremental sending,this code
1085          * will need to be changed to support asynchronous delivery of the
1086          * send_message payload. */
1087         if (!stream_op->payload->send_message.send_message->Next(
1088                 stream_op->payload->send_message.send_message->length(),
1089                 nullptr)) {
1090           /* Should never reach here */
1091           GPR_ASSERT(false);
1092         }
1093         if (GRPC_ERROR_NONE !=
1094             stream_op->payload->send_message.send_message->Pull(&slice)) {
1095           /* Should never reach here */
1096           GPR_ASSERT(false);
1097         }
1098         grpc_slice_buffer_add(&write_slice_buffer, slice);
1099       }
1100       size_t write_buffer_size;
1101       create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
1102                         &write_buffer_size,
1103                         stream_op->payload->send_message.send_message->flags());
1104       if (write_buffer_size > 0) {
1105         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
1106                    stream_state->ws.write_buffer);
1107         stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1108         bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
1109                                    static_cast<int>(write_buffer_size), false);
1110         grpc_slice_buffer_destroy_internal(&write_slice_buffer);
1111         if (t->use_packet_coalescing) {
1112           if (!stream_op->send_trailing_metadata) {
1113             CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1114             bidirectional_stream_flush(s->cbs);
1115             result = ACTION_TAKEN_WITH_CALLBACK;
1116           } else {
1117             stream_state->pending_write_for_trailer = true;
1118             result = ACTION_TAKEN_NO_CALLBACK;
1119           }
1120         } else {
1121           result = ACTION_TAKEN_WITH_CALLBACK;
1122         }
1123       } else {
1124         /* Should never reach here */
1125         GPR_ASSERT(false);
1126       }
1127     }
1128     stream_state->state_op_done[OP_SEND_MESSAGE] = true;
1129     oas->state.state_op_done[OP_SEND_MESSAGE] = true;
1130     stream_op->payload->send_message.send_message.reset();
1131   } else if (stream_op->send_trailing_metadata &&
1132              op_can_be_run(stream_op, s, &oas->state,
1133                            OP_SEND_TRAILING_METADATA)) {
1134     CRONET_LOG(GPR_DEBUG, "running: %p  OP_SEND_TRAILING_METADATA", oas);
1135     if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1136         stream_state->state_callback_received[OP_FAILED] ||
1137         stream_state->state_callback_received[OP_SUCCEEDED]) {
1138       result = NO_ACTION_POSSIBLE;
1139       CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1140     } else {
1141       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
1142       stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1143       bidirectional_stream_write(s->cbs, "", 0, true);
1144       if (t->use_packet_coalescing) {
1145         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1146         bidirectional_stream_flush(s->cbs);
1147       }
1148       result = ACTION_TAKEN_WITH_CALLBACK;
1149     }
1150     stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
1151   } else if (stream_op->recv_initial_metadata &&
1152              op_can_be_run(stream_op, s, &oas->state,
1153                            OP_RECV_INITIAL_METADATA)) {
1154     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_INITIAL_METADATA", oas);
1155     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1156       grpc_core::ExecCtx::Run(
1157           DEBUG_LOCATION,
1158           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1159           GRPC_ERROR_NONE);
1160     } else if (stream_state->state_callback_received[OP_FAILED]) {
1161       grpc_core::ExecCtx::Run(
1162           DEBUG_LOCATION,
1163           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1164           GRPC_ERROR_NONE);
1165     } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1166       grpc_core::ExecCtx::Run(
1167           DEBUG_LOCATION,
1168           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1169           GRPC_ERROR_NONE);
1170     } else {
1171       grpc_chttp2_incoming_metadata_buffer_publish(
1172           &oas->s->state.rs.initial_metadata,
1173           stream_op->payload->recv_initial_metadata.recv_initial_metadata);
1174       grpc_core::ExecCtx::Run(
1175           DEBUG_LOCATION,
1176           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1177           GRPC_ERROR_NONE);
1178     }
1179     stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
1180     result = ACTION_TAKEN_NO_CALLBACK;
1181   } else if (stream_op->recv_message &&
1182              op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) {
1183     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_MESSAGE", oas);
1184     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1185       CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
1186       grpc_core::ExecCtx::Run(
1187           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1188           GRPC_ERROR_NONE);
1189       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1190       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1191       result = ACTION_TAKEN_NO_CALLBACK;
1192     } else if (stream_state->state_callback_received[OP_FAILED]) {
1193       CRONET_LOG(GPR_DEBUG, "Stream failed.");
1194       grpc_core::ExecCtx::Run(
1195           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1196           GRPC_ERROR_NONE);
1197       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1198       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1199       result = ACTION_TAKEN_NO_CALLBACK;
1200     } else if (stream_state->rs.read_stream_closed) {
1201       /* No more data will be received */
1202       CRONET_LOG(GPR_DEBUG, "read stream closed");
1203       grpc_core::ExecCtx::Run(
1204           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1205           GRPC_ERROR_NONE);
1206       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1207       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1208       result = ACTION_TAKEN_NO_CALLBACK;
1209     } else if (stream_state->flush_read) {
1210       CRONET_LOG(GPR_DEBUG, "flush read");
1211       grpc_core::ExecCtx::Run(
1212           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1213           GRPC_ERROR_NONE);
1214       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1215       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1216       result = ACTION_TAKEN_NO_CALLBACK;
1217     } else if (!stream_state->rs.length_field_received) {
1218       if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
1219           stream_state->rs.remaining_bytes == 0) {
1220         /* Start a read operation for data */
1221         stream_state->rs.length_field_received = true;
1222         parse_grpc_header(
1223             reinterpret_cast<const uint8_t*>(stream_state->rs.read_buffer),
1224             &stream_state->rs.length_field, &stream_state->rs.compressed);
1225         CRONET_LOG(GPR_DEBUG, "length field = %d",
1226                    stream_state->rs.length_field);
1227         if (stream_state->rs.length_field > 0) {
1228           stream_state->rs.read_buffer = static_cast<char*>(
1229               gpr_malloc(static_cast<size_t>(stream_state->rs.length_field)));
1230           GPR_ASSERT(stream_state->rs.read_buffer);
1231           stream_state->rs.remaining_bytes = stream_state->rs.length_field;
1232           stream_state->rs.received_bytes = 0;
1233           CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1234           stream_state->state_op_done[OP_READ_REQ_MADE] =
1235               true; /* Indicates that at least one read request has been made */
1236           bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1237                                     stream_state->rs.remaining_bytes);
1238           result = ACTION_TAKEN_WITH_CALLBACK;
1239         } else {
1240           stream_state->rs.remaining_bytes = 0;
1241           CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
1242           /* Clean up read_slice_buffer in case there is unread data. */
1243           grpc_slice_buffer_destroy_internal(
1244               &stream_state->rs.read_slice_buffer);
1245           grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1246           uint32_t flags = 0;
1247           if (stream_state->rs.compressed) {
1248             flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1249           }
1250           stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1251           stream_op->payload->recv_message.recv_message->reset(
1252               stream_state->rs.sbs.get());
1253           grpc_core::ExecCtx::Run(
1254               DEBUG_LOCATION,
1255               stream_op->payload->recv_message.recv_message_ready,
1256               GRPC_ERROR_NONE);
1257           stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1258           oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1259
1260           /* Extra read to trigger on_succeed */
1261           stream_state->rs.length_field_received = false;
1262           stream_state->state_op_done[OP_READ_REQ_MADE] =
1263               true; /* Indicates that at least one read request has been made */
1264           read_grpc_header(s);
1265           result = ACTION_TAKEN_NO_CALLBACK;
1266         }
1267       } else if (stream_state->rs.remaining_bytes == 0) {
1268         /* Start a read operation for first 5 bytes (GRPC header) */
1269         stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1270         stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1271         stream_state->rs.received_bytes = 0;
1272         stream_state->rs.compressed = false;
1273         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1274         stream_state->state_op_done[OP_READ_REQ_MADE] =
1275             true; /* Indicates that at least one read request has been made */
1276         bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1277                                   stream_state->rs.remaining_bytes);
1278         result = ACTION_TAKEN_WITH_CALLBACK;
1279       } else {
1280         result = NO_ACTION_POSSIBLE;
1281       }
1282     } else if (stream_state->rs.remaining_bytes == 0) {
1283       CRONET_LOG(GPR_DEBUG, "read operation complete");
1284       grpc_slice read_data_slice =
1285           GRPC_SLICE_MALLOC((uint32_t)stream_state->rs.length_field);
1286       uint8_t* dst_p = GRPC_SLICE_START_PTR(read_data_slice);
1287       memcpy(dst_p, stream_state->rs.read_buffer,
1288              static_cast<size_t>(stream_state->rs.length_field));
1289       null_and_maybe_free_read_buffer(s);
1290       /* Clean up read_slice_buffer in case there is unread data. */
1291       grpc_slice_buffer_destroy_internal(&stream_state->rs.read_slice_buffer);
1292       grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1293       grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
1294                             read_data_slice);
1295       uint32_t flags = 0;
1296       if (stream_state->rs.compressed) {
1297         flags = GRPC_WRITE_INTERNAL_COMPRESS;
1298       }
1299       stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1300       stream_op->payload->recv_message.recv_message->reset(
1301           stream_state->rs.sbs.get());
1302       grpc_core::ExecCtx::Run(
1303           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1304           GRPC_ERROR_NONE);
1305       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1306       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1307       /* Do an extra read to trigger on_succeeded() callback in case connection
1308          is closed */
1309       stream_state->rs.length_field_received = false;
1310       read_grpc_header(s);
1311       result = ACTION_TAKEN_NO_CALLBACK;
1312     }
1313   } else if (stream_op->recv_trailing_metadata &&
1314              op_can_be_run(stream_op, s, &oas->state,
1315                            OP_RECV_TRAILING_METADATA)) {
1316     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_TRAILING_METADATA", oas);
1317     grpc_error_handle error = GRPC_ERROR_NONE;
1318     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1319       error = GRPC_ERROR_REF(stream_state->cancel_error);
1320     } else if (stream_state->state_callback_received[OP_FAILED]) {
1321       grpc_status_code grpc_error_code =
1322           cronet_net_error_to_grpc_error(stream_state->net_error);
1323       const char* desc = cronet_net_error_as_string(stream_state->net_error);
1324       error =
1325           make_error_with_desc(grpc_error_code, stream_state->net_error, desc);
1326     } else if (oas->s->state.rs.trailing_metadata_valid) {
1327       grpc_chttp2_incoming_metadata_buffer_publish(
1328           &oas->s->state.rs.trailing_metadata,
1329           stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
1330       stream_state->rs.trailing_metadata_valid = false;
1331     }
1332     grpc_core::ExecCtx::Run(
1333         DEBUG_LOCATION,
1334         stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1335         error);
1336     stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
1337     result = ACTION_TAKEN_NO_CALLBACK;
1338   } else if (stream_op->cancel_stream &&
1339              op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
1340     CRONET_LOG(GPR_DEBUG, "running: %p  OP_CANCEL_ERROR", oas);
1341     if (s->cbs) {
1342       CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
1343       bidirectional_stream_cancel(s->cbs);
1344       result = ACTION_TAKEN_WITH_CALLBACK;
1345     } else {
1346       result = ACTION_TAKEN_NO_CALLBACK;
1347     }
1348     stream_state->state_op_done[OP_CANCEL_ERROR] = true;
1349     if (!stream_state->cancel_error) {
1350       stream_state->cancel_error =
1351           GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error);
1352     }
1353   } else if (op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
1354     CRONET_LOG(GPR_DEBUG, "running: %p  OP_ON_COMPLETE", oas);
1355     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1356       if (stream_op->on_complete) {
1357         grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1358                                 GRPC_ERROR_REF(stream_state->cancel_error));
1359       }
1360     } else if (stream_state->state_callback_received[OP_FAILED]) {
1361       if (stream_op->on_complete) {
1362         const char* error_message =
1363             cronet_net_error_as_string(stream_state->net_error);
1364         grpc_status_code grpc_error_code =
1365             cronet_net_error_to_grpc_error(stream_state->net_error);
1366         grpc_core::ExecCtx::Run(
1367             DEBUG_LOCATION, stream_op->on_complete,
1368             make_error_with_desc(grpc_error_code, stream_state->net_error,
1369                                  error_message));
1370       }
1371     } else {
1372       /* All actions in this stream_op are complete. Call the on_complete
1373        * callback
1374        */
1375       if (stream_op->on_complete) {
1376         grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1377                                 GRPC_ERROR_NONE);
1378       }
1379     }
1380     oas->state.state_op_done[OP_ON_COMPLETE] = true;
1381     oas->done = true;
1382     /* reset any send message state, only if this ON_COMPLETE is about a send.
1383      */
1384     if (stream_op->send_message) {
1385       stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1386       stream_state->state_op_done[OP_SEND_MESSAGE] = false;
1387     }
1388     result = ACTION_TAKEN_NO_CALLBACK;
1389     /* If this is the on_complete callback being called for a received message -
1390       make a note */
1391     if (stream_op->recv_message) {
1392       stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
1393     }
1394   } else {
1395     result = NO_ACTION_POSSIBLE;
1396   }
1397   return result;
1398 }
1399
1400 /*
1401   Functions used by upper layers to access transport functionality.
1402 */
1403
1404 inline stream_obj::stream_obj(grpc_transport* gt, grpc_stream* gs,
1405                               grpc_stream_refcount* refcount,
1406                               grpc_core::Arena* arena)
1407     : arena(arena),
1408       curr_ct(reinterpret_cast<grpc_cronet_transport*>(gt)),
1409       curr_gs(gs),
1410       state(arena),
1411       refcount(refcount) {
1412   GRPC_CRONET_STREAM_REF(this, "cronet transport");
1413   gpr_mu_init(&mu);
1414 }
1415
1416 inline stream_obj::~stream_obj() {
1417   null_and_maybe_free_read_buffer(this);
1418   /* Clean up read_slice_buffer in case there is unread data. */
1419   grpc_slice_buffer_destroy_internal(&state.rs.read_slice_buffer);
1420   GRPC_ERROR_UNREF(state.cancel_error);
1421 }
1422
1423 static int init_stream(grpc_transport* gt, grpc_stream* gs,
1424                        grpc_stream_refcount* refcount,
1425                        const void* /*server_data*/, grpc_core::Arena* arena) {
1426   new (gs) stream_obj(gt, gs, refcount, arena);
1427   return 0;
1428 }
1429
1430 static void set_pollset_do_nothing(grpc_transport* /*gt*/, grpc_stream* /*gs*/,
1431                                    grpc_pollset* /*pollset*/) {}
1432
1433 static void set_pollset_set_do_nothing(grpc_transport* /*gt*/,
1434                                        grpc_stream* /*gs*/,
1435                                        grpc_pollset_set* /*pollset_set*/) {}
1436
1437 static void perform_stream_op(grpc_transport* /*gt*/, grpc_stream* gs,
1438                               grpc_transport_stream_op_batch* op) {
1439   CRONET_LOG(GPR_DEBUG, "perform_stream_op");
1440   if (op->send_initial_metadata &&
1441       header_has_authority(op->payload->send_initial_metadata
1442                                .send_initial_metadata->list.head)) {
1443     /* Cronet does not support :authority header field. We cancel the call when
1444      this field is present in metadata */
1445     if (op->recv_initial_metadata) {
1446       grpc_core::ExecCtx::Run(
1447           DEBUG_LOCATION,
1448           op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1449           GRPC_ERROR_CANCELLED);
1450     }
1451     if (op->recv_message) {
1452       grpc_core::ExecCtx::Run(DEBUG_LOCATION,
1453                               op->payload->recv_message.recv_message_ready,
1454                               GRPC_ERROR_CANCELLED);
1455     }
1456     if (op->recv_trailing_metadata) {
1457       grpc_core::ExecCtx::Run(
1458           DEBUG_LOCATION,
1459           op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1460           GRPC_ERROR_CANCELLED);
1461     }
1462     grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
1463                             GRPC_ERROR_CANCELLED);
1464     return;
1465   }
1466   stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1467   add_to_storage(s, op);
1468   execute_from_storage(s);
1469 }
1470
1471 static void destroy_stream(grpc_transport* /*gt*/, grpc_stream* gs,
1472                            grpc_closure* then_schedule_closure) {
1473   stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1474   s->~stream_obj();
1475   grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
1476                           GRPC_ERROR_NONE);
1477 }
1478
1479 static void destroy_transport(grpc_transport* /*gt*/) {}
1480
1481 static grpc_endpoint* get_endpoint(grpc_transport* /*gt*/) { return nullptr; }
1482
1483 static void perform_op(grpc_transport* /*gt*/, grpc_transport_op* /*op*/) {}
1484
1485 static const grpc_transport_vtable grpc_cronet_vtable = {
1486     sizeof(stream_obj),
1487     "cronet_http",
1488     init_stream,
1489     set_pollset_do_nothing,
1490     set_pollset_set_do_nothing,
1491     perform_stream_op,
1492     perform_op,
1493     destroy_stream,
1494     destroy_transport,
1495     get_endpoint};
1496
1497 grpc_transport* grpc_create_cronet_transport(void* engine, const char* target,
1498                                              const grpc_channel_args* args,
1499                                              void* /*reserved*/) {
1500   grpc_cronet_transport* ct = static_cast<grpc_cronet_transport*>(
1501       gpr_malloc(sizeof(grpc_cronet_transport)));
1502   if (!ct) {
1503     goto error;
1504   }
1505   ct->base.vtable = &grpc_cronet_vtable;
1506   ct->engine = static_cast<stream_engine*>(engine);
1507   ct->host = static_cast<char*>(gpr_malloc(strlen(target) + 1));
1508   if (!ct->host) {
1509     goto error;
1510   }
1511   strcpy(ct->host, target);
1512
1513   ct->use_packet_coalescing = true;
1514   if (args) {
1515     for (size_t i = 0; i < args->num_args; i++) {
1516       if (0 ==
1517           strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) {
1518         if (GPR_UNLIKELY(args->args[i].type != GRPC_ARG_INTEGER)) {
1519           gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
1520                   GRPC_ARG_USE_CRONET_PACKET_COALESCING);
1521         } else {
1522           ct->use_packet_coalescing = (args->args[i].value.integer != 0);
1523         }
1524       }
1525     }
1526   }
1527
1528   return &ct->base;
1529
1530 error:
1531   if (ct) {
1532     if (ct->host) {
1533       gpr_free(ct->host);
1534     }
1535     gpr_free(ct);
1536   }
1537
1538   return nullptr;
1539 }