| | |
| | | *
|
| | | * This implementation uses channels and selectors, which are the Java analog of
|
| | | * the Linux epoll mechanism used in the original fanout C code.
|
| | | * |
| | | *
|
| | | * @author James Moger
|
| | | *
|
| | | */
|
| | |
| | |
|
| | | private volatile ServerSocketChannel serviceCh;
|
| | | private volatile Selector selector;
|
| | | |
| | |
|
| | | public static void main(String[] args) throws Exception {
|
| | | FanoutNioService pubsub = new FanoutNioService(null, DEFAULT_PORT);
|
| | | pubsub.setStrictRequestTermination(false);
|
| | |
| | |
|
| | | /**
|
| | | * Create a single-threaded fanout service.
|
| | | * |
| | | *
|
| | | * @param host
|
| | | * @param port
|
| | | * the port for running the fanout PubSub service
|
| | |
| | | public FanoutNioService(int port) {
|
| | | this(null, port);
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Create a single-threaded fanout service.
|
| | | * |
| | | *
|
| | | * @param bindInterface
|
| | | * the ip address to bind for the service, may be null
|
| | | * @param port
|
| | |
| | | public FanoutNioService(String bindInterface, int port) {
|
| | | super(bindInterface, port, "Fanout nio service");
|
| | | }
|
| | | |
| | |
|
| | | @Override
|
| | | protected boolean isConnected() {
|
| | | return serviceCh != null;
|
| | |
| | | serviceCh.socket().bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));
|
| | | selector = Selector.open();
|
| | | serviceCh.register(selector, SelectionKey.OP_ACCEPT);
|
| | | logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}", |
| | | logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}",
|
| | | name, host == null ? "0.0.0.0" : host, port));
|
| | | } catch (IOException e) {
|
| | | logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}", |
| | | logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}",
|
| | | name, name, host == null ? "0.0.0.0" : host, port), e);
|
| | | return false;
|
| | | }
|
| | |
| | | for (Map.Entry<String, SocketChannel> client : clients.entrySet()) {
|
| | | closeClientSocket(client.getKey(), client.getValue());
|
| | | }
|
| | | |
| | |
|
| | | // close service socket channel
|
| | | logger.debug(MessageFormat.format("closing {0} socket channel", name));
|
| | | serviceCh.socket().close();
|
| | | serviceCh.close(); |
| | | serviceCh.close();
|
| | | serviceCh = null;
|
| | | selector.close();
|
| | | selector = null;
|
| | |
| | | Set<SelectionKey> keys = selector.selectedKeys();
|
| | | Iterator<SelectionKey> keyItr = keys.iterator();
|
| | | while (keyItr.hasNext()) {
|
| | | SelectionKey key = (SelectionKey) keyItr.next();
|
| | | SelectionKey key = keyItr.next();
|
| | | if (key.isAcceptable()) {
|
| | | // new fanout client connection
|
| | | ServerSocketChannel sch = (ServerSocketChannel) key.channel();
|
| | |
| | | }
|
| | | }
|
| | | }
|
| | | |
| | |
|
| | | protected void closeClientSocket(String id, SocketChannel ch) {
|
| | | try {
|
| | | ch.close();
|
| | |
| | | logger.error(MessageFormat.format("fanout connection {0}", id), e);
|
| | | }
|
| | | }
|
| | | |
| | |
|
| | | @Override
|
| | | protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {
|
| | | super.broadcast(connections, channel, message);
|
| | | |
| | |
|
| | | // register queued write
|
| | | Map<String, SocketChannel> sockets = getCurrentClientSockets();
|
| | | for (FanoutServiceConnection connection : connections) {
|
| | |
| | | }
|
| | | }
|
| | | }
|
| | | |
| | |
|
| | | protected Map<String, SocketChannel> getCurrentClientSockets() {
|
| | | Map<String, SocketChannel> sockets = new HashMap<String, SocketChannel>();
|
| | | for (SelectionKey key : selector.keys()) {
|
| | |
| | | }
|
| | | return sockets;
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * FanoutNioConnection handles reading/writing messages from a remote fanout
|
| | | * connection.
|
| | | * |
| | | *
|
| | | * @author James Moger
|
| | | *
|
| | | */
|
| | |
| | | replyQueue = new ArrayList<String>();
|
| | | decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder();
|
| | | }
|
| | | |
| | |
|
| | | protected void read(SocketChannel ch, boolean strictRequestTermination) throws CharacterCodingException, IOException {
|
| | | long bytesRead = 0;
|
| | | readBuffer.clear();
|
| | |
| | | String [] lines = req.split(strictRequestTermination ? "\n" : "\n|\r");
|
| | | requestQueue.addAll(Arrays.asList(lines));
|
| | | }
|
| | | |
| | |
|
| | | protected void write(SocketChannel ch) throws IOException {
|
| | | Iterator<String> itr = replyQueue.iterator();
|
| | | while (itr.hasNext()) {
|
| | |
| | | writeBuffer.put((byte) 0xa);
|
| | | }
|
| | | writeBuffer.flip();
|
| | | |
| | |
|
| | | // loop until write buffer has been completely sent
|
| | | int written = 0;
|
| | | int toWrite = writeBuffer.remaining();
|
| | |
| | | Thread.sleep(10);
|
| | | } catch (Exception x) {
|
| | | }
|
| | | } |
| | | }
|
| | | itr.remove();
|
| | | }
|
| | | writeBuffer.clear();
|