| | |
| | |
|
| | | /**
|
| | | * Fanout client class.
|
| | | * |
| | | *
|
| | | * @author James Moger
|
| | | *
|
| | | */
|
| | |
| | | private volatile Selector selector;
|
| | | private volatile SocketChannel socketCh;
|
| | | private Thread clientThread;
|
| | | |
| | |
|
| | | private final AtomicBoolean isConnected;
|
| | | private final AtomicBoolean isRunning;
|
| | | private final AtomicBoolean isAutomaticReconnect;
|
| | | private final ByteBuffer writeBuffer;
|
| | | private final ByteBuffer readBuffer;
|
| | | private final CharsetDecoder decoder;
|
| | | |
| | |
|
| | | private final Set<String> subscriptions;
|
| | | private boolean resubscribe;
|
| | | |
| | |
|
| | | public interface FanoutListener {
|
| | | public void pong(Date timestamp);
|
| | | public void announcement(String channel, String message);
|
| | | }
|
| | | |
| | |
|
| | | public static class FanoutAdapter implements FanoutListener {
|
| | | @Override
|
| | | public void pong(Date timestamp) { }
|
| | | @Override
|
| | | public void announcement(String channel, String message) { }
|
| | | }
|
| | |
|
| | |
| | | public void pong(Date timestamp) {
|
| | | System.out.println("Pong. " + timestamp);
|
| | | }
|
| | | |
| | |
|
| | | @Override
|
| | | public void announcement(String channel, String message) {
|
| | | System.out.println(MessageFormat.format("Here ye, Here ye. {0} says {1}", channel, message));
|
| | | }
|
| | | });
|
| | | client.start();
|
| | | |
| | |
|
| | | Thread.sleep(5000);
|
| | | client.ping();
|
| | | client.subscribe("james");
|
| | | client.announce("james", "12345"); |
| | | client.announce("james", "12345");
|
| | | client.subscribe("c52f99d16eb5627877ae957df7ce1be102783bd5");
|
| | | |
| | |
|
| | | while (true) {
|
| | | Thread.sleep(10000);
|
| | | client.ping();
|
| | |
| | | public void removeListener(FanoutListener listener) {
|
| | | listeners.remove(listener);
|
| | | }
|
| | | |
| | |
|
| | | public boolean isAutomaticReconnect() {
|
| | | return isAutomaticReconnect.get();
|
| | | }
|
| | | |
| | |
|
| | | public void setAutomaticReconnect(boolean value) {
|
| | | isAutomaticReconnect.set(value);
|
| | | }
|
| | |
| | | confirmConnection();
|
| | | write("status");
|
| | | }
|
| | | |
| | |
|
| | | public void subscribe(String channel) {
|
| | | confirmConnection();
|
| | | if (subscriptions.add(channel)) {
|
| | | write("subscribe " + channel);
|
| | | }
|
| | | }
|
| | | |
| | |
|
| | | public void unsubscribe(String channel) {
|
| | | confirmConnection();
|
| | | if (subscriptions.remove(channel)) {
|
| | | write("unsubscribe " + channel);
|
| | | }
|
| | | }
|
| | | |
| | |
|
| | | public void announce(String channel, String message) {
|
| | | confirmConnection();
|
| | | write("announce " + channel + " " + message);
|
| | |
| | | throw new RuntimeException("Fanout client is disconnected!");
|
| | | }
|
| | | }
|
| | | |
| | |
|
| | | public boolean isConnected() {
|
| | | return isRunning.get() && socketCh != null && isConnected.get();
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Start client connection and return immediately.
|
| | | */
|
| | |
| | | clientThread = new Thread(this, "Fanout client");
|
| | | clientThread.start();
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Start client connection and wait until it has connected.
|
| | | */
|
| | | public void startSynchronously() {
|
| | | start();
|
| | | while (!isConnected()) { |
| | | while (!isConnected()) {
|
| | | try {
|
| | | Thread.sleep(100);
|
| | | } catch (Exception e) {
|
| | |
| | | @Override
|
| | | public void run() {
|
| | | resetState();
|
| | | |
| | | isRunning.set(true); |
| | |
|
| | | isRunning.set(true);
|
| | | while (isRunning.get()) {
|
| | | // (re)connect
|
| | | if (socketCh == null) {
|
| | |
| | | socketCh = SocketChannel.open(new InetSocketAddress(addr, port));
|
| | | socketCh.configureBlocking(false);
|
| | | selector = Selector.open();
|
| | | id = FanoutConstants.getLocalSocketId(socketCh.socket()); |
| | | id = FanoutConstants.getLocalSocketId(socketCh.socket());
|
| | | socketCh.register(selector, SelectionKey.OP_READ);
|
| | | } catch (Exception e) {
|
| | | logger.error(MessageFormat.format("failed to open client connection to {0}:{1,number,0}", host, port), e);
|
| | |
| | | continue;
|
| | | }
|
| | | }
|
| | | |
| | |
|
| | | // read/write
|
| | | try {
|
| | | selector.select(clientTimeout);
|
| | |
| | | while (i.hasNext()) {
|
| | | SelectionKey key = i.next();
|
| | | i.remove();
|
| | | |
| | |
|
| | | if (key.isReadable()) {
|
| | | // read message
|
| | | String content = read();
|
| | |
| | | // resubscribe
|
| | | if (resubscribe) {
|
| | | resubscribe = false;
|
| | | logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size())); |
| | | logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size()));
|
| | | for (String subscription : subscriptions) {
|
| | | write("subscribe " + subscription);
|
| | | }
|
| | |
| | | }
|
| | | } catch (IOException e) {
|
| | | logger.error(MessageFormat.format("fanout client {0} error: {1}", id, e.getMessage()));
|
| | | closeChannel(); |
| | | closeChannel();
|
| | | if (!isAutomaticReconnect.get()) {
|
| | | isRunning.set(false);
|
| | | continue;
|
| | | }
|
| | | }
|
| | | }
|
| | | |
| | |
|
| | | closeChannel();
|
| | | resetState();
|
| | | }
|
| | | |
| | |
|
| | | protected void resetState() {
|
| | | readBuffer.clear();
|
| | | writeBuffer.clear();
|
| | | isRunning.set(false);
|
| | | isConnected.set(false);
|
| | | }
|
| | | |
| | |
|
| | | private void closeChannel() {
|
| | | try {
|
| | | if (socketCh != null) {
|
| | |
| | | long time = Long.parseLong(fields[0]);
|
| | | Date date = new Date(time);
|
| | | firePong(date);
|
| | | } catch (Exception e) { |
| | | } catch (Exception e) {
|
| | | }
|
| | | return true;
|
| | | } else if (fields.length == 2) {
|
| | |
| | | }
|
| | | }
|
| | | }
|
| | | |
| | |
|
| | | protected synchronized String read() throws IOException {
|
| | | readBuffer.clear();
|
| | | long len = socketCh.read(readBuffer);
|
| | |
| | | return content;
|
| | | }
|
| | | }
|
| | | |
| | |
|
| | | protected synchronized boolean write(String message) {
|
| | | try {
|
| | | logger.info(MessageFormat.format("fanout client {0} > {1}", id, message));
|