001package com.github.sarxos.webcam;
002
003import java.util.concurrent.ExecutorService;
004import java.util.concurrent.Executors;
005import java.util.concurrent.RejectedExecutionException;
006import java.util.concurrent.SynchronousQueue;
007import java.util.concurrent.ThreadFactory;
008import java.util.concurrent.TimeUnit;
009import java.util.concurrent.atomic.AtomicBoolean;
010import java.util.concurrent.atomic.AtomicInteger;
011
012import org.slf4j.Logger;
013import org.slf4j.LoggerFactory;
014
015
016public class WebcamProcessor {
017
018        private static final Logger LOG = LoggerFactory.getLogger(WebcamProcessor.class);
019
020        /**
021         * Thread doing supersync processing.
022         *
023         * @author sarxos
024         */
025        public static final class ProcessorThread extends Thread {
026
027                private static final AtomicInteger N = new AtomicInteger(0);
028
029                public ProcessorThread(Runnable r) {
030                        super(r, String.format("atomic-processor-%d", N.incrementAndGet()));
031                }
032        }
033
034        /**
035         * Thread factory for processor.
036         *
037         * @author Bartosz Firyn (SarXos)
038         */
039        private static final class ProcessorThreadFactory implements ThreadFactory {
040
041                @Override
042                public Thread newThread(Runnable r) {
043                        Thread t = new ProcessorThread(r);
044                        t.setUncaughtExceptionHandler(WebcamExceptionHandler.getInstance());
045                        t.setDaemon(true);
046                        return t;
047                }
048        }
049
050        /**
051         * Heart of overall processing system. This class process all native calls wrapped in tasks, by
052         * doing this all tasks executions are super-synchronized.
053         *
054         * @author Bartosz Firyn (SarXos)
055         */
056        private static final class AtomicProcessor implements Runnable {
057
058                private SynchronousQueue<WebcamTask> inbound = new SynchronousQueue<WebcamTask>(true);
059                private SynchronousQueue<WebcamTask> outbound = new SynchronousQueue<WebcamTask>(true);
060
061                /**
062                 * Process task.
063                 *
064                 * @param task the task to be processed
065                 * @throws InterruptedException when thread has been interrupted
066                 */
067                public void process(WebcamTask task) throws InterruptedException {
068                        inbound.put(task);
069
070                        Throwable t = outbound.take().getThrowable();
071                        if (t != null) {
072                                throw new WebcamException("Cannot execute task", t);
073                        }
074                }
075
076                @Override
077                public void run() {
078                        while (true) {
079                                WebcamTask t = null;
080                                try {
081                                        (t = inbound.take()).handle();
082                                } catch (InterruptedException e) {
083                                        break;
084                                } catch (Throwable e) {
085                                        if (t != null) {
086                                                t.setThrowable(e);
087                                        }
088                                } finally {
089                                        if (t != null) {
090                                                try {
091                                                        outbound.put(t);
092                                                } catch (InterruptedException e) {
093                                                        break;
094                                                } catch (Exception e) {
095                                                        throw new RuntimeException("Cannot put task into outbound queue", e);
096                                                }
097                                        }
098                                }
099                        }
100                }
101        }
102
103        /**
104         * Is processor started?
105         */
106        private static final AtomicBoolean started = new AtomicBoolean(false);
107
108        /**
109         * Execution service.
110         */
111        private static ExecutorService runner = null;
112
113        /**
114         * Static processor.
115         */
116        private static final AtomicProcessor processor = new AtomicProcessor();
117
118        /**
119         * Singleton instance.
120         */
121        private static final WebcamProcessor INSTANCE = new WebcamProcessor();;
122
123        private WebcamProcessor() {
124        }
125
126        /**
127         * Process single webcam task.
128         *
129         * @param task the task to be processed
130         * @throws InterruptedException when thread has been interrupted
131         */
132        public void process(WebcamTask task) throws InterruptedException {
133
134                if (started.compareAndSet(false, true)) {
135                        runner = Executors.newSingleThreadExecutor(new ProcessorThreadFactory());
136                        runner.execute(processor);
137                }
138
139                if (!runner.isShutdown()) {
140                        processor.process(task);
141                } else {
142                        throw new RejectedExecutionException("Cannot process because processor runner has been already shut down");
143                }
144        }
145
146        public void shutdown() {
147                if (started.compareAndSet(true, false)) {
148
149                        LOG.debug("Shutting down webcam processor");
150
151                        runner.shutdown();
152
153                        LOG.debug("Awaiting tasks termination");
154
155                        while (runner.isTerminated()) {
156
157                                try {
158                                        runner.awaitTermination(100, TimeUnit.MILLISECONDS);
159                                } catch (InterruptedException e) {
160                                        return;
161                                }
162
163                                runner.shutdownNow();
164                        }
165
166                        LOG.debug("All tasks has been terminated");
167                }
168
169        }
170
171        public static synchronized WebcamProcessor getInstance() {
172                return INSTANCE;
173        }
174}