| | |
| | | * This implementation creates a master acceptor thread which accepts incoming
|
| | | * fanout connections and then spawns a daemon thread for each accepted connection.
|
| | | * If there are 100 concurrent fanout connections, there are 101 threads.
|
| | | * |
| | | *
|
| | | * @author James Moger
|
| | | *
|
| | | */
|
| | |
| | | pubsub.setAllowAllChannelAnnouncements(false);
|
| | | pubsub.start();
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Create a multi-threaded fanout service.
|
| | | * |
| | | *
|
| | | * @param port
|
| | | * the port for running the fanout PubSub service
|
| | | * @throws IOException
|
| | |
| | |
|
| | | /**
|
| | | * Create a multi-threaded fanout service.
|
| | | * |
| | | *
|
| | | * @param bindInterface
|
| | | * the ip address to bind for the service, may be null
|
| | | * @param port
|
| | |
| | | public FanoutSocketService(String bindInterface, int port) {
|
| | | super(bindInterface, port, "Fanout socket service");
|
| | | }
|
| | | |
| | |
|
| | | @Override
|
| | | protected boolean isConnected() {
|
| | | return serviceSocket != null;
|
| | | }
|
| | | |
| | |
|
| | | @Override
|
| | | protected boolean connect() {
|
| | | if (serviceSocket == null) {
|
| | |
| | | serviceSocket.setReuseAddress(true);
|
| | | serviceSocket.setSoTimeout(serviceTimeout);
|
| | | serviceSocket.bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));
|
| | | logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}", |
| | | logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}",
|
| | | name, host == null ? "0.0.0.0" : host, serviceSocket.getLocalPort()));
|
| | | } catch (IOException e) {
|
| | | logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}",
|
| | |
| | | // ignore accept timeout exceptions
|
| | | }
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * FanoutSocketConnection handles reading/writing messages from a remote fanout
|
| | | * connection.
|
| | | * |
| | | *
|
| | | * @author James Moger
|
| | | *
|
| | | */
|
| | | class FanoutSocketConnection extends FanoutServiceConnection implements Runnable {
|
| | | Socket socket;
|
| | | |
| | |
|
| | | FanoutSocketConnection(Socket socket) {
|
| | | super(socket);
|
| | | this.socket = socket;
|
| | |
| | |
|
| | | logger.info(MessageFormat.format("thread for fanout connection {0} is finished", id));
|
| | | }
|
| | | |
| | |
|
| | | @Override
|
| | | protected void reply(String content) throws IOException {
|
| | | // synchronously send reply
|
| | |
| | | }
|
| | | os.flush();
|
| | | }
|
| | | |
| | |
|
| | | protected void closeConnection() {
|
| | | // close the connection socket
|
| | | try {
|
| | |
| | | } catch (IOException e) {
|
| | | }
|
| | | socket = null;
|
| | | |
| | |
|
| | | // remove this connection from the service
|
| | | removeConnection(this);
|
| | | }
|