001    package com.github.sarxos.webcam;
002    
003    import java.awt.image.BufferedImage;
004    import java.io.BufferedOutputStream;
005    import java.io.BufferedReader;
006    import java.io.ByteArrayOutputStream;
007    import java.io.Closeable;
008    import java.io.IOException;
009    import java.io.InputStreamReader;
010    import java.net.ServerSocket;
011    import java.net.Socket;
012    import java.net.SocketException;
013    import java.util.concurrent.ExecutorService;
014    import java.util.concurrent.Executors;
015    import java.util.concurrent.ThreadFactory;
016    import java.util.concurrent.atomic.AtomicBoolean;
017    
018    import javax.imageio.ImageIO;
019    
020    import org.slf4j.Logger;
021    import org.slf4j.LoggerFactory;
022    
023    
024    public class WebcamStreamer implements ThreadFactory, WebcamListener {
025    
026            private static final Logger LOG = LoggerFactory.getLogger(WebcamStreamer.class);
027    
028            private static final String BOUNDARY = "mjpegframe";
029    
030            private static final String CRLF = "\r\n";
031    
032            private class Acceptor implements Runnable {
033    
034                    @Override
035                    public void run() {
036                            try {
037                                    ServerSocket server = new ServerSocket(port);
038                                    while (started.get()) {
039                                            Socket socket = server.accept();
040                                            LOG.info("New connection from {}", socket.getRemoteSocketAddress());
041                                            executor.execute(new Connection(socket));
042                                    }
043                            } catch (Exception e) {
044                                    LOG.error("Cannot accept socket connection", e);
045                            }
046                    }
047            }
048    
049            private class Connection implements Runnable {
050    
051                    private Socket socket = null;
052    
053                    public Connection(Socket socket) {
054                            this.socket = socket;
055                    }
056    
057                    @Override
058                    public void run() {
059    
060                            BufferedReader br = null;
061                            BufferedOutputStream bos = null;
062                            ByteArrayOutputStream baos = new ByteArrayOutputStream();
063    
064                            try {
065                                    br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
066                                    bos = new BufferedOutputStream(socket.getOutputStream());
067                            } catch (IOException e) {
068                                    LOG.error("Fatal I/O exception when creating socket streams", e);
069                                    try {
070                                            socket.close();
071                                    } catch (IOException e1) {
072                                            LOG.error("Canot close socket connection from " + socket.getRemoteSocketAddress(), e1);
073                                    }
074                                    return;
075                            }
076    
077                            // consume whole input
078    
079                            try {
080                                    while (br.ready()) {
081                                            br.readLine();
082                                    }
083                            } catch (IOException e) {
084                                    LOG.error("Error when reading input", e);
085                                    return;
086                            }
087    
088                            // stream
089    
090                            try {
091    
092                                    socket.setSoTimeout(0);
093                                    socket.setKeepAlive(false);
094                                    socket.setTcpNoDelay(true);
095    
096                                    while (started.get()) {
097    
098                                            StringBuilder sb = new StringBuilder();
099                                            sb.append("HTTP/1.0 200 OK").append(CRLF);
100                                            sb.append("Connection: close").append(CRLF);
101                                            sb.append("Cache-Control: no-cache").append(CRLF);
102                                            sb.append("Cache-Control: private").append(CRLF);
103                                            sb.append("Pragma: no-cache").append(CRLF);
104                                            sb.append("Content-type: multipart/x-mixed-replace; boundary=--").append(BOUNDARY).append(CRLF);
105                                            sb.append(CRLF);
106    
107                                            bos.write(sb.toString().getBytes());
108    
109                                            do {
110    
111                                                    if (!webcam.isOpen() || socket.isInputShutdown() || socket.isClosed()) {
112                                                            br.close();
113                                                            bos.close();
114                                                            return;
115                                                    }
116    
117                                                    baos.reset();
118    
119                                                    long now = System.currentTimeMillis();
120                                                    if (now > last + delay) {
121                                                            image = webcam.getImage();
122                                                    }
123    
124                                                    ImageIO.write(image, "JPG", baos);
125    
126                                                    sb.delete(0, sb.length());
127                                                    sb.append("--").append(BOUNDARY).append(CRLF);
128                                                    sb.append("Content-type: image/jpeg").append(CRLF);
129                                                    sb.append("Content-Length: ").append(baos.size()).append(CRLF);
130                                                    sb.append(CRLF);
131    
132                                                    try {
133                                                            bos.write(sb.toString().getBytes());
134                                                            bos.write(baos.toByteArray());
135                                                            bos.write(CRLF.getBytes());
136                                                            bos.flush();
137                                                    } catch (SocketException e) {
138                                                            LOG.error("Socket exception from " + socket.getRemoteSocketAddress(), e);
139                                                            br.close();
140                                                            bos.close();
141                                                            return;
142                                                    }
143    
144                                                    Thread.sleep(delay);
145    
146                                            } while (started.get());
147                                    }
148                            } catch (Exception e) {
149    
150                                    String message = e.getMessage();
151    
152                                    if (message != null) {
153                                            if (message.startsWith("Software caused connection abort")) {
154                                                    LOG.info("User closed stream");
155                                                    return;
156                                            }
157                                            if (message.startsWith("Broken pipe")) {
158                                                    LOG.info("User connection broken");
159                                                    return;
160                                            }
161                                    }
162    
163                                    LOG.error("Error", e);
164    
165                                    try {
166                                            bos.write("HTTP/1.0 501 Internal Server Error\r\n\r\n\r\n".getBytes());
167                                    } catch (IOException e1) {
168                                            LOG.error("Not ablte to write to output stream", e);
169                                    }
170    
171                            } finally {
172                                    for (Closeable closeable : new Closeable[] { br, bos, baos }) {
173                                            try {
174                                                    closeable.close();
175                                            } catch (IOException e) {
176                                                    LOG.error("Cannot close socket", e);
177                                            }
178                                    }
179                                    try {
180                                            socket.close();
181                                    } catch (IOException e) {
182                                            LOG.error("Cannot close socket", e);
183                                    }
184                            }
185                    }
186            }
187    
188            private Webcam webcam = null;
189            private double fps = 0;
190            private int number = 0;
191            private int port = 0;
192            private long last = -1;
193            private long delay = -1;
194            private BufferedImage image = null;
195            private ExecutorService executor = Executors.newCachedThreadPool(this);
196            private AtomicBoolean started = new AtomicBoolean(false);
197    
198            public WebcamStreamer(int port, Webcam webcam, double fps, boolean start) {
199    
200                    if (webcam == null) {
201                            throw new IllegalArgumentException("Webcam for streaming cannot be null");
202                    }
203    
204                    this.port = port;
205                    this.webcam = webcam;
206                    this.fps = fps;
207                    this.delay = (long) (1000 / fps);
208    
209                    if (start) {
210                            start();
211                    }
212            }
213    
214            @Override
215            public Thread newThread(Runnable r) {
216                    Thread thread = new Thread(r, String.format("streamer-thread-%s", number++));
217                    thread.setUncaughtExceptionHandler(WebcamExceptionHandler.getInstance());
218                    thread.setDaemon(true);
219                    return thread;
220            }
221    
222            public void start() {
223                    if (started.compareAndSet(false, true)) {
224                            webcam.addWebcamListener(this);
225                            webcam.open();
226                            executor.execute(new Acceptor());
227                    }
228            }
229    
230            public void stop() {
231                    if (started.compareAndSet(true, false)) {
232                            executor.shutdown();
233                            webcam.removeWebcamListener(this);
234                            webcam.close();
235                    }
236            }
237    
238            @Override
239            public void webcamOpen(WebcamEvent we) {
240                    start();
241            }
242    
243            @Override
244            public void webcamClosed(WebcamEvent we) {
245                    stop();
246            }
247    
248            @Override
249            public void webcamDisposed(WebcamEvent we) {
250            }
251    
252            @Override
253            public void webcamImageObtained(WebcamEvent we) {
254            }
255    
256            public double getFPS() {
257                    return fps;
258            }
259    
260            public boolean isInitialized() {
261                    return started.get();
262            }
263    
264            public int getPort() {
265                    return port;
266            }
267    
268    }