提交 e65635d8 编写于 作者: L lei.yul 提交者: yulei

[Wisp] Refactoring WispSocket iostream creation

Summary:
Refactoring WispSocket's InputStream and OutputStream
creating code for readability.

Test Plan: all wisp tests

Reviewed-by: joeyleeeeeee97 shiyuexw

Issue: alibaba/dragonwell8#124
上级 8f74c702
......@@ -171,6 +171,124 @@ public class WispSocketImpl
}
}
private class WispSocketInputStream extends InputStream {
WispSocketInputStream(SocketChannel ch) {
this.ch = ch;
}
protected final SocketChannel ch;
private ByteBuffer bb = null;
// Invoker's previous array
private byte[] bs = null;
private byte[] b1 = null;
private ByteBuffer readAhead = null;
@Override
public int read() throws IOException {
if (b1 == null) {
b1 = new byte[1];
}
int n = this.read(b1);
if (n == 1)
return b1[0] & 0xff;
return -1;
}
@Override
public int read(byte[] bs, int off, int len)
throws IOException {
if (len <= 0 || off < 0 || off + len > bs.length) {
if (len == 0) {
return 0;
}
throw new ArrayIndexOutOfBoundsException();
}
ByteBuffer bb = ((this.bs == bs) ? this.bb : ByteBuffer.wrap(bs));
bb.limit(Math.min(off + len, bb.capacity()));
bb.position(off);
this.bb = bb;
this.bs = bs;
return read(bb);
}
private int read(ByteBuffer bb) throws IOException {
try {
wispSocketLockSupport.beginRead();
return read0(bb);
} finally {
wispSocketLockSupport.endRead();
}
}
private int read0(ByteBuffer bb)
throws IOException {
int n;
try {
if (readAhead != null && readAhead.hasRemaining()) {
if (bb.remaining() >= readAhead.remaining()) {
n = readAhead.remaining();
bb.put(readAhead);
} else {
n = bb.remaining();
for (int i = 0; i < n; i++) {
bb.put(readAhead.get());
}
}
return n;
}
if ((n = ch.read(bb)) != 0) {
return n;
}
if (so.getSoTimeout() > 0) {
WEA.addTimer(System.nanoTime() +
TimeUnit.MILLISECONDS.toNanos(so.getSoTimeout()));
}
do {
WEA.registerEvent(ch, SelectionKey.OP_READ);
WEA.park(-1);
if (so.getSoTimeout() > 0 && WEA.isTimeout()) {
throw new SocketTimeoutException("time out");
}
} while ((n = ch.read(bb)) == 0);
} finally {
if (so.getSoTimeout() > 0) {
WEA.cancelTimer();
}
WEA.unregisterEvent();
}
return n;
}
@Override
public int available() throws IOException {
if (readAhead == null) {
readAhead = ByteBuffer.allocate(4096);
} else if (readAhead.hasRemaining()) {
return readAhead.remaining();
}
readAhead.clear();
ch.read(readAhead);
readAhead.flip();
return readAhead.remaining();
}
@Override
public void close() throws IOException {
WispSocketImpl.this.close();
}
}
public InputStream getInputStream() throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
......@@ -183,119 +301,7 @@ public class WispSocketImpl
socketInputStream = AccessController.doPrivileged(
new PrivilegedExceptionAction<InputStream>() {
public InputStream run() throws IOException {
return new InputStream() {
protected final SocketChannel ch = getChannelImpl();
private ByteBuffer bb = null;
// Invoker's previous array
private byte[] bs = null;
private byte[] b1 = null;
private ByteBuffer readAhead = null;
@Override
public int read() throws IOException {
if (b1 == null) {
b1 = new byte[1];
}
int n = this.read(b1);
if (n == 1)
return b1[0] & 0xff;
return -1;
}
@Override
public int read(byte[] bs, int off, int len)
throws IOException {
if (len <= 0 || off < 0 || off + len > bs.length) {
if (len == 0) {
return 0;
}
throw new ArrayIndexOutOfBoundsException();
}
ByteBuffer bb = ((this.bs == bs) ? this.bb : ByteBuffer.wrap(bs));
bb.limit(Math.min(off + len, bb.capacity()));
bb.position(off);
this.bb = bb;
this.bs = bs;
return read(bb);
}
private int read(ByteBuffer bb) throws IOException {
try {
wispSocketLockSupport.beginRead();
return read0(bb);
} finally {
wispSocketLockSupport.endRead();
}
}
private int read0(ByteBuffer bb)
throws IOException {
int n;
try {
if (readAhead != null && readAhead.hasRemaining()) {
if (bb.remaining() >= readAhead.remaining()) {
n = readAhead.remaining();
bb.put(readAhead);
} else {
n = bb.remaining();
for (int i = 0; i < n; i++) {
bb.put(readAhead.get());
}
}
return n;
}
if ((n = ch.read(bb)) != 0) {
return n;
}
if (so.getSoTimeout() > 0) {
WEA.addTimer(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(so.getSoTimeout()));
}
do {
WEA.registerEvent(ch, SelectionKey.OP_READ);
WEA.park(-1);
if (so.getSoTimeout() > 0 && WEA.isTimeout()) {
throw new SocketTimeoutException("time out");
}
} while ((n = ch.read(bb)) == 0);
} finally {
if (so.getSoTimeout() > 0) {
WEA.cancelTimer();
}
WEA.unregisterEvent();
}
return n;
}
@Override
public int available() throws IOException {
if (readAhead == null) {
readAhead = ByteBuffer.allocate(4096);
} else if (readAhead.hasRemaining()) {
return readAhead.remaining();
}
readAhead.clear();
ch.read(readAhead);
readAhead.flip();
return readAhead.remaining();
}
@Override
public void close() throws IOException {
WispSocketImpl.this.close();
}
};
return new WispSocketInputStream(getChannelImpl());
}
});
} catch (java.security.PrivilegedActionException e) {
......@@ -305,6 +311,93 @@ public class WispSocketImpl
return socketInputStream;
}
private class WispSocketOutputStream extends OutputStream {
WispSocketOutputStream(SocketChannel ch) {
this.ch = ch;
}
protected final SocketChannel ch;
private ByteBuffer bb = null;
// Invoker's previous array
private byte[] bs = null;
private byte[] b1 = null;
@Override
public void write(int b) throws IOException {
if (b1 == null) {
b1 = new byte[1];
}
b1[0] = (byte) b;
this.write(b1);
}
@Override
public void write(byte[] bs, int off, int len)
throws IOException
{
if (len <= 0 || off < 0 || off + len > bs.length) {
if (len == 0) {
return;
}
throw new ArrayIndexOutOfBoundsException();
}
ByteBuffer bb = ((this.bs == bs) ? this.bb : ByteBuffer.wrap(bs));
bb.limit(Math.min(off + len, bb.capacity()));
bb.position(off);
this.bb = bb;
this.bs = bs;
write(bb);
}
private void write(ByteBuffer bb) throws IOException {
try {
wispSocketLockSupport.beginWrite();
write0(bb);
} finally {
wispSocketLockSupport.endWrite();
}
}
private void write0(ByteBuffer bb)
throws IOException {
try {
int writeLen = bb.remaining();
if (ch.write(bb) == writeLen) {
return;
}
if (so.getSoTimeout() > 0) {
WEA.addTimer(System.nanoTime() +
TimeUnit.MILLISECONDS.toNanos(so.getSoTimeout()));
}
do {
WEA.registerEvent(ch, SelectionKey.OP_WRITE);
WEA.park(-1);
if (so.getSoTimeout() > 0 && WEA.isTimeout()) {
throw new SocketTimeoutException("time out");
}
ch.write(bb);
} while (bb.remaining() > 0);
} finally {
if (so.getSoTimeout() > 0) {
WEA.cancelTimer();
}
WEA.unregisterEvent();
}
}
@Override
public void close() throws IOException {
WispSocketImpl.this.close();
}
}
public OutputStream getOutputStream() throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
......@@ -316,86 +409,7 @@ public class WispSocketImpl
return AccessController.doPrivileged(
new PrivilegedExceptionAction<OutputStream>() {
public OutputStream run() throws IOException {
return new OutputStream() {
protected final SocketChannel ch = getChannelImpl();
private ByteBuffer bb = null;
// Invoker's previous array
private byte[] bs = null;
private byte[] b1 = null;
@Override
public void write(int b) throws IOException {
if (b1 == null) {
b1 = new byte[1];
}
b1[0] = (byte) b;
this.write(b1);
}
@Override
public void write(byte[] bs, int off, int len)
throws IOException
{
if (len <= 0 || off < 0 || off + len > bs.length) {
if (len == 0) {
return;
}
throw new ArrayIndexOutOfBoundsException();
}
ByteBuffer bb = ((this.bs == bs) ? this.bb : ByteBuffer.wrap(bs));
bb.limit(Math.min(off + len, bb.capacity()));
bb.position(off);
this.bb = bb;
this.bs = bs;
write(bb);
}
private void write(ByteBuffer bb) throws IOException {
try {
wispSocketLockSupport.beginWrite();
write0(bb);
} finally {
wispSocketLockSupport.endWrite();
}
}
private void write0(ByteBuffer bb)
throws IOException {
try {
int writeLen = bb.remaining();
if (ch.write(bb) == writeLen) {
return;
}
if (so.getSoTimeout() > 0) {
WEA.addTimer(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(so.getSoTimeout()));
}
do {
WEA.registerEvent(ch, SelectionKey.OP_WRITE);
WEA.park(-1);
if (so.getSoTimeout() > 0 && WEA.isTimeout()) {
throw new SocketTimeoutException("time out");
}
ch.write(bb);
} while (bb.remaining() > 0);
} finally {
if (so.getSoTimeout() > 0) {
WEA.cancelTimer();
}
WEA.unregisterEvent();
}
}
@Override
public void close() throws IOException {
WispSocketImpl.this.close();
}
};
return new WispSocketOutputStream(getChannelImpl());
}
});
} catch (java.security.PrivilegedActionException e) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册