001 package com.github.sarxos.webcam;
002
003 import java.util.concurrent.ExecutorService;
004 import java.util.concurrent.Executors;
005 import java.util.concurrent.RejectedExecutionException;
006 import java.util.concurrent.SynchronousQueue;
007 import java.util.concurrent.ThreadFactory;
008 import java.util.concurrent.atomic.AtomicBoolean;
009 import java.util.concurrent.atomic.AtomicInteger;
010
011
012 public class WebcamProcessor {
013
014 /**
015 * Thread factory for processor.
016 *
017 * @author Bartosz Firyn (SarXos)
018 */
019 private static final class ProcessorThreadFactory implements ThreadFactory {
020
021 private static final AtomicInteger N = new AtomicInteger(0);
022
023 @Override
024 public Thread newThread(Runnable r) {
025 Thread t = new Thread(r, String.format("atomic-processor-%d", N.incrementAndGet()));
026 t.setUncaughtExceptionHandler(WebcamExceptionHandler.getInstance());
027 t.setDaemon(true);
028 return t;
029 }
030 }
031
032 /**
033 * Heart of overall processing system. This class process all native calls
034 * wrapped in tasks, by doing this all tasks executions are
035 * super-synchronized.
036 *
037 * @author Bartosz Firyn (SarXos)
038 */
039 private static final class AtomicProcessor implements Runnable {
040
041 private SynchronousQueue<WebcamTask> inbound = new SynchronousQueue<WebcamTask>(true);
042 private SynchronousQueue<WebcamTask> outbound = new SynchronousQueue<WebcamTask>(true);
043
044 /**
045 * Process task.
046 *
047 * @param task the task to be processed
048 * @return Processed task
049 * @throws InterruptedException when thread has been interrupted
050 */
051 public void process(WebcamTask task) throws InterruptedException {
052
053 inbound.put(task);
054
055 Throwable t = outbound.take().getThrowable();
056 if (t != null) {
057 throw new WebcamException("Cannot execute task", t);
058 }
059 }
060
061 @Override
062 public void run() {
063 while (true) {
064 WebcamTask t = null;
065 try {
066 (t = inbound.take()).handle();
067 } catch (InterruptedException e) {
068 break;
069 } catch (Throwable e) {
070 t.setThrowable(e);
071 } finally {
072 if (t != null) {
073 try {
074 outbound.put(t);
075 } catch (InterruptedException e) {
076 break;
077 } catch (Exception e) {
078 throw new RuntimeException("Cannot put task into outbound queue", e);
079 }
080 }
081 }
082 }
083 }
084 }
085
086 /**
087 * Is processor started?
088 */
089 private static final AtomicBoolean started = new AtomicBoolean(false);
090
091 /**
092 * Execution service.
093 */
094 private static final ExecutorService runner = Executors.newSingleThreadExecutor(new ProcessorThreadFactory());
095
096 /**
097 * Static processor.
098 */
099 private static final AtomicProcessor processor = new AtomicProcessor();
100
101 /**
102 * Singleton instance.
103 */
104 private static final WebcamProcessor INSTANCE = new WebcamProcessor();;
105
106 private WebcamProcessor() {
107 }
108
109 /**
110 * Process single webcam task.
111 *
112 * @param task the task to be processed
113 * @throws InterruptedException when thread has been interrupted
114 */
115 public void process(WebcamTask task) throws InterruptedException {
116 if (started.compareAndSet(false, true)) {
117 runner.execute(processor);
118 }
119 if (!runner.isShutdown()) {
120 processor.process(task);
121 } else {
122 throw new RejectedExecutionException("Cannot process because processor runner has been already shut down");
123 }
124 }
125
126 public static synchronized WebcamProcessor getInstance() {
127 return INSTANCE;
128 }
129 }