Paul Martin
2016-04-30 a502d96a860456ec5e8c96761db70f7cabb74751
src/main/java/com/gitblit/fanout/FanoutClient.java
@@ -39,7 +39,7 @@
/**
 * Fanout client class.
 *
 *
 * @author James Moger
 *
 */
@@ -57,24 +57,26 @@
   private volatile Selector selector;
   private volatile SocketChannel socketCh;
   private Thread clientThread;
   private final AtomicBoolean isConnected;
   private final AtomicBoolean isRunning;
   private final AtomicBoolean isAutomaticReconnect;
   private final ByteBuffer writeBuffer;
   private final ByteBuffer readBuffer;
   private final CharsetDecoder decoder;
   private final Set<String> subscriptions;
   private boolean resubscribe;
   public interface FanoutListener {
      public void pong(Date timestamp);
      public void announcement(String channel, String message);
   }
   public static class FanoutAdapter implements FanoutListener {
      @Override
      public void pong(Date timestamp) { }
      @Override
      public void announcement(String channel, String message) { }
   }
@@ -86,20 +88,20 @@
         public void pong(Date timestamp) {
            System.out.println("Pong. " + timestamp);
         }
         @Override
         public void announcement(String channel, String message) {
            System.out.println(MessageFormat.format("Here ye, Here ye. {0} says {1}", channel, message));
         }
      });
      client.start();
      Thread.sleep(5000);
      client.ping();
      client.subscribe("james");
      client.announce("james", "12345");
      client.announce("james", "12345");
      client.subscribe("c52f99d16eb5627877ae957df7ce1be102783bd5");
      while (true) {
         Thread.sleep(10000);
         client.ping();
@@ -126,11 +128,11 @@
   public void removeListener(FanoutListener listener) {
      listeners.remove(listener);
   }
   public boolean isAutomaticReconnect() {
      return isAutomaticReconnect.get();
   }
   public void setAutomaticReconnect(boolean value) {
      isAutomaticReconnect.set(value);
   }
@@ -144,21 +146,21 @@
      confirmConnection();
      write("status");
   }
   public void subscribe(String channel) {
      confirmConnection();
      if (subscriptions.add(channel)) {
         write("subscribe " + channel);
      }
   }
   public void unsubscribe(String channel) {
      confirmConnection();
      if (subscriptions.remove(channel)) {
         write("unsubscribe " + channel);
      }
   }
   public void announce(String channel, String message) {
      confirmConnection();
      write("announce " + channel + " " + message);
@@ -169,11 +171,11 @@
         throw new RuntimeException("Fanout client is disconnected!");
      }
   }
   public boolean isConnected() {
      return isRunning.get() && socketCh != null && isConnected.get();
   }
   /**
    * Start client connection and return immediately.
    */
@@ -185,13 +187,13 @@
      clientThread = new Thread(this, "Fanout client");
      clientThread.start();
   }
   /**
    * Start client connection and wait until it has connected.
    */
   public void startSynchronously() {
      start();
      while (!isConnected()) {
      while (!isConnected()) {
         try {
            Thread.sleep(100);
         } catch (Exception e) {
@@ -221,8 +223,8 @@
   @Override
   public void run() {
      resetState();
      isRunning.set(true);
      isRunning.set(true);
      while (isRunning.get()) {
         // (re)connect
         if (socketCh == null) {
@@ -231,7 +233,7 @@
               socketCh = SocketChannel.open(new InetSocketAddress(addr, port));
               socketCh.configureBlocking(false);
               selector = Selector.open();
               id = FanoutConstants.getLocalSocketId(socketCh.socket());
               id = FanoutConstants.getLocalSocketId(socketCh.socket());
               socketCh.register(selector, SelectionKey.OP_READ);
            } catch (Exception e) {
               logger.error(MessageFormat.format("failed to open client connection to {0}:{1,number,0}", host, port), e);
@@ -242,7 +244,7 @@
               continue;
            }
         }
         // read/write
         try {
            selector.select(clientTimeout);
@@ -251,7 +253,7 @@
            while (i.hasNext()) {
               SelectionKey key = i.next();
               i.remove();
               if (key.isReadable()) {
                  // read message
                  String content = read();
@@ -266,7 +268,7 @@
                  // resubscribe
                  if (resubscribe) {
                     resubscribe = false;
                     logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size()));
                     logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size()));
                     for (String subscription : subscriptions) {
                        write("subscribe " + subscription);
                     }
@@ -276,25 +278,25 @@
            }
         } catch (IOException e) {
            logger.error(MessageFormat.format("fanout client {0} error: {1}", id, e.getMessage()));
            closeChannel();
            closeChannel();
            if (!isAutomaticReconnect.get()) {
               isRunning.set(false);
               continue;
            }
         }
      }
      closeChannel();
      resetState();
   }
   protected void resetState() {
      readBuffer.clear();
      writeBuffer.clear();
      isRunning.set(false);
      isConnected.set(false);
   }
   private void closeChannel() {
      try {
         if (socketCh != null) {
@@ -315,7 +317,7 @@
            long time = Long.parseLong(fields[0]);
            Date date = new Date(time);
            firePong(date);
         } catch (Exception e) {
         } catch (Exception e) {
         }
         return true;
      } else if (fields.length == 2) {
@@ -366,7 +368,7 @@
         }
      }
   }
   protected synchronized String read() throws IOException {
      readBuffer.clear();
      long len = socketCh.read(readBuffer);
@@ -382,7 +384,7 @@
         return content;
      }
   }
   protected synchronized boolean write(String message) {
      try {
         logger.info(MessageFormat.format("fanout client {0} > {1}", id, message));