Paul Martin
2016-04-30 a502d96a860456ec5e8c96761db70f7cabb74751
src/main/java/com/gitblit/fanout/FanoutNioService.java
@@ -44,7 +44,7 @@
 *
 * This implementation uses channels and selectors, which are the Java analog of
 * the Linux epoll mechanism used in the original fanout C code.
 *
 *
 * @author James Moger
 *
 */
@@ -54,7 +54,7 @@
   private volatile ServerSocketChannel serviceCh;
   private volatile Selector selector;
   public static void main(String[] args) throws Exception {
      FanoutNioService pubsub = new FanoutNioService(null, DEFAULT_PORT);
      pubsub.setStrictRequestTermination(false);
@@ -64,7 +64,7 @@
   /**
    * Create a single-threaded fanout service.
    *
    *
    * @param host
    * @param port
    *            the port for running the fanout PubSub service
@@ -73,10 +73,10 @@
   public FanoutNioService(int port) {
      this(null, port);
   }
   /**
    * Create a single-threaded fanout service.
    *
    *
    * @param bindInterface
    *            the ip address to bind for the service, may be null
    * @param port
@@ -86,7 +86,7 @@
   public FanoutNioService(String bindInterface, int port) {
      super(bindInterface, port, "Fanout nio service");
   }
   @Override
   protected boolean isConnected() {
      return serviceCh != null;
@@ -102,10 +102,10 @@
            serviceCh.socket().bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));
            selector = Selector.open();
            serviceCh.register(selector, SelectionKey.OP_ACCEPT);
            logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}",
            logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}",
                  name, host == null ? "0.0.0.0" : host, port));
         } catch (IOException e) {
            logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}",
            logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}",
                  name, name, host == null ? "0.0.0.0" : host, port), e);
            return false;
         }
@@ -122,11 +122,11 @@
            for (Map.Entry<String, SocketChannel> client : clients.entrySet()) {
               closeClientSocket(client.getKey(), client.getValue());
            }
            // close service socket channel
            logger.debug(MessageFormat.format("closing {0} socket channel", name));
            serviceCh.socket().close();
            serviceCh.close();
            serviceCh.close();
            serviceCh = null;
            selector.close();
            selector = null;
@@ -142,7 +142,7 @@
         Set<SelectionKey> keys = selector.selectedKeys();
         Iterator<SelectionKey> keyItr = keys.iterator();
         while (keyItr.hasNext()) {
            SelectionKey key = (SelectionKey) keyItr.next();
            SelectionKey key = keyItr.next();
            if (key.isAcceptable()) {
               // new fanout client connection
               ServerSocketChannel sch = (ServerSocketChannel) key.channel();
@@ -213,7 +213,7 @@
         }
      }
   }
   protected void closeClientSocket(String id, SocketChannel ch) {
      try {
         ch.close();
@@ -221,10 +221,11 @@
         logger.error(MessageFormat.format("fanout connection {0}", id), e);
      }
   }
   @Override
   protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {
      super.broadcast(connections, channel, message);
      // register queued write
      Map<String, SocketChannel> sockets = getCurrentClientSockets();
      for (FanoutServiceConnection connection : connections) {
@@ -241,7 +242,7 @@
         }
      }
   }
   protected Map<String, SocketChannel> getCurrentClientSockets() {
      Map<String, SocketChannel> sockets = new HashMap<String, SocketChannel>();
      for (SelectionKey key : selector.keys()) {
@@ -253,11 +254,11 @@
      }
      return sockets;
   }
   /**
    * FanoutNioConnection handles reading/writing messages from a remote fanout
    * connection.
    *
    *
    * @author James Moger
    *
    */
@@ -276,7 +277,7 @@
         replyQueue = new ArrayList<String>();
         decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder();
      }
      protected void read(SocketChannel ch, boolean strictRequestTermination) throws CharacterCodingException, IOException {
         long bytesRead = 0;
         readBuffer.clear();
@@ -293,7 +294,7 @@
         String [] lines = req.split(strictRequestTermination ? "\n" : "\n|\r");
         requestQueue.addAll(Arrays.asList(lines));
      }
      protected void write(SocketChannel ch) throws IOException {
         Iterator<String> itr = replyQueue.iterator();
         while (itr.hasNext()) {
@@ -306,7 +307,7 @@
               writeBuffer.put((byte) 0xa);
            }
            writeBuffer.flip();
            // loop until write buffer has been completely sent
            int written = 0;
            int toWrite = writeBuffer.remaining();
@@ -316,7 +317,7 @@
                  Thread.sleep(10);
               } catch (Exception x) {
               }
            }
            }
            itr.remove();
         }
         writeBuffer.clear();