320b52972517bb86f537b879b2c70697933f7f28
[platform/upstream/grpc.git] / src / core / ext / transport / cronet / transport / cronet_transport.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <string.h>
22
23 #include <grpc/slice_buffer.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/string_util.h>
27
28 #include "src/core/ext/transport/chttp2/transport/bin_decoder.h"
29 #include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
30 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
31 #include "src/core/ext/transport/cronet/transport/cronet_transport.h"
32 #include "src/core/lib/debug/trace.h"
33 #include "src/core/lib/gpr/host_port.h"
34 #include "src/core/lib/gpr/string.h"
35 #include "src/core/lib/gprpp/manual_constructor.h"
36 #include "src/core/lib/iomgr/endpoint.h"
37 #include "src/core/lib/iomgr/exec_ctx.h"
38 #include "src/core/lib/slice/slice_internal.h"
39 #include "src/core/lib/slice/slice_string_helpers.h"
40 #include "src/core/lib/surface/channel.h"
41 #include "src/core/lib/transport/metadata_batch.h"
42 #include "src/core/lib/transport/static_metadata.h"
43 #include "src/core/lib/transport/transport_impl.h"
44 #include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
45
46 #define GRPC_HEADER_SIZE_IN_BYTES 5
47 #define GRPC_FLUSH_READ_SIZE 4096
48
49 grpc_core::TraceFlag grpc_cronet_trace(false, "cronet");
50 #define CRONET_LOG(...)                                    \
51   do {                                                     \
52     if (grpc_cronet_trace.enabled()) gpr_log(__VA_ARGS__); \
53   } while (0)
54
55 enum e_op_result {
56   ACTION_TAKEN_WITH_CALLBACK,
57   ACTION_TAKEN_NO_CALLBACK,
58   NO_ACTION_POSSIBLE
59 };
60
61 enum e_op_id {
62   OP_SEND_INITIAL_METADATA = 0,
63   OP_SEND_MESSAGE,
64   OP_SEND_TRAILING_METADATA,
65   OP_RECV_MESSAGE,
66   OP_RECV_INITIAL_METADATA,
67   OP_RECV_TRAILING_METADATA,
68   OP_CANCEL_ERROR,
69   OP_ON_COMPLETE,
70   OP_FAILED,
71   OP_SUCCEEDED,
72   OP_CANCELED,
73   OP_RECV_MESSAGE_AND_ON_COMPLETE,
74   OP_READ_REQ_MADE,
75   OP_NUM_OPS
76 };
77
78 /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
79
80 static void on_stream_ready(bidirectional_stream*);
81 static void on_response_headers_received(
82     bidirectional_stream*, const bidirectional_stream_header_array*,
83     const char*);
84 static void on_write_completed(bidirectional_stream*, const char*);
85 static void on_read_completed(bidirectional_stream*, char*, int);
86 static void on_response_trailers_received(
87     bidirectional_stream*, const bidirectional_stream_header_array*);
88 static void on_succeeded(bidirectional_stream*);
89 static void on_failed(bidirectional_stream*, int);
90 static void on_canceled(bidirectional_stream*);
91 static bidirectional_stream_callback cronet_callbacks = {
92     on_stream_ready,
93     on_response_headers_received,
94     on_read_completed,
95     on_write_completed,
96     on_response_trailers_received,
97     on_succeeded,
98     on_failed,
99     on_canceled};
100
101 /* Cronet transport object */
102 struct grpc_cronet_transport {
103   grpc_transport base; /* must be first element in this structure */
104   stream_engine* engine;
105   char* host;
106   bool use_packet_coalescing;
107 };
108 typedef struct grpc_cronet_transport grpc_cronet_transport;
109
110 /* TODO (makdharma): reorder structure for memory efficiency per
111    http://www.catb.org/esr/structure-packing/#_structure_reordering: */
112 struct read_state {
113   read_state(grpc_core::Arena* arena)
114       : trailing_metadata(arena), initial_metadata(arena) {
115     grpc_slice_buffer_init(&read_slice_buffer);
116   }
117
118   /* vars to store data coming from server */
119   char* read_buffer = nullptr;
120   bool length_field_received = false;
121   int received_bytes = 0;
122   int remaining_bytes = 0;
123   int length_field = 0;
124   bool compressed = 0;
125   char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES] = {};
126   char* payload_field = nullptr;
127   bool read_stream_closed = 0;
128
129   /* vars for holding data destined for the application */
130   grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs;
131   grpc_slice_buffer read_slice_buffer;
132
133   /* vars for trailing metadata */
134   grpc_chttp2_incoming_metadata_buffer trailing_metadata;
135   bool trailing_metadata_valid = false;
136
137   /* vars for initial metadata */
138   grpc_chttp2_incoming_metadata_buffer initial_metadata;
139 };
140
141 struct write_state {
142   char* write_buffer = nullptr;
143 };
144
145 /* track state of one stream op */
146 struct op_state {
147   op_state(grpc_core::Arena* arena) : rs(arena) {}
148
149   bool state_op_done[OP_NUM_OPS] = {};
150   bool state_callback_received[OP_NUM_OPS] = {};
151   /* A non-zero gRPC status code has been seen */
152   bool fail_state = false;
153   /* Transport is discarding all buffered messages */
154   bool flush_read = false;
155   bool flush_cronet_when_ready = false;
156   bool pending_write_for_trailer = false;
157   bool pending_send_message = false;
158   /* User requested RECV_TRAILING_METADATA */
159   bool pending_recv_trailing_metadata = false;
160   /* Cronet has not issued a callback of a bidirectional read */
161   bool pending_read_from_cronet = false;
162   grpc_error* cancel_error = GRPC_ERROR_NONE;
163   /* data structure for storing data coming from server */
164   struct read_state rs;
165   /* data structure for storing data going to the server */
166   struct write_state ws;
167 };
168
169 struct stream_obj;
170
171 struct op_and_state {
172   op_and_state(stream_obj* s, const grpc_transport_stream_op_batch& op);
173
174   grpc_transport_stream_op_batch op;
175   struct op_state state;
176   bool done = false;
177   struct stream_obj* s; /* Pointer back to the stream object */
178   /* next op_and_state in the linked list */
179   struct op_and_state* next = nullptr;
180 };
181
182 struct op_storage {
183   int num_pending_ops = 0;
184   struct op_and_state* head = nullptr;
185 };
186
187 struct stream_obj {
188   stream_obj(grpc_transport* gt, grpc_stream* gs,
189              grpc_stream_refcount* refcount, grpc_core::Arena* arena);
190   ~stream_obj();
191
192   grpc_core::Arena* arena;
193   struct op_and_state* oas = nullptr;
194   grpc_transport_stream_op_batch* curr_op = nullptr;
195   grpc_cronet_transport* curr_ct;
196   grpc_stream* curr_gs;
197   bidirectional_stream* cbs = nullptr;
198   bidirectional_stream_header_array header_array =
199       bidirectional_stream_header_array();  // Zero-initialize the structure.
200
201   /* Stream level state. Some state will be tracked both at stream and stream_op
202    * level */
203   struct op_state state;
204
205   /* OP storage */
206   struct op_storage storage;
207
208   /* Mutex to protect storage */
209   gpr_mu mu;
210
211   /* Refcount object of the stream */
212   grpc_stream_refcount* refcount;
213 };
214
215 #ifndef NDEBUG
216 #define GRPC_CRONET_STREAM_REF(stream, reason) \
217   grpc_cronet_stream_ref((stream), (reason))
218 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
219   grpc_cronet_stream_unref((stream), (reason))
220 void grpc_cronet_stream_ref(stream_obj* s, const char* reason) {
221   grpc_stream_ref(s->refcount, reason);
222 }
223 void grpc_cronet_stream_unref(stream_obj* s, const char* reason) {
224   grpc_stream_unref(s->refcount, reason);
225 }
226 #else
227 #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
228 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
229   grpc_cronet_stream_unref((stream))
230 void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); }
231 void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); }
232 #endif
233
234 static enum e_op_result execute_stream_op(struct op_and_state* oas);
235
236 /*
237   Utility function to translate enum into string for printing
238 */
239 static const char* op_result_string(enum e_op_result i) {
240   switch (i) {
241     case ACTION_TAKEN_WITH_CALLBACK:
242       return "ACTION_TAKEN_WITH_CALLBACK";
243     case ACTION_TAKEN_NO_CALLBACK:
244       return "ACTION_TAKEN_NO_CALLBACK";
245     case NO_ACTION_POSSIBLE:
246       return "NO_ACTION_POSSIBLE";
247   }
248   GPR_UNREACHABLE_CODE(return "UNKNOWN");
249 }
250
251 static const char* op_id_string(enum e_op_id i) {
252   switch (i) {
253     case OP_SEND_INITIAL_METADATA:
254       return "OP_SEND_INITIAL_METADATA";
255     case OP_SEND_MESSAGE:
256       return "OP_SEND_MESSAGE";
257     case OP_SEND_TRAILING_METADATA:
258       return "OP_SEND_TRAILING_METADATA";
259     case OP_RECV_MESSAGE:
260       return "OP_RECV_MESSAGE";
261     case OP_RECV_INITIAL_METADATA:
262       return "OP_RECV_INITIAL_METADATA";
263     case OP_RECV_TRAILING_METADATA:
264       return "OP_RECV_TRAILING_METADATA";
265     case OP_CANCEL_ERROR:
266       return "OP_CANCEL_ERROR";
267     case OP_ON_COMPLETE:
268       return "OP_ON_COMPLETE";
269     case OP_FAILED:
270       return "OP_FAILED";
271     case OP_SUCCEEDED:
272       return "OP_SUCCEEDED";
273     case OP_CANCELED:
274       return "OP_CANCELED";
275     case OP_RECV_MESSAGE_AND_ON_COMPLETE:
276       return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
277     case OP_READ_REQ_MADE:
278       return "OP_READ_REQ_MADE";
279     case OP_NUM_OPS:
280       return "OP_NUM_OPS";
281   }
282   return "UNKNOWN";
283 }
284
285 static void null_and_maybe_free_read_buffer(stream_obj* s) {
286   if (s->state.rs.read_buffer &&
287       s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
288     gpr_free(s->state.rs.read_buffer);
289   }
290   s->state.rs.read_buffer = nullptr;
291 }
292
293 static void maybe_flush_read(stream_obj* s) {
294   /* To enter flush read state (discarding all the buffered messages in
295    * transport layer), two conditions must be satisfied: 1) non-zero grpc status
296    * has been received, and 2) an op requesting the status code
297    * (RECV_TRAILING_METADATA) is issued by the user. (See
298    * doc/status_ordering.md) */
299   /* Whenever the evaluation of any of the two condition is changed, we check
300    * whether we should enter the flush read state. */
301   if (s->state.pending_recv_trailing_metadata && s->state.fail_state) {
302     if (!s->state.flush_read && !s->state.rs.read_stream_closed) {
303       CRONET_LOG(GPR_DEBUG, "%p: Flush read", s);
304       s->state.flush_read = true;
305       null_and_maybe_free_read_buffer(s);
306       s->state.rs.read_buffer =
307           static_cast<char*>(gpr_malloc(GRPC_FLUSH_READ_SIZE));
308       if (!s->state.pending_read_from_cronet) {
309         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
310         bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
311                                   GRPC_FLUSH_READ_SIZE);
312         s->state.pending_read_from_cronet = true;
313       }
314     }
315   }
316 }
317
318 static grpc_error* make_error_with_desc(int error_code, const char* desc) {
319   grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
320   error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, error_code);
321   return error;
322 }
323
324 inline op_and_state::op_and_state(stream_obj* s,
325                                   const grpc_transport_stream_op_batch& op)
326     : op(op), state(s->arena), s(s) {}
327
328 /*
329   Add a new stream op to op storage.
330 */
331 static void add_to_storage(struct stream_obj* s,
332                            grpc_transport_stream_op_batch* op) {
333   struct op_storage* storage = &s->storage;
334   /* add new op at the beginning of the linked list. The memory is freed
335   in remove_from_storage */
336   op_and_state* new_op = grpc_core::New<op_and_state>(s, *op);
337   gpr_mu_lock(&s->mu);
338   new_op->next = storage->head;
339   storage->head = new_op;
340   storage->num_pending_ops++;
341   if (op->send_message) {
342     s->state.pending_send_message = true;
343   }
344   if (op->recv_trailing_metadata) {
345     s->state.pending_recv_trailing_metadata = true;
346     maybe_flush_read(s);
347   }
348   CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
349              storage->num_pending_ops);
350   gpr_mu_unlock(&s->mu);
351 }
352
353 /*
354   Traverse the linked list and delete op and free memory
355 */
356 static void remove_from_storage(struct stream_obj* s,
357                                 struct op_and_state* oas) {
358   struct op_and_state* curr;
359   if (s->storage.head == nullptr || oas == nullptr) {
360     return;
361   }
362   if (s->storage.head == oas) {
363     s->storage.head = oas->next;
364     grpc_core::Delete(oas);
365     s->storage.num_pending_ops--;
366     CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
367                s->storage.num_pending_ops);
368   } else {
369     for (curr = s->storage.head; curr != nullptr; curr = curr->next) {
370       if (curr->next == oas) {
371         curr->next = oas->next;
372         s->storage.num_pending_ops--;
373         CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
374                    s->storage.num_pending_ops);
375         grpc_core::Delete(oas);
376         break;
377       } else if (GPR_UNLIKELY(curr->next == nullptr)) {
378         CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
379       }
380     }
381   }
382 }
383
384 /*
385   Cycle through ops and try to take next action. Break when either
386   an action with callback is taken, or no action is possible.
387   This can get executed from the Cronet network thread via cronet callback
388   or on the application supplied thread via the perform_stream_op function.
389 */
390 static void execute_from_storage(stream_obj* s) {
391   gpr_mu_lock(&s->mu);
392   for (struct op_and_state* curr = s->storage.head; curr != nullptr;) {
393     CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
394     GPR_ASSERT(!curr->done);
395     enum e_op_result result = execute_stream_op(curr);
396     CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
397                op_result_string(result));
398     /* if this op is done, then remove it and free memory */
399     if (curr->done) {
400       struct op_and_state* next = curr->next;
401       remove_from_storage(s, curr);
402       curr = next;
403     } else if (result == NO_ACTION_POSSIBLE) {
404       curr = curr->next;
405     } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
406       /* wait for the callback */
407       break;
408     } /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
409   }
410   gpr_mu_unlock(&s->mu);
411 }
412
413 static void convert_cronet_array_to_metadata(
414     const bidirectional_stream_header_array* header_array,
415     grpc_chttp2_incoming_metadata_buffer* mds) {
416   for (size_t i = 0; i < header_array->count; i++) {
417     CRONET_LOG(GPR_DEBUG, "header key=%s, value=%s",
418                header_array->headers[i].key, header_array->headers[i].value);
419     grpc_slice key = grpc_slice_intern(
420         grpc_slice_from_static_string(header_array->headers[i].key));
421     grpc_slice value;
422     if (grpc_is_binary_header(key)) {
423       value = grpc_slice_from_static_string(header_array->headers[i].value);
424       value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length(
425           value, grpc_chttp2_base64_infer_length_after_decode(value)));
426     } else {
427       value = grpc_slice_intern(
428           grpc_slice_from_static_string(header_array->headers[i].value));
429     }
430     GRPC_LOG_IF_ERROR("convert_cronet_array_to_metadata",
431                       grpc_chttp2_incoming_metadata_buffer_add(
432                           mds, grpc_mdelem_from_slices(key, value)));
433   }
434 }
435
436 /*
437   Cronet callback
438 */
439 static void on_failed(bidirectional_stream* stream, int net_error) {
440   gpr_log(GPR_ERROR, "on_failed(%p, %d)", stream, net_error);
441   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
442   grpc_core::ExecCtx exec_ctx;
443
444   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
445   gpr_mu_lock(&s->mu);
446   bidirectional_stream_destroy(s->cbs);
447   s->state.state_callback_received[OP_FAILED] = true;
448   s->cbs = nullptr;
449   if (s->header_array.headers) {
450     gpr_free(s->header_array.headers);
451     s->header_array.headers = nullptr;
452   }
453   if (s->state.ws.write_buffer) {
454     gpr_free(s->state.ws.write_buffer);
455     s->state.ws.write_buffer = nullptr;
456   }
457   null_and_maybe_free_read_buffer(s);
458   gpr_mu_unlock(&s->mu);
459   execute_from_storage(s);
460   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
461 }
462
463 /*
464   Cronet callback
465 */
466 static void on_canceled(bidirectional_stream* stream) {
467   CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
468   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
469   grpc_core::ExecCtx exec_ctx;
470
471   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
472   gpr_mu_lock(&s->mu);
473   bidirectional_stream_destroy(s->cbs);
474   s->state.state_callback_received[OP_CANCELED] = true;
475   s->cbs = nullptr;
476   if (s->header_array.headers) {
477     gpr_free(s->header_array.headers);
478     s->header_array.headers = nullptr;
479   }
480   if (s->state.ws.write_buffer) {
481     gpr_free(s->state.ws.write_buffer);
482     s->state.ws.write_buffer = nullptr;
483   }
484   null_and_maybe_free_read_buffer(s);
485   gpr_mu_unlock(&s->mu);
486   execute_from_storage(s);
487   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
488 }
489
490 /*
491   Cronet callback
492 */
493 static void on_succeeded(bidirectional_stream* stream) {
494   CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
495   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
496   grpc_core::ExecCtx exec_ctx;
497
498   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
499   gpr_mu_lock(&s->mu);
500   bidirectional_stream_destroy(s->cbs);
501   s->state.state_callback_received[OP_SUCCEEDED] = true;
502   s->cbs = nullptr;
503   null_and_maybe_free_read_buffer(s);
504   gpr_mu_unlock(&s->mu);
505   execute_from_storage(s);
506   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
507 }
508
509 /*
510   Cronet callback
511 */
512 static void on_stream_ready(bidirectional_stream* stream) {
513   CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
514   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
515   grpc_core::ExecCtx exec_ctx;
516   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
517   grpc_cronet_transport* t = s->curr_ct;
518   gpr_mu_lock(&s->mu);
519   s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
520   s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
521   /* Free the memory allocated for headers */
522   if (s->header_array.headers) {
523     gpr_free(s->header_array.headers);
524     s->header_array.headers = nullptr;
525   }
526   /* Send the initial metadata on wire if there is no SEND_MESSAGE or
527    * SEND_TRAILING_METADATA ops pending */
528   if (t->use_packet_coalescing) {
529     if (s->state.flush_cronet_when_ready) {
530       CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
531       bidirectional_stream_flush(stream);
532     }
533   }
534   gpr_mu_unlock(&s->mu);
535   execute_from_storage(s);
536 }
537
538 /*
539   Cronet callback
540 */
541 static void on_response_headers_received(
542     bidirectional_stream* stream,
543     const bidirectional_stream_header_array* headers,
544     const char* negotiated_protocol) {
545   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
546   grpc_core::ExecCtx exec_ctx;
547   CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
548              headers, negotiated_protocol);
549   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
550
551   /* Identify if this is a header or a trailer (in a trailer-only response case)
552    */
553   for (size_t i = 0; i < headers->count; i++) {
554     if (0 == strcmp("grpc-status", headers->headers[i].key)) {
555       on_response_trailers_received(stream, headers);
556       return;
557     }
558   }
559
560   gpr_mu_lock(&s->mu);
561   convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata);
562   s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
563   if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
564         s->state.state_callback_received[OP_FAILED])) {
565     /* Do an extra read to trigger on_succeeded() callback in case connection
566      is closed */
567     GPR_ASSERT(s->state.rs.length_field_received == false);
568     s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
569     s->state.rs.compressed = false;
570     s->state.rs.received_bytes = 0;
571     s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
572     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
573     bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
574                               s->state.rs.remaining_bytes);
575     s->state.pending_read_from_cronet = true;
576   }
577   gpr_mu_unlock(&s->mu);
578   execute_from_storage(s);
579 }
580
581 /*
582   Cronet callback
583 */
584 static void on_write_completed(bidirectional_stream* stream, const char* data) {
585   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
586   grpc_core::ExecCtx exec_ctx;
587   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
588   CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
589   gpr_mu_lock(&s->mu);
590   if (s->state.ws.write_buffer) {
591     gpr_free(s->state.ws.write_buffer);
592     s->state.ws.write_buffer = nullptr;
593   }
594   s->state.state_callback_received[OP_SEND_MESSAGE] = true;
595   gpr_mu_unlock(&s->mu);
596   execute_from_storage(s);
597 }
598
599 /*
600   Cronet callback
601 */
602 static void on_read_completed(bidirectional_stream* stream, char* data,
603                               int count) {
604   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
605   grpc_core::ExecCtx exec_ctx;
606   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
607   CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
608              count);
609   gpr_mu_lock(&s->mu);
610   s->state.pending_read_from_cronet = false;
611   s->state.state_callback_received[OP_RECV_MESSAGE] = true;
612   if (count > 0 && s->state.flush_read) {
613     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
614     bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
615                               GRPC_FLUSH_READ_SIZE);
616     s->state.pending_read_from_cronet = true;
617     gpr_mu_unlock(&s->mu);
618   } else if (count > 0) {
619     s->state.rs.received_bytes += count;
620     s->state.rs.remaining_bytes -= count;
621     if (s->state.rs.remaining_bytes > 0) {
622       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
623       s->state.state_op_done[OP_READ_REQ_MADE] = true;
624       bidirectional_stream_read(
625           s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
626           s->state.rs.remaining_bytes);
627       s->state.pending_read_from_cronet = true;
628       gpr_mu_unlock(&s->mu);
629     } else {
630       gpr_mu_unlock(&s->mu);
631       execute_from_storage(s);
632     }
633   } else {
634     null_and_maybe_free_read_buffer(s);
635     s->state.rs.read_stream_closed = true;
636     gpr_mu_unlock(&s->mu);
637     execute_from_storage(s);
638   }
639 }
640
641 /*
642   Cronet callback
643 */
644 static void on_response_trailers_received(
645     bidirectional_stream* stream,
646     const bidirectional_stream_header_array* trailers) {
647   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
648   grpc_core::ExecCtx exec_ctx;
649   CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
650              trailers);
651   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
652   grpc_cronet_transport* t = s->curr_ct;
653   gpr_mu_lock(&s->mu);
654   s->state.rs.trailing_metadata_valid = false;
655   convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata);
656   if (trailers->count > 0) {
657     s->state.rs.trailing_metadata_valid = true;
658   }
659   for (size_t i = 0; i < trailers->count; i++) {
660     if (0 == strcmp(trailers->headers[i].key, "grpc-status") &&
661         0 != strcmp(trailers->headers[i].value, "0")) {
662       s->state.fail_state = true;
663       maybe_flush_read(s);
664     }
665   }
666   s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
667   /* Send a EOS when server terminates the stream (testServerFinishesRequest) to
668    * trigger on_succeeded */
669   if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] &&
670       !(s->state.state_op_done[OP_CANCEL_ERROR] ||
671         s->state.state_callback_received[OP_FAILED])) {
672     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
673     s->state.state_callback_received[OP_SEND_MESSAGE] = false;
674     bidirectional_stream_write(s->cbs, "", 0, true);
675     if (t->use_packet_coalescing) {
676       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
677       bidirectional_stream_flush(s->cbs);
678     }
679     s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
680
681     gpr_mu_unlock(&s->mu);
682   } else {
683     gpr_mu_unlock(&s->mu);
684     execute_from_storage(s);
685   }
686 }
687
688 /*
689  Utility function that takes the data from s->write_slice_buffer and assembles
690  into a contiguous byte stream with 5 byte gRPC header prepended.
691 */
692 static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer,
693                               char** pp_write_buffer,
694                               size_t* p_write_buffer_size, uint32_t flags) {
695   grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer);
696   size_t length = GRPC_SLICE_LENGTH(slice);
697   *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
698   /* This is freed in the on_write_completed callback */
699   char* write_buffer =
700       static_cast<char*>(gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES));
701   *pp_write_buffer = write_buffer;
702   uint8_t* p = reinterpret_cast<uint8_t*>(write_buffer);
703   /* Append 5 byte header */
704   /* Compressed flag */
705   *p++ = static_cast<uint8_t>((flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0);
706   /* Message length */
707   *p++ = static_cast<uint8_t>(length >> 24);
708   *p++ = static_cast<uint8_t>(length >> 16);
709   *p++ = static_cast<uint8_t>(length >> 8);
710   *p++ = static_cast<uint8_t>(length);
711   /* append actual data */
712   memcpy(p, GRPC_SLICE_START_PTR(slice), length);
713   grpc_slice_unref_internal(slice);
714 }
715
716 /*
717  Convert metadata in a format that Cronet can consume
718 */
719 static void convert_metadata_to_cronet_headers(
720     grpc_linked_mdelem* head, const char* host, char** pp_url,
721     bidirectional_stream_header** pp_headers, size_t* p_num_headers,
722     const char** method) {
723   grpc_linked_mdelem* curr = head;
724   /* Walk the linked list and get number of header fields */
725   size_t num_headers_available = 0;
726   while (curr != nullptr) {
727     curr = curr->next;
728     num_headers_available++;
729   }
730   /* Allocate enough memory. It is freed in the on_stream_ready callback
731    */
732   bidirectional_stream_header* headers =
733       static_cast<bidirectional_stream_header*>(gpr_malloc(
734           sizeof(bidirectional_stream_header) * num_headers_available));
735   *pp_headers = headers;
736
737   /* Walk the linked list again, this time copying the header fields.
738     s->num_headers can be less than num_headers_available, as some headers
739     are not used for cronet.
740     TODO (makdharma): Eliminate need to traverse the LL second time for perf.
741    */
742   curr = head;
743   size_t num_headers = 0;
744   while (num_headers < num_headers_available) {
745     grpc_mdelem mdelem = curr->md;
746     curr = curr->next;
747     char* key = grpc_slice_to_c_string(GRPC_MDKEY(mdelem));
748     char* value;
749     if (grpc_is_binary_header(GRPC_MDKEY(mdelem))) {
750       grpc_slice wire_value = grpc_chttp2_base64_encode(GRPC_MDVALUE(mdelem));
751       value = grpc_slice_to_c_string(wire_value);
752       grpc_slice_unref_internal(wire_value);
753     } else {
754       value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem));
755     }
756     if (grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_SCHEME) ||
757         grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_AUTHORITY)) {
758       /* Cronet populates these fields on its own */
759       gpr_free(key);
760       gpr_free(value);
761       continue;
762     }
763     if (grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_METHOD)) {
764       if (grpc_slice_eq(GRPC_MDVALUE(mdelem), GRPC_MDSTR_PUT)) {
765         *method = "PUT";
766       } else {
767         /* POST method in default*/
768         *method = "POST";
769       }
770       gpr_free(key);
771       gpr_free(value);
772       continue;
773     }
774     if (grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_PATH)) {
775       /* Create URL by appending :path value to the hostname */
776       gpr_asprintf(pp_url, "https://%s%s", host, value);
777       gpr_free(key);
778       gpr_free(value);
779       continue;
780     }
781     CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
782     headers[num_headers].key = key;
783     headers[num_headers].value = value;
784     num_headers++;
785     if (curr == nullptr) {
786       break;
787     }
788   }
789   *p_num_headers = num_headers;
790 }
791
792 static void parse_grpc_header(const uint8_t* data, int* length,
793                               bool* compressed) {
794   const uint8_t c = *data;
795   const uint8_t* p = data + 1;
796   *compressed = ((c & 0x01) == 0x01);
797   *length = 0;
798   *length |= (*p++) << 24;
799   *length |= (*p++) << 16;
800   *length |= (*p++) << 8;
801   *length |= (*p++);
802 }
803
804 static bool header_has_authority(grpc_linked_mdelem* head) {
805   while (head != nullptr) {
806     if (grpc_slice_eq(GRPC_MDKEY(head->md), GRPC_MDSTR_AUTHORITY)) {
807       return true;
808     }
809     head = head->next;
810   }
811   return false;
812 }
813
814 /*
815   Op Execution: Decide if one of the actions contained in the stream op can be
816   executed. This is the heart of the state machine.
817 */
818 static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
819                           struct stream_obj* s, struct op_state* op_state,
820                           enum e_op_id op_id) {
821   struct op_state* stream_state = &s->state;
822   grpc_cronet_transport* t = s->curr_ct;
823   bool result = true;
824   /* When call is canceled, every op can be run, except under following
825   conditions
826   */
827   bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
828                                stream_state->state_callback_received[OP_FAILED];
829   if (is_canceled_or_failed) {
830     if (op_id == OP_SEND_INITIAL_METADATA) {
831       CRONET_LOG(GPR_DEBUG, "Because");
832       result = false;
833     }
834     if (op_id == OP_SEND_MESSAGE) {
835       CRONET_LOG(GPR_DEBUG, "Because");
836       result = false;
837     }
838     if (op_id == OP_SEND_TRAILING_METADATA) {
839       CRONET_LOG(GPR_DEBUG, "Because");
840       result = false;
841     }
842     if (op_id == OP_CANCEL_ERROR) {
843       CRONET_LOG(GPR_DEBUG, "Because");
844       result = false;
845     }
846     /* already executed */
847     if (op_id == OP_RECV_INITIAL_METADATA &&
848         stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
849       CRONET_LOG(GPR_DEBUG, "Because");
850       result = false;
851     }
852     if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) {
853       CRONET_LOG(GPR_DEBUG, "Because");
854       result = false;
855     }
856     if (op_id == OP_RECV_TRAILING_METADATA &&
857         stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
858       CRONET_LOG(GPR_DEBUG, "Because");
859       result = false;
860     }
861     /* ON_COMPLETE can be processed if one of the following conditions is met:
862      * 1. the stream failed
863      * 2. the stream is cancelled, and the callback is received
864      * 3. the stream succeeded before cancel is effective
865      * 4. the stream is cancelled, and the stream is never started */
866     if (op_id == OP_ON_COMPLETE &&
867         !(stream_state->state_callback_received[OP_FAILED] ||
868           stream_state->state_callback_received[OP_CANCELED] ||
869           stream_state->state_callback_received[OP_SUCCEEDED] ||
870           !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
871       CRONET_LOG(GPR_DEBUG, "Because");
872       result = false;
873     }
874   } else if (op_id == OP_SEND_INITIAL_METADATA) {
875     /* already executed */
876     if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
877   } else if (op_id == OP_RECV_INITIAL_METADATA) {
878     /* already executed */
879     if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) result = false;
880     /* we haven't sent headers yet. */
881     else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
882       result = false;
883     /* we haven't received headers yet. */
884     else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
885              !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
886       result = false;
887   } else if (op_id == OP_SEND_MESSAGE) {
888     /* already executed (note we're checking op specific state, not stream
889      state) */
890     if (op_state->state_op_done[OP_SEND_MESSAGE]) result = false;
891     /* we haven't sent headers yet. */
892     else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
893       result = false;
894   } else if (op_id == OP_RECV_MESSAGE) {
895     /* already executed */
896     if (op_state->state_op_done[OP_RECV_MESSAGE]) result = false;
897     /* we haven't received headers yet. */
898     else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
899              !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
900       result = false;
901   } else if (op_id == OP_RECV_TRAILING_METADATA) {
902     /* already executed */
903     if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false;
904     /* we have asked for but haven't received message yet. */
905     else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
906              !stream_state->state_op_done[OP_RECV_MESSAGE])
907       result = false;
908     /* we haven't received trailers  yet. */
909     else if (!stream_state->state_callback_received[OP_RECV_TRAILING_METADATA])
910       result = false;
911     /* we haven't received on_succeeded  yet. */
912     else if (!stream_state->state_callback_received[OP_SUCCEEDED])
913       result = false;
914   } else if (op_id == OP_SEND_TRAILING_METADATA) {
915     /* already executed */
916     if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false;
917     /* we haven't sent initial metadata yet */
918     else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
919       result = false;
920     /* we haven't sent message yet */
921     else if (stream_state->pending_send_message &&
922              !stream_state->state_op_done[OP_SEND_MESSAGE])
923       result = false;
924     /* we haven't got on_write_completed for the send yet */
925     else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
926              !stream_state->state_callback_received[OP_SEND_MESSAGE] &&
927              !(t->use_packet_coalescing &&
928                stream_state->pending_write_for_trailer))
929       result = false;
930   } else if (op_id == OP_CANCEL_ERROR) {
931     /* already executed */
932     if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
933   } else if (op_id == OP_ON_COMPLETE) {
934     /* already executed (note we're checking op specific state, not stream
935     state) */
936     if (op_state->state_op_done[OP_ON_COMPLETE]) {
937       CRONET_LOG(GPR_DEBUG, "Because");
938       result = false;
939     }
940     /* Check if every op that was asked for is done. */
941     /* TODO(muxi): We should not consider the recv ops here, since they
942      * have their own callbacks.  We should invoke a batch's on_complete
943      * as soon as all of the batch's send ops are complete, even if
944      * there are still recv ops pending. */
945     else if (curr_op->send_initial_metadata &&
946              !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
947       CRONET_LOG(GPR_DEBUG, "Because");
948       result = false;
949     } else if (curr_op->send_message &&
950                !op_state->state_op_done[OP_SEND_MESSAGE]) {
951       CRONET_LOG(GPR_DEBUG, "Because");
952       result = false;
953     } else if (curr_op->send_message &&
954                !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
955       CRONET_LOG(GPR_DEBUG, "Because");
956       result = false;
957     } else if (curr_op->send_trailing_metadata &&
958                !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
959       CRONET_LOG(GPR_DEBUG, "Because");
960       result = false;
961     } else if (curr_op->recv_initial_metadata &&
962                !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
963       CRONET_LOG(GPR_DEBUG, "Because");
964       result = false;
965     } else if (curr_op->recv_message &&
966                !op_state->state_op_done[OP_RECV_MESSAGE]) {
967       CRONET_LOG(GPR_DEBUG, "Because");
968       result = false;
969     } else if (curr_op->cancel_stream &&
970                !stream_state->state_callback_received[OP_CANCELED]) {
971       CRONET_LOG(GPR_DEBUG, "Because");
972       result = false;
973     } else if (curr_op->recv_trailing_metadata) {
974       /* We aren't done with trailing metadata yet */
975       if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
976         CRONET_LOG(GPR_DEBUG, "Because");
977         result = false;
978       }
979       /* We've asked for actual message in an earlier op, and it hasn't been
980         delivered yet. */
981       else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
982         /* If this op is not the one asking for read, (which means some earlier
983           op has asked), and the read hasn't been delivered. */
984         if (!curr_op->recv_message &&
985             !stream_state->state_callback_received[OP_SUCCEEDED]) {
986           CRONET_LOG(GPR_DEBUG, "Because");
987           result = false;
988         }
989       }
990     }
991     /* We should see at least one on_write_completed for the trailers that we
992       sent */
993     else if (curr_op->send_trailing_metadata &&
994              !stream_state->state_callback_received[OP_SEND_MESSAGE])
995       result = false;
996   }
997   CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
998              result ? "YES" : "NO");
999   return result;
1000 }
1001
1002 /*
1003   TODO (makdharma): Break down this function in smaller chunks for readability.
1004 */
1005 static enum e_op_result execute_stream_op(struct op_and_state* oas) {
1006   grpc_transport_stream_op_batch* stream_op = &oas->op;
1007   struct stream_obj* s = oas->s;
1008   grpc_cronet_transport* t = s->curr_ct;
1009   struct op_state* stream_state = &s->state;
1010   enum e_op_result result = NO_ACTION_POSSIBLE;
1011   if (stream_op->send_initial_metadata &&
1012       op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) {
1013     CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
1014     /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
1015      * on_failed */
1016     GPR_ASSERT(s->cbs == nullptr);
1017     GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
1018     s->cbs =
1019         bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks);
1020     CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
1021     if (t->use_packet_coalescing) {
1022       bidirectional_stream_disable_auto_flush(s->cbs, true);
1023       bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
1024     }
1025     char* url = nullptr;
1026     const char* method = "POST";
1027     s->header_array.headers = nullptr;
1028     convert_metadata_to_cronet_headers(stream_op->payload->send_initial_metadata
1029                                            .send_initial_metadata->list.head,
1030                                        t->host, &url, &s->header_array.headers,
1031                                        &s->header_array.count, &method);
1032     s->header_array.capacity = s->header_array.count;
1033     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url);
1034     bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false);
1035     if (url) {
1036       gpr_free(url);
1037     }
1038     unsigned int header_index;
1039     for (header_index = 0; header_index < s->header_array.count;
1040          header_index++) {
1041       gpr_free((void*)s->header_array.headers[header_index].key);
1042       gpr_free((void*)s->header_array.headers[header_index].value);
1043     }
1044     stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
1045     if (t->use_packet_coalescing) {
1046       if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
1047         s->state.flush_cronet_when_ready = true;
1048       }
1049     }
1050     result = ACTION_TAKEN_WITH_CALLBACK;
1051   } else if (stream_op->send_message &&
1052              op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
1053     CRONET_LOG(GPR_DEBUG, "running: %p  OP_SEND_MESSAGE", oas);
1054     stream_state->pending_send_message = false;
1055     if (stream_state->state_callback_received[OP_FAILED]) {
1056       result = NO_ACTION_POSSIBLE;
1057       CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
1058     } else {
1059       grpc_slice_buffer write_slice_buffer;
1060       grpc_slice slice;
1061       grpc_slice_buffer_init(&write_slice_buffer);
1062       if (1 != stream_op->payload->send_message.send_message->Next(
1063                    stream_op->payload->send_message.send_message->length(),
1064                    nullptr)) {
1065         /* Should never reach here */
1066         GPR_ASSERT(false);
1067       }
1068       if (GRPC_ERROR_NONE !=
1069           stream_op->payload->send_message.send_message->Pull(&slice)) {
1070         /* Should never reach here */
1071         GPR_ASSERT(false);
1072       }
1073       grpc_slice_buffer_add(&write_slice_buffer, slice);
1074       if (GPR_UNLIKELY(write_slice_buffer.count != 1)) {
1075         /* Empty request not handled yet */
1076         gpr_log(GPR_ERROR, "Empty request is not supported");
1077         GPR_ASSERT(write_slice_buffer.count == 1);
1078       }
1079       if (write_slice_buffer.count > 0) {
1080         size_t write_buffer_size;
1081         create_grpc_frame(
1082             &write_slice_buffer, &stream_state->ws.write_buffer,
1083             &write_buffer_size,
1084             stream_op->payload->send_message.send_message->flags());
1085         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
1086                    stream_state->ws.write_buffer);
1087         stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1088         bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
1089                                    static_cast<int>(write_buffer_size), false);
1090         grpc_slice_buffer_destroy_internal(&write_slice_buffer);
1091         if (t->use_packet_coalescing) {
1092           if (!stream_op->send_trailing_metadata) {
1093             CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1094             bidirectional_stream_flush(s->cbs);
1095             result = ACTION_TAKEN_WITH_CALLBACK;
1096           } else {
1097             stream_state->pending_write_for_trailer = true;
1098             result = ACTION_TAKEN_NO_CALLBACK;
1099           }
1100         } else {
1101           result = ACTION_TAKEN_WITH_CALLBACK;
1102         }
1103       } else {
1104         result = NO_ACTION_POSSIBLE;
1105       }
1106     }
1107     stream_state->state_op_done[OP_SEND_MESSAGE] = true;
1108     oas->state.state_op_done[OP_SEND_MESSAGE] = true;
1109     stream_op->payload->send_message.send_message.reset();
1110   } else if (stream_op->send_trailing_metadata &&
1111              op_can_be_run(stream_op, s, &oas->state,
1112                            OP_SEND_TRAILING_METADATA)) {
1113     CRONET_LOG(GPR_DEBUG, "running: %p  OP_SEND_TRAILING_METADATA", oas);
1114     if (stream_state->state_callback_received[OP_FAILED]) {
1115       result = NO_ACTION_POSSIBLE;
1116       CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
1117     } else {
1118       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
1119       stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1120       bidirectional_stream_write(s->cbs, "", 0, true);
1121       if (t->use_packet_coalescing) {
1122         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1123         bidirectional_stream_flush(s->cbs);
1124       }
1125       result = ACTION_TAKEN_WITH_CALLBACK;
1126     }
1127     stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
1128   } else if (stream_op->recv_initial_metadata &&
1129              op_can_be_run(stream_op, s, &oas->state,
1130                            OP_RECV_INITIAL_METADATA)) {
1131     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_INITIAL_METADATA", oas);
1132     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1133       GRPC_CLOSURE_SCHED(
1134           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1135           GRPC_ERROR_NONE);
1136     } else if (stream_state->state_callback_received[OP_FAILED]) {
1137       GRPC_CLOSURE_SCHED(
1138           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1139           GRPC_ERROR_NONE);
1140     } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1141       GRPC_CLOSURE_SCHED(
1142           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1143           GRPC_ERROR_NONE);
1144     } else {
1145       grpc_chttp2_incoming_metadata_buffer_publish(
1146           &oas->s->state.rs.initial_metadata,
1147           stream_op->payload->recv_initial_metadata.recv_initial_metadata);
1148       GRPC_CLOSURE_SCHED(
1149           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1150           GRPC_ERROR_NONE);
1151     }
1152     stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
1153     result = ACTION_TAKEN_NO_CALLBACK;
1154   } else if (stream_op->recv_message &&
1155              op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) {
1156     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_MESSAGE", oas);
1157     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1158       CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
1159       GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1160                          GRPC_ERROR_NONE);
1161       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1162       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1163       result = ACTION_TAKEN_NO_CALLBACK;
1164     } else if (stream_state->state_callback_received[OP_FAILED]) {
1165       CRONET_LOG(GPR_DEBUG, "Stream failed.");
1166       GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1167                          GRPC_ERROR_NONE);
1168       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1169       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1170       result = ACTION_TAKEN_NO_CALLBACK;
1171     } else if (stream_state->rs.read_stream_closed == true) {
1172       /* No more data will be received */
1173       CRONET_LOG(GPR_DEBUG, "read stream closed");
1174       GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1175                          GRPC_ERROR_NONE);
1176       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1177       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1178       result = ACTION_TAKEN_NO_CALLBACK;
1179     } else if (stream_state->flush_read) {
1180       CRONET_LOG(GPR_DEBUG, "flush read");
1181       GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1182                          GRPC_ERROR_NONE);
1183       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1184       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1185       result = ACTION_TAKEN_NO_CALLBACK;
1186     } else if (stream_state->rs.length_field_received == false) {
1187       if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
1188           stream_state->rs.remaining_bytes == 0) {
1189         /* Start a read operation for data */
1190         stream_state->rs.length_field_received = true;
1191         parse_grpc_header(
1192             reinterpret_cast<const uint8_t*>(stream_state->rs.read_buffer),
1193             &stream_state->rs.length_field, &stream_state->rs.compressed);
1194         CRONET_LOG(GPR_DEBUG, "length field = %d",
1195                    stream_state->rs.length_field);
1196         if (stream_state->rs.length_field > 0) {
1197           stream_state->rs.read_buffer = static_cast<char*>(
1198               gpr_malloc(static_cast<size_t>(stream_state->rs.length_field)));
1199           GPR_ASSERT(stream_state->rs.read_buffer);
1200           stream_state->rs.remaining_bytes = stream_state->rs.length_field;
1201           stream_state->rs.received_bytes = 0;
1202           CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1203           stream_state->state_op_done[OP_READ_REQ_MADE] =
1204               true; /* Indicates that at least one read request has been made */
1205           bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1206                                     stream_state->rs.remaining_bytes);
1207           stream_state->pending_read_from_cronet = true;
1208           result = ACTION_TAKEN_WITH_CALLBACK;
1209         } else {
1210           stream_state->rs.remaining_bytes = 0;
1211           CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
1212           /* Clean up read_slice_buffer in case there is unread data. */
1213           grpc_slice_buffer_destroy_internal(
1214               &stream_state->rs.read_slice_buffer);
1215           grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1216           uint32_t flags = 0;
1217           if (stream_state->rs.compressed) {
1218             flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1219           }
1220           stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1221           stream_op->payload->recv_message.recv_message->reset(
1222               stream_state->rs.sbs.get());
1223           GRPC_CLOSURE_SCHED(
1224               stream_op->payload->recv_message.recv_message_ready,
1225               GRPC_ERROR_NONE);
1226           stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1227           oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1228
1229           /* Extra read to trigger on_succeed */
1230           stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1231           stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1232           stream_state->rs.received_bytes = 0;
1233           stream_state->rs.compressed = false;
1234           stream_state->rs.length_field_received = false;
1235           CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1236           stream_state->state_op_done[OP_READ_REQ_MADE] =
1237               true; /* Indicates that at least one read request has been made */
1238           bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1239                                     stream_state->rs.remaining_bytes);
1240           stream_state->pending_read_from_cronet = true;
1241           result = ACTION_TAKEN_NO_CALLBACK;
1242         }
1243       } else if (stream_state->rs.remaining_bytes == 0) {
1244         /* Start a read operation for first 5 bytes (GRPC header) */
1245         stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1246         stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1247         stream_state->rs.received_bytes = 0;
1248         stream_state->rs.compressed = false;
1249         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1250         stream_state->state_op_done[OP_READ_REQ_MADE] =
1251             true; /* Indicates that at least one read request has been made */
1252         bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1253                                   stream_state->rs.remaining_bytes);
1254         stream_state->pending_read_from_cronet = true;
1255         result = ACTION_TAKEN_WITH_CALLBACK;
1256       } else {
1257         result = NO_ACTION_POSSIBLE;
1258       }
1259     } else if (stream_state->rs.remaining_bytes == 0) {
1260       CRONET_LOG(GPR_DEBUG, "read operation complete");
1261       grpc_slice read_data_slice =
1262           GRPC_SLICE_MALLOC((uint32_t)stream_state->rs.length_field);
1263       uint8_t* dst_p = GRPC_SLICE_START_PTR(read_data_slice);
1264       memcpy(dst_p, stream_state->rs.read_buffer,
1265              static_cast<size_t>(stream_state->rs.length_field));
1266       null_and_maybe_free_read_buffer(s);
1267       /* Clean up read_slice_buffer in case there is unread data. */
1268       grpc_slice_buffer_destroy_internal(&stream_state->rs.read_slice_buffer);
1269       grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1270       grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
1271                             read_data_slice);
1272       uint32_t flags = 0;
1273       if (stream_state->rs.compressed) {
1274         flags = GRPC_WRITE_INTERNAL_COMPRESS;
1275       }
1276       stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1277       stream_op->payload->recv_message.recv_message->reset(
1278           stream_state->rs.sbs.get());
1279       GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1280                          GRPC_ERROR_NONE);
1281       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1282       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1283       /* Do an extra read to trigger on_succeeded() callback in case connection
1284          is closed */
1285       stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1286       stream_state->rs.compressed = false;
1287       stream_state->rs.received_bytes = 0;
1288       stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1289       stream_state->rs.length_field_received = false;
1290       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1291       bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1292                                 stream_state->rs.remaining_bytes);
1293       stream_state->pending_read_from_cronet = true;
1294       result = ACTION_TAKEN_NO_CALLBACK;
1295     }
1296   } else if (stream_op->recv_trailing_metadata &&
1297              op_can_be_run(stream_op, s, &oas->state,
1298                            OP_RECV_TRAILING_METADATA)) {
1299     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_TRAILING_METADATA", oas);
1300     grpc_error* error = GRPC_ERROR_NONE;
1301     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1302       error = GRPC_ERROR_REF(stream_state->cancel_error);
1303     } else if (stream_state->state_callback_received[OP_FAILED]) {
1304       error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.");
1305     } else if (oas->s->state.rs.trailing_metadata_valid) {
1306       grpc_chttp2_incoming_metadata_buffer_publish(
1307           &oas->s->state.rs.trailing_metadata,
1308           stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
1309       stream_state->rs.trailing_metadata_valid = false;
1310     }
1311     GRPC_CLOSURE_SCHED(
1312         stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1313         error);
1314     stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
1315     result = ACTION_TAKEN_NO_CALLBACK;
1316   } else if (stream_op->cancel_stream &&
1317              op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
1318     CRONET_LOG(GPR_DEBUG, "running: %p  OP_CANCEL_ERROR", oas);
1319     if (s->cbs) {
1320       CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
1321       bidirectional_stream_cancel(s->cbs);
1322       result = ACTION_TAKEN_WITH_CALLBACK;
1323     } else {
1324       result = ACTION_TAKEN_NO_CALLBACK;
1325     }
1326     stream_state->state_op_done[OP_CANCEL_ERROR] = true;
1327     if (!stream_state->cancel_error) {
1328       stream_state->cancel_error =
1329           GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error);
1330     }
1331   } else if (stream_op->on_complete &&
1332              op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
1333     CRONET_LOG(GPR_DEBUG, "running: %p  OP_ON_COMPLETE", oas);
1334     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1335       GRPC_CLOSURE_SCHED(stream_op->on_complete,
1336                          GRPC_ERROR_REF(stream_state->cancel_error));
1337     } else if (stream_state->state_callback_received[OP_FAILED]) {
1338       GRPC_CLOSURE_SCHED(
1339           stream_op->on_complete,
1340           make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
1341     } else {
1342       /* All actions in this stream_op are complete. Call the on_complete
1343        * callback
1344        */
1345       GRPC_CLOSURE_SCHED(stream_op->on_complete, GRPC_ERROR_NONE);
1346     }
1347     oas->state.state_op_done[OP_ON_COMPLETE] = true;
1348     oas->done = true;
1349     /* reset any send message state, only if this ON_COMPLETE is about a send.
1350      */
1351     if (stream_op->send_message) {
1352       stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1353       stream_state->state_op_done[OP_SEND_MESSAGE] = false;
1354     }
1355     result = ACTION_TAKEN_NO_CALLBACK;
1356     /* If this is the on_complete callback being called for a received message -
1357       make a note */
1358     if (stream_op->recv_message)
1359       stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
1360   } else {
1361     result = NO_ACTION_POSSIBLE;
1362   }
1363   return result;
1364 }
1365
1366 /*
1367   Functions used by upper layers to access transport functionality.
1368 */
1369
1370 inline stream_obj::stream_obj(grpc_transport* gt, grpc_stream* gs,
1371                               grpc_stream_refcount* refcount,
1372                               grpc_core::Arena* arena)
1373     : arena(arena),
1374       curr_ct(reinterpret_cast<grpc_cronet_transport*>(gt)),
1375       curr_gs(gs),
1376       state(arena),
1377       refcount(refcount) {
1378   GRPC_CRONET_STREAM_REF(this, "cronet transport");
1379   gpr_mu_init(&mu);
1380 }
1381
1382 inline stream_obj::~stream_obj() {
1383   null_and_maybe_free_read_buffer(this);
1384   /* Clean up read_slice_buffer in case there is unread data. */
1385   grpc_slice_buffer_destroy_internal(&state.rs.read_slice_buffer);
1386   GRPC_ERROR_UNREF(state.cancel_error);
1387 }
1388
1389 static int init_stream(grpc_transport* gt, grpc_stream* gs,
1390                        grpc_stream_refcount* refcount, const void* server_data,
1391                        grpc_core::Arena* arena) {
1392   new (gs) stream_obj(gt, gs, refcount, arena);
1393   return 0;
1394 }
1395
1396 static void set_pollset_do_nothing(grpc_transport* gt, grpc_stream* gs,
1397                                    grpc_pollset* pollset) {}
1398
1399 static void set_pollset_set_do_nothing(grpc_transport* gt, grpc_stream* gs,
1400                                        grpc_pollset_set* pollset_set) {}
1401
1402 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1403                               grpc_transport_stream_op_batch* op) {
1404   CRONET_LOG(GPR_DEBUG, "perform_stream_op");
1405   if (op->send_initial_metadata &&
1406       header_has_authority(op->payload->send_initial_metadata
1407                                .send_initial_metadata->list.head)) {
1408     /* Cronet does not support :authority header field. We cancel the call when
1409      this field is present in metadata */
1410     if (op->recv_initial_metadata) {
1411       GRPC_CLOSURE_SCHED(
1412           op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1413           GRPC_ERROR_CANCELLED);
1414     }
1415     if (op->recv_message) {
1416       GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
1417                          GRPC_ERROR_CANCELLED);
1418     }
1419     if (op->recv_trailing_metadata) {
1420       GRPC_CLOSURE_SCHED(
1421           op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1422           GRPC_ERROR_CANCELLED);
1423     }
1424     GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED);
1425     return;
1426   }
1427   stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1428   add_to_storage(s, op);
1429   execute_from_storage(s);
1430 }
1431
1432 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
1433                            grpc_closure* then_schedule_closure) {
1434   stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1435   s->~stream_obj();
1436   GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
1437 }
1438
1439 static void destroy_transport(grpc_transport* gt) {}
1440
1441 static grpc_endpoint* get_endpoint(grpc_transport* gt) { return nullptr; }
1442
1443 static void perform_op(grpc_transport* gt, grpc_transport_op* op) {}
1444
1445 static const grpc_transport_vtable grpc_cronet_vtable = {
1446     sizeof(stream_obj),
1447     "cronet_http",
1448     init_stream,
1449     set_pollset_do_nothing,
1450     set_pollset_set_do_nothing,
1451     perform_stream_op,
1452     perform_op,
1453     destroy_stream,
1454     destroy_transport,
1455     get_endpoint};
1456
1457 grpc_transport* grpc_create_cronet_transport(void* engine, const char* target,
1458                                              const grpc_channel_args* args,
1459                                              void* reserved) {
1460   grpc_cronet_transport* ct = static_cast<grpc_cronet_transport*>(
1461       gpr_malloc(sizeof(grpc_cronet_transport)));
1462   if (!ct) {
1463     goto error;
1464   }
1465   ct->base.vtable = &grpc_cronet_vtable;
1466   ct->engine = static_cast<stream_engine*>(engine);
1467   ct->host = static_cast<char*>(gpr_malloc(strlen(target) + 1));
1468   if (!ct->host) {
1469     goto error;
1470   }
1471   strcpy(ct->host, target);
1472
1473   ct->use_packet_coalescing = true;
1474   if (args) {
1475     for (size_t i = 0; i < args->num_args; i++) {
1476       if (0 ==
1477           strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) {
1478         if (GPR_UNLIKELY(args->args[i].type != GRPC_ARG_INTEGER)) {
1479           gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
1480                   GRPC_ARG_USE_CRONET_PACKET_COALESCING);
1481         } else {
1482           ct->use_packet_coalescing = (args->args[i].value.integer != 0);
1483         }
1484       }
1485     }
1486   }
1487
1488   return &ct->base;
1489
1490 error:
1491   if (ct) {
1492     if (ct->host) {
1493       gpr_free(ct->host);
1494     }
1495     gpr_free(ct);
1496   }
1497
1498   return nullptr;
1499 }