001package com.github.sarxos.webcam; 002 003import java.util.concurrent.ExecutorService; 004import java.util.concurrent.Executors; 005import java.util.concurrent.RejectedExecutionException; 006import java.util.concurrent.SynchronousQueue; 007import java.util.concurrent.ThreadFactory; 008import java.util.concurrent.TimeUnit; 009import java.util.concurrent.atomic.AtomicBoolean; 010import java.util.concurrent.atomic.AtomicInteger; 011 012import org.slf4j.Logger; 013import org.slf4j.LoggerFactory; 014 015 016public class WebcamProcessor { 017 018 private static final Logger LOG = LoggerFactory.getLogger(WebcamProcessor.class); 019 020 /** 021 * Thread doing supersync processing. 022 * 023 * @author sarxos 024 */ 025 public static final class ProcessorThread extends Thread { 026 027 private static final AtomicInteger N = new AtomicInteger(0); 028 029 public ProcessorThread(Runnable r) { 030 super(r, String.format("atomic-processor-%d", N.incrementAndGet())); 031 } 032 } 033 034 /** 035 * Thread factory for processor. 036 * 037 * @author Bartosz Firyn (SarXos) 038 */ 039 private static final class ProcessorThreadFactory implements ThreadFactory { 040 041 @Override 042 public Thread newThread(Runnable r) { 043 Thread t = new ProcessorThread(r); 044 t.setUncaughtExceptionHandler(WebcamExceptionHandler.getInstance()); 045 t.setDaemon(true); 046 return t; 047 } 048 } 049 050 /** 051 * Heart of overall processing system. This class process all native calls wrapped in tasks, by 052 * doing this all tasks executions are super-synchronized. 053 * 054 * @author Bartosz Firyn (SarXos) 055 */ 056 private static final class AtomicProcessor implements Runnable { 057 058 private SynchronousQueue<WebcamTask> inbound = new SynchronousQueue<WebcamTask>(true); 059 private SynchronousQueue<WebcamTask> outbound = new SynchronousQueue<WebcamTask>(true); 060 061 /** 062 * Process task. 063 * 064 * @param task the task to be processed 065 * @throws InterruptedException when thread has been interrupted 066 */ 067 public void process(WebcamTask task) throws InterruptedException { 068 inbound.put(task); 069 070 Throwable t = outbound.take().getThrowable(); 071 if (t != null) { 072 throw new WebcamException("Cannot execute task", t); 073 } 074 } 075 076 @Override 077 public void run() { 078 while (true) { 079 WebcamTask t = null; 080 try { 081 (t = inbound.take()).handle(); 082 } catch (InterruptedException e) { 083 break; 084 } catch (Throwable e) { 085 if (t != null) { 086 t.setThrowable(e); 087 } 088 } finally { 089 if (t != null) { 090 try { 091 outbound.put(t); 092 } catch (InterruptedException e) { 093 break; 094 } catch (Exception e) { 095 throw new RuntimeException("Cannot put task into outbound queue", e); 096 } 097 } 098 } 099 } 100 } 101 } 102 103 /** 104 * Is processor started? 105 */ 106 private static final AtomicBoolean started = new AtomicBoolean(false); 107 108 /** 109 * Execution service. 110 */ 111 private static ExecutorService runner = null; 112 113 /** 114 * Static processor. 115 */ 116 private static final AtomicProcessor processor = new AtomicProcessor(); 117 118 /** 119 * Singleton instance. 120 */ 121 private static final WebcamProcessor INSTANCE = new WebcamProcessor();; 122 123 private WebcamProcessor() { 124 } 125 126 /** 127 * Process single webcam task. 128 * 129 * @param task the task to be processed 130 * @throws InterruptedException when thread has been interrupted 131 */ 132 public void process(WebcamTask task) throws InterruptedException { 133 134 if (started.compareAndSet(false, true)) { 135 runner = Executors.newSingleThreadExecutor(new ProcessorThreadFactory()); 136 runner.execute(processor); 137 } 138 139 if (!runner.isShutdown()) { 140 processor.process(task); 141 } else { 142 throw new RejectedExecutionException("Cannot process because processor runner has been already shut down"); 143 } 144 } 145 146 public void shutdown() { 147 if (started.compareAndSet(true, false)) { 148 149 LOG.debug("Shutting down webcam processor"); 150 151 runner.shutdown(); 152 153 LOG.debug("Awaiting tasks termination"); 154 155 while (runner.isTerminated()) { 156 157 try { 158 runner.awaitTermination(100, TimeUnit.MILLISECONDS); 159 } catch (InterruptedException e) { 160 return; 161 } 162 163 runner.shutdownNow(); 164 } 165 166 LOG.debug("All tasks has been terminated"); 167 } 168 169 } 170 171 public static synchronized WebcamProcessor getInstance() { 172 return INSTANCE; 173 } 174}