提交 0f13b58c 编写于 作者: 云逸之's avatar 云逸之 💬

优化接收帧的buffer大小,优化发送帧去掉多次内存复制

上级 b0405679
package org.btik.server;
import org.btik.server.video.device.FrameBuffer;
import org.btik.server.video.device.FrameReceiver;
/**
*视频服务
*/
......@@ -10,7 +13,7 @@ public interface VideoServer {
*
* @param frame 一帧jpeg
*/
void sendFrame(byte[] frame);
void sendFrame(FrameBuffer buffer);
......
......@@ -2,6 +2,7 @@ package org.btik.server.video;
import org.btik.server.VideoServer;
import org.btik.server.util.ByteUtil;
import org.btik.server.video.device.FrameBuffer;
import java.io.IOException;
import java.io.InputStream;
......@@ -131,6 +132,15 @@ public class BioHttpVideoServer extends Thread implements VideoServer, HttpConst
}
void sendChunk(FrameBuffer buffer, OutputStream out) throws IOException {
int length = buffer.frameLen();
out.write(NEW_LINE);
out.write(ByteUtil.toHexString(length));
out.write(NEW_LINE);
buffer.takeFrame(out);
}
void sendChunk(OutputStream out, byte[]... chunk) throws IOException {
int length = 0;
for (byte[] bytes : chunk) {
......@@ -153,19 +163,15 @@ public class BioHttpVideoServer extends Thread implements VideoServer, HttpConst
}
@Override
public void sendFrame(byte[] frame) {
int length = frame.length;
if (frame[length - 1] == 0 && frame[length - 2] == 0) {
System.out.print("\rdrop frame:");
return;
}
public void sendFrame(FrameBuffer buffer) {
int length = buffer.frameLen();
synchronized (clientLock) {
for (Socket client : clients) {
try {
OutputStream outputStream = client.getOutputStream();
sendChunk(_STREAM_BOUNDARY, outputStream);
sendChunk(outputStream, _STREAM_PART, ByteUtil.toString(length), DOUBLE_LINE);
sendChunk(frame, outputStream);
sendChunk(buffer, outputStream);
outputStream.flush();
} catch (IOException e) {
checkState(client, e);
......
......@@ -3,7 +3,7 @@ package org.btik.server.video.device;
import org.btik.server.VideoServer;
import org.btik.server.video.AsyncTaskExecutor;
import org.btik.server.video.HttpConstant;
import java.io.IOException;
import java.net.ServerSocket;
......@@ -16,7 +16,7 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* 发送帧设备接入通道
*/
public class BioDeviceChannel extends Thread implements HttpConstant {
public class BioDeviceChannel extends Thread {
private boolean runFlag = true;
......
package org.btik.server.video.device;
import java.io.ByteArrayOutputStream;
import java.util.Arrays;
import java.io.IOException;
import java.io.OutputStream;
/**
* 视频帧缓冲区
......@@ -20,6 +21,8 @@ public class FrameBuffer extends ByteArrayOutputStream {
private static final byte endLast = end[END_TOP_INDEX];
private int checkIndex = END_TOP_INDEX;
private int frameLength;
@Override
public void write(byte[] b, int off, int len) {
synchronized (this) {
......@@ -43,6 +46,7 @@ public class FrameBuffer extends ByteArrayOutputStream {
continue searchEndChar;
}
}
frameLength = checkIndex - END_TOP_INDEX;
return true;
}
}
......@@ -52,13 +56,16 @@ public class FrameBuffer extends ByteArrayOutputStream {
/**
* 此处不加锁,但必须在锁对象为this情况下调用
*/
byte[] takeFrame() {
int newLength = checkIndex - END_TOP_INDEX;
byte[] bytes = Arrays.copyOf(buf, newLength);
public void takeFrame(OutputStream outputStream) throws IOException {
outputStream.write(buf, 0, frameLength);
int nextIndex = checkIndex + 1;
count -= nextIndex;
System.arraycopy(buf, nextIndex, buf, 0, count);
checkIndex = END_TOP_INDEX;
return bytes;
}
public int frameLen(){
return frameLength;
}
}
......@@ -23,32 +23,42 @@ public class FrameReceiver extends Thread {
private final FrameSplit frameSplit = new FrameSplit();
private final FrameBuffer frameBuffer = new FrameBuffer();
/**
* 单个摄像头socket 接收图片缓冲区大小
*/
private static final int RECEIVE_BUFFER_SIZE = 40960;
private byte[] preFrameBuffer = new byte[RECEIVE_BUFFER_SIZE];
public FrameReceiver(VideoServer videoServer, Socket socket) throws IOException {
this.videoServer = videoServer;
this.socket = socket;
this.in = socket.getInputStream();
SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress();
socket.setReceiveBufferSize(RECEIVE_BUFFER_SIZE);
setName("frameReceiver" + remoteSocketAddress);
frameSplit.start();
}
private final FrameBuffer frameBuffer = new FrameBuffer();
@Override
public void run() {
while (runFlag) {
try {
int available = in.available();
if (available > 0) {
byte[] buffer = new byte[available];
int read = in.read(buffer);
if (available > preFrameBuffer.length) {
preFrameBuffer = new byte[available];
}
int read = in.read(preFrameBuffer);
if (read == -1) {
shutDown("eof");
}
frameBuffer.write(buffer, 0, read);
frameBuffer.write(preFrameBuffer, 0, read);
} else {
synchronized (in) {
in.wait(10);
......@@ -70,7 +80,7 @@ public class FrameReceiver extends Thread {
public void shutDown(String msg) {
System.err.println("wait by break");
System.err.println("shutdown on:" + msg);
runFlag = false;
synchronized (frameBuffer) {
frameBuffer.notify();
......@@ -94,7 +104,7 @@ public class FrameReceiver extends Thread {
while (runFlag) {
synchronized (frameBuffer) {
if (frameBuffer.hasFrame()) {
videoServer.sendFrame(frameBuffer.takeFrame());
videoServer.sendFrame(frameBuffer);
} else {
try {
frameBuffer.wait();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册