Update To 11.40.268.0
[platform/framework/web/crosswalk.git] / src / components / devtools_bridge / android / java / src / org / chromium / components / devtools_bridge / SocketTunnelBase.java
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package org.chromium.components.devtools_bridge;
6
7 import android.net.LocalSocket;
8 import android.net.LocalSocketAddress;
9
10 import java.io.IOException;
11 import java.nio.ByteBuffer;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.Executors;
14 import java.util.concurrent.atomic.AtomicInteger;
15 import java.util.concurrent.atomic.AtomicReference;
16 import java.util.concurrent.locks.Lock;
17 import java.util.concurrent.locks.ReadWriteLock;
18 import java.util.concurrent.locks.ReentrantReadWriteLock;
19
20 /**
21  * Base class for client and server that tunnels DevToolsServer's UNIX socket
22  * over WebRTC data channel.
23  *
24  * Server runs on a android device with Chromium (or alike). Client runs where socket
25  * needed to be accesses (it could be the same device if socket names are different; this
26  * configuration useful for testing).
27  *
28  * Client listens LocalServerSocket and each time it receives connection it forwards
29  * CLIENT_OPEN packet to the server with newly assigned connection id. On receiving this packet
30  * server tries to connect to DevToolsServer socket. If succeeded it sends back SERVER_OPEN_ACK
31  * with the same connection id. If failed it sends SERVER_CLOSE.
32  *
33  * When input stream on client shuts down it sends CLIENT_CLOSE. The same with SERVER_CLOSE
34  * on the server side (only if SERVER_OPEN_ACK had sent). Between CLIENT_OPEN and CLIENT_CLOSE
35  * any amount of data packets may be transferred (the same for SERVER_OPEN_ACK/SERVER_CLOSE
36  * on the server side).
37  *
38  * Since all communication is reliable and ordered it's safe for client to assume that
39  * if CLIENT_CLOSE has sent and SERVER_CLOSE has received with the same connection ID this
40  * ID is safe to be reused.
41  */
42 public abstract class SocketTunnelBase {
43     // Data channel is threadsafe but access to the reference needs synchronization.
44     private final ReadWriteLock mDataChanneliReferenceLock = new ReentrantReadWriteLock();
45     private volatile AbstractDataChannel mDataChannel;
46
47     // Packet structure encapsulated in buildControlPacket, buildDataPacket and PacketDecoderBase.
48     // Structure of control packet:
49     // 1-st byte: CONTROL_CONNECTION_ID.
50     // 2-d byte: op code.
51     // 3-d byte: connection id.
52     //
53     // Structure of data packet:
54     // 1-st byte: connection id.
55     // 2..n: data.
56
57     private static final int CONTROL_PACKET_SIZE = 3;
58
59     // Client to server control packets.
60     protected static final byte CLIENT_OPEN = (byte) 0;
61     protected static final byte CLIENT_CLOSE = (byte) 1;
62
63     // Server to client control packets.
64     protected static final byte SERVER_OPEN_ACK = (byte) 0;
65     protected static final byte SERVER_CLOSE = (byte) 1;
66
67     // Must not exceed WebRTC limit. Exceeding it closes
68     // data channel automatically. TODO(serya): WebRTC limit supposed to be removed.
69     static final int READING_BUFFER_SIZE = 4 * 1024;
70
71     private static final int CONTROL_CONNECTION_ID = 0;
72
73     // DevTools supports up to ~10 connections at the time. A few extra IDs usefull for
74     // delays in closing acknowledgement.
75     protected static final int MIN_CONNECTION_ID = 1;
76     protected static final int MAX_CONNECTION_ID = 64;
77
78     // Signaling thread isn't accessible via API. Assumes that first caller
79     // checkCalledOnSignalingThread is called on it indeed. It also works well for tests.
80     private final AtomicReference<Thread> mSignalingThread = new AtomicReference<Thread>();
81
82     // For writing in socket without blocking signaling thread.
83     private final ExecutorService mWritingThread = Executors.newSingleThreadExecutor();
84
85     public boolean isBound() {
86         final Lock lock = mDataChanneliReferenceLock.readLock();
87         lock.lock();
88         try {
89             return mDataChannel != null;
90         } finally {
91             lock.unlock();
92         }
93     }
94
95     /**
96      * Binds the tunnel to the data channel. Tunnel starts its activity when data channel
97      * open.
98      */
99     public void bind(AbstractDataChannel dataChannel) {
100         // Observer registrution must not be done in constructor.
101         final Lock lock = mDataChanneliReferenceLock.writeLock();
102         lock.lock();
103         try {
104             mDataChannel = dataChannel;
105         } finally {
106             lock.unlock();
107         }
108         dataChannel.registerObserver(new DataChannelObserver());
109     }
110
111     /**
112      * Stops all tunnel activity and returns the prevously bound data channel.
113      * It's safe to dispose the data channel after it.
114      */
115     public AbstractDataChannel unbind() {
116         final Lock lock = mDataChanneliReferenceLock.writeLock();
117         lock.lock();
118         final AbstractDataChannel dataChannel;
119         try {
120             dataChannel = mDataChannel;
121             mDataChannel = null;
122         } finally {
123             lock.unlock();
124         }
125         dataChannel.unregisterObserver();
126         mSignalingThread.set(null);
127         mWritingThread.shutdownNow();
128         return dataChannel;
129     }
130
131     protected void checkCalledOnSignalingThread() {
132         if (!mSignalingThread.compareAndSet(null, Thread.currentThread())) {
133             if (mSignalingThread.get() != Thread.currentThread()) {
134                 throw new RuntimeException("Must be called on signaling thread");
135             }
136         }
137     }
138
139     protected static void checkConnectionId(int connectionId) throws ProtocolError {
140         if (connectionId < MIN_CONNECTION_ID || connectionId > MAX_CONNECTION_ID) {
141             throw new ProtocolError("Invalid connection id: " + Integer.toString(connectionId));
142         }
143     }
144
145     protected void onProtocolError(ProtocolError e) {
146         checkCalledOnSignalingThread();
147
148         // When integrity of data channel is broken then best way to survive is to close it.
149         final Lock lock = mDataChanneliReferenceLock.readLock();
150         lock.lock();
151         try {
152             mDataChannel.close();
153         } finally {
154             lock.unlock();
155         }
156     }
157
158     protected abstract void onReceivedDataPacket(int connectionId, byte[] data)
159             throws ProtocolError;
160     protected abstract void onReceivedControlPacket(int connectionId, byte opCode)
161             throws ProtocolError;
162     protected void onSocketException(IOException e, int connectionId) {}
163     protected void onDataChannelOpened() {}
164     protected void onDataChannelClosed() {}
165
166     static ByteBuffer buildControlPacket(int connectionId, byte opCode) {
167         ByteBuffer packet = ByteBuffer.allocateDirect(CONTROL_PACKET_SIZE);
168         packet.put((byte) CONTROL_CONNECTION_ID);
169         packet.put(opCode);
170         packet.put((byte) connectionId);
171         return packet;
172     }
173
174     static ByteBuffer buildDataPacket(int connectionId, byte[] buffer, int count) {
175         ByteBuffer packet = ByteBuffer.allocateDirect(count + 1);
176         packet.put((byte) connectionId);
177         packet.put(buffer, 0, count);
178         return packet;
179     }
180
181     protected void sendToDataChannel(ByteBuffer packet) {
182         packet.limit(packet.position());
183         packet.position(0);
184         final Lock lock = mDataChanneliReferenceLock.readLock();
185         lock.lock();
186         try {
187             if (mDataChannel != null) {
188                 mDataChannel.send(packet, AbstractDataChannel.MessageType.BINARY);
189             }
190         } finally {
191             lock.unlock();
192         }
193     }
194
195     /**
196      * Packet decoding exposed for tests.
197      */
198     abstract static class PacketDecoderBase {
199         protected void decodePacket(ByteBuffer packet) throws ProtocolError {
200             if (packet.remaining() == 0) {
201                 throw new ProtocolError("Empty packet");
202             }
203
204             int connectionId = packet.get();
205             if (connectionId != CONTROL_CONNECTION_ID) {
206                 checkConnectionId(connectionId);
207                 byte[] data = new byte[packet.remaining()];
208                 packet.get(data);
209                 onReceivedDataPacket(connectionId, data);
210             } else {
211                 if (packet.remaining() != CONTROL_PACKET_SIZE - 1) {
212                     throw new ProtocolError("Invalid control packet size");
213                 }
214
215                 byte opCode = packet.get();
216                 connectionId = packet.get();
217                 checkConnectionId(connectionId);
218                 onReceivedControlPacket(connectionId, opCode);
219             }
220         }
221
222         protected abstract void onReceivedDataPacket(int connectionId, byte[] data)
223                 throws ProtocolError;
224         protected abstract void onReceivedControlPacket(int connectionId, byte opcode)
225                 throws ProtocolError;
226     }
227
228     private final class DataChannelObserver
229             extends PacketDecoderBase implements AbstractDataChannel.Observer {
230         @Override
231         public void onStateChange(AbstractDataChannel.State state) {
232             checkCalledOnSignalingThread();
233
234             if (state == AbstractDataChannel.State.OPEN) {
235                 onDataChannelOpened();
236             } else {
237                 onDataChannelClosed();
238             }
239         }
240
241         @Override
242         public void onMessage(ByteBuffer message) {
243             checkCalledOnSignalingThread();
244
245             try {
246                 decodePacket(message);
247             } catch (ProtocolError e) {
248                 onProtocolError(e);
249             }
250         }
251
252         @Override
253         protected void onReceivedDataPacket(int connectionId, byte[] data) throws ProtocolError {
254             checkCalledOnSignalingThread();
255
256             SocketTunnelBase.this.onReceivedDataPacket(connectionId, data);
257         }
258
259         @Override
260         protected void onReceivedControlPacket(int connectionId, byte opCode) throws ProtocolError {
261             checkCalledOnSignalingThread();
262
263             SocketTunnelBase.this.onReceivedControlPacket(connectionId, opCode);
264         }
265     }
266
267     /**
268      * Any problem happened while handling incoming message that breaks state integrity.
269      */
270     static class ProtocolError extends Exception {
271         public ProtocolError(String description) {
272             super(description);
273         }
274     }
275
276     /**
277      * Base utility class for client and server connections.
278      */
279     protected abstract class ConnectionBase {
280         protected final int mId;
281         protected final LocalSocket mSocket;
282         private final AtomicInteger mOpenedStreams = new AtomicInteger(2); // input and output.
283         private volatile boolean mConnected;
284         private byte[] mBuffer;
285
286         private ConnectionBase(int id, LocalSocket socket, boolean preconnected) {
287             mId = id;
288             mSocket = socket;
289             mConnected = preconnected;
290         }
291
292         protected ConnectionBase(int id, LocalSocket socket) {
293             this(id, socket, true);
294         }
295
296         protected ConnectionBase(int id) {
297             this(id, new LocalSocket(), false);
298         }
299
300         protected boolean connect(LocalSocketAddress address) {
301             assert !mConnected;
302             try {
303                 mSocket.connect(address);
304                 mConnected = true;
305                 return true;
306             } catch (IOException e) {
307                 onSocketException(e, mId);
308                 return false;
309             }
310         }
311
312         protected void runReadingLoop() {
313             mBuffer = new byte[READING_BUFFER_SIZE];
314             try {
315                 boolean open;
316                 do {
317                     open = pump();
318                 } while (open);
319             } catch (IOException e) {
320                 onSocketException(e, mId);
321             } finally {
322                 mBuffer = null;
323             }
324         }
325
326         private boolean pump() throws IOException {
327             int count = mSocket.getInputStream().read(mBuffer);
328             if (count <= 0)
329                 return false;
330             sendToDataChannel(buildDataPacket(mId, mBuffer, count));
331             return true;
332         }
333
334         protected void writeData(byte[] data) {
335             // Called on writing thread.
336             try {
337                 mSocket.getOutputStream().write(data);
338             } catch (IOException e) {
339                 onSocketException(e, mId);
340             }
341         }
342
343         public void onReceivedDataPacket(final byte[] data) {
344             mWritingThread.execute(new Runnable() {
345                 @Override
346                 public void run() {
347                     writeData(data);
348                 }
349             });
350         }
351
352         public void terminate() {
353             close();
354         }
355
356         protected void shutdownOutput() {
357             // Shutdown output on writing thread to make sure all pending writes finished.
358             mWritingThread.execute(new Runnable() {
359                 @Override
360                 public void run() {
361                     shutdownOutputOnWritingThread();
362                 }
363             });
364         }
365
366         private void shutdownOutputOnWritingThread() {
367             try {
368                 if (mConnected) mSocket.shutdownOutput();
369             } catch (IOException e) {
370                 onSocketException(e, mId);
371             }
372             releaseStream();
373         }
374
375         protected void shutdownInput() {
376             try {
377                 if (mConnected) mSocket.shutdownInput();
378             } catch (IOException e) {
379                 onSocketException(e, mId);
380             }
381             releaseStream();
382         }
383
384         private void releaseStream() {
385             if (mOpenedStreams.decrementAndGet() == 0) close();
386         }
387
388         protected void close() {
389             try {
390                 mSocket.close();
391             } catch (IOException e) {
392                 onSocketException(e, mId);
393             }
394         }
395     }
396 }