From 699e71e76b15081baf746c6ce9c9144f7e5f1ff9 Mon Sep 17 00:00:00 2001 From: James Moger <james.moger@gitblit.com> Date: Mon, 30 Sep 2013 10:11:28 -0400 Subject: [PATCH] Trim trailing whitespace and organize imports --- src/main/java/com/gitblit/fanout/FanoutNioService.java | 43 ++++++++++++++++++++++--------------------- 1 files changed, 22 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/gitblit/fanout/FanoutNioService.java b/src/main/java/com/gitblit/fanout/FanoutNioService.java index 65d022a..e7aff34 100644 --- a/src/main/java/com/gitblit/fanout/FanoutNioService.java +++ b/src/main/java/com/gitblit/fanout/FanoutNioService.java @@ -44,7 +44,7 @@ * * This implementation uses channels and selectors, which are the Java analog of * the Linux epoll mechanism used in the original fanout C code. - * + * * @author James Moger * */ @@ -54,7 +54,7 @@ private volatile ServerSocketChannel serviceCh; private volatile Selector selector; - + public static void main(String[] args) throws Exception { FanoutNioService pubsub = new FanoutNioService(null, DEFAULT_PORT); pubsub.setStrictRequestTermination(false); @@ -64,7 +64,7 @@ /** * Create a single-threaded fanout service. - * + * * @param host * @param port * the port for running the fanout PubSub service @@ -73,10 +73,10 @@ public FanoutNioService(int port) { this(null, port); } - + /** * Create a single-threaded fanout service. - * + * * @param bindInterface * the ip address to bind for the service, may be null * @param port @@ -86,7 +86,7 @@ public FanoutNioService(String bindInterface, int port) { super(bindInterface, port, "Fanout nio service"); } - + @Override protected boolean isConnected() { return serviceCh != null; @@ -102,10 +102,10 @@ serviceCh.socket().bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port)); selector = Selector.open(); serviceCh.register(selector, SelectionKey.OP_ACCEPT); - logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}", + logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}", name, host == null ? "0.0.0.0" : host, port)); } catch (IOException e) { - logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}", + logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}", name, name, host == null ? "0.0.0.0" : host, port), e); return false; } @@ -122,11 +122,11 @@ for (Map.Entry<String, SocketChannel> client : clients.entrySet()) { closeClientSocket(client.getKey(), client.getValue()); } - + // close service socket channel logger.debug(MessageFormat.format("closing {0} socket channel", name)); serviceCh.socket().close(); - serviceCh.close(); + serviceCh.close(); serviceCh = null; selector.close(); selector = null; @@ -142,7 +142,7 @@ Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> keyItr = keys.iterator(); while (keyItr.hasNext()) { - SelectionKey key = (SelectionKey) keyItr.next(); + SelectionKey key = keyItr.next(); if (key.isAcceptable()) { // new fanout client connection ServerSocketChannel sch = (ServerSocketChannel) key.channel(); @@ -213,7 +213,7 @@ } } } - + protected void closeClientSocket(String id, SocketChannel ch) { try { ch.close(); @@ -221,10 +221,11 @@ logger.error(MessageFormat.format("fanout connection {0}", id), e); } } - + + @Override protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) { super.broadcast(connections, channel, message); - + // register queued write Map<String, SocketChannel> sockets = getCurrentClientSockets(); for (FanoutServiceConnection connection : connections) { @@ -241,7 +242,7 @@ } } } - + protected Map<String, SocketChannel> getCurrentClientSockets() { Map<String, SocketChannel> sockets = new HashMap<String, SocketChannel>(); for (SelectionKey key : selector.keys()) { @@ -253,11 +254,11 @@ } return sockets; } - + /** * FanoutNioConnection handles reading/writing messages from a remote fanout * connection. - * + * * @author James Moger * */ @@ -276,7 +277,7 @@ replyQueue = new ArrayList<String>(); decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder(); } - + protected void read(SocketChannel ch, boolean strictRequestTermination) throws CharacterCodingException, IOException { long bytesRead = 0; readBuffer.clear(); @@ -293,7 +294,7 @@ String [] lines = req.split(strictRequestTermination ? "\n" : "\n|\r"); requestQueue.addAll(Arrays.asList(lines)); } - + protected void write(SocketChannel ch) throws IOException { Iterator<String> itr = replyQueue.iterator(); while (itr.hasNext()) { @@ -306,7 +307,7 @@ writeBuffer.put((byte) 0xa); } writeBuffer.flip(); - + // loop until write buffer has been completely sent int written = 0; int toWrite = writeBuffer.remaining(); @@ -316,7 +317,7 @@ Thread.sleep(10); } catch (Exception x) { } - } + } itr.remove(); } writeBuffer.clear(); -- Gitblit v1.9.1