1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 package org.chromium.components.devtools_bridge;
7 import android.net.LocalSocket;
8 import android.net.LocalSocketAddress;
10 import java.io.IOException;
11 import java.nio.ByteBuffer;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.Executors;
14 import java.util.concurrent.atomic.AtomicInteger;
15 import java.util.concurrent.atomic.AtomicReference;
16 import java.util.concurrent.locks.Lock;
17 import java.util.concurrent.locks.ReadWriteLock;
18 import java.util.concurrent.locks.ReentrantReadWriteLock;
21 * Base class for client and server that tunnels DevToolsServer's UNIX socket
22 * over WebRTC data channel.
24 * Server runs on a android device with Chromium (or alike). Client runs where socket
25 * needed to be accesses (it could be the same device if socket names are different; this
26 * configuration useful for testing).
28 * Client listens LocalServerSocket and each time it receives connection it forwards
29 * CLIENT_OPEN packet to the server with newly assigned connection id. On receiving this packet
30 * server tries to connect to DevToolsServer socket. If succeeded it sends back SERVER_OPEN_ACK
31 * with the same connection id. If failed it sends SERVER_CLOSE.
33 * When input stream on client shuts down it sends CLIENT_CLOSE. The same with SERVER_CLOSE
34 * on the server side (only if SERVER_OPEN_ACK had sent). Between CLIENT_OPEN and CLIENT_CLOSE
35 * any amount of data packets may be transferred (the same for SERVER_OPEN_ACK/SERVER_CLOSE
36 * on the server side).
38 * Since all communication is reliable and ordered it's safe for client to assume that
39 * if CLIENT_CLOSE has sent and SERVER_CLOSE has received with the same connection ID this
40 * ID is safe to be reused.
42 public abstract class SocketTunnelBase {
43 // Data channel is threadsafe but access to the reference needs synchronization.
44 private final ReadWriteLock mDataChanneliReferenceLock = new ReentrantReadWriteLock();
45 private volatile AbstractDataChannel mDataChannel;
47 // Packet structure encapsulated in buildControlPacket, buildDataPacket and PacketDecoderBase.
48 // Structure of control packet:
49 // 1-st byte: CONTROL_CONNECTION_ID.
51 // 3-d byte: connection id.
53 // Structure of data packet:
54 // 1-st byte: connection id.
57 private static final int CONTROL_PACKET_SIZE = 3;
59 // Client to server control packets.
60 protected static final byte CLIENT_OPEN = (byte) 0;
61 protected static final byte CLIENT_CLOSE = (byte) 1;
63 // Server to client control packets.
64 protected static final byte SERVER_OPEN_ACK = (byte) 0;
65 protected static final byte SERVER_CLOSE = (byte) 1;
67 // Must not exceed WebRTC limit. Exceeding it closes
68 // data channel automatically. TODO(serya): WebRTC limit supposed to be removed.
69 static final int READING_BUFFER_SIZE = 4 * 1024;
71 private static final int CONTROL_CONNECTION_ID = 0;
73 // DevTools supports up to ~10 connections at the time. A few extra IDs usefull for
74 // delays in closing acknowledgement.
75 protected static final int MIN_CONNECTION_ID = 1;
76 protected static final int MAX_CONNECTION_ID = 64;
78 // Signaling thread isn't accessible via API. Assumes that first caller
79 // checkCalledOnSignalingThread is called on it indeed. It also works well for tests.
80 private final AtomicReference<Thread> mSignalingThread = new AtomicReference<Thread>();
82 // For writing in socket without blocking signaling thread.
83 private final ExecutorService mWritingThread = Executors.newSingleThreadExecutor();
85 public boolean isBound() {
86 final Lock lock = mDataChanneliReferenceLock.readLock();
89 return mDataChannel != null;
96 * Binds the tunnel to the data channel. Tunnel starts its activity when data channel
99 public void bind(AbstractDataChannel dataChannel) {
100 // Observer registrution must not be done in constructor.
101 final Lock lock = mDataChanneliReferenceLock.writeLock();
104 mDataChannel = dataChannel;
108 dataChannel.registerObserver(new DataChannelObserver());
112 * Stops all tunnel activity and returns the prevously bound data channel.
113 * It's safe to dispose the data channel after it.
115 public AbstractDataChannel unbind() {
116 final Lock lock = mDataChanneliReferenceLock.writeLock();
118 final AbstractDataChannel dataChannel;
120 dataChannel = mDataChannel;
125 dataChannel.unregisterObserver();
126 mSignalingThread.set(null);
127 mWritingThread.shutdownNow();
131 protected void checkCalledOnSignalingThread() {
132 if (!mSignalingThread.compareAndSet(null, Thread.currentThread())) {
133 if (mSignalingThread.get() != Thread.currentThread()) {
134 throw new RuntimeException("Must be called on signaling thread");
139 protected static void checkConnectionId(int connectionId) throws ProtocolError {
140 if (connectionId < MIN_CONNECTION_ID || connectionId > MAX_CONNECTION_ID) {
141 throw new ProtocolError("Invalid connection id: " + Integer.toString(connectionId));
145 protected void onProtocolError(ProtocolError e) {
146 checkCalledOnSignalingThread();
148 // When integrity of data channel is broken then best way to survive is to close it.
149 final Lock lock = mDataChanneliReferenceLock.readLock();
152 mDataChannel.close();
158 protected abstract void onReceivedDataPacket(int connectionId, byte[] data)
159 throws ProtocolError;
160 protected abstract void onReceivedControlPacket(int connectionId, byte opCode)
161 throws ProtocolError;
162 protected void onSocketException(IOException e, int connectionId) {}
163 protected void onDataChannelOpened() {}
164 protected void onDataChannelClosed() {}
166 static ByteBuffer buildControlPacket(int connectionId, byte opCode) {
167 ByteBuffer packet = ByteBuffer.allocateDirect(CONTROL_PACKET_SIZE);
168 packet.put((byte) CONTROL_CONNECTION_ID);
170 packet.put((byte) connectionId);
174 static ByteBuffer buildDataPacket(int connectionId, byte[] buffer, int count) {
175 ByteBuffer packet = ByteBuffer.allocateDirect(count + 1);
176 packet.put((byte) connectionId);
177 packet.put(buffer, 0, count);
181 protected void sendToDataChannel(ByteBuffer packet) {
182 packet.limit(packet.position());
184 final Lock lock = mDataChanneliReferenceLock.readLock();
187 if (mDataChannel != null) {
188 mDataChannel.send(packet, AbstractDataChannel.MessageType.BINARY);
196 * Packet decoding exposed for tests.
198 abstract static class PacketDecoderBase {
199 protected void decodePacket(ByteBuffer packet) throws ProtocolError {
200 if (packet.remaining() == 0) {
201 throw new ProtocolError("Empty packet");
204 int connectionId = packet.get();
205 if (connectionId != CONTROL_CONNECTION_ID) {
206 checkConnectionId(connectionId);
207 byte[] data = new byte[packet.remaining()];
209 onReceivedDataPacket(connectionId, data);
211 if (packet.remaining() != CONTROL_PACKET_SIZE - 1) {
212 throw new ProtocolError("Invalid control packet size");
215 byte opCode = packet.get();
216 connectionId = packet.get();
217 checkConnectionId(connectionId);
218 onReceivedControlPacket(connectionId, opCode);
222 protected abstract void onReceivedDataPacket(int connectionId, byte[] data)
223 throws ProtocolError;
224 protected abstract void onReceivedControlPacket(int connectionId, byte opcode)
225 throws ProtocolError;
228 private final class DataChannelObserver
229 extends PacketDecoderBase implements AbstractDataChannel.Observer {
231 public void onStateChange(AbstractDataChannel.State state) {
232 checkCalledOnSignalingThread();
234 if (state == AbstractDataChannel.State.OPEN) {
235 onDataChannelOpened();
237 onDataChannelClosed();
242 public void onMessage(ByteBuffer message) {
243 checkCalledOnSignalingThread();
246 decodePacket(message);
247 } catch (ProtocolError e) {
253 protected void onReceivedDataPacket(int connectionId, byte[] data) throws ProtocolError {
254 checkCalledOnSignalingThread();
256 SocketTunnelBase.this.onReceivedDataPacket(connectionId, data);
260 protected void onReceivedControlPacket(int connectionId, byte opCode) throws ProtocolError {
261 checkCalledOnSignalingThread();
263 SocketTunnelBase.this.onReceivedControlPacket(connectionId, opCode);
268 * Any problem happened while handling incoming message that breaks state integrity.
270 static class ProtocolError extends Exception {
271 public ProtocolError(String description) {
277 * Base utility class for client and server connections.
279 protected abstract class ConnectionBase {
280 protected final int mId;
281 protected final LocalSocket mSocket;
282 private final AtomicInteger mOpenedStreams = new AtomicInteger(2); // input and output.
283 private volatile boolean mConnected;
284 private byte[] mBuffer;
286 private ConnectionBase(int id, LocalSocket socket, boolean preconnected) {
289 mConnected = preconnected;
292 protected ConnectionBase(int id, LocalSocket socket) {
293 this(id, socket, true);
296 protected ConnectionBase(int id) {
297 this(id, new LocalSocket(), false);
300 protected boolean connect(LocalSocketAddress address) {
303 mSocket.connect(address);
306 } catch (IOException e) {
307 onSocketException(e, mId);
312 protected void runReadingLoop() {
313 mBuffer = new byte[READING_BUFFER_SIZE];
319 } catch (IOException e) {
320 onSocketException(e, mId);
326 private boolean pump() throws IOException {
327 int count = mSocket.getInputStream().read(mBuffer);
330 sendToDataChannel(buildDataPacket(mId, mBuffer, count));
334 protected void writeData(byte[] data) {
335 // Called on writing thread.
337 mSocket.getOutputStream().write(data);
338 } catch (IOException e) {
339 onSocketException(e, mId);
343 public void onReceivedDataPacket(final byte[] data) {
344 mWritingThread.execute(new Runnable() {
352 public void terminate() {
356 protected void shutdownOutput() {
357 // Shutdown output on writing thread to make sure all pending writes finished.
358 mWritingThread.execute(new Runnable() {
361 shutdownOutputOnWritingThread();
366 private void shutdownOutputOnWritingThread() {
368 if (mConnected) mSocket.shutdownOutput();
369 } catch (IOException e) {
370 onSocketException(e, mId);
375 protected void shutdownInput() {
377 if (mConnected) mSocket.shutdownInput();
378 } catch (IOException e) {
379 onSocketException(e, mId);
384 private void releaseStream() {
385 if (mOpenedStreams.decrementAndGet() == 0) close();
388 protected void close() {
391 } catch (IOException e) {
392 onSocketException(e, mId);