diff --git a/src/main/java/org/spacehq/packetlib/ConnectionListener.java b/src/main/java/org/spacehq/packetlib/ConnectionListener.java index 8de8fb60..25027a20 100644 --- a/src/main/java/org/spacehq/packetlib/ConnectionListener.java +++ b/src/main/java/org/spacehq/packetlib/ConnectionListener.java @@ -56,4 +56,12 @@ public interface ConnectionListener { * @param wait Whether to wait for the listener to finish closing. */ public void close(boolean wait); + + /** + * Closes the listener. + * + * @param wait Whether to wait for the listener to finish closing. + * @param callback Callback to call when the listener has finished closing. + */ + public void close(boolean wait, Runnable callback); } diff --git a/src/main/java/org/spacehq/packetlib/Server.java b/src/main/java/org/spacehq/packetlib/Server.java index cad2d2d9..cde9f5b8 100644 --- a/src/main/java/org/spacehq/packetlib/Server.java +++ b/src/main/java/org/spacehq/packetlib/Server.java @@ -248,7 +248,7 @@ public class Server { * Closes the server. */ public void close() { - this.close(false); + this.close(true); } /** @@ -264,7 +264,11 @@ public class Server { } } - this.listener.close(wait); - this.callEvent(new ServerClosedEvent(this)); + this.listener.close(wait, new Runnable() { + @Override + public void run() { + callEvent(new ServerClosedEvent(Server.this)); + } + }); } } diff --git a/src/main/java/org/spacehq/packetlib/tcp/TcpClientSession.java b/src/main/java/org/spacehq/packetlib/tcp/TcpClientSession.java index 6737b7df..cd73dcd8 100644 --- a/src/main/java/org/spacehq/packetlib/tcp/TcpClientSession.java +++ b/src/main/java/org/spacehq/packetlib/tcp/TcpClientSession.java @@ -93,7 +93,7 @@ public class TcpClientSession extends TcpSession { future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - if(!future.isSuccess() && future.cause() != null) { + if(!future.isSuccess()) { exceptionCaught(null, future.cause()); } } diff --git a/src/main/java/org/spacehq/packetlib/tcp/TcpConnectionListener.java b/src/main/java/org/spacehq/packetlib/tcp/TcpConnectionListener.java index c9e363a9..32c9fafe 100644 --- a/src/main/java/org/spacehq/packetlib/tcp/TcpConnectionListener.java +++ b/src/main/java/org/spacehq/packetlib/tcp/TcpConnectionListener.java @@ -11,6 +11,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import org.spacehq.packetlib.ConnectionListener; import org.spacehq.packetlib.Server; import org.spacehq.packetlib.packet.PacketProtocol; @@ -57,7 +58,7 @@ public class TcpConnectionListener implements ConnectionListener { } @Override - public void bind(final boolean wait, final Runnable callback) { + public void bind(boolean wait, final Runnable callback) { if(this.group != null || this.channel != null) { return; } @@ -96,17 +97,23 @@ public class TcpConnectionListener implements ConnectionListener { } channel = future.channel(); - callback.run(); + if(callback != null) { + callback.run(); + } } else { future.addListener(new ChannelFutureListener() { @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { - if(channelFuture.isSuccess()) { - channel = channelFuture.channel(); - callback.run(); - } else if(channelFuture.cause() != null && !wait) { + public void operationComplete(ChannelFuture future) throws Exception { + if(future.isSuccess()) { + channel = future.channel(); + if(callback != null) { + callback.run(); + } + } else { System.err.println("[ERROR] Failed to asynchronously bind connection listener."); - channelFuture.cause().printStackTrace(); + if(future.cause() != null) { + future.cause().printStackTrace(); + } } } }); @@ -120,14 +127,39 @@ public class TcpConnectionListener implements ConnectionListener { @Override public void close(boolean wait) { + this.close(wait, null); + } + + @Override + public void close(boolean wait, final Runnable callback) { if(this.channel != null) { if(this.channel.isOpen()) { ChannelFuture future = this.channel.close(); if(wait) { try { - future.await(); + future.sync(); } catch(InterruptedException e) { } + + if(callback != null) { + callback.run(); + } + } else { + future.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if(future.isSuccess()) { + if(callback != null) { + callback.run(); + } + } else { + System.err.println("[ERROR] Failed to asynchronously close connection listener."); + if(future.cause() != null) { + future.cause().printStackTrace(); + } + } + } + }); } } @@ -138,9 +170,21 @@ public class TcpConnectionListener implements ConnectionListener { Future future = this.group.shutdownGracefully(); if(wait) { try { - future.await(); + future.sync(); } catch(InterruptedException e) { } + } else { + future.addListener(new GenericFutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if(!future.isSuccess()) { + System.err.println("[ERROR] Failed to asynchronously close connection listener."); + if(future.cause() != null) { + future.cause().printStackTrace(); + } + } + } + }); } this.group = null; diff --git a/src/main/java/org/spacehq/packetlib/tcp/TcpSession.java b/src/main/java/org/spacehq/packetlib/tcp/TcpSession.java index c3a15521..cc74912f 100644 --- a/src/main/java/org/spacehq/packetlib/tcp/TcpSession.java +++ b/src/main/java/org/spacehq/packetlib/tcp/TcpSession.java @@ -200,10 +200,10 @@ public abstract class TcpSession extends SimpleChannelInboundHandler imp ChannelFuture future = this.channel.writeAndFlush(packet).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - if(!future.isSuccess()) { - exceptionCaught(null, future.cause()); - } else { + if(future.isSuccess()) { callEvent(new PacketSentEvent(TcpSession.this, packet)); + } else { + exceptionCaught(null, future.cause()); } } }); @@ -248,7 +248,7 @@ public abstract class TcpSession extends SimpleChannelInboundHandler imp this.callEvent(new DisconnectingEvent(this, reason, cause)); ChannelFuture future = this.channel.flush().close().addListener(new ChannelFutureListener() { @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { + public void operationComplete(ChannelFuture future) throws Exception { callEvent(new DisconnectedEvent(TcpSession.this, reason != null ? reason : "Connection closed.", cause)); } });