From 699e71e76b15081baf746c6ce9c9144f7e5f1ff9 Mon Sep 17 00:00:00 2001 From: James Moger <james.moger@gitblit.com> Date: Mon, 30 Sep 2013 10:11:28 -0400 Subject: [PATCH] Trim trailing whitespace and organize imports --- src/main/java/com/gitblit/fanout/FanoutService.java | 142 +++++++++++++++++++++++----------------------- 1 files changed, 71 insertions(+), 71 deletions(-) diff --git a/src/main/java/com/gitblit/fanout/FanoutService.java b/src/main/java/com/gitblit/fanout/FanoutService.java index cbfd8a2..e0e4d64 100644 --- a/src/main/java/com/gitblit/fanout/FanoutService.java +++ b/src/main/java/com/gitblit/fanout/FanoutService.java @@ -37,29 +37,29 @@ /** * Base class for Fanout service implementations. - * + * * Subclass implementations can be used as a Sparkleshare PubSub notification * server. This allows Sparkleshare to be used in conjunction with Gitblit * behind a corporate firewall that restricts or prohibits client internet access * to the default Sparkleshare PubSub server: notifications.sparkleshare.org - * + * * @author James Moger * */ public abstract class FanoutService implements Runnable { private final static Logger logger = LoggerFactory.getLogger(FanoutService.class); - + public final static int DEFAULT_PORT = 17000; - + protected final static int serviceTimeout = 5000; protected final String host; protected final int port; - protected final String name; - + protected final String name; + private Thread serviceThread; - + private final Map<String, FanoutServiceConnection> connections; private final Map<String, Set<FanoutServiceConnection>> subscriptions; @@ -67,7 +67,7 @@ private final AtomicBoolean strictRequestTermination; private final AtomicBoolean allowAllChannelAnnouncements; private final AtomicInteger concurrentConnectionLimit; - + private final Date bootDate; private final AtomicLong rejectedConnectionCount; private final AtomicInteger peakConnectionCount; @@ -82,16 +82,16 @@ this.host = host; this.port = port; this.name = name; - + connections = new ConcurrentHashMap<String, FanoutServiceConnection>(); subscriptions = new ConcurrentHashMap<String, Set<FanoutServiceConnection>>(); subscriptions.put(FanoutConstants.CH_ALL, new ConcurrentSkipListSet<FanoutServiceConnection>()); - + isRunning = new AtomicBoolean(false); strictRequestTermination = new AtomicBoolean(false); allowAllChannelAnnouncements = new AtomicBoolean(false); concurrentConnectionLimit = new AtomicInteger(0); - + bootDate = new Date(); rejectedConnectionCount = new AtomicLong(0); peakConnectionCount = new AtomicInteger(0); @@ -106,18 +106,18 @@ /* * Abstract methods */ - + protected abstract boolean isConnected(); - + protected abstract boolean connect(); - + protected abstract void listen() throws IOException; - + protected abstract void disconnect(); - + /** * Returns true if the service requires \n request termination. - * + * * @return true if request requires \n termination */ public boolean isStrictRequestTermination() { @@ -128,62 +128,62 @@ * Control the termination of fanout requests. If true, fanout requests must * be terminated with \n. If false, fanout requests may be terminated with * \n, \r, \r\n, or \n\r. This is useful for debugging with a telnet client. - * + * * @param isStrictTermination */ public void setStrictRequestTermination(boolean isStrictTermination) { strictRequestTermination.set(isStrictTermination); } - + /** * Returns the maximum allowable concurrent fanout connections. - * + * * @return the maximum allowable concurrent connection count */ public int getConcurrentConnectionLimit() { return concurrentConnectionLimit.get(); } - + /** * Sets the maximum allowable concurrent fanout connection count. - * + * * @param value */ public void setConcurrentConnectionLimit(int value) { concurrentConnectionLimit.set(value); } - + /** * Returns true if connections are allowed to announce on the all channel. - * + * * @return true if connections are allowed to announce on the all channel */ public boolean allowAllChannelAnnouncements() { return allowAllChannelAnnouncements.get(); } - + /** * Allows/prohibits connections from announcing on the ALL channel. - * + * * @param value */ public void setAllowAllChannelAnnouncements(boolean value) { allowAllChannelAnnouncements.set(value); } - + /** * Returns the current connections - * + * * @param channel * @return map of current connections keyed by their id */ public Map<String, FanoutServiceConnection> getCurrentConnections() { return connections; } - + /** * Returns all subscriptions - * + * * @return map of current subscriptions keyed by channel name */ public Map<String, Set<FanoutServiceConnection>> getCurrentSubscriptions() { @@ -192,7 +192,7 @@ /** * Returns the subscriptions for the specified channel - * + * * @param channel * @return set of subscribed connections for the specified channel */ @@ -202,17 +202,17 @@ /** * Returns the runtime statistics object for this service. - * + * * @return stats */ public FanoutStats getStatistics() { FanoutStats stats = new FanoutStats(); - + // settings stats.allowAllChannelAnnouncements = allowAllChannelAnnouncements(); stats.concurrentConnectionLimit = getConcurrentConnectionLimit(); stats.strictRequestTermination = isStrictRequestTermination(); - + // runtime stats stats.bootDate = bootDate; stats.rejectedConnectionCount = rejectedConnectionCount.get(); @@ -222,16 +222,16 @@ stats.totalMessages = totalMessages.get(); stats.totalSubscribes = totalSubscribes.get(); stats.totalUnsubscribes = totalUnsubscribes.get(); - stats.totalPings = totalPings.get(); + stats.totalPings = totalPings.get(); stats.currentConnections = connections.size(); stats.currentChannels = subscriptions.size(); stats.currentSubscriptions = subscriptions.size() * connections.size(); return stats; } - + /** * Returns true if the service is ready. - * + * * @return true, if the service is ready */ public boolean isReady() { @@ -243,7 +243,7 @@ /** * Start the Fanout service thread and immediatel return. - * + * */ public void start() { if (isRunning.get()) { @@ -254,10 +254,10 @@ serviceThread.setName(MessageFormat.format("{0} {1}:{2,number,0}", name, host == null ? "all" : host, port)); serviceThread.start(); } - + /** * Start the Fanout service thread and wait until it is accepting connections. - * + * */ public void startSynchronously() { start(); @@ -268,7 +268,7 @@ } } } - + /** * Stop the Fanout service. This method returns when the service has been * completely shutdown. @@ -290,7 +290,7 @@ } logger.info(MessageFormat.format("stopped {0}", name)); } - + /** * Main execution method of the service */ @@ -314,10 +314,10 @@ } } } - disconnect(); + disconnect(); resetState(); } - + protected void resetState() { // reset state data connections.clear(); @@ -334,23 +334,23 @@ /** * Configure the client connection socket. - * + * * @param socket * @throws SocketException */ protected void configureClientSocket(Socket socket) throws SocketException { - socket.setKeepAlive(true); + socket.setKeepAlive(true); socket.setSoLinger(true, 0); // immediately discard any remaining data } - + /** * Add the connection to the connections map. - * + * * @param connection * @return false if the connection was rejected due to too many concurrent * connections */ - protected boolean addConnection(FanoutServiceConnection connection) { + protected boolean addConnection(FanoutServiceConnection connection) { int limit = getConcurrentConnectionLimit(); if (limit > 0 && connections.size() > limit) { logger.info(MessageFormat.format("hit {0,number,0} connection limit, rejecting fanout connection", concurrentConnectionLimit)); @@ -358,7 +358,7 @@ connection.busy(); return false; } - + // add the connection to our map connections.put(connection.id, connection); @@ -371,10 +371,10 @@ connection.connected(); return true; } - + /** * Remove the connection from the connections list and from subscriptions. - * + * * @param connection */ protected void removeConnection(FanoutServiceConnection connection) { @@ -393,46 +393,46 @@ } logger.info(MessageFormat.format("fanout connection {0} removed", connection.id)); } - + /** * Tests to see if the connection is being monitored by the service. - * + * * @param connection * @return true if the service is monitoring the connection */ protected boolean hasConnection(FanoutServiceConnection connection) { return connections.containsKey(connection.id); } - + /** * Reply to a connection on the specified channel. - * + * * @param connection * @param channel * @param message * @return the reply */ - protected String reply(FanoutServiceConnection connection, String channel, String message) { + protected String reply(FanoutServiceConnection connection, String channel, String message) { if (channel != null && channel.length() > 0) { increment(totalMessages); } return connection.reply(channel, message); } - + /** * Service method to broadcast a message to all connections. - * + * * @param message */ public void broadcastAll(String message) { broadcast(connections.values(), FanoutConstants.CH_ALL, message); increment(totalAnnouncements); } - + /** * Service method to broadcast a message to connections subscribed to the * channel. - * + * * @param message */ public void broadcast(String channel, String message) { @@ -440,10 +440,10 @@ broadcast(connections, channel, message); increment(totalAnnouncements); } - + /** * Broadcast a message to connections subscribed to the specified channel. - * + * * @param connections * @param channel * @param message @@ -453,10 +453,10 @@ reply(connection, channel, message); } } - + /** * Process an incoming Fanout request. - * + * * @param connection * @param req * @return the reply to the request, may be null @@ -476,10 +476,10 @@ } return null; } - + /** * Process the Fanout request. - * + * * @param connection * @param action * @param channel @@ -535,7 +535,7 @@ } return null; } - + private String asHexArray(String req) { StringBuilder sb = new StringBuilder(); for (char c : req.toCharArray()) { @@ -543,10 +543,10 @@ } return "[ " + sb.toString().trim() + " ]"; } - + /** * Increment a long and prevent negative rollover. - * + * * @param counter */ private void increment(AtomicLong counter) { @@ -555,7 +555,7 @@ counter.set(0); } } - + @Override public String toString() { return name; -- Gitblit v1.9.1