Split incoming/outgoing packet registry, transition protocol states correctly (#841)
Some checks failed
Java CI with Gradle / build (push) Has been cancelled
Deploy / build (push) Has been cancelled

* Initial code changes

* Make it compile

* Small inlining

* Make less detectable by anticheats and fix keepalive during configuration

* Fix keepalive edge case

* Properly switch inbound protocol in server listener

* Add flow control

* Make toggling automatic keepalive work in another way

* Remove ping pong packets again

* Address review

* Handle keepalive in configuration

* Only spawn keepalive after login is acknowledged

* Prevent very unlikely race conditions with keepalive being switched during a task

* Add debug log for packet serialization and state switching

* Add one more debug print

* Update protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java

Co-authored-by: chris <github@onechris.mozmail.com>

* Update protocol/src/main/java/org/geysermc/mcprotocollib/protocol/MinecraftProtocol.java

Co-authored-by: chris <github@onechris.mozmail.com>

* Update protocol/src/main/java/org/geysermc/mcprotocollib/protocol/MinecraftProtocol.java

Co-authored-by: chris <github@onechris.mozmail.com>

* Mark packet as nonnull

* Fix outbound writing race conditions

* Ensure packets are always sent on the event loop

This replicates the same approach Mojang uses in their networking code.

* Reduce log verbosity

* Put errors into debug

* Update protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java

Co-authored-by: chris <github@onechris.mozmail.com>

* Add comment to always running in event loop

* Handle auto read earlier to prevent race conditions

* Make instance dynamic

* Revert "Make instance dynamic"

This reverts commit 7f8affbdc5.

* Make flush packet priority

* Do not hide original line that is the cause of the exception

* Cancel packet using exception rather than return

* Properly iterate through parents

* Set log level to debug for unit tests

* Revert "Properly iterate through parents"

This reverts commit 4e2b64d983.

* Revert "Cancel packet using exception rather than return"

This reverts commit 6507e77bbe.

* Add write length filter

* Reuse bytebuf for fake flush to avoid unnecessary allocations

* Make tests happy

* Remake dropping packets

* Update protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java

Co-authored-by: chris <github@onechris.mozmail.com>

* Fix space

* Rename to flush packet

* Add mojmap reference

* Share keepalive code

* Fix compilation

* Revert a tiny bit closer to vanilla

* Inline lambda

* Inherit annotation

* Inherit annotation 2

* Use checkerframework annotation

* Fixup grammar slightly

* Add reset states method

* Add log marker for packet logging

---------

Co-authored-by: chris <github@onechris.mozmail.com>
This commit is contained in:
Alex 2024-10-08 15:45:26 +02:00 committed by GitHub
parent b2c9268633
commit f8460356db
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 455 additions and 168 deletions

View file

@ -82,7 +82,12 @@ public class TestProtocol extends PacketProtocol {
}
@Override
public PacketRegistry getPacketRegistry() {
public PacketRegistry getInboundPacketRegistry() {
return registry;
}
@Override
public PacketRegistry getOutboundPacketRegistry() {
return registry;
}
}

View file

@ -135,7 +135,7 @@ public class MinecraftProtocolTest {
@Override
public void sessionRemoved(SessionRemovedEvent event) {
MinecraftProtocol protocol = (MinecraftProtocol) event.getSession().getPacketProtocol();
if (protocol.getState() == ProtocolState.GAME) {
if (protocol.getOutboundState() == ProtocolState.GAME) {
log.info("Closing server.");
event.getServer().close(false);
}

View file

@ -1,5 +1,6 @@
package org.geysermc.mcprotocollib.network;
import io.netty.channel.Channel;
import net.kyori.adventure.text.Component;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
@ -10,6 +11,7 @@ import org.geysermc.mcprotocollib.network.event.session.SessionEvent;
import org.geysermc.mcprotocollib.network.event.session.SessionListener;
import org.geysermc.mcprotocollib.network.packet.Packet;
import org.geysermc.mcprotocollib.network.packet.PacketProtocol;
import org.geysermc.mcprotocollib.network.tcp.FlushHandler;
import java.net.SocketAddress;
import java.util.List;
@ -212,7 +214,17 @@ public interface Session {
*
* @param packet Packet to send.
*/
void send(Packet packet);
default void send(@NonNull Packet packet) {
this.send(packet, null);
}
/**
* Sends a packet and runs the specified callback when the packet has been sent.
*
* @param packet Packet to send.
* @param onSent Callback to run when the packet has been sent.
*/
void send(@NonNull Packet packet, @Nullable Runnable onSent);
/**
* Disconnects the session.
@ -255,4 +267,48 @@ public interface Session {
* @param cause Throwable responsible for disconnecting.
*/
void disconnect(@NonNull Component reason, @Nullable Throwable cause);
/**
* Auto read in netty means that the server is automatically reading from the channel.
* Turning it off means that we won't get more packets being decoded until we turn it back on.
* We use this to hold off on reading packets until we are ready to process them.
* For example this is used for switching inbound states with {@link #switchInboundState(Runnable)}.
*
* @param autoRead Whether to enable auto read.
* Default is true.
*/
void setAutoRead(boolean autoRead);
/**
* Returns the underlying netty channel of this session.
*
* @return The netty channel
*/
Channel getChannel();
/**
* Changes the inbound state of the session and then re-enables auto read.
* This is used after a terminal packet was handled and the session is ready to receive more packets in the new state.
*
* @param switcher The runnable that switches the inbound state.
*/
default void switchInboundState(Runnable switcher) {
switcher.run();
// We switched to the new inbound state
// we can start reading again
setAutoRead(true);
}
/**
* Flushes all packets that are due to be sent and changes the outbound state of the session.
* This makes sure no other threads have scheduled packets to be sent.
*
* @param switcher The runnable that switches the outbound state.
*/
default void switchOutboundState(Runnable switcher) {
getChannel().writeAndFlush(FlushHandler.FLUSH_PACKET).syncUninterruptibly();
switcher.run();
}
}

View file

@ -1,6 +1,7 @@
package org.geysermc.mcprotocollib.network.packet;
import io.netty.buffer.ByteBuf;
import org.geysermc.mcprotocollib.network.Session;
/**
* A network packet. Any given packet must have a constructor that takes in a {@link ByteBuf}.
@ -17,4 +18,14 @@ public interface Packet {
default boolean isPriority() {
return false;
}
/**
* Returns whether the packet is terminal. If true, this should be the last packet sent inside a protocol state.
* Subsequently, {@link Session#setAutoRead(boolean)} should be disabled when a terminal packet is received, until the session has switched into a new state and is ready to receive more packets.
*
* @return Whether the packet is terminal.
*/
default boolean isTerminal() {
return false;
}
}

View file

@ -49,9 +49,16 @@ public abstract class PacketProtocol {
public abstract void newServerSession(Server server, Session session);
/**
* Gets the packet registry for this protocol.
* Gets the inbound packet registry for this protocol.
*
* @return The protocol's packet registry.
* @return The protocol's inbound packet registry.
*/
public abstract PacketRegistry getPacketRegistry();
public abstract PacketRegistry getInboundPacketRegistry();
/**
* Gets the outbound packet registry for this protocol.
*
* @return The protocol's outbound packet registry.
*/
public abstract PacketRegistry getOutboundPacketRegistry();
}

View file

@ -0,0 +1,28 @@
package org.geysermc.mcprotocollib.network.tcp;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
/**
* Sending a {@link FlushPacket} will ensure all before were sent.
* This handler makes sure it's dropped before it reaches the encoder.
* This logic is similar to the Minecraft UnconfiguredPipelineHandler.OutboundConfigurationTask.
*/
public class FlushHandler extends ChannelOutboundHandlerAdapter {
public static final FlushPacket FLUSH_PACKET = new FlushPacket();
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg == FLUSH_PACKET) {
promise.setSuccess();
} else {
super.write(ctx, msg, promise);
}
}
public static class FlushPacket {
private FlushPacket() {
}
}
}

View file

@ -27,6 +27,7 @@ import io.netty.handler.timeout.WriteTimeoutHandler;
import io.netty.resolver.dns.DnsNameResolver;
import io.netty.resolver.dns.DnsNameResolverBuilder;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.geysermc.mcprotocollib.network.BuiltinFlags;
import org.geysermc.mcprotocollib.network.ProxyInfo;
import org.geysermc.mcprotocollib.network.codec.PacketCodecHelper;
@ -100,7 +101,7 @@ public class TcpClientSession extends TcpSession {
.localAddress(bindAddress, bindPort)
.handler(new ChannelInitializer<>() {
@Override
public void initChannel(Channel channel) {
public void initChannel(@NonNull Channel channel) {
PacketProtocol protocol = getPacketProtocol();
protocol.newClientSession(TcpClientSession.this, transferring);
@ -117,7 +118,9 @@ public class TcpClientSession extends TcpSession {
pipeline.addLast("sizer", new TcpPacketSizer(protocol.getPacketHeader(), getCodecHelper()));
pipeline.addLast("compression", new TcpPacketCompression(getCodecHelper()));
pipeline.addLast("flow-control", new TcpFlowControlHandler());
pipeline.addLast("codec", new TcpPacketCodec(TcpClientSession.this, true));
pipeline.addLast("flush-handler", new FlushHandler());
pipeline.addLast("manager", TcpClientSession.this);
}
});
@ -246,9 +249,7 @@ public class TcpClientSession extends TcpSession {
HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, proxiedProtocol,
clientAddress.getAddress().getHostAddress(), remoteAddress.getAddress().getHostAddress(),
clientAddress.getPort(), remoteAddress.getPort()
)).addListener(future -> {
channel.pipeline().remove("proxy-protocol-encoder");
});
)).addListener(future -> channel.pipeline().remove("proxy-protocol-encoder"));
}
private static void createTcpEventLoopGroup() {

View file

@ -0,0 +1,20 @@
package org.geysermc.mcprotocollib.network.tcp;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.flow.FlowControlHandler;
/**
* A flow control handler for TCP connections.
* When auto-read is disabled, this will halt decoding of packets until auto-read is re-enabled.
* This is needed because auto-read still allows packets to be decoded, even if the channel is not reading anymore from the network.
* This can happen when the channel already read a packet, but the packet is not yet decoded.
* This will halt all decoding until the channel is ready to process more packets.
*/
public class TcpFlowControlHandler extends FlowControlHandler {
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().config().isAutoRead()) {
super.read(ctx);
}
}
}

