001 package com.github.sarxos.webcam;
002
003 import java.util.concurrent.ExecutorService;
004 import java.util.concurrent.Executors;
005 import java.util.concurrent.SynchronousQueue;
006 import java.util.concurrent.ThreadFactory;
007 import java.util.concurrent.atomic.AtomicBoolean;
008
009
010 public class WebcamProcessor {
011
012 /**
013 * Thread factory for processor.
014 *
015 * @author Bartosz Firyn (SarXos)
016 */
017 private static final class ProcessorThreadFactory implements ThreadFactory {
018
019 @Override
020 public Thread newThread(Runnable r) {
021 Thread t = new Thread(r, "atomic-processor");
022 t.setDaemon(true);
023 return t;
024 }
025 }
026
027 /**
028 * Heart of overall processing system. This class process all native calls
029 * wrapped in tasks, by doing this all tasks executions are
030 * super-synchronized.
031 *
032 * @author Bartosz Firyn (SarXos)
033 */
034 private static final class AtomicProcessor implements Runnable {
035
036 private SynchronousQueue<WebcamTask> inbound = new SynchronousQueue<WebcamTask>(true);
037 private SynchronousQueue<WebcamTask> outbound = new SynchronousQueue<WebcamTask>(true);
038
039 /**
040 * Process task.
041 *
042 * @param task the task to be processed
043 * @return Processed task
044 * @throws InterruptedException when thread has been interrupted
045 */
046 public void process(WebcamTask task) throws InterruptedException {
047
048 inbound.put(task);
049
050 Throwable t = outbound.take().getThrowable();
051 if (t != null) {
052 throw new WebcamException("Cannot execute task", t);
053 }
054 }
055
056 @Override
057 public void run() {
058 while (true) {
059 WebcamTask t = null;
060 try {
061 (t = inbound.take()).handle();
062 } catch (InterruptedException e) {
063 break;
064 } catch (Throwable e) {
065 t.setThrowable(e);
066 } finally {
067 try {
068 if (t != null) {
069 outbound.put(t);
070 }
071 } catch (InterruptedException e) {
072 break;
073 }
074 }
075 }
076 }
077 }
078
079 /**
080 * Is processor started?
081 */
082 private static final AtomicBoolean started = new AtomicBoolean(false);
083
084 /**
085 * Execution service.
086 */
087 private static final ExecutorService runner = Executors.newSingleThreadExecutor(new ProcessorThreadFactory());
088
089 /**
090 * Static processor.
091 */
092 private static final AtomicProcessor processor = new AtomicProcessor();
093
094 /**
095 * Singleton instance.
096 */
097 private static WebcamProcessor instance = null;
098
099 private WebcamProcessor() {
100 }
101
102 /**
103 * Process single webcam task.
104 *
105 * @param task the task to be processed
106 * @throws InterruptedException when thread has been interrupted
107 */
108 public void process(WebcamTask task) throws InterruptedException {
109 if (started.compareAndSet(false, true)) {
110 runner.execute(processor);
111 }
112 processor.process(task);
113 }
114
115 public static synchronized WebcamProcessor getInstance() {
116 if (instance == null) {
117 instance = new WebcamProcessor();
118 }
119 return instance;
120 }
121 }