From 436bd3f0ecdee282c503a9eb0f7a240b7a68ff49 Mon Sep 17 00:00:00 2001
From: James Moger <james.moger@gitblit.com>
Date: Fri, 11 Apr 2014 14:51:50 -0400
Subject: [PATCH] Merged #6 "Support serving repositories over the SSH transport"

---
 src/main/java/com/gitblit/utils/WorkQueue.java |  346 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 346 insertions(+), 0 deletions(-)

diff --git a/src/main/java/com/gitblit/utils/WorkQueue.java b/src/main/java/com/gitblit/utils/WorkQueue.java
new file mode 100644
index 0000000..ba49a4c
--- /dev/null
+++ b/src/main/java/com/gitblit/utils/WorkQueue.java
@@ -0,0 +1,346 @@
+// Copyright (C) 2009 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gitblit.utils;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/** Delayed execution of tasks using a background thread pool. */
+public class WorkQueue {
+  private static final Logger log = LoggerFactory.getLogger(WorkQueue.class);
+  private static final UncaughtExceptionHandler LOG_UNCAUGHT_EXCEPTION =
+      new UncaughtExceptionHandler() {
+        @Override
+        public void uncaughtException(Thread t, Throwable e) {
+          log.error("WorkQueue thread " + t.getName() + " threw exception", e);
+        }
+      };
+
+  private Executor defaultQueue;
+  private final IdGenerator idGenerator;
+  private final CopyOnWriteArrayList<Executor> queues;
+
+  public WorkQueue(final IdGenerator idGenerator) {
+    this.idGenerator = idGenerator;
+    this.queues = new CopyOnWriteArrayList<Executor>();
+  }
+
+  /** Get the default work queue, for miscellaneous tasks. */
+  public synchronized Executor getDefaultQueue() {
+    if (defaultQueue == null) {
+      defaultQueue = createQueue(1, "WorkQueue");
+    }
+    return defaultQueue;
+  }
+
+  /** Create a new executor queue with one thread. */
+  public Executor createQueue(final int poolsize, final String prefix) {
+    final Executor r = new Executor(poolsize, prefix);
+    r.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+    r.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+    queues.add(r);
+    return r;
+  }
+
+  /** Get all of the tasks currently scheduled in any work queue. */
+  public List<Task<?>> getTasks() {
+    final List<Task<?>> r = new ArrayList<Task<?>>();
+    for (final Executor e : queues) {
+      e.addAllTo(r);
+    }
+    return r;
+  }
+
+  public <T> List<T> getTaskInfos(TaskInfoFactory<T> factory) {
+    List<T> taskInfos = Lists.newArrayList();
+    for (Executor exe : queues) {
+      for (Task<?> task : exe.getTasks()) {
+        taskInfos.add(factory.getTaskInfo(task));
+      }
+    }
+    return taskInfos;
+  }
+
+  /** Locate a task by its unique id, null if no task matches. */
+  public Task<?> getTask(final int id) {
+    Task<?> result = null;
+    for (final Executor e : queues) {
+      final Task<?> t = e.getTask(id);
+      if (t != null) {
+        if (result != null) {
+          // Don't return the task if we have a duplicate. Lie instead.
+          return null;
+        } else {
+          result = t;
+        }
+      }
+    }
+    return result;
+  }
+
+  public void stop() {
+    for (final Executor p : queues) {
+      p.shutdown();
+      boolean isTerminated;
+      do {
+        try {
+          isTerminated = p.awaitTermination(10, TimeUnit.SECONDS);
+        } catch (InterruptedException ie) {
+          isTerminated = false;
+        }
+      } while (!isTerminated);
+    }
+    queues.clear();
+  }
+
+  /** An isolated queue. */
+  public class Executor extends ScheduledThreadPoolExecutor {
+    private final ConcurrentHashMap<Integer, Task<?>> all;
+
+    Executor(final int corePoolSize, final String prefix) {
+      super(corePoolSize, new ThreadFactory() {
+        private final ThreadFactory parent = Executors.defaultThreadFactory();
+        private final AtomicInteger tid = new AtomicInteger(1);
+
+        @Override
+        public Thread newThread(final Runnable task) {
+          final Thread t = parent.newThread(task);
+          t.setName(prefix + "-" + tid.getAndIncrement());
+          t.setUncaughtExceptionHandler(LOG_UNCAUGHT_EXCEPTION);
+          return t;
+        }
+      });
+
+      all = new ConcurrentHashMap<Integer, Task<?>>( //
+          corePoolSize << 1, // table size
+          0.75f, // load factor
+          corePoolSize + 4 // concurrency level
+          );
+    }
+
+    public void unregisterWorkQueue() {
+      queues.remove(this);
+    }
+
+    @Override
+    protected <V> RunnableScheduledFuture<V> decorateTask(
+        final Runnable runnable, RunnableScheduledFuture<V> r) {
+      r = super.decorateTask(runnable, r);
+      for (;;) {
+        final int id = idGenerator.next();
+
+        Task<V> task;
+        task = new Task<V>(runnable, r, this, id);
+
+        if (all.putIfAbsent(task.getTaskId(), task) == null) {
+          return task;
+        }
+      }
+    }
+
+    @Override
+    protected <V> RunnableScheduledFuture<V> decorateTask(
+        final Callable<V> callable, final RunnableScheduledFuture<V> task) {
+      throw new UnsupportedOperationException("Callable not implemented");
+    }
+
+    void remove(final Task<?> task) {
+      all.remove(task.getTaskId(), task);
+    }
+
+    Task<?> getTask(final int id) {
+      return all.get(id);
+    }
+
+    void addAllTo(final List<Task<?>> list) {
+      list.addAll(all.values()); // iterator is thread safe
+    }
+
+    Collection<Task<?>> getTasks() {
+      return all.values();
+    }
+  }
+
+  /** Runnable needing to know it was canceled. */
+  public interface CancelableRunnable extends Runnable {
+    /** Notifies the runnable it was canceled. */
+    public void cancel();
+  }
+
+  /** A wrapper around a scheduled Runnable, as maintained in the queue. */
+  public static class Task<V> implements RunnableScheduledFuture<V> {
+    /**
+     * Summarized status of a single task.
+     * <p>
+     * Tasks have the following state flow:
+     * <ol>
+     * <li>{@link #SLEEPING}: if scheduled with a non-zero delay.</li>
+     * <li>{@link #READY}: waiting for an available worker thread.</li>
+     * <li>{@link #RUNNING}: actively executing on a worker thread.</li>
+     * <li>{@link #DONE}: finished executing, if not periodic.</li>
+     * </ol>
+     */
+    public static enum State {
+      // Ordered like this so ordinal matches the order we would
+      // prefer to see tasks sorted in: done before running,
+      // running before ready, ready before sleeping.
+      //
+      DONE, CANCELLED, RUNNING, READY, SLEEPING, OTHER
+    }
+
+    private final Runnable runnable;
+    private final RunnableScheduledFuture<V> task;
+    private final Executor executor;
+    private final int taskId;
+    private final AtomicBoolean running;
+    private final Date startTime;
+
+    Task(Runnable runnable, RunnableScheduledFuture<V> task, Executor executor,
+        int taskId) {
+      this.runnable = runnable;
+      this.task = task;
+      this.executor = executor;
+      this.taskId = taskId;
+      this.running = new AtomicBoolean();
+      this.startTime = new Date();
+    }
+
+    public int getTaskId() {
+      return taskId;
+    }
+
+    public State getState() {
+      if (isCancelled()) {
+        return State.CANCELLED;
+      } else if (isDone() && !isPeriodic()) {
+        return State.DONE;
+      } else if (running.get()) {
+        return State.RUNNING;
+      }
+
+      final long delay = getDelay(TimeUnit.MILLISECONDS);
+      if (delay <= 0) {
+        return State.READY;
+      } else if (0 < delay) {
+        return State.SLEEPING;
+      }
+
+      return State.OTHER;
+    }
+
+    public Date getStartTime() {
+      return startTime;
+    }
+
+    @Override
+	public boolean cancel(boolean mayInterruptIfRunning) {
+      if (task.cancel(mayInterruptIfRunning)) {
+        // Tiny abuse of running: if the task needs to know it was
+        // canceled (to clean up resources) and it hasn't started
+        // yet the task's run method won't execute. So we tag it
+        // as running and allow it to clean up. This ensures we do
+        // not invoke cancel twice.
+        //
+        if (runnable instanceof CancelableRunnable
+            && running.compareAndSet(false, true)) {
+          ((CancelableRunnable) runnable).cancel();
+        }
+        executor.remove(this);
+        executor.purge();
+        return true;
+
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+	public int compareTo(Delayed o) {
+      return task.compareTo(o);
+    }
+
+    @Override
+	public V get() throws InterruptedException, ExecutionException {
+      return task.get();
+    }
+
+    @Override
+	public V get(long timeout, TimeUnit unit) throws InterruptedException,
+        ExecutionException, TimeoutException {
+      return task.get(timeout, unit);
+    }
+
+    @Override
+	public long getDelay(TimeUnit unit) {
+      return task.getDelay(unit);
+    }
+
+    @Override
+	public boolean isCancelled() {
+      return task.isCancelled();
+    }
+
+    @Override
+	public boolean isDone() {
+      return task.isDone();
+    }
+
+    @Override
+	public boolean isPeriodic() {
+      return task.isPeriodic();
+    }
+
+    @Override
+	public void run() {
+      if (running.compareAndSet(false, true)) {
+        try {
+          task.run();
+        } finally {
+          if (isPeriodic()) {
+            running.set(false);
+          } else {
+            executor.remove(this);
+          }
+        }
+      }
+    }
+
+    @Override
+    public String toString() {
+      return runnable.toString();
+    }
+  }
+}

--
Gitblit v1.9.1