From 8dc30ce466e8fb471c350bea05ca7b8e507753e9 Mon Sep 17 00:00:00 2001 From: Darkere Date: Fri, 14 May 2021 17:05:46 +0200 Subject: [PATCH 1/2] Add packet splitting for some packets --- .../network/NetworkHandler.java | 41 ++- .../network/PacketSplitter.java | 233 ++++++++++++++++++ .../network/SplitPacketMessage.java | 46 ++++ 3 files changed, 307 insertions(+), 13 deletions(-) create mode 100644 src/main/java/com/refinedmods/refinedstorage/network/PacketSplitter.java create mode 100644 src/main/java/com/refinedmods/refinedstorage/network/SplitPacketMessage.java diff --git a/src/main/java/com/refinedmods/refinedstorage/network/NetworkHandler.java b/src/main/java/com/refinedmods/refinedstorage/network/NetworkHandler.java index e9061cada..ca9b0398c 100644 --- a/src/main/java/com/refinedmods/refinedstorage/network/NetworkHandler.java +++ b/src/main/java/com/refinedmods/refinedstorage/network/NetworkHandler.java @@ -12,18 +12,20 @@ import com.refinedmods.refinedstorage.network.tiledata.TileDataParameterUpdateMe import net.minecraft.entity.player.ServerPlayerEntity; import net.minecraft.util.ResourceLocation; import net.minecraftforge.common.util.FakePlayer; -import net.minecraftforge.fml.network.NetworkDirection; import net.minecraftforge.fml.network.NetworkRegistry; +import net.minecraftforge.fml.network.PacketDistributor; import net.minecraftforge.fml.network.simple.SimpleChannel; public class NetworkHandler { private final String protocolVersion = Integer.toString(1); + private final ResourceLocation channel = new ResourceLocation(RS.ID, "main_channel"); private final SimpleChannel handler = NetworkRegistry.ChannelBuilder - .named(new ResourceLocation(RS.ID, "main_channel")) + .named(channel) .clientAcceptedVersions(protocolVersion::equals) .serverAcceptedVersions(protocolVersion::equals) .networkProtocolVersion(() -> protocolVersion) .simpleChannel(); + private final PacketSplitter splitter = new PacketSplitter(5, handler, channel); public void register() { int id = 0; @@ -34,8 +36,8 @@ public class NetworkHandler { handler.registerMessage(id++, FluidFilterSlotUpdateMessage.class, FluidFilterSlotUpdateMessage::encode, FluidFilterSlotUpdateMessage::decode, FluidFilterSlotUpdateMessage::handle); handler.registerMessage(id++, TileDataParameterMessage.class, TileDataParameterMessage::encode, TileDataParameterMessage::decode, (msg, ctx) -> TileDataParameterMessage.handle(ctx)); handler.registerMessage(id++, TileDataParameterUpdateMessage.class, TileDataParameterUpdateMessage::encode, TileDataParameterUpdateMessage::decode, TileDataParameterUpdateMessage::handle); - handler.registerMessage(id++, GridItemUpdateMessage.class, GridItemUpdateMessage::encode, GridItemUpdateMessage::decode, GridItemUpdateMessage::handle); - handler.registerMessage(id++, GridItemDeltaMessage.class, GridItemDeltaMessage::encode, GridItemDeltaMessage::decode, GridItemDeltaMessage::handle); + splitter.registerMessage(id++, GridItemUpdateMessage.class, GridItemUpdateMessage::encode, GridItemUpdateMessage::decode, GridItemUpdateMessage::handle); + splitter.registerMessage(id++, GridItemDeltaMessage.class, GridItemDeltaMessage::encode, GridItemDeltaMessage::decode, GridItemDeltaMessage::handle); handler.registerMessage(id++, GridItemPullMessage.class, GridItemPullMessage::encode, GridItemPullMessage::decode, GridItemPullMessage::handle); handler.registerMessage(id++, GridItemGridScrollMessage.class, GridItemGridScrollMessage::encode, GridItemGridScrollMessage::decode, GridItemGridScrollMessage::handle); handler.registerMessage(id++, GridItemInventoryScrollMessage.class, GridItemInventoryScrollMessage::encode, GridItemInventoryScrollMessage::decode, GridItemInventoryScrollMessage::handle); @@ -50,15 +52,15 @@ public class NetworkHandler { handler.registerMessage(id++, GridFluidInsertHeldMessage.class, (msg, buf) -> { }, buf -> new GridFluidInsertHeldMessage(), (msg, ctx) -> GridFluidInsertHeldMessage.handle(ctx)); handler.registerMessage(id++, GridFluidPullMessage.class, GridFluidPullMessage::encode, GridFluidPullMessage::decode, GridFluidPullMessage::handle); - handler.registerMessage(id++, GridTransferMessage.class, GridTransferMessage::encode, GridTransferMessage::decode, GridTransferMessage::handle); + splitter.registerMessage(id++, GridTransferMessage.class, GridTransferMessage::encode, GridTransferMessage::decode, GridTransferMessage::handle); handler.registerMessage(id++, GridProcessingTransferMessage.class, GridProcessingTransferMessage::encode, GridProcessingTransferMessage::decode, GridProcessingTransferMessage::handle); handler.registerMessage(id++, SecurityManagerUpdateMessage.class, SecurityManagerUpdateMessage::encode, SecurityManagerUpdateMessage::decode, SecurityManagerUpdateMessage::handle); handler.registerMessage(id++, WirelessGridSettingsUpdateMessage.class, WirelessGridSettingsUpdateMessage::encode, WirelessGridSettingsUpdateMessage::decode, WirelessGridSettingsUpdateMessage::handle); handler.registerMessage(id++, OpenNetworkItemMessage.class, OpenNetworkItemMessage::encode, OpenNetworkItemMessage::decode, OpenNetworkItemMessage::handle); handler.registerMessage(id++, WirelessFluidGridSettingsUpdateMessage.class, WirelessFluidGridSettingsUpdateMessage::encode, WirelessFluidGridSettingsUpdateMessage::decode, WirelessFluidGridSettingsUpdateMessage::handle); handler.registerMessage(id++, PortableGridSettingsUpdateMessage.class, PortableGridSettingsUpdateMessage::encode, PortableGridSettingsUpdateMessage::decode, PortableGridSettingsUpdateMessage::handle); - handler.registerMessage(id++, PortableGridItemUpdateMessage.class, PortableGridItemUpdateMessage::encode, PortableGridItemUpdateMessage::decode, PortableGridItemUpdateMessage::handle); - handler.registerMessage(id++, PortableGridItemDeltaMessage.class, PortableGridItemDeltaMessage::encode, PortableGridItemDeltaMessage::decode, PortableGridItemDeltaMessage::handle); + splitter.registerMessage(id++, PortableGridItemUpdateMessage.class, PortableGridItemUpdateMessage::encode, PortableGridItemUpdateMessage::decode, PortableGridItemUpdateMessage::handle); + splitter.registerMessage(id++, PortableGridItemDeltaMessage.class, PortableGridItemDeltaMessage::encode, PortableGridItemDeltaMessage::decode, PortableGridItemDeltaMessage::handle); handler.registerMessage(id++, PortableGridFluidUpdateMessage.class, PortableGridFluidUpdateMessage::encode, PortableGridFluidUpdateMessage::decode, PortableGridFluidUpdateMessage::handle); handler.registerMessage(id++, PortableGridFluidDeltaMessage.class, PortableGridFluidDeltaMessage::encode, PortableGridFluidDeltaMessage::decode, PortableGridFluidDeltaMessage::handle); handler.registerMessage(id++, GridCraftingPreviewRequestMessage.class, GridCraftingPreviewRequestMessage::encode, GridCraftingPreviewRequestMessage::decode, GridCraftingPreviewRequestMessage::handle); @@ -66,18 +68,31 @@ public class NetworkHandler { handler.registerMessage(id++, GridCraftingStartRequestMessage.class, GridCraftingStartRequestMessage::encode, GridCraftingStartRequestMessage::decode, GridCraftingStartRequestMessage::handle); handler.registerMessage(id++, GridCraftingStartResponseMessage.class, (msg, buf) -> { }, buf -> new GridCraftingStartResponseMessage(), (msg, ctx) -> GridCraftingStartResponseMessage.handle(ctx)); - handler.registerMessage(id++, CraftingMonitorUpdateMessage.class, CraftingMonitorUpdateMessage::encode, CraftingMonitorUpdateMessage::decode, CraftingMonitorUpdateMessage::handle); + splitter.registerMessage(id++, CraftingMonitorUpdateMessage.class, CraftingMonitorUpdateMessage::encode, CraftingMonitorUpdateMessage::decode, CraftingMonitorUpdateMessage::handle); handler.registerMessage(id++, CraftingMonitorCancelMessage.class, CraftingMonitorCancelMessage::encode, CraftingMonitorCancelMessage::decode, CraftingMonitorCancelMessage::handle); handler.registerMessage(id++, WirelessCraftingMonitorSettingsUpdateMessage.class, WirelessCraftingMonitorSettingsUpdateMessage::encode, WirelessCraftingMonitorSettingsUpdateMessage::decode, WirelessCraftingMonitorSettingsUpdateMessage::handle); - } - - public void sendToServer(Object message) { - handler.sendToServer(message); + handler.registerMessage(id++, SplitPacketMessage.class, SplitPacketMessage::encode, SplitPacketMessage::decode, SplitPacketMessage::handle); } public void sendTo(ServerPlayerEntity player, Object message) { if (!(player instanceof FakePlayer)) { - handler.sendTo(message, player.connection.netManager, NetworkDirection.PLAY_TO_CLIENT); + if (splitter.shouldMessageBeSplit(message.getClass())) { + splitter.sendToPlayer(player, message); + } else { + handler.send(PacketDistributor.PLAYER.with(() -> player), message); + } } } + + public void sendToServer(Object message) { + if (splitter.shouldMessageBeSplit(message.getClass())) { + splitter.sendToServer(message); + } else { + handler.send(PacketDistributor.SERVER.noArg(), message); + } + } + + public void addPackagePart(int communicationId, int packetIndex, byte[] payload) { + splitter.addPackagePart(communicationId, packetIndex, payload); + } } diff --git a/src/main/java/com/refinedmods/refinedstorage/network/PacketSplitter.java b/src/main/java/com/refinedmods/refinedstorage/network/PacketSplitter.java new file mode 100644 index 000000000..a70920d28 --- /dev/null +++ b/src/main/java/com/refinedmods/refinedstorage/network/PacketSplitter.java @@ -0,0 +1,233 @@ +package com.refinedmods.refinedstorage.network; + +import com.google.common.primitives.Bytes; +import io.netty.buffer.Unpooled; +import net.minecraft.entity.player.ServerPlayerEntity; +import net.minecraft.network.PacketBuffer; +import net.minecraft.util.ResourceLocation; +import net.minecraftforge.fml.network.NetworkEvent; +import net.minecraftforge.fml.network.PacketDistributor; +import net.minecraftforge.fml.network.simple.SimpleChannel; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.logging.log4j.LogManager; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.Supplier; + +public class PacketSplitter { + private final static int MAX_PACKET_SIZE = 943718; + + private final ResourceLocation CHANNEL_ID; + private final SimpleChannel CHANNEL; + + private static final Map> packageCache = new HashMap<>(); + private final Map messageTargets = new HashMap<>(); + private final Map packetMaximums = new HashMap<>(); + private final Set> messagesToSplit = new HashSet<>(); + + private int comId = 0; + private final int maxNumberOfMessages; + private int ID; + + public PacketSplitter(int maxNumberOfMessages, SimpleChannel CHANNEL, ResourceLocation CHANNEL_ID) { + this.maxNumberOfMessages = maxNumberOfMessages; + this.CHANNEL = CHANNEL; + this.CHANNEL_ID = CHANNEL_ID; + } + + public boolean shouldMessageBeSplit(Class clazz) { + return messagesToSplit.contains(clazz); + } + + public void sendToPlayer(ServerPlayerEntity player, Object message) { + if (ID == 0) ID++; // in case we wrapped around, 0 is reserved for server + int id = ID++; + messageTargets.put(id, player); + sendPacket(message, id, PacketDistributor.PLAYER.with(() -> player)); + } + + public void sendToServer(Object message) { + messageTargets.put(0, null); + sendPacket(message, 0, PacketDistributor.SERVER.noArg()); + } + + //@Volatile mostly copied from SimpleChannel + private void sendPacket(Object Message, int id, PacketDistributor.PacketTarget target) { + final PacketBuffer bufIn = new PacketBuffer(Unpooled.buffer()); + + //write the message id to be able to figure out where the packet is supposed to go in the wrapper + bufIn.writeInt(id); + + int index = CHANNEL.encodeMessage(Message, bufIn); + target.send(target.getDirection().buildPacket(Pair.of(bufIn, index), CHANNEL_ID).getThis()); + } + + public void registerMessage(int index, Class messageType, BiConsumer encoder, Function decoder, BiConsumer> messageConsumer) { + registerMessage(index, maxNumberOfMessages, messageType, encoder, decoder, messageConsumer); + } + + public void registerMessage(int index, int maxNumberOfMessages, Class messageType, BiConsumer encoder, Function decoder, BiConsumer> messageConsumer) { + packetMaximums.put(index, maxNumberOfMessages); + messagesToSplit.add(messageType); + + BiConsumer wrappedEncoder = (msg, buffer) -> { + int id = buffer.readInt(); + buffer.discardReadBytes(); + ServerPlayerEntity player = messageTargets.get(id); + messageTargets.remove(id); + + //write a zero for the number of packets in case the packet does not need to be split + buffer.writeShort(0); + encoder.accept(msg, buffer); + createSplittingConsumer(player).accept(msg, buffer); + }; + + CHANNEL.registerMessage(index, messageType, wrappedEncoder, createPacketCombiner().andThen(decoder), messageConsumer); + } + + private BiConsumer createSplittingConsumer(ServerPlayerEntity playerEntity) { + return (MSG, buf) -> { + + if (buf.writerIndex() < MAX_PACKET_SIZE) { + return; + } + + //read packetId for this packet + int packetId = buf.readUnsignedByte(); + + //this short is written here in case we are not splitting, ignore for split packages + buf.readShort(); + + //ignore the above as it is not required for the final packet + int currentIndex = buf.readerIndex(); + int packetIndex = 0; + final int comId = this.comId++; + + //Data for this packet + byte[] packetData = new byte[0]; + + int maximumPackets = packetMaximums.get(packetId); + int expectedPackets = buf.writerIndex() / MAX_PACKET_SIZE + 1; + boolean failure = false; + + //Loop while data is available. + while (currentIndex < buf.writerIndex()) { + + int sliceSize = Math.min(MAX_PACKET_SIZE, buf.writerIndex() - currentIndex); + + //Extract the sub data array. + byte[] subPacketData = Arrays.copyOfRange(buf.array(), currentIndex, currentIndex + sliceSize); + + if (packetIndex == 0) { // Assign Data for first Packet to this packet. + packetData = subPacketData; + packetIndex++; + } else { + //Construct the split packet. + SplitPacketMessage splitPacketMessage = new SplitPacketMessage(comId, packetIndex++, subPacketData); + + if (playerEntity == null) { + CHANNEL.send(PacketDistributor.SERVER.noArg(), splitPacketMessage); + } else { + CHANNEL.send(PacketDistributor.PLAYER.with(() -> playerEntity), splitPacketMessage); + } + } + + //Move our working index. + currentIndex += sliceSize; + + if (packetIndex > maximumPackets) { + LogManager.getLogger().error("Failure Splitting Packets on Channel \"" + CHANNEL_ID + "\"." + " with " + MSG.getClass() + ". " + + " Number of Packets sent " + (packetIndex - 1) + ", expected number of Packets " + expectedPackets + ", maximum number of packets for a message of this type " + packetMaximums.get(packetId)); + failure = true; + break; + } + } + + //start writing at the beginning + buf.setIndex(0, 0); + + //packetId is required for forge to match the packet + buf.writeByte(packetId); + + //number of packets the packet was split into + buf.writeShort(failure ? expectedPackets : packetIndex); + buf.writeInt(comId); + buf.writeByteArray(packetData); + + //copies the written data into a new buffer discarding the old one + buf.capacity(buf.writerIndex()); + }; + } + + private Function createPacketCombiner() { + return (buf) -> { + int size = buf.readShort(); + + //This packet was not split + if (size < 2) return buf; + + int comId = buf.readInt(); + + Map partsMap = packageCache.get(comId); + if (partsMap == null || partsMap.size() != size - 1) { + int partSize = partsMap == null ? 0 : partsMap.size(); + int id = buf.readUnsignedByte(); + int max = packetMaximums.get(id) == null ? 0 : packetMaximums.get(id); + throw new PacketSplittingException(CHANNEL_ID, partSize, size, max, id); + } + + //Add data that came from this packet + addPackagePart(comId, 0, buf.readByteArray()); + + //Combine Cached Data + final byte[] packetData = partsMap.entrySet() + .stream() + .sorted(Map.Entry.comparingByKey()) + .map(Map.Entry::getValue) + .reduce(new byte[0], Bytes::concat); + + PacketBuffer buffer = new PacketBuffer(Unpooled.wrappedBuffer(packetData)); + + //Packets come in with with writer Index at 1 skipping the packet index + // buffer.readerIndex(1); + + //remove data from cache + packageCache.remove(comId); + return buffer; + }; + } + + public void addPackagePart(int communicationId, int packetIndex, byte[] payload) { + //Sync on the message cache since this is still on the Netty thread. + synchronized (PacketSplitter.packageCache) { + PacketSplitter.packageCache.computeIfAbsent(communicationId, (id) -> new ConcurrentHashMap<>()); + PacketSplitter.packageCache.get(communicationId).put(packetIndex, payload); + } + } +} + +class PacketSplittingException extends RuntimeException { + ResourceLocation channnelId; + int actualSize; + int expectedSize; + int maximumSize; + int packetId; + + public PacketSplittingException(ResourceLocation channnelId, int actualSize, int expectedSize, int maximumSize, int packetId) { + this.channnelId = channnelId; + this.actualSize = actualSize; + this.expectedSize = expectedSize; + this.maximumSize = maximumSize; + this.packetId = packetId; + } + + @Override + public String getMessage() { + return "Failure Splitting Packets on Channel \"" + channnelId.toString() + "\"." + + " Number of Packets sent " + actualSize + ", Number of Packets expected " + expectedSize + ", maximum number of packets for a message of this type " + maximumSize; + } + +} diff --git a/src/main/java/com/refinedmods/refinedstorage/network/SplitPacketMessage.java b/src/main/java/com/refinedmods/refinedstorage/network/SplitPacketMessage.java new file mode 100644 index 000000000..78a80eca7 --- /dev/null +++ b/src/main/java/com/refinedmods/refinedstorage/network/SplitPacketMessage.java @@ -0,0 +1,46 @@ +package com.refinedmods.refinedstorage.network; + +import com.refinedmods.refinedstorage.RS; +import net.minecraft.network.PacketBuffer; +import net.minecraftforge.fml.network.NetworkEvent; + +import java.util.function.Supplier; + +public class SplitPacketMessage { + /** + * Internal communication id. Used to indicate to what wrapped message this belongs to. + */ + private int communicationId; + + /** + * The index of the split message in the wrapped message. + */ + private int packetIndex; + + /** + * The payload. + */ + private final byte[] payload; + + public SplitPacketMessage(final int communicationId, final int packetIndex, final byte[] payload) { + this.communicationId = communicationId; + this.packetIndex = packetIndex; + this.payload = payload; + } + + public static void encode(SplitPacketMessage message, PacketBuffer buf) { + buf.writeVarInt(message.communicationId); + buf.writeVarInt(message.packetIndex); + buf.writeByteArray(message.payload); + } + + public static SplitPacketMessage decode(final PacketBuffer buf) { + return new SplitPacketMessage(buf.readVarInt(), buf.readVarInt(), buf.readByteArray()); + } + + public static boolean handle(SplitPacketMessage data, Supplier ctx) { + RS.NETWORK_HANDLER.addPackagePart(data.communicationId, data.packetIndex, data.payload); + ctx.get().setPacketHandled(true); + return true; + } +} From eb785016290172804c353ac5e6c13dbd688336ee Mon Sep 17 00:00:00 2001 From: Darkere Date: Fri, 14 May 2021 17:09:57 +0200 Subject: [PATCH 2/2] remove commented out code --- .../com/refinedmods/refinedstorage/network/PacketSplitter.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/java/com/refinedmods/refinedstorage/network/PacketSplitter.java b/src/main/java/com/refinedmods/refinedstorage/network/PacketSplitter.java index a70920d28..26c6c32b7 100644 --- a/src/main/java/com/refinedmods/refinedstorage/network/PacketSplitter.java +++ b/src/main/java/com/refinedmods/refinedstorage/network/PacketSplitter.java @@ -191,9 +191,6 @@ public class PacketSplitter { PacketBuffer buffer = new PacketBuffer(Unpooled.wrappedBuffer(packetData)); - //Packets come in with with writer Index at 1 skipping the packet index - // buffer.readerIndex(1); - //remove data from cache packageCache.remove(comId); return buffer;