src/main/java/com/gitblit/manager/ServicesManager.java
@@ -24,13 +24,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import javax.inject.Named; import javax.inject.Singleton; import javax.servlet.http.HttpServletRequest; import org.apache.sshd.server.Command; import org.eclipse.jgit.transport.resolver.ReceivePackFactory; import org.eclipse.jgit.transport.resolver.UploadPackFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,25 +38,14 @@ import com.gitblit.fanout.FanoutService; import com.gitblit.fanout.FanoutSocketService; import com.gitblit.git.GitDaemon; import com.gitblit.git.GitblitReceivePackFactory; import com.gitblit.git.GitblitUploadPackFactory; import com.gitblit.git.RepositoryResolver; import com.gitblit.models.FederationModel; import com.gitblit.models.RepositoryModel; import com.gitblit.models.UserModel; import com.gitblit.service.FederationPullService; import com.gitblit.transport.ssh.SshCommandFactory; import com.gitblit.transport.ssh.SshDaemon; import com.gitblit.transport.ssh.SshSession; import com.gitblit.transport.ssh.commands.CreateRepository; import com.gitblit.transport.ssh.commands.VersionCommand; import com.gitblit.utils.IdGenerator; import com.gitblit.utils.StringUtils; import com.gitblit.utils.TimeUtils; import dagger.Module; import dagger.ObjectGraph; import dagger.Provides; /** * Services manager manages long-running services/processes that either have no @@ -110,6 +94,9 @@ } if (gitDaemon != null) { gitDaemon.stop(); } if (sshDaemon != null) { sshDaemon.stop(); } return this; } @@ -164,7 +151,7 @@ String bindInterface = settings.getString(Keys.git.sshBindInterface, "localhost"); if (port > 0) { try { sshDaemon = ObjectGraph.create(new SshModule()).get(SshDaemon.class); sshDaemon = new SshDaemon(gitblit, new IdGenerator()); sshDaemon.start(); } catch (IOException e) { sshDaemon = null; @@ -261,41 +248,5 @@ registration.name, registration.url, registration.nextPull)); } } @Module(library = true, injects = { IGitblit.class, SshCommandFactory.class, SshDaemon.class, }) public class SshModule { @Provides @Named("create-repository") Command provideCreateRepository() { return new CreateRepository(); } @Provides @Named("version") Command provideVersion() { return new VersionCommand(); } @Provides @Singleton IdGenerator provideIdGenerator() { return new IdGenerator(); } @Provides @Singleton RepositoryResolver<SshSession> provideRepositoryResolver() { return new RepositoryResolver<SshSession>(provideGitblit()); } @Provides @Singleton UploadPackFactory<SshSession> provideUploadPackFactory() { return new GitblitUploadPackFactory<SshSession>(provideGitblit()); } @Provides @Singleton ReceivePackFactory<SshSession> provideReceivePackFactory() { return new GitblitReceivePackFactory<SshSession>(provideGitblit()); } @Provides @Singleton IGitblit provideGitblit() { return ServicesManager.this.gitblit; } } } src/main/java/com/gitblit/transport/ssh/CommandDispatcher.java
File was deleted src/main/java/com/gitblit/transport/ssh/DisabledFilesystemFactory.java
New file @@ -0,0 +1,26 @@ package com.gitblit.transport.ssh; import java.io.IOException; import org.apache.sshd.common.Session; import org.apache.sshd.server.FileSystemFactory; import org.apache.sshd.server.FileSystemView; import org.apache.sshd.server.SshFile; public class DisabledFilesystemFactory implements FileSystemFactory { @Override public FileSystemView createFileSystemView(Session session) throws IOException { return new FileSystemView() { @Override public SshFile getFile(SshFile baseDir, String file) { return null; } @Override public SshFile getFile(String file) { return null; } }; } } src/main/java/com/gitblit/transport/ssh/NonForwardingFilter.java
New file @@ -0,0 +1,28 @@ package com.gitblit.transport.ssh; import java.net.InetSocketAddress; import org.apache.sshd.server.ForwardingFilter; import org.apache.sshd.server.session.ServerSession; public class NonForwardingFilter implements ForwardingFilter { @Override public boolean canForwardAgent(ServerSession session) { return false; } @Override public boolean canForwardX11(ServerSession session) { return false; } @Override public boolean canConnect(InetSocketAddress address, ServerSession session) { return false; } @Override public boolean canListen(InetSocketAddress address, ServerSession session) { return false; } } src/main/java/com/gitblit/transport/ssh/SshCommandFactory.java
@@ -25,8 +25,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.inject.Inject; import org.apache.sshd.server.Command; import org.apache.sshd.server.CommandFactory; import org.apache.sshd.server.Environment; @@ -66,14 +64,13 @@ private ReceivePackFactory<SshSession> receivePackFactory; private final ScheduledExecutorService startExecutor; private CommandDispatcher dispatcher; private DispatchCommand dispatcher; @Inject public SshCommandFactory(RepositoryResolver<SshSession> repositoryResolver, UploadPackFactory<SshSession> uploadPackFactory, ReceivePackFactory<SshSession> receivePackFactory, WorkQueue workQueue, CommandDispatcher d) { DispatchCommand d) { this.repositoryResolver = repositoryResolver; this.uploadPackFactory = uploadPackFactory; this.receivePackFactory = receivePackFactory; @@ -116,25 +113,31 @@ // TODO Auto-generated method stub } @Override public void setInputStream(final InputStream in) { this.in = in; } @Override public void setOutputStream(final OutputStream out) { this.out = out; } @Override public void setErrorStream(final OutputStream err) { this.err = err; } @Override public void setExitCallback(final ExitCallback callback) { this.exit = callback; } @Override public void start(final Environment env) throws IOException { this.env = env; task.set(startExecutor.submit(new Runnable() { @Override public void run() { try { onStart(); @@ -155,7 +158,7 @@ synchronized (this) { //final Context old = sshScope.set(ctx); try { cmd = dispatcher.get(); cmd = dispatcher; cmd.setArguments(argv); cmd.setInputStream(in); cmd.setOutputStream(out); src/main/java/com/gitblit/transport/ssh/SshDaemon.java
@@ -18,70 +18,27 @@ import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.security.InvalidKeyException; import java.text.MessageFormat; import java.util.Arrays; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import javax.inject.Inject; import org.apache.mina.core.future.IoFuture; import org.apache.mina.core.future.IoFutureListener; import org.apache.mina.core.session.IoSession; import org.apache.mina.transport.socket.SocketSessionConfig; import org.apache.sshd.SshServer; import org.apache.sshd.common.Channel; import org.apache.sshd.common.Cipher; import org.apache.sshd.common.Compression; import org.apache.sshd.common.KeyExchange; import org.apache.sshd.common.KeyPairProvider; import org.apache.sshd.common.Mac; import org.apache.sshd.common.NamedFactory; import org.apache.sshd.common.Session; import org.apache.sshd.common.Signature; import org.apache.sshd.common.cipher.AES128CBC; import org.apache.sshd.common.cipher.AES192CBC; import org.apache.sshd.common.cipher.AES256CBC; import org.apache.sshd.common.cipher.BlowfishCBC; import org.apache.sshd.common.cipher.TripleDESCBC; import org.apache.sshd.common.compression.CompressionNone; import org.apache.sshd.common.mac.HMACMD5; import org.apache.sshd.common.mac.HMACMD596; import org.apache.sshd.common.mac.HMACSHA1; import org.apache.sshd.common.mac.HMACSHA196; import org.apache.sshd.common.random.BouncyCastleRandom; import org.apache.sshd.common.random.SingletonRandomFactory; import org.apache.sshd.common.signature.SignatureDSA; import org.apache.sshd.common.signature.SignatureRSA; import org.apache.sshd.common.util.SecurityUtils; import org.apache.sshd.server.CommandFactory; import org.apache.sshd.server.FileSystemFactory; import org.apache.sshd.server.FileSystemView; import org.apache.sshd.server.ForwardingFilter; import org.apache.sshd.server.PublickeyAuthenticator; import org.apache.sshd.server.SshFile; import org.apache.sshd.server.UserAuth; import org.apache.sshd.server.auth.UserAuthPublicKey; import org.apache.sshd.server.channel.ChannelDirectTcpip; import org.apache.sshd.server.channel.ChannelSession; import org.apache.sshd.server.kex.DHG1; import org.apache.sshd.server.kex.DHG14; import org.apache.sshd.server.keyprovider.PEMGeneratorHostKeyProvider; import org.apache.sshd.server.session.ServerSession; import org.apache.sshd.server.session.SessionFactory; import org.eclipse.jgit.internal.JGitText; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.gitblit.IStoredSettings; import com.gitblit.Keys; import com.gitblit.git.GitblitReceivePackFactory; import com.gitblit.git.GitblitUploadPackFactory; import com.gitblit.git.RepositoryResolver; import com.gitblit.manager.IGitblit; import com.gitblit.transport.ssh.commands.CreateRepository; import com.gitblit.transport.ssh.commands.DispatchCommand; import com.gitblit.transport.ssh.commands.VersionCommand; import com.gitblit.utils.IdGenerator; import com.gitblit.utils.StringUtils; import com.gitblit.utils.WorkQueue; /** * Manager for the ssh transport. Roughly analogous to the @@ -90,7 +47,7 @@ * @author Eric Myhre * */ public class SshDaemon extends SshServer { public class SshDaemon { private final Logger log = LoggerFactory.getLogger(SshDaemon.class); @@ -104,85 +61,72 @@ private static final String HOST_KEY_STORE = "sshKeyStore.pem"; private InetSocketAddress myAddress; private AtomicBoolean run; private final AtomicBoolean run; @SuppressWarnings("unused") private IGitblit gitblit; private final IGitblit gitblit; private final IdGenerator idGenerator; private final SshServer sshd; /** * Construct the Gitblit SSH daemon. * * @param gitblit */ @Inject SshDaemon(IGitblit gitblit, IdGenerator idGenerator, SshCommandFactory factory) { public SshDaemon(IGitblit gitblit, IdGenerator idGenerator) { this.gitblit = gitblit; this.idGenerator = idGenerator; IStoredSettings settings = gitblit.getSettings(); int port = settings.getInteger(Keys.git.sshPort, 0); String bindInterface = settings.getString(Keys.git.sshBindInterface, "localhost"); InetSocketAddress addr; if (StringUtils.isEmpty(bindInterface)) { myAddress = new InetSocketAddress(port); addr = new InetSocketAddress(port); } else { myAddress = new InetSocketAddress(bindInterface, port); addr = new InetSocketAddress(bindInterface, port); } setPort(myAddress.getPort()); setHost(myAddress.getHostName()); setup(); setKeyPairProvider(new PEMGeneratorHostKeyProvider(new File( sshd = SshServer.setUpDefaultServer(); sshd.setPort(addr.getPort()); sshd.setHost(addr.getHostName()); sshd.setKeyPairProvider(new PEMGeneratorHostKeyProvider(new File( gitblit.getBaseFolder(), HOST_KEY_STORE).getPath())); setPublickeyAuthenticator(new SshKeyAuthenticator(gitblit)); sshd.setPublickeyAuthenticator(new SshKeyAuthenticator(gitblit)); sshd.setPasswordAuthenticator(new SshPasswordAuthenticator(gitblit)); sshd.setSessionFactory(new SshSessionFactory(idGenerator)); sshd.setFileSystemFactory(new DisabledFilesystemFactory()); sshd.setForwardingFilter(new NonForwardingFilter()); DispatchCommand dispatcher = new DispatchCommand(); dispatcher.registerCommand(CreateRepository.class); dispatcher.registerCommand(VersionCommand.class); SshCommandFactory commandFactory = new SshCommandFactory( new RepositoryResolver<SshSession>(gitblit), new GitblitUploadPackFactory<SshSession>(gitblit), new GitblitReceivePackFactory<SshSession>(gitblit), new WorkQueue(idGenerator), dispatcher); sshd.setCommandFactory(commandFactory); run = new AtomicBoolean(false); setCommandFactory(factory); setSessionFactory(newSessionFactory(idGenerator)); } SessionFactory newSessionFactory(final IdGenerator idGenerator) { return new SessionFactory() { @Override protected ServerSession createSession(final IoSession io) throws Exception { log.info("connection accepted on " + io); if (io.getConfig() instanceof SocketSessionConfig) { final SocketSessionConfig c = (SocketSessionConfig) io .getConfig(); c.setKeepAlive(true); } ServerSession s = (ServerSession) super.createSession(io); SocketAddress peer = io.getRemoteAddress(); SshSession session = new SshSession(idGenerator.next(), peer); s.setAttribute(SshSession.KEY, session); io.getCloseFuture().addListener( new IoFutureListener<IoFuture>() { @Override public void operationComplete(IoFuture future) { log.info("connection closed on " + io); } }); return s; } }; } public int getPort() { return myAddress.getPort(); } public String formatUrl(String gituser, String servername, String repository) { if (getPort() == DEFAULT_PORT) { if (sshd.getPort() == DEFAULT_PORT) { // standard port return MessageFormat.format("{0}@{1}/{2}", gituser, servername, repository); } else { // non-standard port return MessageFormat.format("ssh://{0}@{1}:{2,number,0}/{3}", gituser, servername, getPort(), repository); gituser, servername, sshd.getPort(), repository); } } @@ -199,12 +143,12 @@ throw new IllegalStateException(JGitText.get().daemonAlreadyRunning); } super.start(); sshd.start(); run.set(true); log.info(MessageFormat.format( "SSH Daemon is listening on {0}:{1,number,0}", myAddress .getAddress().getHostAddress(), myAddress.getPort())); "SSH Daemon is listening on {0}:{1,number,0}", sshd.getHost(), sshd.getPort())); } /** @return true if this daemon is receiving connections. */ @@ -219,117 +163,10 @@ run.set(false); try { super.stop(); sshd.stop(); } catch (InterruptedException e) { log.error("SSH Daemon stop interrupted", e); } } } /** * Performs most of default configuration (setup random sources, setup * ciphers, etc; also, support for forwarding and filesystem is explicitly * disallowed). * * {@link #setKeyPairProvider(KeyPairProvider)} and * {@link #setPublickeyAuthenticator(PublickeyAuthenticator)} are left for * you. And applying {@link #setCommandFactory(CommandFactory)} is probably * wise if you want something to actually happen when users do successfully * authenticate. */ @SuppressWarnings("unchecked") public void setup() { if (!SecurityUtils.isBouncyCastleRegistered()) throw new RuntimeException("BC crypto not available"); setKeyExchangeFactories(Arrays.<NamedFactory<KeyExchange>> asList( new DHG14.Factory(), new DHG1.Factory())); setRandomFactory(new SingletonRandomFactory( new BouncyCastleRandom.Factory())); setupCiphers(); setCompressionFactories(Arrays .<NamedFactory<Compression>> asList(new CompressionNone.Factory())); setMacFactories(Arrays.<NamedFactory<Mac>> asList( new HMACMD5.Factory(), new HMACSHA1.Factory(), new HMACMD596.Factory(), new HMACSHA196.Factory())); setChannelFactories(Arrays.<NamedFactory<Channel>> asList( new ChannelSession.Factory(), new ChannelDirectTcpip.Factory())); setSignatureFactories(Arrays.<NamedFactory<Signature>> asList( new SignatureDSA.Factory(), new SignatureRSA.Factory())); setFileSystemFactory(new FileSystemFactory() { @Override public FileSystemView createFileSystemView(Session session) throws IOException { return new FileSystemView() { @Override public SshFile getFile(SshFile baseDir, String file) { return null; } @Override public SshFile getFile(String file) { return null; } }; } }); setForwardingFilter(new ForwardingFilter() { @Override public boolean canForwardAgent(ServerSession session) { return false; } @Override public boolean canForwardX11(ServerSession session) { return false; } @Override public boolean canConnect(InetSocketAddress address, ServerSession session) { return false; } @Override public boolean canListen(InetSocketAddress address, ServerSession session) { return false; } }); setUserAuthFactories(Arrays .<NamedFactory<UserAuth>> asList(new UserAuthPublicKey.Factory())); } protected void setupCiphers() { List<NamedFactory<Cipher>> avail = new LinkedList<NamedFactory<Cipher>>(); avail.add(new AES128CBC.Factory()); avail.add(new TripleDESCBC.Factory()); avail.add(new BlowfishCBC.Factory()); avail.add(new AES192CBC.Factory()); avail.add(new AES256CBC.Factory()); for (Iterator<NamedFactory<Cipher>> i = avail.iterator(); i.hasNext();) { final NamedFactory<Cipher> f = i.next(); try { final Cipher c = f.create(); final byte[] key = new byte[c.getBlockSize()]; final byte[] iv = new byte[c.getIVSize()]; c.init(Cipher.Mode.Encrypt, key, iv); } catch (InvalidKeyException e) { i.remove(); } catch (Exception e) { i.remove(); } } setCipherFactories(avail); } } src/main/java/com/gitblit/transport/ssh/SshPasswordAuthenticator.java
New file @@ -0,0 +1,50 @@ /* * Copyright 2014 gitblit.com. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. */ package com.gitblit.transport.ssh; import java.util.Locale; import org.apache.sshd.server.PasswordAuthenticator; import org.apache.sshd.server.session.ServerSession; import com.gitblit.manager.IGitblit; import com.gitblit.models.UserModel; /** * * @author James Moger * */ public class SshPasswordAuthenticator implements PasswordAuthenticator { protected final IGitblit gitblit; public SshPasswordAuthenticator(IGitblit gitblit) { this.gitblit = gitblit; } @Override public boolean authenticate(String username, String password, ServerSession session) { username = username.toLowerCase(Locale.US); UserModel user = gitblit.authenticate(username, password.toCharArray()); if (user != null) { SshSession sd = session.getAttribute(SshSession.KEY); sd.authenticationSuccess(username); return true; } return false; } } src/main/java/com/gitblit/transport/ssh/SshSessionFactory.java
New file @@ -0,0 +1,69 @@ /* * Copyright 2014 gitblit.com. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.gitblit.transport.ssh; import java.net.SocketAddress; import org.apache.mina.core.future.IoFuture; import org.apache.mina.core.future.IoFutureListener; import org.apache.mina.core.session.IoSession; import org.apache.mina.transport.socket.SocketSessionConfig; import org.apache.sshd.server.session.ServerSession; import org.apache.sshd.server.session.SessionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.gitblit.utils.IdGenerator; /** * * @author James Moger * */ public class SshSessionFactory extends SessionFactory { private final Logger log = LoggerFactory.getLogger(getClass()); private final IdGenerator idGenerator; public SshSessionFactory(IdGenerator idGenerator) { this.idGenerator = idGenerator; } @Override protected ServerSession createSession(final IoSession io) throws Exception { log.info("connection accepted on " + io); if (io.getConfig() instanceof SocketSessionConfig) { final SocketSessionConfig c = (SocketSessionConfig) io.getConfig(); c.setKeepAlive(true); } final ServerSession s = (ServerSession) super.createSession(io); SocketAddress peer = io.getRemoteAddress(); SshSession session = new SshSession(idGenerator.next(), peer); s.setAttribute(SshSession.KEY, session); io.getCloseFuture().addListener(new IoFutureListener<IoFuture>() { @Override public void operationComplete(IoFuture future) { log.info("connection closed on " + io); } }); return s; } } src/main/java/com/gitblit/transport/ssh/commands/DispatchCommand.java
@@ -16,12 +16,12 @@ import java.io.IOException; import java.io.StringWriter; import java.text.MessageFormat; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import javax.inject.Provider; import org.apache.sshd.server.Command; import org.apache.sshd.server.Environment; @@ -42,28 +42,26 @@ @Argument(index = 1, multiValued = true, metaVar = "ARG") private List<String> args = new ArrayList<String>(); private Set<Provider<Command>> commands; private Map<String, Provider<Command>> map; private Set<Class<? extends Command>> commands; private Map<String, Class<? extends Command>> map; public DispatchCommand() {} public DispatchCommand(Map<String, Provider<Command>> map) { this.map = map; public DispatchCommand() { commands = new HashSet<Class<? extends Command>>(); } public void setMap(Map<String, Provider<Command>> m) { map = m; public void registerCommand(Class<? extends Command> cmd) { if (!cmd.isAnnotationPresent(CommandMetaData.class)) { throw new RuntimeException(MessageFormat.format("{0} must be annotated with {1}!", cmd.getName(), CommandMetaData.class.getName())); } commands.add(cmd); } public DispatchCommand(Set<Provider<Command>> commands) { this.commands = commands; } private Map<String, Provider<Command>> getMap() { private Map<String, Class<? extends Command>> getMap() { if (map == null) { map = Maps.newHashMapWithExpectedSize(commands.size()); for (Provider<Command> cmd : commands) { CommandMetaData meta = cmd.get().getClass().getAnnotation(CommandMetaData.class); for (Class<? extends Command> cmd : commands) { CommandMetaData meta = cmd.getAnnotation(CommandMetaData.class); map.put(meta.name(), cmd); } } @@ -80,15 +78,20 @@ throw new UnloggedFailure(1, msg.toString()); } final Provider<Command> p = getMap().get(commandName); if (p == null) { final Class<? extends Command> c = getMap().get(commandName); if (c == null) { String msg = (getName().isEmpty() ? "Gitblit" : getName()) + ": " + commandName + ": not found"; throw new UnloggedFailure(1, msg); } final Command cmd = p.get(); Command cmd = null; try { cmd = c.newInstance(); } catch (Exception e) { throw new UnloggedFailure(1, MessageFormat.format("Failed to instantiate {0} command", commandName)); } if (cmd instanceof BaseCommand) { BaseCommand bc = (BaseCommand) cmd; if (getName().isEmpty()) { @@ -116,6 +119,7 @@ } } @Override protected String usage() { final StringBuilder usage = new StringBuilder(); usage.append("Available commands"); @@ -127,15 +131,15 @@ usage.append("\n"); int maxLength = -1; Map<String, Provider<Command>> m = getMap(); Map<String, Class<? extends Command>> m = getMap(); for (String name : m.keySet()) { maxLength = Math.max(maxLength, name.length()); } String format = "%-" + maxLength + "s %s"; for (String name : Sets.newTreeSet(m.keySet())) { final Provider<Command> p = m.get(name); final Class<? extends Command> c = m.get(name); usage.append(" "); CommandMetaData meta = p.get().getClass().getAnnotation(CommandMetaData.class); CommandMetaData meta = c.getAnnotation(CommandMetaData.class); if (meta != null) { usage.append(String.format(format, name, Strings.nullToEmpty(meta.description()))); src/main/java/com/gitblit/utils/WorkQueue.java
@@ -14,11 +14,6 @@ package com.gitblit.utils; import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.Thread.UncaughtExceptionHandler; import java.util.ArrayList; import java.util.Collection; @@ -38,7 +33,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.inject.Inject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; /** Delayed execution of tasks using a background thread pool. */ public class WorkQueue { @@ -55,7 +53,6 @@ private final IdGenerator idGenerator; private final CopyOnWriteArrayList<Executor> queues; @Inject public WorkQueue(final IdGenerator idGenerator) { this.idGenerator = idGenerator; this.queues = new CopyOnWriteArrayList<Executor>(); @@ -268,6 +265,7 @@ return startTime; } @Override public boolean cancel(boolean mayInterruptIfRunning) { if (task.cancel(mayInterruptIfRunning)) { // Tiny abuse of running: if the task needs to know it was @@ -289,35 +287,43 @@ } } @Override public int compareTo(Delayed o) { return task.compareTo(o); } @Override public V get() throws InterruptedException, ExecutionException { return task.get(); } @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return task.get(timeout, unit); } @Override public long getDelay(TimeUnit unit) { return task.getDelay(unit); } @Override public boolean isCancelled() { return task.isCancelled(); } @Override public boolean isDone() { return task.isDone(); } @Override public boolean isPeriodic() { return task.isPeriodic(); } @Override public void run() { if (running.compareAndSet(false, true)) { try {