View file

@ -2,17 +2,27 @@ package org.geysermc.mcprotocollib.network.tcp;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.MessageToMessageCodec;
import org.geysermc.mcprotocollib.network.Session;
import org.geysermc.mcprotocollib.network.codec.PacketCodecHelper;
import org.geysermc.mcprotocollib.network.codec.PacketDefinition;
import org.geysermc.mcprotocollib.network.event.session.PacketErrorEvent;
import org.geysermc.mcprotocollib.network.packet.Packet;
import org.geysermc.mcprotocollib.network.packet.PacketProtocol;
import org.geysermc.mcprotocollib.network.packet.PacketRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import java.util.List;
public class TcpPacketCodec extends ByteToMessageCodec<Packet> {
public class TcpPacketCodec extends MessageToMessageCodec<ByteBuf, Packet> {
private static final Marker marker = MarkerFactory.getMarker("packet_logging");
private static final Logger log = LoggerFactory.getLogger(TcpPacketCodec.class);
private final Session session;
private final boolean client;
@ -23,35 +33,51 @@ public class TcpPacketCodec extends ByteToMessageCodec<Packet> {
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf buf) {
int initial = buf.writerIndex();
public void encode(ChannelHandlerContext ctx, Packet packet, List<Object> out) {
if (log.isTraceEnabled()) {
log.trace(marker, "Encoding packet: {}", packet.getClass().getSimpleName());
}
PacketProtocol packetProtocol = this.session.getPacketProtocol();
PacketRegistry packetRegistry = packetProtocol.getOutboundPacketRegistry();
PacketCodecHelper codecHelper = this.session.getCodecHelper();
try {
int packetId = this.client ? packetProtocol.getPacketRegistry().getServerboundId(packet) : packetProtocol.getPacketRegistry().getClientboundId(packet);
PacketDefinition definition = this.client ? packetProtocol.getPacketRegistry().getServerboundDefinition(packetId) : packetProtocol.getPacketRegistry().getClientboundDefinition(packetId);
int packetId = this.client ? packetRegistry.getServerboundId(packet) : packetRegistry.getClientboundId(packet);
PacketDefinition definition = this.client ? packetRegistry.getServerboundDefinition(packetId) : packetRegistry.getClientboundDefinition(packetId);
ByteBuf buf = ctx.alloc().buffer();
packetProtocol.getPacketHeader().writePacketId(buf, codecHelper, packetId);
definition.getSerializer().serialize(buf, codecHelper, packet);
out.add(buf);
if (log.isDebugEnabled()) {
log.debug(marker, "Encoded packet {} ({})", packet.getClass().getSimpleName(), packetId);
}
} catch (Throwable t) {
// Reset writer index to make sure incomplete data is not written out.
buf.writerIndex(initial);
log.debug(marker, "Error encoding packet", t);
PacketErrorEvent e = new PacketErrorEvent(this.session, t);
this.session.callEvent(e);
if (!e.shouldSuppress()) {
throw t;
throw new EncoderException(t);
}
}
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) {
// Vanilla also checks for 0 length
if (buf.readableBytes() == 0) {
return;
}
int initial = buf.readerIndex();
PacketProtocol packetProtocol = this.session.getPacketProtocol();
PacketRegistry packetRegistry = packetProtocol.getInboundPacketRegistry();
PacketCodecHelper codecHelper = this.session.getCodecHelper();
Packet packet = null;
try {
int id = packetProtocol.getPacketHeader().readPacketId(buf, codecHelper);
if (id == -1) {
@ -59,21 +85,35 @@ public class TcpPacketCodec extends ByteToMessageCodec<Packet> {
return;
}
Packet packet = this.client ? packetProtocol.getPacketRegistry().createClientboundPacket(id, buf, codecHelper) : packetProtocol.getPacketRegistry().createServerboundPacket(id, buf, codecHelper);
log.trace(marker, "Decoding packet with id: {}", id);
packet = this.client ? packetRegistry.createClientboundPacket(id, buf, codecHelper) : packetRegistry.createServerboundPacket(id, buf, codecHelper);
if (buf.readableBytes() > 0) {
throw new IllegalStateException("Packet \"" + packet.getClass().getSimpleName() + "\" not fully read.");
}
out.add(packet);
if (log.isDebugEnabled()) {
log.debug(marker, "Decoded packet {} ({})", packet.getClass().getSimpleName(), id);
}
} catch (Throwable t) {
log.debug(marker, "Error decoding packet", t);
// Advance buffer to end to make sure remaining data in this packet is skipped.
buf.readerIndex(buf.readerIndex() + buf.readableBytes());
PacketErrorEvent e = new PacketErrorEvent(this.session, t);
this.session.callEvent(e);
if (!e.shouldSuppress()) {
throw t;
throw new DecoderException(t);
}
} finally {
if (packet != null && packet.isTerminal()) {
// Next packets are in a different protocol state, so we must
// disable auto-read to prevent reading wrong packets.
session.setAutoRead(false);
}
}
}

View file

@ -9,6 +9,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.geysermc.mcprotocollib.network.AbstractServer;
import org.geysermc.mcprotocollib.network.BuiltinFlags;
import org.geysermc.mcprotocollib.network.helper.TransportHelper;
@ -52,7 +53,7 @@ public class TcpServer extends AbstractServer {
.localAddress(this.getHost(), this.getPort())
.childHandler(new ChannelInitializer<>() {
@Override
public void initChannel(Channel channel) {
public void initChannel(@NonNull Channel channel) {
InetSocketAddress address = (InetSocketAddress) channel.remoteAddress();
PacketProtocol protocol = createPacketProtocol();
@ -68,7 +69,9 @@ public class TcpServer extends AbstractServer {
pipeline.addLast("sizer", new TcpPacketSizer(protocol.getPacketHeader(), session.getCodecHelper()));
pipeline.addLast("compression", new TcpPacketCompression(session.getCodecHelper()));
pipeline.addLast("flow-control", new TcpFlowControlHandler());
pipeline.addLast("codec", new TcpPacketCodec(session, false));
pipeline.addLast("flush-handler", new FlushHandler());
pipeline.addLast("manager", session);
}
});

View file

@ -216,11 +216,17 @@ public abstract class TcpSession extends SimpleChannelInboundHandler<Packet> imp
}
@Override
public void send(Packet packet) {
public void send(@NonNull Packet packet, @Nullable Runnable onSent) {
if (this.channel == null) {
return;
}
// Same behaviour as vanilla, always offload packet sending to the event loop
if (!this.channel.eventLoop().inEventLoop()) {
this.channel.eventLoop().execute(() -> this.send(packet, onSent));
return;
}
PacketSendingEvent sendingEvent = new PacketSendingEvent(this, packet);
this.callEvent(sendingEvent);
@ -228,6 +234,10 @@ public abstract class TcpSession extends SimpleChannelInboundHandler<Packet> imp
final Packet toSend = sendingEvent.getPacket();
this.channel.writeAndFlush(toSend).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
if (onSent != null) {
onSent.run();
}
callPacketSent(toSend);
} else {
exceptionCaught(null, future.cause());
@ -252,6 +262,13 @@ public abstract class TcpSession extends SimpleChannelInboundHandler<Packet> imp
}
}
@Override
public void setAutoRead(boolean autoRead) {
if (this.channel != null) {
this.channel.config().setAutoRead(autoRead);
}
}
private @Nullable EventLoop createEventLoop() {
if (!USE_EVENT_LOOP_FOR_PACKETS) {
return null;
@ -267,6 +284,7 @@ public abstract class TcpSession extends SimpleChannelInboundHandler<Packet> imp
return PACKET_EVENT_LOOP.next();
}
@Override
public Channel getChannel() {
return this.channel;
}

View file

@ -61,7 +61,7 @@ public class ClientListener extends SessionAdapter {
@Override
public void packetReceived(Session session, Packet packet) {
MinecraftProtocol protocol = (MinecraftProtocol) session.getPacketProtocol();
if (protocol.getState() == ProtocolState.LOGIN) {
if (protocol.getInboundState() == ProtocolState.LOGIN) {
if (packet instanceof ClientboundHelloPacket helloPacket) {
GameProfile profile = session.getFlag(MinecraftConstants.PROFILE_KEY);
String accessToken = session.getFlag(MinecraftConstants.ACCESS_TOKEN_KEY);
@ -92,18 +92,21 @@ public class ClientListener extends SessionAdapter {
return;
}
session.send(new ServerboundKeyPacket(helloPacket.getPublicKey(), key, helloPacket.getChallenge()));
session.setEncryption(protocol.createEncryption(key));
session.send(new ServerboundKeyPacket(helloPacket.getPublicKey(), key, helloPacket.getChallenge()),
() -> session.setEncryption(protocol.createEncryption(key)));
} else if (packet instanceof ClientboundGameProfilePacket) {
session.switchInboundState(() -> protocol.setInboundState(ProtocolState.CONFIGURATION));
session.send(new ServerboundLoginAcknowledgedPacket());
session.switchOutboundState(() -> protocol.setOutboundState(ProtocolState.CONFIGURATION));
} else if (packet instanceof ClientboundLoginDisconnectPacket loginDisconnectPacket) {
session.disconnect(loginDisconnectPacket.getReason());
} else if (packet instanceof ClientboundLoginCompressionPacket loginCompressionPacket) {
int threshold = loginCompressionPacket.getThreshold();
session.setCompression(threshold >= 0 ?
new CompressionConfig(threshold, new ZlibCompression(), false) : null);
if (threshold >= 0) {
session.setCompression(new CompressionConfig(threshold, new ZlibCompression(), false));
}
} else if (protocol.getState() == ProtocolState.STATUS) {
}
} else if (protocol.getInboundState() == ProtocolState.STATUS) {
if (packet instanceof ClientboundStatusResponsePacket statusResponsePacket) {
ServerStatusInfo info = statusResponsePacket.parseInfo();
ServerInfoHandler handler = session.getFlag(MinecraftConstants.SERVER_INFO_HANDLER_KEY);
@ -121,13 +124,15 @@ public class ClientListener extends SessionAdapter {
session.disconnect(Component.translatable("multiplayer.status.finished"));
}
} else if (protocol.getState() == ProtocolState.GAME) {
} else if (protocol.getInboundState() == ProtocolState.GAME) {
if (packet instanceof ClientboundKeepAlivePacket keepAlivePacket && session.getFlag(MinecraftConstants.AUTOMATIC_KEEP_ALIVE_MANAGEMENT, true)) {
session.send(new ServerboundKeepAlivePacket(keepAlivePacket.getPingId()));
} else if (packet instanceof ClientboundDisconnectPacket disconnectPacket) {
session.disconnect(disconnectPacket.getReason());
} else if (packet instanceof ClientboundStartConfigurationPacket) {
session.switchInboundState(() -> protocol.setInboundState(ProtocolState.CONFIGURATION));
session.send(new ServerboundConfigurationAcknowledgedPacket());
session.switchOutboundState(() -> protocol.setOutboundState(ProtocolState.CONFIGURATION));
} else if (packet instanceof ClientboundTransferPacket transferPacket) {
if (session.getFlag(MinecraftConstants.FOLLOW_TRANSFERS, true)) {
TcpClientSession newSession = new TcpClientSession(transferPacket.getHost(), transferPacket.getPort(), session.getPacketProtocol());
@ -136,9 +141,13 @@ public class ClientListener extends SessionAdapter {
newSession.connect(true, true);
}
}
} else if (protocol.getState() == ProtocolState.CONFIGURATION) {
if (packet instanceof ClientboundFinishConfigurationPacket) {
} else if (protocol.getInboundState() == ProtocolState.CONFIGURATION) {
if (packet instanceof ClientboundKeepAlivePacket keepAlivePacket && session.getFlag(MinecraftConstants.AUTOMATIC_KEEP_ALIVE_MANAGEMENT, true)) {
session.send(new ServerboundKeepAlivePacket(keepAlivePacket.getPingId()));
} else if (packet instanceof ClientboundFinishConfigurationPacket) {
session.switchInboundState(() -> protocol.setInboundState(ProtocolState.GAME));
session.send(new ServerboundFinishConfigurationPacket());
session.switchOutboundState(() -> protocol.setOutboundState(ProtocolState.GAME));
} else if (packet instanceof ClientboundSelectKnownPacks) {
if (session.getFlag(MinecraftConstants.SEND_BLANK_KNOWN_PACKS_RESPONSE, true)) {
session.send(new ServerboundSelectKnownPacks(Collections.emptyList()));
@ -155,38 +164,25 @@ public class ClientListener extends SessionAdapter {
}
@Override
public void packetSent(Session session, Packet packet) {
public void connected(ConnectedEvent event) {
Session session = event.getSession();
MinecraftProtocol protocol = (MinecraftProtocol) session.getPacketProtocol();
if (packet instanceof ClientIntentionPacket) {
// Once the HandshakePacket has been sent, switch to the next protocol mode.
protocol.setState(this.targetState);
ClientIntentionPacket intention = new ClientIntentionPacket(protocol.getCodec().getProtocolVersion(), session.getHost(), session.getPort(), switch (targetState) {
case LOGIN -> transferring ? HandshakeIntent.TRANSFER : HandshakeIntent.LOGIN;
case STATUS -> HandshakeIntent.STATUS;
default -> throw new IllegalStateException("Unexpected value: " + targetState);
});
if (this.targetState == ProtocolState.LOGIN) {
session.switchInboundState(() -> protocol.setInboundState(this.targetState));
session.send(intention);
session.switchOutboundState(() -> protocol.setOutboundState(this.targetState));
switch (this.targetState) {
case LOGIN -> {
GameProfile profile = session.getFlag(MinecraftConstants.PROFILE_KEY);
session.send(new ServerboundHelloPacket(profile.getName(), profile.getId()));
} else {
session.send(new ServerboundStatusRequestPacket());
}
} else if (packet instanceof ServerboundLoginAcknowledgedPacket) {
protocol.setState(ProtocolState.CONFIGURATION); // LOGIN -> CONFIGURATION
} else if (packet instanceof ServerboundFinishConfigurationPacket) {
protocol.setState(ProtocolState.GAME); // CONFIGURATION -> GAME
} else if (packet instanceof ServerboundConfigurationAcknowledgedPacket) {
protocol.setState(ProtocolState.CONFIGURATION); // GAME -> CONFIGURATION
}
}
@Override
public void connected(ConnectedEvent event) {
MinecraftProtocol protocol = (MinecraftProtocol) event.getSession().getPacketProtocol();
if (this.targetState == ProtocolState.LOGIN) {
if (this.transferring) {
event.getSession().send(new ClientIntentionPacket(protocol.getCodec().getProtocolVersion(), event.getSession().getHost(), event.getSession().getPort(), HandshakeIntent.TRANSFER));
} else {
event.getSession().send(new ClientIntentionPacket(protocol.getCodec().getProtocolVersion(), event.getSession().getHost(), event.getSession().getPort(), HandshakeIntent.LOGIN));
}
} else if (this.targetState == ProtocolState.STATUS) {
event.getSession().send(new ClientIntentionPacket(protocol.getCodec().getProtocolVersion(), event.getSession().getHost(), event.getSession().getPort(), HandshakeIntent.STATUS));
case STATUS -> session.send(new ServerboundStatusRequestPacket());
default -> throw new IllegalStateException("Unexpected value: " + targetState);
}
}
}

View file

@ -18,6 +18,8 @@ import org.geysermc.mcprotocollib.protocol.codec.MinecraftCodec;
import org.geysermc.mcprotocollib.protocol.codec.MinecraftCodecHelper;
import org.geysermc.mcprotocollib.protocol.codec.PacketCodec;
import org.geysermc.mcprotocollib.protocol.data.ProtocolState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.security.GeneralSecurityException;
@ -29,6 +31,7 @@ import java.util.UUID;
* Implements the Minecraft protocol.
*/
public class MinecraftProtocol extends PacketProtocol {
private static final Logger log = LoggerFactory.getLogger(MinecraftProtocol.class);
/**
* The network codec sent from the server to the client during {@link ProtocolState#CONFIGURATION}.
@ -44,8 +47,11 @@ public class MinecraftProtocol extends PacketProtocol {
@Getter
private final PacketCodec codec;
private ProtocolState state;
private PacketRegistry stateRegistry;
private ProtocolState inboundState;
private PacketRegistry inboundStateRegistry;
private ProtocolState outboundState;
private PacketRegistry outboundStateRegistry;
private final ProtocolState targetState;
@ -84,7 +90,7 @@ public class MinecraftProtocol extends PacketProtocol {
this.codec = codec;
this.targetState = ProtocolState.STATUS;
this.setState(ProtocolState.HANDSHAKE);
resetStates();
}
/**
@ -129,7 +135,7 @@ public class MinecraftProtocol extends PacketProtocol {
this.profile = profile;
this.accessToken = accessToken;
this.setState(ProtocolState.HANDSHAKE);
resetStates();
}
@Override
@ -152,7 +158,7 @@ public class MinecraftProtocol extends PacketProtocol {
session.setFlag(MinecraftConstants.PROFILE_KEY, this.profile);
session.setFlag(MinecraftConstants.ACCESS_TOKEN_KEY, this.accessToken);
this.setState(ProtocolState.HANDSHAKE);
resetStates();
if (this.useDefaultListeners) {
session.addListener(new ClientListener(this.targetState, transferring));
@ -161,7 +167,7 @@ public class MinecraftProtocol extends PacketProtocol {
@Override
public void newServerSession(Server server, Session session) {
this.setState(ProtocolState.HANDSHAKE);
resetStates();
if (this.useDefaultListeners) {
if (DEFAULT_NETWORK_CODEC == null) {
@ -173,8 +179,13 @@ public class MinecraftProtocol extends PacketProtocol {
}
@Override
public PacketRegistry getPacketRegistry() {
return this.stateRegistry;
public PacketRegistry getInboundPacketRegistry() {
return this.inboundStateRegistry;
}
@Override
public PacketRegistry getOutboundPacketRegistry() {
return this.outboundStateRegistry;
}
protected EncryptionConfig createEncryption(Key key) {
@ -186,17 +197,43 @@ public class MinecraftProtocol extends PacketProtocol {
}
/**
* Gets the current {@link ProtocolState} the client is in.
*
* @return The current {@link ProtocolState}.
* Resets the protocol states to {@link ProtocolState#HANDSHAKE}.
*/
public ProtocolState getState() {
return this.state;
public void resetStates() {
this.setInboundState(ProtocolState.HANDSHAKE);
this.setOutboundState(ProtocolState.HANDSHAKE);
}
public void setState(ProtocolState state) {
this.state = state;
this.stateRegistry = this.codec.getCodec(state);
/**
* Gets the current inbound {@link ProtocolState} we're in.
*
* @return The current inbound {@link ProtocolState}.
*/
public ProtocolState getInboundState() {
return this.inboundState;
}
/**
* Gets the current outbound {@link ProtocolState} we're in.
*
* @return The current outbound {@link ProtocolState}.
*/
public ProtocolState getOutboundState() {
return this.outboundState;
}
public void setInboundState(ProtocolState state) {
log.debug("Setting inbound protocol state to: {}", state);
this.inboundState = state;
this.inboundStateRegistry = this.codec.getCodec(state);
}
public void setOutboundState(ProtocolState state) {
log.debug("Setting outbound protocol state to: {}", state);
this.outboundState = state;
this.outboundStateRegistry = this.codec.getCodec(state);
}
public static NbtMap loadNetworkCodec() {

View file

@ -1,6 +1,6 @@
package org.geysermc.mcprotocollib.protocol;
import lombok.RequiredArgsConstructor;
import lombok.Getter;
import net.kyori.adventure.key.Key;
import net.kyori.adventure.text.Component;
import org.cloudburstmc.nbt.NbtMap;
@ -77,9 +77,10 @@ public class ServerListener extends SessionAdapter {
private final byte[] challenge = new byte[4];
private String username = "";
private KeepAliveState keepAliveState;
private long lastPingTime = 0;
private int lastPingId = 0;
@Getter
private boolean isTransfer = false;
public ServerListener(NbtMap networkCodec) {
this.networkCodec = networkCodec;
@ -88,40 +89,33 @@ public class ServerListener extends SessionAdapter {
@Override
public void connected(ConnectedEvent event) {
event.getSession().setFlag(MinecraftConstants.PING_KEY, 0L);
Session session = event.getSession();
session.setFlag(MinecraftConstants.PING_KEY, 0L);
}
@Override
public void packetReceived(Session session, Packet packet) {
MinecraftProtocol protocol = (MinecraftProtocol) session.getPacketProtocol();
if (protocol.getState() == ProtocolState.HANDSHAKE) {
if (protocol.getInboundState() == ProtocolState.HANDSHAKE) {
if (packet instanceof ClientIntentionPacket intentionPacket) {
switch (intentionPacket.getIntent()) {
case STATUS -> protocol.setState(ProtocolState.STATUS);
case TRANSFER -> {
if (!session.getFlag(MinecraftConstants.ACCEPT_TRANSFERS_KEY, false)) {
session.disconnect(Component.translatable("multiplayer.disconnect.transfers_disabled"));
}
}
case LOGIN -> {
protocol.setState(ProtocolState.LOGIN);
if (intentionPacket.getProtocolVersion() > protocol.getCodec().getProtocolVersion()) {
session.disconnect(Component.translatable("multiplayer.disconnect.incompatible", Component.text(protocol.getCodec().getMinecraftVersion())));
} else if (intentionPacket.getProtocolVersion() < protocol.getCodec().getProtocolVersion()) {
session.disconnect(Component.translatable("multiplayer.disconnect.outdated_client", Component.text(protocol.getCodec().getMinecraftVersion())));
}
case STATUS -> {
protocol.setOutboundState(ProtocolState.STATUS);
session.switchInboundState(() -> protocol.setInboundState(ProtocolState.STATUS));
}
case TRANSFER -> beginLogin(session, protocol, intentionPacket, true);
case LOGIN -> beginLogin(session, protocol, intentionPacket, false);
default -> throw new UnsupportedOperationException("Invalid client intent: " + intentionPacket.getIntent());
}
}
} else if (protocol.getState() == ProtocolState.LOGIN) {
} else if (protocol.getInboundState() == ProtocolState.LOGIN) {
if (packet instanceof ServerboundHelloPacket helloPacket) {
this.username = helloPacket.getUsername();
if (session.getFlag(MinecraftConstants.ENCRYPT_CONNECTION, true)) {
session.send(new ClientboundHelloPacket(SERVER_ID, KEY_PAIR.getPublic(), this.challenge, session.getFlag(MinecraftConstants.SHOULD_AUTHENTICATE, true)));
} else {
new Thread(new UserAuthTask(session, false, null)).start();
new Thread(() -> authenticate(session, false, null)).start();
}
} else if (packet instanceof ServerboundKeyPacket keyPacket) {
PrivateKey privateKey = KEY_PAIR.getPrivate();
@ -132,9 +126,15 @@ public class ServerListener extends SessionAdapter {
SecretKey key = keyPacket.getSecretKey(privateKey);
session.setEncryption(protocol.createEncryption(key));
new Thread(new UserAuthTask(session, session.getFlag(MinecraftConstants.SHOULD_AUTHENTICATE, true), key)).start();
new Thread(() -> authenticate(session, session.getFlag(MinecraftConstants.SHOULD_AUTHENTICATE, true), key)).start();
} else if (packet instanceof ServerboundLoginAcknowledgedPacket) {
protocol.setState(ProtocolState.CONFIGURATION);
protocol.setOutboundState(ProtocolState.CONFIGURATION);
session.switchInboundState(() -> protocol.setInboundState(ProtocolState.CONFIGURATION));
keepAliveState = new KeepAliveState();
if (session.getFlag(MinecraftConstants.AUTOMATIC_KEEP_ALIVE_MANAGEMENT, true)) {
// If keepalive state is null, lets assume there is no keepalive thread yet
new Thread(() -> keepAlive(session)).start();
}
// Credit ViaVersion: https://github.com/ViaVersion/ViaVersion/blob/dev/common/src/main/java/com/viaversion/viaversion/protocols/protocol1_20_5to1_20_3/rewriter/EntityPacketRewriter1_20_5.java
for (Map.Entry<String, Object> entry : networkCodec.entrySet()) {
@ -156,7 +156,7 @@ public class ServerListener extends SessionAdapter {
session.send(new ClientboundFinishConfigurationPacket());
}
} else if (protocol.getState() == ProtocolState.STATUS) {
} else if (protocol.getInboundState() == ProtocolState.STATUS) {
if (packet instanceof ServerboundStatusRequestPacket) {
ServerInfoBuilder builder = session.getFlag(MinecraftConstants.SERVER_INFO_BUILDER_KEY);
if (builder == null) {
@ -174,66 +174,77 @@ public class ServerListener extends SessionAdapter {
} else if (packet instanceof ServerboundPingRequestPacket pingRequestPacket) {
session.send(new ClientboundPongResponsePacket(pingRequestPacket.getPingTime()));
}
} else if (protocol.getState() == ProtocolState.GAME) {
} else if (protocol.getInboundState() == ProtocolState.GAME) {
if (packet instanceof ServerboundKeepAlivePacket keepAlivePacket) {
if (keepAlivePacket.getPingId() == this.lastPingId) {
long time = System.currentTimeMillis() - this.lastPingTime;
session.setFlag(MinecraftConstants.PING_KEY, time);
}
handleKeepAlive(session, keepAlivePacket);
} else if (packet instanceof ServerboundConfigurationAcknowledgedPacket) {
protocol.setState(ProtocolState.CONFIGURATION);
// The developer who sends ClientboundStartConfigurationPacket needs to setOutboundState to CONFIGURATION
// after sending the packet. We can't do it in this class because it needs to be a method call right after it was sent.
// Using nettys event loop to change outgoing state may cause differences to vanilla.
session.switchInboundState(() -> protocol.setInboundState(ProtocolState.CONFIGURATION));
keepAliveState = new KeepAliveState();
} else if (packet instanceof ServerboundPingRequestPacket pingRequestPacket) {
session.send(new ClientboundPongResponsePacket(pingRequestPacket.getPingTime()));
session.disconnect(Component.translatable("multiplayer.status.request_handled"));
}
} else if (protocol.getState() == ProtocolState.CONFIGURATION) {
if (packet instanceof ServerboundFinishConfigurationPacket) {
protocol.setState(ProtocolState.GAME);
} else if (protocol.getInboundState() == ProtocolState.CONFIGURATION) {
if (packet instanceof ServerboundKeepAlivePacket keepAlivePacket) {
handleKeepAlive(session, keepAlivePacket);
} else if (packet instanceof ServerboundFinishConfigurationPacket) {
protocol.setOutboundState(ProtocolState.GAME);
session.switchInboundState(() -> protocol.setInboundState(ProtocolState.GAME));
keepAliveState = new KeepAliveState();
ServerLoginHandler handler = session.getFlag(MinecraftConstants.SERVER_LOGIN_HANDLER_KEY);
if (handler != null) {
handler.loggedIn(session);
}
if (session.getFlag(MinecraftConstants.AUTOMATIC_KEEP_ALIVE_MANAGEMENT, true)) {
new Thread(new KeepAliveTask(session)).start();
}
}
}
}
@Override
public void packetSent(Session session, Packet packet) {
if (packet instanceof ClientboundLoginCompressionPacket loginCompressionPacket) {
int threshold = loginCompressionPacket.getThreshold();
session.setCompression(threshold >= 0 ?
new CompressionConfig(threshold, new ZlibCompression(), true) : null);
session.send(new ClientboundGameProfilePacket(session.getFlag(MinecraftConstants.PROFILE_KEY), true));
private void handleKeepAlive(Session session, ServerboundKeepAlivePacket keepAlivePacket) {
KeepAliveState currentKeepAliveState = this.keepAliveState;
if (currentKeepAliveState != null) {
if (currentKeepAliveState.keepAlivePending && keepAlivePacket.getPingId() == currentKeepAliveState.keepAliveChallenge) {
currentKeepAliveState.keepAlivePending = false;
session.setFlag(MinecraftConstants.PING_KEY, System.currentTimeMillis() - currentKeepAliveState.keepAliveTime);
} else {
session.disconnect(Component.translatable("disconnect.timeout"));
}
}
}
private void beginLogin(Session session, MinecraftProtocol protocol, ClientIntentionPacket packet, boolean transferred) {
isTransfer = transferred;
protocol.setOutboundState(ProtocolState.LOGIN);
if (transferred && !session.getFlag(MinecraftConstants.ACCEPT_TRANSFERS_KEY)) {
session.disconnect(Component.translatable("multiplayer.disconnect.transfers_disabled"));
} else if (packet.getProtocolVersion() > protocol.getCodec().getProtocolVersion()) {
session.disconnect(Component.translatable("multiplayer.disconnect.incompatible", Component.text(protocol.getCodec().getMinecraftVersion())));
} else if (packet.getProtocolVersion() < protocol.getCodec().getProtocolVersion()) {
session.disconnect(Component.translatable("multiplayer.disconnect.outdated_client", Component.text(protocol.getCodec().getMinecraftVersion())));
} else {
session.switchInboundState(() -> protocol.setInboundState(ProtocolState.LOGIN));
}
}
@Override
public void disconnecting(DisconnectingEvent event) {
MinecraftProtocol protocol = (MinecraftProtocol) event.getSession().getPacketProtocol();
if (protocol.getState() == ProtocolState.LOGIN) {
event.getSession().send(new ClientboundLoginDisconnectPacket(event.getReason()));
} else if (protocol.getState() == ProtocolState.GAME) {
event.getSession().send(new ClientboundDisconnectPacket(event.getReason()));
Session session = event.getSession();
MinecraftProtocol protocol = (MinecraftProtocol) session.getPacketProtocol();
if (protocol.getOutboundState() == ProtocolState.LOGIN) {
session.send(new ClientboundLoginDisconnectPacket(event.getReason()));
} else if (protocol.getOutboundState() == ProtocolState.GAME) {
session.send(new ClientboundDisconnectPacket(event.getReason()));
}
}
@RequiredArgsConstructor
private class UserAuthTask implements Runnable {
private final Session session;
private final boolean shouldAuthenticate;
private final SecretKey key;
@Override
public void run() {
private void authenticate(Session session, boolean shouldAuthenticate, SecretKey key) {
GameProfile profile;
if (this.shouldAuthenticate && this.key != null) {
SessionService sessionService = this.session.getFlag(MinecraftConstants.SESSION_SERVICE_KEY, new SessionService());
if (shouldAuthenticate && key != null) {
SessionService sessionService = session.getFlag(MinecraftConstants.SESSION_SERVICE_KEY, new SessionService());
try {
profile = sessionService.getProfileByServer(username, SessionService.getServerId(SERVER_ID, KEY_PAIR.getPublic(), this.key));
profile = sessionService.getProfileByServer(username, SessionService.getServerId(SERVER_ID, KEY_PAIR.getPublic(), key));
} catch (IOException e) {
session.disconnect(Component.translatable("multiplayer.disconnect.authservers_down"), e);
return;
@ -247,30 +258,48 @@ public class ServerListener extends SessionAdapter {
profile = new GameProfile(UUID.nameUUIDFromBytes(("OfflinePlayer:" + username).getBytes()), username);
}
this.session.setFlag(MinecraftConstants.PROFILE_KEY, profile);
session.setFlag(MinecraftConstants.PROFILE_KEY, profile);
int threshold = session.getFlag(MinecraftConstants.SERVER_COMPRESSION_THRESHOLD, DEFAULT_COMPRESSION_THRESHOLD);
this.session.send(new ClientboundLoginCompressionPacket(threshold));
if (threshold >= 0) {
session.send(new ClientboundLoginCompressionPacket(threshold), () ->
session.setCompression(new CompressionConfig(threshold, new ZlibCompression(), true)));
}
session.send(new ClientboundGameProfilePacket(profile, true));
}
private void keepAlive(Session session) {
while (session.isConnected()) {
KeepAliveState currentKeepAliveState = this.keepAliveState;
if (currentKeepAliveState != null) {
if (System.currentTimeMillis() - currentKeepAliveState.keepAliveTime >= 15000L) {
if (currentKeepAliveState.keepAlivePending) {
session.disconnect(Component.translatable("disconnect.timeout"));
break;
}
long time = System.currentTimeMillis();
currentKeepAliveState.keepAlivePending = true;
currentKeepAliveState.keepAliveChallenge = time;
currentKeepAliveState.keepAliveTime = time;
session.send(new ClientboundKeepAlivePacket(currentKeepAliveState.keepAliveChallenge));
}
}
@RequiredArgsConstructor
private class KeepAliveTask implements Runnable {
private final Session session;
@Override
public void run() {
while (this.session.isConnected()) {
lastPingTime = System.currentTimeMillis();
lastPingId = (int) lastPingTime;
this.session.send(new ClientboundKeepAlivePacket(lastPingId));
// TODO: Implement proper tick loop rather than sleeping
try {
Thread.sleep(2000);
Thread.sleep(50);
} catch (InterruptedException e) {
break;
}
}
}
private static class KeepAliveState {
private boolean keepAlivePending;
private long keepAliveChallenge;
private long keepAliveTime = System.currentTimeMillis();
}
}

View file

@ -15,4 +15,9 @@ public class ClientboundFinishConfigurationPacket implements MinecraftPacket {
@Override
public void serialize(ByteBuf out, MinecraftCodecHelper helper) {
}
@Override
public boolean isTerminal() {
return true;
}
}

View file

@ -15,4 +15,9 @@ public class ServerboundFinishConfigurationPacket implements MinecraftPacket {
public void serialize(ByteBuf buf, MinecraftCodecHelper helper) {
}
@Override
public boolean isTerminal() {
return true;
}
}

View file

@ -37,4 +37,9 @@ public class ClientIntentionPacket implements MinecraftPacket {
public boolean isPriority() {
return true;
}
@Override
public boolean isTerminal() {
return true;
}
}

View file

@ -15,4 +15,9 @@ public class ClientboundStartConfigurationPacket implements MinecraftPacket {
public void serialize(ByteBuf out, MinecraftCodecHelper helper) {
}
@Override
public boolean isTerminal() {
return true;
}
}

View file

@ -15,4 +15,9 @@ public class ServerboundConfigurationAcknowledgedPacket implements MinecraftPack
public void serialize(ByteBuf out, MinecraftCodecHelper helper) {
}
@Override
public boolean isTerminal() {
return true;
}
}

View file

@ -35,4 +35,9 @@ public class ClientboundGameProfilePacket implements MinecraftPacket {
public boolean isPriority() {
return true;
}
@Override
public boolean isTerminal() {
return true;
}
}

View file

@ -15,4 +15,9 @@ public class ServerboundLoginAcknowledgedPacket implements MinecraftPacket {
@Override
public void serialize(ByteBuf out, MinecraftCodecHelper helper) {
}
@Override
public boolean isTerminal() {
return true;
}
}

View file

@ -0,0 +1 @@
org.slf4j.simpleLogger.defaultLogLevel=debug