/*
|
* Copyright 2013 gitblit.com.
|
*
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
* you may not use this file except in compliance with the License.
|
* You may obtain a copy of the License at
|
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
* Unless required by applicable law or agreed to in writing, software
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* See the License for the specific language governing permissions and
|
* limitations under the License.
|
*/
|
package com.gitblit.fanout;
|
|
import java.io.IOException;
|
import java.net.InetAddress;
|
import java.net.InetSocketAddress;
|
import java.nio.ByteBuffer;
|
import java.nio.channels.SelectionKey;
|
import java.nio.channels.Selector;
|
import java.nio.channels.SocketChannel;
|
import java.nio.charset.Charset;
|
import java.nio.charset.CharsetDecoder;
|
import java.text.MessageFormat;
|
import java.util.ArrayList;
|
import java.util.Collections;
|
import java.util.Date;
|
import java.util.Iterator;
|
import java.util.LinkedHashSet;
|
import java.util.List;
|
import java.util.Set;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
/**
|
* Fanout client class.
|
*
|
* @author James Moger
|
*
|
*/
|
public class FanoutClient implements Runnable {
|
|
private final static Logger logger = LoggerFactory.getLogger(FanoutClient.class);
|
|
private final int clientTimeout = 500;
|
private final int reconnectTimeout = 2000;
|
private final String host;
|
private final int port;
|
private final List<FanoutListener> listeners;
|
|
private String id;
|
private volatile Selector selector;
|
private volatile SocketChannel socketCh;
|
private Thread clientThread;
|
|
private final AtomicBoolean isConnected;
|
private final AtomicBoolean isRunning;
|
private final AtomicBoolean isAutomaticReconnect;
|
private final ByteBuffer writeBuffer;
|
private final ByteBuffer readBuffer;
|
private final CharsetDecoder decoder;
|
|
private final Set<String> subscriptions;
|
private boolean resubscribe;
|
|
public interface FanoutListener {
|
public void pong(Date timestamp);
|
public void announcement(String channel, String message);
|
}
|
|
public static class FanoutAdapter implements FanoutListener {
|
public void pong(Date timestamp) { }
|
public void announcement(String channel, String message) { }
|
}
|
|
public static void main(String args[]) throws Exception {
|
FanoutClient client = new FanoutClient("localhost", 2000);
|
client.addListener(new FanoutAdapter() {
|
|
@Override
|
public void pong(Date timestamp) {
|
System.out.println("Pong. " + timestamp);
|
}
|
|
@Override
|
public void announcement(String channel, String message) {
|
System.out.println(MessageFormat.format("Here ye, Here ye. {0} says {1}", channel, message));
|
}
|
});
|
client.start();
|
|
Thread.sleep(5000);
|
client.ping();
|
client.subscribe("james");
|
client.announce("james", "12345");
|
client.subscribe("c52f99d16eb5627877ae957df7ce1be102783bd5");
|
|
while (true) {
|
Thread.sleep(10000);
|
client.ping();
|
}
|
}
|
|
public FanoutClient(String host, int port) {
|
this.host = host;
|
this.port = port;
|
readBuffer = ByteBuffer.allocateDirect(FanoutConstants.BUFFER_LENGTH);
|
writeBuffer = ByteBuffer.allocateDirect(FanoutConstants.BUFFER_LENGTH);
|
decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder();
|
listeners = Collections.synchronizedList(new ArrayList<FanoutListener>());
|
subscriptions = new LinkedHashSet<String>();
|
isRunning = new AtomicBoolean(false);
|
isConnected = new AtomicBoolean(false);
|
isAutomaticReconnect = new AtomicBoolean(true);
|
}
|
|
public void addListener(FanoutListener listener) {
|
listeners.add(listener);
|
}
|
|
public void removeListener(FanoutListener listener) {
|
listeners.remove(listener);
|
}
|
|
public boolean isAutomaticReconnect() {
|
return isAutomaticReconnect.get();
|
}
|
|
public void setAutomaticReconnect(boolean value) {
|
isAutomaticReconnect.set(value);
|
}
|
|
public void ping() {
|
confirmConnection();
|
write("ping");
|
}
|
|
public void status() {
|
confirmConnection();
|
write("status");
|
}
|
|
public void subscribe(String channel) {
|
confirmConnection();
|
if (subscriptions.add(channel)) {
|
write("subscribe " + channel);
|
}
|
}
|
|
public void unsubscribe(String channel) {
|
confirmConnection();
|
if (subscriptions.remove(channel)) {
|
write("unsubscribe " + channel);
|
}
|
}
|
|
public void announce(String channel, String message) {
|
confirmConnection();
|
write("announce " + channel + " " + message);
|
}
|
|
private void confirmConnection() {
|
if (!isConnected()) {
|
throw new RuntimeException("Fanout client is disconnected!");
|
}
|
}
|
|
public boolean isConnected() {
|
return isRunning.get() && socketCh != null && isConnected.get();
|
}
|
|
/**
|
* Start client connection and return immediately.
|
*/
|
public void start() {
|
if (isRunning.get()) {
|
logger.warn("Fanout client is already running");
|
return;
|
}
|
clientThread = new Thread(this, "Fanout client");
|
clientThread.start();
|
}
|
|
/**
|
* Start client connection and wait until it has connected.
|
*/
|
public void startSynchronously() {
|
start();
|
while (!isConnected()) {
|
try {
|
Thread.sleep(100);
|
} catch (Exception e) {
|
}
|
}
|
}
|
|
/**
|
* Stops client connection. This method returns when the connection has
|
* been completely shutdown.
|
*/
|
public void stop() {
|
if (!isRunning.get()) {
|
logger.warn("Fanout client is not running");
|
return;
|
}
|
isRunning.set(false);
|
try {
|
if (clientThread != null) {
|
clientThread.join();
|
clientThread = null;
|
}
|
} catch (InterruptedException e1) {
|
}
|
}
|
|
@Override
|
public void run() {
|
resetState();
|
|
isRunning.set(true);
|
while (isRunning.get()) {
|
// (re)connect
|
if (socketCh == null) {
|
try {
|
InetAddress addr = InetAddress.getByName(host);
|
socketCh = SocketChannel.open(new InetSocketAddress(addr, port));
|
socketCh.configureBlocking(false);
|
selector = Selector.open();
|
id = FanoutConstants.getLocalSocketId(socketCh.socket());
|
socketCh.register(selector, SelectionKey.OP_READ);
|
} catch (Exception e) {
|
logger.error(MessageFormat.format("failed to open client connection to {0}:{1,number,0}", host, port), e);
|
try {
|
Thread.sleep(reconnectTimeout);
|
} catch (InterruptedException x) {
|
}
|
continue;
|
}
|
}
|
|
// read/write
|
try {
|
selector.select(clientTimeout);
|
|
Iterator<SelectionKey> i = selector.selectedKeys().iterator();
|
while (i.hasNext()) {
|
SelectionKey key = i.next();
|
i.remove();
|
|
if (key.isReadable()) {
|
// read message
|
String content = read();
|
String[] lines = content.split("\n");
|
for (String reply : lines) {
|
logger.trace(MessageFormat.format("fanout client {0} received: {1}", id, reply));
|
if (!processReply(reply)) {
|
logger.error(MessageFormat.format("fanout client {0} received unknown message", id));
|
}
|
}
|
} else if (key.isWritable()) {
|
// resubscribe
|
if (resubscribe) {
|
resubscribe = false;
|
logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size()));
|
for (String subscription : subscriptions) {
|
write("subscribe " + subscription);
|
}
|
}
|
socketCh.register(selector, SelectionKey.OP_READ);
|
}
|
}
|
} catch (IOException e) {
|
logger.error(MessageFormat.format("fanout client {0} error: {1}", id, e.getMessage()));
|
closeChannel();
|
if (!isAutomaticReconnect.get()) {
|
isRunning.set(false);
|
continue;
|
}
|
}
|
}
|
|
closeChannel();
|
resetState();
|
}
|
|
protected void resetState() {
|
readBuffer.clear();
|
writeBuffer.clear();
|
isRunning.set(false);
|
isConnected.set(false);
|
}
|
|
private void closeChannel() {
|
try {
|
if (socketCh != null) {
|
socketCh.close();
|
socketCh = null;
|
selector.close();
|
selector = null;
|
isConnected.set(false);
|
}
|
} catch (IOException x) {
|
}
|
}
|
|
protected boolean processReply(String reply) {
|
String[] fields = reply.split("!", 2);
|
if (fields.length == 1) {
|
try {
|
long time = Long.parseLong(fields[0]);
|
Date date = new Date(time);
|
firePong(date);
|
} catch (Exception e) {
|
}
|
return true;
|
} else if (fields.length == 2) {
|
String channel = fields[0];
|
String message = fields[1];
|
if (FanoutConstants.CH_DEBUG.equals(channel)) {
|
// debug messages are for internal use
|
if (FanoutConstants.MSG_CONNECTED.equals(message)) {
|
isConnected.set(true);
|
resubscribe = subscriptions.size() > 0;
|
if (resubscribe) {
|
try {
|
// register for async resubscribe
|
socketCh.register(selector, SelectionKey.OP_WRITE);
|
} catch (Exception e) {
|
logger.error("an error occurred", e);
|
}
|
}
|
}
|
logger.debug(MessageFormat.format("fanout client {0} < {1}", id, reply));
|
} else {
|
fireAnnouncement(channel, message);
|
}
|
return true;
|
} else {
|
// unknown message
|
return false;
|
}
|
}
|
|
protected void firePong(Date timestamp) {
|
logger.info(MessageFormat.format("fanout client {0} < pong {1,date,yyyy-MM-dd HH:mm:ss}", id, timestamp));
|
for (FanoutListener listener : listeners) {
|
try {
|
listener.pong(timestamp);
|
} catch (Throwable t) {
|
logger.error("FanoutListener threw an exception!", t);
|
}
|
}
|
}
|
protected void fireAnnouncement(String channel, String message) {
|
logger.info(MessageFormat.format("fanout client {0} < announcement {1} {2}", id, channel, message));
|
for (FanoutListener listener : listeners) {
|
try {
|
listener.announcement(channel, message);
|
} catch (Throwable t) {
|
logger.error("FanoutListener threw an exception!", t);
|
}
|
}
|
}
|
|
protected synchronized String read() throws IOException {
|
readBuffer.clear();
|
long len = socketCh.read(readBuffer);
|
|
if (len == -1) {
|
logger.error(MessageFormat.format("fanout client {0} lost connection to {1}:{2,number,0}, end of stream", id, host, port));
|
socketCh.close();
|
return null;
|
} else {
|
readBuffer.flip();
|
String content = decoder.decode(readBuffer).toString();
|
readBuffer.clear();
|
return content;
|
}
|
}
|
|
protected synchronized boolean write(String message) {
|
try {
|
logger.info(MessageFormat.format("fanout client {0} > {1}", id, message));
|
byte [] bytes = message.getBytes(FanoutConstants.CHARSET);
|
writeBuffer.clear();
|
writeBuffer.put(bytes);
|
if (bytes[bytes.length - 1] != 0xa) {
|
writeBuffer.put((byte) 0xa);
|
}
|
writeBuffer.flip();
|
|
// loop until write buffer has been completely sent
|
long written = 0;
|
long toWrite = writeBuffer.remaining();
|
while (written != toWrite) {
|
written += socketCh.write(writeBuffer);
|
try {
|
Thread.sleep(10);
|
} catch (Exception x) {
|
}
|
}
|
return true;
|
} catch (IOException e) {
|
logger.error("fanout client {0} error: {1}", id, e.getMessage());
|
}
|
return false;
|
}
|
}
|