Imported Upstream version 1.22.0
[platform/upstream/grpc.git] / src / core / lib / iomgr / cfstream_handle.cc
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/gprpp/memory.h"
22 #include "src/core/lib/iomgr/port.h"
23
24 #ifdef GRPC_CFSTREAM
25 #import <CoreFoundation/CoreFoundation.h>
26 #import "src/core/lib/iomgr/cfstream_handle.h"
27
28 #include <grpc/support/atm.h>
29 #include <grpc/support/sync.h>
30
31 #include "src/core/lib/debug/trace.h"
32 #include "src/core/lib/iomgr/closure.h"
33 #include "src/core/lib/iomgr/error_cfstream.h"
34 #include "src/core/lib/iomgr/exec_ctx.h"
35
36 extern grpc_core::TraceFlag grpc_tcp_trace;
37
38 void* CFStreamHandle::Retain(void* info) {
39   CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
40   CFSTREAM_HANDLE_REF(handle, "retain");
41   return info;
42 }
43
44 void CFStreamHandle::Release(void* info) {
45   CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
46   CFSTREAM_HANDLE_UNREF(handle, "release");
47 }
48
49 CFStreamHandle* CFStreamHandle::CreateStreamHandle(
50     CFReadStreamRef read_stream, CFWriteStreamRef write_stream) {
51   return grpc_core::New<CFStreamHandle>(read_stream, write_stream);
52 }
53
54 void CFStreamHandle::ReadCallback(CFReadStreamRef stream,
55                                   CFStreamEventType type,
56                                   void* client_callback_info) {
57   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
58   grpc_core::ExecCtx exec_ctx;
59   grpc_error* error;
60   CFErrorRef stream_error;
61   CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info);
62   if (grpc_tcp_trace.enabled()) {
63     gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle,
64             stream, type, client_callback_info);
65   }
66   switch (type) {
67     case kCFStreamEventOpenCompleted:
68       handle->open_event_.SetReady();
69       break;
70     case kCFStreamEventHasBytesAvailable:
71     case kCFStreamEventEndEncountered:
72       handle->read_event_.SetReady();
73       break;
74     case kCFStreamEventErrorOccurred:
75       stream_error = CFReadStreamCopyError(stream);
76       error = grpc_error_set_int(
77           GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "read error"),
78           GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
79       CFRelease(stream_error);
80       handle->open_event_.SetShutdown(GRPC_ERROR_REF(error));
81       handle->write_event_.SetShutdown(GRPC_ERROR_REF(error));
82       handle->read_event_.SetShutdown(GRPC_ERROR_REF(error));
83       GRPC_ERROR_UNREF(error);
84       break;
85     default:
86       GPR_UNREACHABLE_CODE(return );
87   }
88 }
89 void CFStreamHandle::WriteCallback(CFWriteStreamRef stream,
90                                    CFStreamEventType type,
91                                    void* clientCallBackInfo) {
92   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
93   grpc_core::ExecCtx exec_ctx;
94   grpc_error* error;
95   CFErrorRef stream_error;
96   CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo);
97   if (grpc_tcp_trace.enabled()) {
98     gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle,
99             stream, type, clientCallBackInfo);
100   }
101   switch (type) {
102     case kCFStreamEventOpenCompleted:
103       handle->open_event_.SetReady();
104       break;
105     case kCFStreamEventCanAcceptBytes:
106     case kCFStreamEventEndEncountered:
107       handle->write_event_.SetReady();
108       break;
109     case kCFStreamEventErrorOccurred:
110       stream_error = CFWriteStreamCopyError(stream);
111       error = grpc_error_set_int(
112           GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write error"),
113           GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
114       CFRelease(stream_error);
115       handle->open_event_.SetShutdown(GRPC_ERROR_REF(error));
116       handle->write_event_.SetShutdown(GRPC_ERROR_REF(error));
117       handle->read_event_.SetShutdown(GRPC_ERROR_REF(error));
118       GRPC_ERROR_UNREF(error);
119       break;
120     default:
121       GPR_UNREACHABLE_CODE(return );
122   }
123 }
124
125 CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream,
126                                CFWriteStreamRef write_stream) {
127   gpr_ref_init(&refcount_, 1);
128   open_event_.InitEvent();
129   read_event_.InitEvent();
130   write_event_.InitEvent();
131   dispatch_queue_ = dispatch_queue_create(nullptr, DISPATCH_QUEUE_SERIAL);
132   CFStreamClientContext ctx = {0, static_cast<void*>(this),
133                                CFStreamHandle::Retain, CFStreamHandle::Release,
134                                nil};
135   CFReadStreamSetClient(
136       read_stream,
137       kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
138           kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
139       CFStreamHandle::ReadCallback, &ctx);
140   CFWriteStreamSetClient(
141       write_stream,
142       kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
143           kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
144       CFStreamHandle::WriteCallback, &ctx);
145   CFReadStreamSetDispatchQueue(read_stream, dispatch_queue_);
146   CFWriteStreamSetDispatchQueue(write_stream, dispatch_queue_);
147 }
148
149 CFStreamHandle::~CFStreamHandle() {
150   open_event_.DestroyEvent();
151   read_event_.DestroyEvent();
152   write_event_.DestroyEvent();
153 }
154
155 void CFStreamHandle::NotifyOnOpen(grpc_closure* closure) {
156   open_event_.NotifyOn(closure);
157 }
158
159 void CFStreamHandle::NotifyOnRead(grpc_closure* closure) {
160   read_event_.NotifyOn(closure);
161 }
162
163 void CFStreamHandle::NotifyOnWrite(grpc_closure* closure) {
164   write_event_.NotifyOn(closure);
165 }
166
167 void CFStreamHandle::Shutdown(grpc_error* error) {
168   open_event_.SetShutdown(GRPC_ERROR_REF(error));
169   read_event_.SetShutdown(GRPC_ERROR_REF(error));
170   write_event_.SetShutdown(GRPC_ERROR_REF(error));
171   GRPC_ERROR_UNREF(error);
172 }
173
174 void CFStreamHandle::Ref(const char* file, int line, const char* reason) {
175   if (grpc_tcp_trace.enabled()) {
176     gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
177     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
178             "CFStream Handle ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
179             reason, val, val + 1);
180   }
181   gpr_ref(&refcount_);
182 }
183
184 void CFStreamHandle::Unref(const char* file, int line, const char* reason) {
185   if (grpc_tcp_trace.enabled()) {
186     gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
187     gpr_log(GPR_ERROR,
188             "CFStream Handle unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
189             reason, val, val - 1);
190   }
191   if (gpr_unref(&refcount_)) {
192     grpc_core::Delete<CFStreamHandle>(this);
193   }
194 }
195
196 #else
197
198 /* Creating a dummy function so that the grpc_cfstream library will be
199  * non-empty.
200  */
201 void CFStreamDummy() {}
202
203 #endif