Merge pull request #2977 from Darkere/splitting

Add packet splitting for some packets
This commit is contained in:
Darkere
2021-10-17 17:21:37 +02:00
committed by GitHub
3 changed files with 304 additions and 13 deletions

View File

@@ -12,18 +12,20 @@ import com.refinedmods.refinedstorage.network.tiledata.TileDataParameterUpdateMe
import net.minecraft.entity.player.ServerPlayerEntity;
import net.minecraft.util.ResourceLocation;
import net.minecraftforge.common.util.FakePlayer;
import net.minecraftforge.fml.network.NetworkDirection;
import net.minecraftforge.fml.network.NetworkRegistry;
import net.minecraftforge.fml.network.PacketDistributor;
import net.minecraftforge.fml.network.simple.SimpleChannel;
public class NetworkHandler {
private final String protocolVersion = Integer.toString(1);
private final ResourceLocation channel = new ResourceLocation(RS.ID, "main_channel");
private final SimpleChannel handler = NetworkRegistry.ChannelBuilder
.named(new ResourceLocation(RS.ID, "main_channel"))
.named(channel)
.clientAcceptedVersions(protocolVersion::equals)
.serverAcceptedVersions(protocolVersion::equals)
.networkProtocolVersion(() -> protocolVersion)
.simpleChannel();
private final PacketSplitter splitter = new PacketSplitter(5, handler, channel);
public void register() {
int id = 0;
@@ -34,8 +36,8 @@ public class NetworkHandler {
handler.registerMessage(id++, FluidFilterSlotUpdateMessage.class, FluidFilterSlotUpdateMessage::encode, FluidFilterSlotUpdateMessage::decode, FluidFilterSlotUpdateMessage::handle);
handler.registerMessage(id++, TileDataParameterMessage.class, TileDataParameterMessage::encode, TileDataParameterMessage::decode, (msg, ctx) -> TileDataParameterMessage.handle(ctx));
handler.registerMessage(id++, TileDataParameterUpdateMessage.class, TileDataParameterUpdateMessage::encode, TileDataParameterUpdateMessage::decode, TileDataParameterUpdateMessage::handle);
handler.registerMessage(id++, GridItemUpdateMessage.class, GridItemUpdateMessage::encode, GridItemUpdateMessage::decode, GridItemUpdateMessage::handle);
handler.registerMessage(id++, GridItemDeltaMessage.class, GridItemDeltaMessage::encode, GridItemDeltaMessage::decode, GridItemDeltaMessage::handle);
splitter.registerMessage(id++, GridItemUpdateMessage.class, GridItemUpdateMessage::encode, GridItemUpdateMessage::decode, GridItemUpdateMessage::handle);
splitter.registerMessage(id++, GridItemDeltaMessage.class, GridItemDeltaMessage::encode, GridItemDeltaMessage::decode, GridItemDeltaMessage::handle);
handler.registerMessage(id++, GridItemPullMessage.class, GridItemPullMessage::encode, GridItemPullMessage::decode, GridItemPullMessage::handle);
handler.registerMessage(id++, GridItemGridScrollMessage.class, GridItemGridScrollMessage::encode, GridItemGridScrollMessage::decode, GridItemGridScrollMessage::handle);
handler.registerMessage(id++, GridItemInventoryScrollMessage.class, GridItemInventoryScrollMessage::encode, GridItemInventoryScrollMessage::decode, GridItemInventoryScrollMessage::handle);
@@ -50,15 +52,15 @@ public class NetworkHandler {
handler.registerMessage(id++, GridFluidInsertHeldMessage.class, (msg, buf) -> {
}, buf -> new GridFluidInsertHeldMessage(), (msg, ctx) -> GridFluidInsertHeldMessage.handle(ctx));
handler.registerMessage(id++, GridFluidPullMessage.class, GridFluidPullMessage::encode, GridFluidPullMessage::decode, GridFluidPullMessage::handle);
handler.registerMessage(id++, GridTransferMessage.class, GridTransferMessage::encode, GridTransferMessage::decode, GridTransferMessage::handle);
splitter.registerMessage(id++, GridTransferMessage.class, GridTransferMessage::encode, GridTransferMessage::decode, GridTransferMessage::handle);
handler.registerMessage(id++, GridProcessingTransferMessage.class, GridProcessingTransferMessage::encode, GridProcessingTransferMessage::decode, GridProcessingTransferMessage::handle);
handler.registerMessage(id++, SecurityManagerUpdateMessage.class, SecurityManagerUpdateMessage::encode, SecurityManagerUpdateMessage::decode, SecurityManagerUpdateMessage::handle);
handler.registerMessage(id++, WirelessGridSettingsUpdateMessage.class, WirelessGridSettingsUpdateMessage::encode, WirelessGridSettingsUpdateMessage::decode, WirelessGridSettingsUpdateMessage::handle);
handler.registerMessage(id++, OpenNetworkItemMessage.class, OpenNetworkItemMessage::encode, OpenNetworkItemMessage::decode, OpenNetworkItemMessage::handle);
handler.registerMessage(id++, WirelessFluidGridSettingsUpdateMessage.class, WirelessFluidGridSettingsUpdateMessage::encode, WirelessFluidGridSettingsUpdateMessage::decode, WirelessFluidGridSettingsUpdateMessage::handle);
handler.registerMessage(id++, PortableGridSettingsUpdateMessage.class, PortableGridSettingsUpdateMessage::encode, PortableGridSettingsUpdateMessage::decode, PortableGridSettingsUpdateMessage::handle);
handler.registerMessage(id++, PortableGridItemUpdateMessage.class, PortableGridItemUpdateMessage::encode, PortableGridItemUpdateMessage::decode, PortableGridItemUpdateMessage::handle);
handler.registerMessage(id++, PortableGridItemDeltaMessage.class, PortableGridItemDeltaMessage::encode, PortableGridItemDeltaMessage::decode, PortableGridItemDeltaMessage::handle);
splitter.registerMessage(id++, PortableGridItemUpdateMessage.class, PortableGridItemUpdateMessage::encode, PortableGridItemUpdateMessage::decode, PortableGridItemUpdateMessage::handle);
splitter.registerMessage(id++, PortableGridItemDeltaMessage.class, PortableGridItemDeltaMessage::encode, PortableGridItemDeltaMessage::decode, PortableGridItemDeltaMessage::handle);
handler.registerMessage(id++, PortableGridFluidUpdateMessage.class, PortableGridFluidUpdateMessage::encode, PortableGridFluidUpdateMessage::decode, PortableGridFluidUpdateMessage::handle);
handler.registerMessage(id++, PortableGridFluidDeltaMessage.class, PortableGridFluidDeltaMessage::encode, PortableGridFluidDeltaMessage::decode, PortableGridFluidDeltaMessage::handle);
handler.registerMessage(id++, GridCraftingPreviewRequestMessage.class, GridCraftingPreviewRequestMessage::encode, GridCraftingPreviewRequestMessage::decode, GridCraftingPreviewRequestMessage::handle);
@@ -66,18 +68,31 @@ public class NetworkHandler {
handler.registerMessage(id++, GridCraftingStartRequestMessage.class, GridCraftingStartRequestMessage::encode, GridCraftingStartRequestMessage::decode, GridCraftingStartRequestMessage::handle);
handler.registerMessage(id++, GridCraftingStartResponseMessage.class, (msg, buf) -> {
}, buf -> new GridCraftingStartResponseMessage(), (msg, ctx) -> GridCraftingStartResponseMessage.handle(ctx));
handler.registerMessage(id++, CraftingMonitorUpdateMessage.class, CraftingMonitorUpdateMessage::encode, CraftingMonitorUpdateMessage::decode, CraftingMonitorUpdateMessage::handle);
splitter.registerMessage(id++, CraftingMonitorUpdateMessage.class, CraftingMonitorUpdateMessage::encode, CraftingMonitorUpdateMessage::decode, CraftingMonitorUpdateMessage::handle);
handler.registerMessage(id++, CraftingMonitorCancelMessage.class, CraftingMonitorCancelMessage::encode, CraftingMonitorCancelMessage::decode, CraftingMonitorCancelMessage::handle);
handler.registerMessage(id++, WirelessCraftingMonitorSettingsUpdateMessage.class, WirelessCraftingMonitorSettingsUpdateMessage::encode, WirelessCraftingMonitorSettingsUpdateMessage::decode, WirelessCraftingMonitorSettingsUpdateMessage::handle);
}
public void sendToServer(Object message) {
handler.sendToServer(message);
handler.registerMessage(id++, SplitPacketMessage.class, SplitPacketMessage::encode, SplitPacketMessage::decode, SplitPacketMessage::handle);
}
public void sendTo(ServerPlayerEntity player, Object message) {
if (!(player instanceof FakePlayer)) {
handler.sendTo(message, player.connection.netManager, NetworkDirection.PLAY_TO_CLIENT);
if (splitter.shouldMessageBeSplit(message.getClass())) {
splitter.sendToPlayer(player, message);
} else {
handler.send(PacketDistributor.PLAYER.with(() -> player), message);
}
}
}
public void sendToServer(Object message) {
if (splitter.shouldMessageBeSplit(message.getClass())) {
splitter.sendToServer(message);
} else {
handler.send(PacketDistributor.SERVER.noArg(), message);
}
}
public void addPackagePart(int communicationId, int packetIndex, byte[] payload) {
splitter.addPackagePart(communicationId, packetIndex, payload);
}
}

View File

@@ -0,0 +1,230 @@
package com.refinedmods.refinedstorage.network;
import com.google.common.primitives.Bytes;
import io.netty.buffer.Unpooled;
import net.minecraft.entity.player.ServerPlayerEntity;
import net.minecraft.network.PacketBuffer;
import net.minecraft.util.ResourceLocation;
import net.minecraftforge.fml.network.NetworkEvent;
import net.minecraftforge.fml.network.PacketDistributor;
import net.minecraftforge.fml.network.simple.SimpleChannel;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
public class PacketSplitter {
private final static int MAX_PACKET_SIZE = 943718;
private final ResourceLocation CHANNEL_ID;
private final SimpleChannel CHANNEL;
private static final Map<Integer, Map<Integer, byte[]>> packageCache = new HashMap<>();
private final Map<Integer, ServerPlayerEntity> messageTargets = new HashMap<>();
private final Map<Integer, Integer> packetMaximums = new HashMap<>();
private final Set<Class<?>> messagesToSplit = new HashSet<>();
private int comId = 0;
private final int maxNumberOfMessages;
private int ID;
public PacketSplitter(int maxNumberOfMessages, SimpleChannel CHANNEL, ResourceLocation CHANNEL_ID) {
this.maxNumberOfMessages = maxNumberOfMessages;
this.CHANNEL = CHANNEL;
this.CHANNEL_ID = CHANNEL_ID;
}
public boolean shouldMessageBeSplit(Class<?> clazz) {
return messagesToSplit.contains(clazz);
}
public void sendToPlayer(ServerPlayerEntity player, Object message) {
if (ID == 0) ID++; // in case we wrapped around, 0 is reserved for server
int id = ID++;
messageTargets.put(id, player);
sendPacket(message, id, PacketDistributor.PLAYER.with(() -> player));
}
public void sendToServer(Object message) {
messageTargets.put(0, null);
sendPacket(message, 0, PacketDistributor.SERVER.noArg());
}
//@Volatile mostly copied from SimpleChannel
private void sendPacket(Object Message, int id, PacketDistributor.PacketTarget target) {
final PacketBuffer bufIn = new PacketBuffer(Unpooled.buffer());
//write the message id to be able to figure out where the packet is supposed to go in the wrapper
bufIn.writeInt(id);
int index = CHANNEL.encodeMessage(Message, bufIn);
target.send(target.getDirection().buildPacket(Pair.of(bufIn, index), CHANNEL_ID).getThis());
}
public <MSG> void registerMessage(int index, Class<MSG> messageType, BiConsumer<MSG, PacketBuffer> encoder, Function<PacketBuffer, MSG> decoder, BiConsumer<MSG, Supplier<NetworkEvent.Context>> messageConsumer) {
registerMessage(index, maxNumberOfMessages, messageType, encoder, decoder, messageConsumer);
}
public <MSG> void registerMessage(int index, int maxNumberOfMessages, Class<MSG> messageType, BiConsumer<MSG, PacketBuffer> encoder, Function<PacketBuffer, MSG> decoder, BiConsumer<MSG, Supplier<NetworkEvent.Context>> messageConsumer) {
packetMaximums.put(index, maxNumberOfMessages);
messagesToSplit.add(messageType);
BiConsumer<MSG, PacketBuffer> wrappedEncoder = (msg, buffer) -> {
int id = buffer.readInt();
buffer.discardReadBytes();
ServerPlayerEntity player = messageTargets.get(id);
messageTargets.remove(id);
//write a zero for the number of packets in case the packet does not need to be split
buffer.writeShort(0);
encoder.accept(msg, buffer);
createSplittingConsumer(player).accept(msg, buffer);
};
CHANNEL.registerMessage(index, messageType, wrappedEncoder, createPacketCombiner().andThen(decoder), messageConsumer);
}
private <MSG> BiConsumer<MSG, PacketBuffer> createSplittingConsumer(ServerPlayerEntity playerEntity) {
return (MSG, buf) -> {
if (buf.writerIndex() < MAX_PACKET_SIZE) {
return;
}
//read packetId for this packet
int packetId = buf.readUnsignedByte();
//this short is written here in case we are not splitting, ignore for split packages
buf.readShort();
//ignore the above as it is not required for the final packet
int currentIndex = buf.readerIndex();
int packetIndex = 0;
final int comId = this.comId++;
//Data for this packet
byte[] packetData = new byte[0];
int maximumPackets = packetMaximums.get(packetId);
int expectedPackets = buf.writerIndex() / MAX_PACKET_SIZE + 1;
boolean failure = false;
//Loop while data is available.
while (currentIndex < buf.writerIndex()) {
int sliceSize = Math.min(MAX_PACKET_SIZE, buf.writerIndex() - currentIndex);
//Extract the sub data array.
byte[] subPacketData = Arrays.copyOfRange(buf.array(), currentIndex, currentIndex + sliceSize);
if (packetIndex == 0) { // Assign Data for first Packet to this packet.
packetData = subPacketData;
packetIndex++;
} else {
//Construct the split packet.
SplitPacketMessage splitPacketMessage = new SplitPacketMessage(comId, packetIndex++, subPacketData);
if (playerEntity == null) {
CHANNEL.send(PacketDistributor.SERVER.noArg(), splitPacketMessage);
} else {
CHANNEL.send(PacketDistributor.PLAYER.with(() -> playerEntity), splitPacketMessage);
}
}
//Move our working index.
currentIndex += sliceSize;
if (packetIndex > maximumPackets) {
LogManager.getLogger().error("Failure Splitting Packets on Channel \"" + CHANNEL_ID + "\"." + " with " + MSG.getClass() + ". " +
" Number of Packets sent " + (packetIndex - 1) + ", expected number of Packets " + expectedPackets + ", maximum number of packets for a message of this type " + packetMaximums.get(packetId));
failure = true;
break;
}
}
//start writing at the beginning
buf.setIndex(0, 0);
//packetId is required for forge to match the packet
buf.writeByte(packetId);
//number of packets the packet was split into
buf.writeShort(failure ? expectedPackets : packetIndex);
buf.writeInt(comId);
buf.writeByteArray(packetData);
//copies the written data into a new buffer discarding the old one
buf.capacity(buf.writerIndex());
};
}
private Function<PacketBuffer, PacketBuffer> createPacketCombiner() {
return (buf) -> {
int size = buf.readShort();
//This packet was not split
if (size < 2) return buf;
int comId = buf.readInt();
Map<Integer, byte[]> partsMap = packageCache.get(comId);
if (partsMap == null || partsMap.size() != size - 1) {
int partSize = partsMap == null ? 0 : partsMap.size();
int id = buf.readUnsignedByte();
int max = packetMaximums.get(id) == null ? 0 : packetMaximums.get(id);
throw new PacketSplittingException(CHANNEL_ID, partSize, size, max, id);
}
//Add data that came from this packet
addPackagePart(comId, 0, buf.readByteArray());
//Combine Cached Data
final byte[] packetData = partsMap.entrySet()
.stream()
.sorted(Map.Entry.comparingByKey())
.map(Map.Entry::getValue)
.reduce(new byte[0], Bytes::concat);
PacketBuffer buffer = new PacketBuffer(Unpooled.wrappedBuffer(packetData));
//remove data from cache
packageCache.remove(comId);
return buffer;
};
}
public void addPackagePart(int communicationId, int packetIndex, byte[] payload) {
//Sync on the message cache since this is still on the Netty thread.
synchronized (PacketSplitter.packageCache) {
PacketSplitter.packageCache.computeIfAbsent(communicationId, (id) -> new ConcurrentHashMap<>());
PacketSplitter.packageCache.get(communicationId).put(packetIndex, payload);
}
}
}
class PacketSplittingException extends RuntimeException {
ResourceLocation channnelId;
int actualSize;
int expectedSize;
int maximumSize;
int packetId;
public PacketSplittingException(ResourceLocation channnelId, int actualSize, int expectedSize, int maximumSize, int packetId) {
this.channnelId = channnelId;
this.actualSize = actualSize;
this.expectedSize = expectedSize;
this.maximumSize = maximumSize;
this.packetId = packetId;
}
@Override
public String getMessage() {
return "Failure Splitting Packets on Channel \"" + channnelId.toString() + "\"." +
" Number of Packets sent " + actualSize + ", Number of Packets expected " + expectedSize + ", maximum number of packets for a message of this type " + maximumSize;
}
}

View File

@@ -0,0 +1,46 @@
package com.refinedmods.refinedstorage.network;
import com.refinedmods.refinedstorage.RS;
import net.minecraft.network.PacketBuffer;
import net.minecraftforge.fml.network.NetworkEvent;
import java.util.function.Supplier;
public class SplitPacketMessage {
/**
* Internal communication id. Used to indicate to what wrapped message this belongs to.
*/
private int communicationId;
/**
* The index of the split message in the wrapped message.
*/
private int packetIndex;
/**
* The payload.
*/
private final byte[] payload;
public SplitPacketMessage(final int communicationId, final int packetIndex, final byte[] payload) {
this.communicationId = communicationId;
this.packetIndex = packetIndex;
this.payload = payload;
}
public static void encode(SplitPacketMessage message, PacketBuffer buf) {
buf.writeVarInt(message.communicationId);
buf.writeVarInt(message.packetIndex);
buf.writeByteArray(message.payload);
}
public static SplitPacketMessage decode(final PacketBuffer buf) {
return new SplitPacketMessage(buf.readVarInt(), buf.readVarInt(), buf.readByteArray());
}
public static boolean handle(SplitPacketMessage data, Supplier<NetworkEvent.Context> ctx) {
RS.NETWORK_HANDLER.addPackagePart(data.communicationId, data.packetIndex, data.payload);
ctx.get().setPacketHandled(true);
return true;
}
}