From 227c3a1567e278bbdb1abdb496b433aa6685d58c Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 19 Feb 2026 00:31:26 +0300 Subject: [PATCH 01/10] raw --- .../discovery/DiscoveryMessageFactory.java | 12 + .../persistence/tree/io/BPlusMetaIO.java | 3 +- .../serializer/RecordDataV1Serializer.java | 3 +- .../ignite/lang/IgniteProductVersion.java | 121 ++++--- .../ignite/spi/discovery/tcp/ClientImpl.java | 6 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 24 +- .../tcp/internal/TcpDiscoveryNode.java | 202 ++++++----- .../ClusterNodeCollectionMessage.java | 77 +++++ .../tcp/messages/ClusterNodeMessage.java | 274 +++++++++++++++ .../messages/IgniteProductVersionMessage.java | 152 +++++++++ .../TcpDiscoveryNodeAddedMessage.java | 314 ++++++++++++++++-- 11 files changed, 985 insertions(+), 203 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeCollectionMessage.java create mode 100644 modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeMessage.java create mode 100644 modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/IgniteProductVersionMessage.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 753f00569c5cf..7a799ad101ab3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -17,7 +17,10 @@ package org.apache.ignite.internal.managers.discovery; +import org.apache.ignite.internal.codegen.ClusterNodeCollectionMessageSerializer; +import org.apache.ignite.internal.codegen.ClusterNodeMessageSerializer; import org.apache.ignite.internal.codegen.DiscoveryDataPacketSerializer; +import org.apache.ignite.internal.codegen.IgniteProductVersionMessageSerializer; import org.apache.ignite.internal.codegen.InetAddressMessageSerializer; import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer; import org.apache.ignite.internal.codegen.NodeSpecificDataSerializer; @@ -37,6 +40,7 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryMetricsUpdateMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryNodeAddFinishedMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryNodeAddedMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryNodeFailedMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryNodeFullMetricsMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryNodeLeftMessageSerializer; @@ -48,6 +52,9 @@ import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; +import org.apache.ignite.spi.discovery.tcp.messages.ClusterNodeCollectionMessage; +import org.apache.ignite.spi.discovery.tcp.messages.ClusterNodeMessage; +import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessage; import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage; import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage; import org.apache.ignite.spi.discovery.tcp.messages.NodeSpecificData; @@ -67,6 +74,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; @@ -80,6 +88,9 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { /** {@inheritDoc} */ @Override public void registerAll(MessageFactory factory) { + factory.register((short)-110, ClusterNodeCollectionMessage::new, new ClusterNodeCollectionMessageSerializer()); + factory.register((short)-109, ClusterNodeMessage::new, new ClusterNodeMessageSerializer()); + factory.register((short)-108, IgniteProductVersionMessage::new, new IgniteProductVersionMessageSerializer()); factory.register((short)-107, NodeSpecificData::new, new NodeSpecificDataSerializer()); factory.register((short)-106, DiscoveryDataPacket::new, new DiscoveryDataPacketSerializer()); factory.register((short)-105, TcpDiscoveryNodeFullMetricsMessage::new, @@ -110,5 +121,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)17, TcpDiscoveryNodeFailedMessage::new, new TcpDiscoveryNodeFailedMessageSerializer()); factory.register((short)18, TcpDiscoveryStatusCheckMessage::new, new TcpDiscoveryStatusCheckMessageSerializer()); factory.register((short)19, TcpDiscoveryNodeAddFinishedMessage::new, new TcpDiscoveryNodeAddFinishedMessageSerializer()); + factory.register((short)20, TcpDiscoveryNodeAddedMessage::new, new TcpDiscoveryNodeAddedMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusMetaIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusMetaIO.java index edcfefc4f8890..7ab88b3809182 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusMetaIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusMetaIO.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.util.GridStringBuilder; import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessage; /** * IO routines for B+Tree meta pages. @@ -303,7 +304,7 @@ public IgniteProductVersion createdVersion(long pageAddr) { PageUtils.getByte(pageAddr, CREATED_VER_OFFSET + 1), PageUtils.getByte(pageAddr, CREATED_VER_OFFSET + 2), PageUtils.getLong(pageAddr, CREATED_VER_OFFSET + 3), - PageUtils.getBytes(pageAddr, CREATED_VER_OFFSET + 11, IgniteProductVersion.REV_HASH_SIZE)); + PageUtils.getBytes(pageAddr, CREATED_VER_OFFSET + 11, IgniteProductVersionMessage.REV_HASH_SIZE)); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java index de920fddadf31..5fb2d97a02d13 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java @@ -120,6 +120,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessage; import org.apache.ignite.spi.encryption.EncryptionSpi; import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi; import org.jetbrains.annotations.Nullable; @@ -850,7 +851,7 @@ WALRecord readPlainRecord(RecordType type, ByteBufferBackedDataInput in, long flags = in.readLong(); - byte[] revHash = new byte[IgniteProductVersion.REV_HASH_SIZE]; + byte[] revHash = new byte[IgniteProductVersionMessage.REV_HASH_SIZE]; byte maj = in.readByte(); byte min = in.readByte(); byte maint = in.readByte(); diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java index 1c78694550bd5..3efda7095caa4 100644 --- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java +++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java @@ -27,6 +27,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteVersionUtils; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessage; import org.jetbrains.annotations.NotNull; /** @@ -41,39 +42,26 @@ public class IgniteProductVersion implements Comparable, E /** */ private static final long serialVersionUID = 0L; - /** Size of the {@link #revHash }*/ - public static final int REV_HASH_SIZE = 20; - /** Size in bytes of serialized: 3 bytes (maj, min, maintenance version), 8 bytes - timestamp */ - public static final int SIZE_IN_BYTES = 3 + 8 + REV_HASH_SIZE; + public static final int SIZE_IN_BYTES = 3 + 8 + IgniteProductVersionMessage.REV_HASH_SIZE; /** Regexp parse pattern. */ private static final Pattern VER_PATTERN = Pattern.compile("(\\d+)\\.(\\d+)\\.(\\d+)([-.]([^0123456789][^-]+)(-SNAPSHOT)?)?(-(\\d+))?(-([\\da-f]+))?"); - /** Major version number. */ - private byte major; - - /** Minor version number. */ - private byte minor; - - /** Maintenance version number. */ - private byte maintenance; - - /** Stage of development. */ - private String stage; - - /** Revision timestamp. */ - private long revTs; - - /** Revision hash. */ - private byte[] revHash; + /** The values holding message. */ + private IgniteProductVersionMessage productVerMsg; /** * Empty constructor required by {@link Externalizable}. */ public IgniteProductVersion() { - // No-op. + productVerMsg = new IgniteProductVersionMessage(); + } + + /** @param productVerMsg Product version message. */ + public IgniteProductVersion(IgniteProductVersionMessage productVerMsg) { + this.productVerMsg = productVerMsg; } /** @@ -96,17 +84,17 @@ public IgniteProductVersion(byte major, byte minor, byte maintenance, long revTs * @param revHash Revision hash. */ public IgniteProductVersion(byte major, byte minor, byte maintenance, String stage, long revTs, byte[] revHash) { - if (revHash != null && revHash.length != REV_HASH_SIZE) { + productVerMsg = new IgniteProductVersionMessage(major, minor, maintenance, stage, revTs, revHash); + + if (revHash != null && revHash.length != IgniteProductVersionMessage.REV_HASH_SIZE) { throw new IllegalArgumentException("Invalid length for SHA1 hash (must be " - + REV_HASH_SIZE + "): " + revHash.length); + + IgniteProductVersionMessage.REV_HASH_SIZE + "): " + revHash.length); } + } - this.major = major; - this.minor = minor; - this.maintenance = maintenance; - this.stage = stage; - this.revTs = revTs; - this.revHash = revHash != null ? revHash : new byte[REV_HASH_SIZE]; + /** @return {@link IgniteProductVersionMessage}. */ + public IgniteProductVersionMessage message() { + return productVerMsg; } /** @@ -115,7 +103,7 @@ public IgniteProductVersion(byte major, byte minor, byte maintenance, String sta * @return Major version number. */ public byte major() { - return major; + return productVerMsg.major(); } /** @@ -124,7 +112,7 @@ public byte major() { * @return Minor version number. */ public byte minor() { - return minor; + return productVerMsg.minor(); } /** @@ -133,14 +121,14 @@ public byte minor() { * @return Maintenance version number. */ public byte maintenance() { - return maintenance; + return productVerMsg.maintenance(); } /** * @return Stage of development. */ public String stage() { - return stage; + return productVerMsg.stage(); } /** @@ -149,7 +137,7 @@ public String stage() { * @return Revision timestamp. */ public long revisionTimestamp() { - return revTs; + return productVerMsg.revisionTimestamp(); } /** @@ -158,7 +146,7 @@ public long revisionTimestamp() { * @return Revision hash. */ public byte[] revisionHash() { - return revHash; + return productVerMsg.revisionHash(); } /** @@ -167,7 +155,7 @@ public byte[] revisionHash() { * @return Release date. */ public Date releaseDate() { - return new Date(revTs * 1000); + return new Date(revisionTimestamp() * 1000); } /** @@ -178,31 +166,31 @@ public Date releaseDate() { */ public boolean greaterThanEqual(int major, int minor, int maintenance) { // NOTE: Unknown version is less than any other version. - if (major == this.major) - return minor == this.minor ? this.maintenance >= maintenance : this.minor > minor; + if (major == major()) + return minor == minor() ? maintenance() >= maintenance : minor() > minor; else - return this.major > major; + return major() > major; } /** {@inheritDoc} */ @Override public int compareTo(@NotNull IgniteProductVersion o) { // NOTE: Unknown version is less than any other version. - int res = Integer.compare(major, o.major); + int res = Integer.compare(major(), o.major()); if (res != 0) return res; - res = Integer.compare(minor, o.minor); + res = Integer.compare(minor(), o.minor()); if (res != 0) return res; - res = Integer.compare(maintenance, o.maintenance); + res = Integer.compare(maintenance(), o.maintenance()); if (res != 0) return res; - return Long.compare(revTs, o.revTs); + return Long.compare(revisionTimestamp(), o.revisionTimestamp()); } /** @@ -210,17 +198,17 @@ public boolean greaterThanEqual(int major, int minor, int maintenance) { * @return Compare result. */ public int compareToIgnoreTimestamp(@NotNull IgniteProductVersion o) { - int res = Integer.compare(major, o.major); + int res = Integer.compare(major(), o.major()); if (res != 0) return res; - res = Integer.compare(minor, o.minor); + res = Integer.compare(minor(), o.minor()); if (res != 0) return res; - return Integer.compare(maintenance, o.maintenance); + return Integer.compare(maintenance(), o.maintenance()); } /** {@inheritDoc} */ @@ -233,47 +221,50 @@ public int compareToIgnoreTimestamp(@NotNull IgniteProductVersion o) { IgniteProductVersion that = (IgniteProductVersion)o; - return revTs == that.revTs && maintenance == that.maintenance && minor == that.minor && major == that.major; + return revisionTimestamp() == that.revisionTimestamp() && maintenance() == that.maintenance() + && minor() == that.minor() && major() == that.major(); } /** {@inheritDoc} */ @Override public int hashCode() { - int res = major; + int res = major(); - res = 31 * res + minor; - res = 31 * res + maintenance; - res = 31 * res + (int)(revTs ^ (revTs >>> 32)); + res = 31 * res + minor(); + res = 31 * res + maintenance(); + res = 31 * res + (int)(revisionTimestamp() ^ (revisionTimestamp() >>> 32)); return res; } /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeByte(major); - out.writeByte(minor); - out.writeByte(maintenance); - out.writeLong(revTs); - U.writeByteArray(out, revHash); + out.writeByte(major()); + out.writeByte(minor()); + out.writeByte(maintenance()); + out.writeLong(revisionTimestamp()); + U.writeByteArray(out, revisionHash()); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - major = in.readByte(); - minor = in.readByte(); - maintenance = in.readByte(); - revTs = in.readLong(); - revHash = U.readByteArray(in); + assert productVerMsg != null; + + productVerMsg.major(in.readByte()); + productVerMsg.minor(in.readByte()); + productVerMsg.maintenance(in.readByte()); + productVerMsg.revisionTimestamp(in.readLong()); + productVerMsg.revisionHash(U.readByteArray(in)); } /** {@inheritDoc} */ @Override public String toString() { - String revTsStr = IgniteVersionUtils.formatBuildTimeStamp(revTs * 1000); + String revTsStr = IgniteVersionUtils.formatBuildTimeStamp(revisionTimestamp() * 1000); - String hash = U.byteArray2HexString(revHash).toLowerCase(); + String hash = U.byteArray2HexString(revisionHash()).toLowerCase(); hash = hash.length() > 8 ? hash.substring(0, 8) : hash; - return major + "." + minor + "." + maintenance + "#" + revTsStr + "-sha1:" + hash; + return major() + "." + minor() + "." + maintenance() + "#" + revTsStr + "-sha1:" + hash; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 2e96845ff0332..fe702517dbe33 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -2160,8 +2160,12 @@ protected void processDiscoveryMessage(TcpDiscoveryAbstractMessage msg) { if (msg instanceof TraceableMessage) tracing.messages().beforeSend((TraceableMessage)msg); - if (msg instanceof TcpDiscoveryNodeAddedMessage) + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + ((TcpDiscoveryNodeAddedMessage)msg) + .finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); + processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg); + } else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg); else if (msg instanceof TcpDiscoveryNodeLeftMessage) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 2f6d431fa9a4b..3b16c46392645 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1877,7 +1877,7 @@ private void prepareNodeAddedMessage( nodeAddedMsg.topology(topToSnd); - Collection msgs0 = null; + List msgs0 = null; if (msgs != null) { msgs0 = new ArrayList<>(msgs.size()); @@ -2482,6 +2482,8 @@ void add(TcpDiscoveryAbstractMessage msg) { // Do not need this data for client reconnect. if (addedMsg.gridDiscoveryData() != null) addedMsg.clearDiscoveryData(); + + addedMsg.prepareMarshal(spi.marshaller()); } else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg; @@ -2644,6 +2646,8 @@ private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUI msg0.topology(addedMsg.clientTopology()); + msg0.prepareMarshal(spi.marshaller()); + return msg0; } } @@ -3141,8 +3145,12 @@ else if (msg instanceof TcpDiscoveryClientReconnectMessage) { sendMessageAcrossRing(msg); } - else if (msg instanceof TcpDiscoveryNodeAddedMessage) + else if (msg instanceof TcpDiscoveryNodeAddedMessage) { + ((TcpDiscoveryNodeAddedMessage)msg).finishUnmarshal(spi.marshaller(), + U.resolveClassLoader(spi.ignite().configuration())); + processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg); + } else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg); @@ -3254,6 +3262,8 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) { } } + ((TcpDiscoveryNodeAddedMessage)msg).prepareMarshal(spi.marshaller()); + // TODO Investigate possible optimizations: https://issues.apache.org/jira/browse/IGNITE-27722 clientMsgWorker.addMessage(msg); } @@ -4863,8 +4873,11 @@ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { else if (!locNodeId.equals(node.id()) && ring.node(node.id()) != null) { // Local node already has node from message in local topology. // Just pass it to coordinator via the ring. - if (sendMessageToRemotes(msg)) + if (sendMessageToRemotes(msg)) { + msg.prepareMarshal(spi.marshaller()); + sendMessageAcrossRing(msg); + } if (log.isDebugEnabled()) { log.debug("Local node already has node being added. Passing TcpDiscoveryNodeAddedMessage to " + @@ -5087,8 +5100,11 @@ else if (spiState == CONNECTING) processMessageFailedNodes(msg); } - if (sendMessageToRemotes(msg)) + if (sendMessageToRemotes(msg)) { + msg.prepareMarshal(spi.marshaller()); + sendMessageAcrossRing(msg); + } } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index 89f1f492e9272..2b4ffc06b8f2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.managers.discovery.IgniteClusterNode; +import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage; import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -46,6 +47,7 @@ import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.messages.ClusterNodeMessage; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTENT_ID; @@ -62,23 +64,8 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Ignite /** */ private static final long serialVersionUID = 0L; - /** Node ID. */ - private volatile UUID id; - - /** Consistent ID. */ - @GridToStringInclude - private Object consistentId; - - /** Node attributes. */ - @GridToStringExclude - private Map attrs; - - /** Internal discovery addresses as strings. */ - @GridToStringInclude - private Collection addrs; - - /** Internal discovery host names as strings. */ - private Collection hostNames; + /** Values holding message of {@link ClusterNode}. */ + private ClusterNodeMessage clusterNodeMsg; /** */ @GridToStringInclude @@ -88,17 +75,10 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Ignite @GridToStringInclude private int discPort; - /** Node metrics. */ - @GridToStringExclude - private volatile ClusterMetrics metrics; - /** Node cache metrics. */ @GridToStringExclude private volatile Map cacheMetrics; - /** Node order in the topology. */ - private volatile long order; - /** Node order in the topology (internal). */ private volatile long intOrder; @@ -118,12 +98,6 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Ignite @GridToStringExclude private boolean visible; - /** Grid local node flag (transient). */ - private boolean loc; - - /** Version. */ - private IgniteProductVersion ver; - /** Alive check time (used by clients). */ @GridToStringExclude private transient volatile long aliveCheckTimeNanos; @@ -148,7 +122,12 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Ignite * Public default no-arg constructor for {@link Externalizable} interface. */ public TcpDiscoveryNode() { - // No-op. + clusterNodeMsg = new ClusterNodeMessage(); + } + + /** */ + public TcpDiscoveryNode(ClusterNodeMessage msg) { + clusterNodeMsg = msg; } /** @@ -162,7 +141,8 @@ public TcpDiscoveryNode() { * @param ver Version. * @param consistentId Node consistent ID. */ - public TcpDiscoveryNode(UUID id, + public TcpDiscoveryNode( + UUID id, Collection addrs, Collection hostNames, int discPort, @@ -174,21 +154,23 @@ public TcpDiscoveryNode(UUID id, assert metricsProvider != null; assert ver != null; - this.id = id; - List sortedAddrs = new ArrayList<>(addrs); - Collections.sort(sortedAddrs); - this.addrs = sortedAddrs; - this.hostNames = hostNames; - this.discPort = discPort; - this.metricsProvider = metricsProvider; - this.ver = ver; + clusterNodeMsg = new ClusterNodeMessage(); + id(id); + clusterNodeMsg.consistentId(consistentId != null ? consistentId : U.consistentId(sortedAddrs, discPort)); + clusterNodeMsg.addresses(sortedAddrs); + clusterNodeMsg.hostNames(hostNames); + version(ver); + + ClusterMetrics metrics = metricsProvider.metrics(); - this.consistentId = consistentId != null ? consistentId : U.consistentId(sortedAddrs, discPort); + if (metrics != null) + setMetrics(metrics); - metrics = metricsProvider.metrics(); + this.metricsProvider = metricsProvider; + this.discPort = discPort; cacheMetrics = metricsProvider.cacheMetrics(); sockAddrs = U.toSocketAddresses(this, discPort); } @@ -208,13 +190,18 @@ public void lastSuccessfulAddress(InetSocketAddress lastSuccessfulAddr) { } /** {@inheritDoc} */ - @Override public UUID id() { - return id; + @Override public synchronized UUID id() { + return clusterNodeMsg.id(); + } + + /** */ + private synchronized void id(UUID id) { + clusterNodeMsg.id(id); } /** {@inheritDoc} */ @Override public Object consistentId() { - return consistentId; + return clusterNodeMsg.consistentId(); } /** @@ -223,13 +210,13 @@ public void lastSuccessfulAddress(InetSocketAddress lastSuccessfulAddr) { * @param consistentId Consistent globally unique node ID. */ @Override public void setConsistentId(Serializable consistentId) { - this.consistentId = consistentId; + clusterNodeMsg.consistentId(consistentId); - final Map map = new HashMap<>(attrs); + Map map = new HashMap<>(clusterNodeMsg.attributes()); map.put(ATTR_NODE_CONSISTENT_ID, consistentId); - attrs = Collections.unmodifiableMap(map); + clusterNodeMsg.attributes(Collections.unmodifiableMap(map)); } /** {@inheritDoc} */ @@ -238,13 +225,13 @@ public void lastSuccessfulAddress(InetSocketAddress lastSuccessfulAddr) { if (IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(name)) return null; - return (T)attrs.get(name); + return (T)clusterNodeMsg.attributes().get(name); } /** {@inheritDoc} */ @Override public Map attributes() { // Even though discovery SPI removes this attribute after authentication, keep this check for safety. - return F.view(attrs, new IgnitePredicate() { + return F.view(clusterNodeMsg.attributes(), new IgnitePredicate<>() { @Override public boolean apply(String s) { return !IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(s); } @@ -257,7 +244,7 @@ public void lastSuccessfulAddress(InetSocketAddress lastSuccessfulAddr) { * @param attrs Node attributes. */ public void setAttributes(Map attrs) { - this.attrs = U.sealMap(attrs); + clusterNodeMsg.attributes(U.sealMap(attrs)); } /** @@ -266,27 +253,29 @@ public void setAttributes(Map attrs) { * @return Node attributes without filtering. */ public Map getAttributes() { - return attrs; + return clusterNodeMsg.attributes(); } /** {@inheritDoc} */ - @Override public ClusterMetrics metrics() { + @Override public synchronized ClusterMetrics metrics() { if (metricsProvider != null) { - ClusterMetrics metrics0 = metricsProvider.metrics(); + ClusterMetrics metrics = metricsProvider.metrics(); - metrics = metrics0; + setMetrics(metrics); - return metrics0; + return metrics; } - return metrics; + assert clusterNodeMsg.clusterMetricsMessage() != null; + + return new ClusterMetricsSnapshot(clusterNodeMsg.clusterMetricsMessage()); } /** {@inheritDoc} */ - @Override public void setMetrics(ClusterMetrics metrics) { + @Override public synchronized void setMetrics(ClusterMetrics metrics) { assert metrics != null; - this.metrics = metrics; + clusterNodeMsg.clusterMetricsMessage(new NodeMetricsMessage(metrics)); } /** {@inheritDoc} */ @@ -326,22 +315,22 @@ public void internalOrder(long intOrder) { /** * @return Order. */ - @Override public long order() { - return order; + @Override public synchronized long order() { + return clusterNodeMsg.order(); } /** * @param order Order of the node. */ - public void order(long order) { + public synchronized void order(long order) { assert order > 0 : "Order is invalid: " + this; - this.order = order; + clusterNodeMsg.order(order); } /** {@inheritDoc} */ @Override public IgniteProductVersion version() { - return ver; + return new IgniteProductVersion(clusterNodeMsg.productVersionMessage()); } /** @@ -350,29 +339,29 @@ public void order(long order) { public void version(IgniteProductVersion ver) { assert ver != null; - this.ver = ver; + clusterNodeMsg.productVersionMessage(ver.message()); } /** {@inheritDoc} */ @Override public Collection addresses() { - return addrs; + return clusterNodeMsg.addresses(); } /** {@inheritDoc} */ @Override public boolean isLocal() { - return loc; + return clusterNodeMsg.local(); } /** * @param loc Grid local node flag. */ public void local(boolean loc) { - this.loc = loc; + clusterNodeMsg.local(loc); } /** {@inheritDoc} */ @Override public Collection hostNames() { - return hostNames; + return clusterNodeMsg.hostNames(); } /** @@ -463,9 +452,9 @@ public void visible(boolean visible) { /** {@inheritDoc} */ @Override public boolean isClient() { if (!cacheCliInit) { - Boolean clientModeAttr = ((ClusterNode)this).attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE); + Boolean clientModeAttr = attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE); - cacheCli = clientModeAttr != null && clientModeAttr; + clusterNodeMsg.client(cacheCli = clientModeAttr != null && clientModeAttr); cacheCliInit = true; } @@ -520,7 +509,7 @@ public void clientRouterNodeId(UUID clientRouterNodeId) { * @param newId New node ID. */ public void onClientDisconnected(UUID newId) { - id = newId; + id(newId); } /** @@ -528,11 +517,10 @@ public void onClientDisconnected(UUID newId) { * @return Copy of local node for client reconnect request. */ public TcpDiscoveryNode clientReconnectNode(Map nodeAttrs) { - TcpDiscoveryNode node = new TcpDiscoveryNode( - id, addrs, hostNames, discPort, metricsProvider, ver, null - ); + TcpDiscoveryNode node = new TcpDiscoveryNode(id(), clusterNodeMsg.addresses(), hostNames(), discPort, + metricsProvider, version(), null); - node.attrs = Collections.unmodifiableMap(new HashMap<>(nodeAttrs)); + node.clusterNodeMsg.attributes(Collections.unmodifiableMap(new HashMap<>(nodeAttrs))); node.clientRouterNodeId = clientRouterNodeId; return node; @@ -556,16 +544,16 @@ public TcpDiscoveryNode clientReconnectNode(Map nodeAttrs) { /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeUuid(out, id); - U.writeMap(out, attrs); - U.writeCollection(out, addrs); - U.writeCollection(out, hostNames); + U.writeUuid(out, id()); + U.writeMap(out, clusterNodeMsg.attributes()); + U.writeCollection(out, clusterNodeMsg.addresses()); + U.writeCollection(out, clusterNodeMsg.hostNames()); out.writeInt(discPort); // Cluster metrics byte[] mtr = null; - ClusterMetrics metrics = this.metrics; + ClusterMetrics metrics = metrics(); if (metrics != null) mtr = ClusterMetricsSnapshot.serialize(metrics); @@ -575,28 +563,30 @@ public TcpDiscoveryNode clientReconnectNode(Map nodeAttrs) { // Legacy: Number of cache metrics out.writeInt(0); - out.writeLong(order); + out.writeLong(order()); out.writeLong(intOrder); - out.writeObject(ver); + out.writeObject(version()); U.writeUuid(out, clientRouterNodeId); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - id = U.readUuid(in); + assert clusterNodeMsg != null; + + id(U.readUuid(in)); - attrs = U.sealMap(U.readMap(in)); - addrs = U.readCollection(in); - hostNames = U.readCollection(in); + clusterNodeMsg.attributes(U.sealMap(U.readMap(in))); + clusterNodeMsg.addresses(U.readCollection(in)); + clusterNodeMsg.hostNames(U.readCollection(in)); discPort = in.readInt(); - Object consistentIdAttr = attrs.get(ATTR_NODE_CONSISTENT_ID); + Object consistentIdAttr = clusterNodeMsg.attributes().get(ATTR_NODE_CONSISTENT_ID); // Cluster metrics byte[] mtr = U.readByteArray(in); if (mtr != null) - metrics = ClusterMetricsSnapshot.deserialize(mtr, 0); + setMetrics(ClusterMetricsSnapshot.deserialize(mtr, 0)); // Legacy: Cache metrics int size = in.readInt(); @@ -606,20 +596,22 @@ public TcpDiscoveryNode clientReconnectNode(Map nodeAttrs) { in.readObject(); } - order = in.readLong(); + order(in.readLong()); intOrder = in.readLong(); - ver = (IgniteProductVersion)in.readObject(); + version((IgniteProductVersion)in.readObject()); clientRouterNodeId = U.readUuid(in); if (clientRouterNodeId() != null) - consistentId = consistentIdAttr != null ? consistentIdAttr : id; - else - consistentId = consistentIdAttr != null ? consistentIdAttr : U.consistentId(addrs, discPort); + clusterNodeMsg.consistentId(consistentIdAttr != null ? consistentIdAttr : id()); + else { + clusterNodeMsg.consistentId(consistentIdAttr != null ? consistentIdAttr + : U.consistentId(clusterNodeMsg.addresses(), discPort)); + } } /** {@inheritDoc} */ @Override public int hashCode() { - return id.hashCode(); + return id().hashCode(); } /** {@inheritDoc} */ @@ -638,14 +630,16 @@ public TcpDiscoveryNode clientReconnectNode(Map nodeAttrs) { * @param node to copy data from */ public TcpDiscoveryNode(ClusterNode node) { - this.id = node.id(); - this.consistentId = node.consistentId(); - this.addrs = node.addresses(); - this.hostNames = node.hostNames(); - this.order = node.order(); - this.ver = node.version(); - this.clientRouterNodeId = node.isClient() ? node.id() : null; - - attrs = Collections.singletonMap(ATTR_NODE_CONSISTENT_ID, consistentId); + clusterNodeMsg = new ClusterNodeMessage(); + + id(node.id()); + clusterNodeMsg.consistentId(node.consistentId()); + clusterNodeMsg.addresses(node.addresses()); + clusterNodeMsg.hostNames(node.hostNames()); + order(node.order()); + version(node.version()); + clusterNodeMsg.attributes(Collections.singletonMap(ATTR_NODE_CONSISTENT_ID, clusterNodeMsg.consistentId())); + + clientRouterNodeId = node.isClient() ? node.id() : null; } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeCollectionMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeCollectionMessage.java new file mode 100644 index 0000000000000..35304fd68bd96 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeCollectionMessage.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import java.util.Collection; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * Container message for a collection of {@link ClusterNodeMessage}. + *
+ * Requires pre- and post- marshalling. + * + * @see #prepareMarshal(Marshaller) + * @see #prepareMarshal(Marshaller) + */ +public class ClusterNodeCollectionMessage implements Message { + /** The collection of wrapped {@link ClusterNodeMessage}. */ + @Order(value = 0, method = "clusterNodeMessages") + private Collection clusterNodeMsgs; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public ClusterNodeCollectionMessage() { + // No-op. + } + + /** @param clusterNodeMsgs Holder messages of {@link ClusterNode}. */ + public ClusterNodeCollectionMessage(Collection clusterNodeMsgs) { + this.clusterNodeMsgs = clusterNodeMsgs; + } + + /** @param marsh Marshalled. */ + public void prepareMarshal(Marshaller marsh) { + clusterNodeMsgs.forEach(msg -> msg.prepareMarshal(marsh)); + } + + /** + * @param marsh Marshalled. + * @param clsLdr Class loader. + */ + public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { + clusterNodeMsgs.forEach(msg -> msg.finishUnmarshal(marsh, clsLdr)); + } + + /** @return Holder messages of {@link ClusterNode}. */ + public Collection clusterNodeMessages() { + return clusterNodeMsgs; + } + + /** @param clusterNodeMsgs Holder messages of {@link ClusterNode}. */ + public void clusterNodeMessages(Collection clusterNodeMsgs) { + this.clusterNodeMsgs = clusterNodeMsgs; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 115; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeMessage.java new file mode 100644 index 0000000000000..510c7ac0ff33e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeMessage.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import java.util.Collection; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * Message for {@link ClusterNode}. Requites pre- and post- serialization with the class loader. + *
+ * Requires pre- and -post marshalling. + * + * @see #prepareMarshal(Marshaller) + * @see #finishUnmarshal(Marshaller, ClassLoader) + */ +public class ClusterNodeMessage implements Message { + /** Node ID. */ + @Order(0) + private UUID id; + + /** Internal discovery addresses as strings. */ + @Order(value = 1, method = "addresses") + private Collection addrs; + + /** Internal discovery host names as strings. */ + @Order(2) + private Collection hostNames; + + /** */ + @Order(value = 3, method = "clusterMetricsMessage") + private NodeMetricsMessage clusterMetricsMsg; + + /** */ + @Order(value = 4) + private long order; + + /** */ + @Order(value = 5, method = "productVersionMessage") + private IgniteProductVersionMessage productVerMsg; + + /** Grid local node flag (transient). */ + @Order(value = 6, method = "local") + private boolean loc; + + /** */ + @Order(7) + private boolean client; + + /** */ + @Order(8) + private String dataCenterId; + + /** Consistent ID. */ + private Object consistentId; + + /** */ + private byte[] consistentIdBytes; + + /** Node attributes. */ + private Map attrs; + + /** */ + private byte[] attrsBytes; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public ClusterNodeMessage() { + // No-op. + } + + /** @param clusterNode Cluster node. */ + public ClusterNodeMessage(ClusterNode clusterNode) { + id = clusterNode.id(); + addrs = clusterNode.addresses(); + hostNames = clusterNode.hostNames(); + if (clusterNode.metrics() != null) + clusterMetricsMsg = new NodeMetricsMessage(clusterNode.metrics()); + order = clusterNode.order(); + productVerMsg = new IgniteProductVersionMessage(clusterNode.version()); + loc = clusterNode.isLocal(); + client = clusterNode.isClient(); + dataCenterId = clusterNode.dataCenterId(); + attrs = clusterNode.attributes(); + } + + /** @param marsh Marshalled. */ + public void prepareMarshal(Marshaller marsh) { + if (F.isEmpty(attrs) && attrsBytes == null) { + try { + attrsBytes = U.marshal(marsh, attrs); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal cluster node attributes.", e); + } + } + + if (consistentId != null && consistentIdBytes == null) { + try { + consistentIdBytes = U.marshal(marsh, consistentId); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal cluster node's consistent id.", e); + } + } + } + + /** + * @param marsh Marshalled. + * @param clsLdr Class loader. + */ + public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { + if (attrsBytes != null && F.isEmpty(attrs)) { + try { + attrs = U.unmarshal(marsh, attrsBytes, clsLdr); + + attrsBytes = null; + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to unmarshal cluster node attributes.", e); + } + } + + if (consistentIdBytes != null && consistentId == null) { + try { + consistentId = U.unmarshal(marsh, consistentIdBytes, clsLdr); + + consistentIdBytes = null; + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to cluster node's consistent id.", e); + } + } + } + + /** @return Addresses. */ + public Collection addresses() { + return addrs; + } + + /** @param addrs Addresses. */ + public void addresses(Collection addrs) { + this.addrs = addrs; + } + + /** @return Node metrics message. */ + public NodeMetricsMessage clusterMetricsMessage() { + return clusterMetricsMsg; + } + + /** @param clusterMetricsMsg Node metrics message. */ + public void clusterMetricsMessage(NodeMetricsMessage clusterMetricsMsg) { + this.clusterMetricsMsg = clusterMetricsMsg; + } + + /** @return Client flag. */ + public boolean client() { + return client; + } + + /** @param client Client flag. */ + public void client(boolean client) { + this.client = client; + } + + /** @return Datacenter id. */ + public String dataCenterId() { + return dataCenterId; + } + + /** @param dataCenterId Datacenter id. */ + public void dataCenterId(String dataCenterId) { + this.dataCenterId = dataCenterId; + } + + /** @return Host names. */ + public Collection hostNames() { + return hostNames; + } + + /** @param hostNames Host names. */ + public void hostNames(Collection hostNames) { + this.hostNames = hostNames; + } + + /** @return Node id. */ + public UUID id() { + return id; + } + + /** @param id Node id. */ + public void id(UUID id) { + this.id = id; + } + + /** @return Node order. */ + public long order() { + return order; + } + + /** @param order Node order. */ + public void order(long order) { + this.order = order; + } + + /** @return Local node flag. */ + public boolean local() { + return loc; + } + + /** @param loc Local node flag. */ + public void local(boolean loc) { + this.loc = loc; + } + + /** @return Product version. */ + public IgniteProductVersionMessage productVersionMessage() { + return productVerMsg; + } + + /** @param productVerMsg Product version. */ + public void productVersionMessage(IgniteProductVersionMessage productVerMsg) { + this.productVerMsg = productVerMsg; + } + + /** @return Node consistent id. */ + public Object consistentId() { + return consistentId; + } + + /** @param consistentId Node consistent id. */ + public void consistentId(Object consistentId) { + this.consistentId = consistentId; + } + + /** @return Node's attributes. */ + public Map attributes() { + return attrs; + } + + /** @param attrs Node's attributes. */ + public void attributes(Map attrs) { + this.attrs = attrs; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -109; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/IgniteProductVersionMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/IgniteProductVersionMessage.java new file mode 100644 index 0000000000000..bdf1da1e1b8aa --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/IgniteProductVersionMessage.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** Message for {@link IgniteProductVersion}.*/ +public class IgniteProductVersionMessage implements Message { + /** Size of the {@link #revHash }*/ + public static final int REV_HASH_SIZE = 20; + + /** Major version number. */ + @Order(0) + private byte major; + + /** Minor version number. */ + @Order(1) + private byte minor; + + /** Maintenance version number. */ + @Order(2) + private byte maintenance; + + /** Stage of development. */ + @Order(3) + private String stage; + + /** Revision timestamp. */ + @Order(value = 4, method = "revisionTimestamp") + private long revTs; + + /** Revision hash. */ + @Order(value = 5, method = "revisionHash") + private byte[] revHash; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public IgniteProductVersionMessage() { + // No-op. + } + + /** + * @param major Major version. + * @param minor Minor version. + * @param maintenance Maintenance. + * @param stage Stage. + * @param revTs Revision timestamp. + * @param revHash Revision hash. + */ + public IgniteProductVersionMessage(byte major, byte minor, byte maintenance, String stage, long revTs, byte[] revHash) { + this.major = major; + this.minor = minor; + this.maintenance = maintenance; + this.stage = stage; + this.revTs = revTs; + this.revHash = revHash != null ? revHash : new byte[REV_HASH_SIZE]; + } + + /** @param ver Product version. */ + public IgniteProductVersionMessage(IgniteProductVersion ver) { + this( + ver.major(), + ver.minor(), + ver.maintenance(), + ver.stage(), + ver.revisionTimestamp(), + ver.revisionHash() + ); + } + + /** @return Maintenance. */ + public byte maintenance() { + return maintenance; + } + + /** @param maintenance Maintenance. */ + public void maintenance(byte maintenance) { + this.maintenance = maintenance; + } + + /** @return Major version. */ + public byte major() { + return major; + } + + /** @param major Major version. */ + public void major(byte major) { + this.major = major; + } + + /** @return Minor version. */ + public byte minor() { + return minor; + } + + /** @param minor Minor version. */ + public void minor(byte minor) { + this.minor = minor; + } + + /** @return Revision hash. */ + public byte[] revisionHash() { + return revHash; + } + + /** @param revHash Revision hash. */ + public void revisionHash(byte[] revHash) { + this.revHash = revHash; + } + + /** @return Revision timestamp. */ + public long revisionTimestamp() { + return revTs; + } + + /** @param revTs Revision timestamp. */ + public void revisionTimestamp(long revTs) { + this.revTs = revTs; + } + + /** @return Statge. */ + public String stage() { + return stage; + } + + /** @param stage Stage. */ + public void stage(String stage) { + this.stage = stage; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -108; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index 36540d8b7dfc1..e445617fd87e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -17,12 +17,24 @@ package org.apache.ignite.spi.discovery.tcp.messages; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.jetbrains.annotations.Nullable; @@ -31,35 +43,66 @@ * Message telling nodes that new node should be added to topology. * When newly added node receives the message it connects to its next and finishes * join process. + *
+ * Requires pre- and post- marshalling. + * @see #prepareMarshal(Marshaller) + * @see #finishUnmarshal(Marshaller, ClassLoader) */ @TcpDiscoveryEnsureDelivery @TcpDiscoveryRedirectToClient -public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableMessage { +public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableMessage implements Message { /** */ private static final long serialVersionUID = 0L; - /** Added node. */ - private final TcpDiscoveryNode node; - /** */ + @Order(value = 6, method = "gridDiscoveryData") private DiscoveryDataPacket dataPacket; - /** Pending messages from previous node. */ - private Collection msgs; + /** Start time of the first grid node. */ + @Order(7) + private long gridStartTime; + + /** Topology snapshots history. */ + @Order(value = 8, method = "topologyHistoryMessages") + private @Nullable Map topHistMsgs; + + /** {@link TcpDiscoveryAbstractMessage} pending messages from previous node which is a {@link Message}. */ + @Order(value = 9, method = "pendingMessages") + private Map pendingMsgs; + + /** Added node. */ + private TcpDiscoveryNode node; + + /** Marshalled {@link #node}. */ + private byte[] nodeBytes; /** Current topology. Initialized by coordinator. */ @GridToStringInclude private Collection top; + /** Marshalled {@link #top}. */ + private byte[] topBytes; + /** */ @GridToStringInclude private transient Collection clientTop; - /** Topology snapshots history. */ - private Map> topHist; + /** Marshalled {@link #clientTop}. */ + private byte[] clientTopBytes; - /** Start time of the first grid node. */ - private final long gridStartTime; + /** + * TODO: Remove after refactoring of discovery messages serialization https://issues.apache.org/jira/browse/IGNITE-25883 + * Java-serializable pending messages from previous node. + */ + private Map serializablePendingMsgs; + + /** Marshalled {@link #serializablePendingMsgs}. */ + private byte[] serializablePendingMsgsBytes; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryNodeAddedMessage() { + // No-op. + } /** * Constructor. @@ -69,7 +112,8 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableM * @param dataPacket container for collecting discovery data across the cluster. * @param gridStartTime Start time of the first grid node. */ - public TcpDiscoveryNodeAddedMessage(UUID creatorNodeId, + public TcpDiscoveryNodeAddedMessage( + UUID creatorNodeId, TcpDiscoveryNode node, DiscoveryDataPacket dataPacket, long gridStartTime @@ -90,13 +134,113 @@ public TcpDiscoveryNodeAddedMessage(UUID creatorNodeId, public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { super(msg); - this.node = msg.node; - this.msgs = msg.msgs; - this.top = msg.top; - this.clientTop = msg.clientTop; - this.topHist = msg.topHist; - this.dataPacket = msg.dataPacket; - this.gridStartTime = msg.gridStartTime; + node = msg.node; + pendingMsgs = msg.pendingMsgs; + serializablePendingMsgs = msg.serializablePendingMsgs; + top = msg.top; + clientTop = msg.clientTop; + topHistMsgs = msg.topHistMsgs; + dataPacket = msg.dataPacket; + gridStartTime = msg.gridStartTime; + } + + /** + * TODO: Revise after refactoring of TcpDiscoveryNode serialization https://issues.apache.org/jira/browse/IGNITE-27899 + * @param marsh marshaller. + */ + public void prepareMarshal(Marshaller marsh) { + if (!F.isEmpty(topHistMsgs)) + topHistMsgs.values().forEach(m -> m.prepareMarshal(marsh)); + + if (node == null && nodeBytes == null) { + try { + nodeBytes = U.marshal(marsh, node); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal cluster node.", e); + } + } + + if (top != null && topBytes == null) { + try { + topBytes = U.marshal(marsh, top); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal topology nodes.", e); + } + } + + if (clientTop != null && clientTopBytes == null) { + try { + clientTopBytes = U.marshal(marsh, clientTop); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal client topology nodes.", e); + } + } + + if (serializablePendingMsgs != null && serializablePendingMsgsBytes == null) { + try { + serializablePendingMsgsBytes = U.marshal(marsh, serializablePendingMsgs); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal serializable pending messages.", e); + } + } + } + + /** + * TODO: Revise after refactoring of TcpDiscoveryNode serialization https://issues.apache.org/jira/browse/IGNITE-27899 + * @param marsh Marshaller. + * @param clsLdr Class loader. + */ + public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { + if (!F.isEmpty(topHistMsgs)) + topHistMsgs.values().forEach(m -> m.finishUnmarshal(marsh, clsLdr)); + + if (nodeBytes != null && node == null) { + try { + node = U.unmarshal(marsh, nodeBytes, clsLdr); + + nodeBytes = null; + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to unmarshal cluster node.", e); + } + } + + if (topBytes != null && top == null) { + try { + top = U.unmarshal(marsh, topBytes, clsLdr); + + topBytes = null; + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to unmarshal topology nodes.", e); + } + } + + if (clientTopBytes != null && clientTop == null) { + try { + clientTop = U.unmarshal(marsh, clientTopBytes, clsLdr); + + clientTopBytes = null; + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to unmarshal client topology nodes.", e); + } + } + + if (serializablePendingMsgsBytes != null && serializablePendingMsgs == null) { + try { + serializablePendingMsgs = U.unmarshal(marsh, serializablePendingMsgsBytes, clsLdr); + + serializablePendingMsgsBytes = null; + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to unmarshal serializable pending messages.", e); + } + } } /** @@ -109,23 +253,89 @@ public TcpDiscoveryNode node() { } /** + * @return Pending messages. + * @see #messages() + */ + public Map pendingMessages() { + return pendingMsgs; + } + + /** + * @param pendingMsgs Pending messages. + * @see #messages(List) + */ + public void pendingMessages(Map pendingMsgs) { + this.pendingMsgs = pendingMsgs; + } + + /** + * TODO: revise after refactoring of discovery messages serialization https://issues.apache.org/jira/browse/IGNITE-25883 * Gets pending messages sent to new node by its previous. * * @return Pending messages from previous node. */ - @Nullable public Collection messages() { - return msgs; + @Nullable public List messages() { + assert serializablePendingMsgs == null; + + if (F.isEmpty(pendingMsgs) && F.isEmpty(serializablePendingMsgs)) + return Collections.emptyList(); + + int totalSz = (F.isEmpty(pendingMsgs) ? 0 : pendingMsgs.size()) + + (F.isEmpty(serializablePendingMsgs) ? 0 : serializablePendingMsgs.size()); + + List res = new ArrayList<>(totalSz); + + for (int i = 0; i < totalSz; ++i) { + Message m = pendingMsgs.get(i); + + if (m == null) { + TcpDiscoveryAbstractMessage sm = serializablePendingMsgs.get(i); + assert sm != null; + + res.add(sm); + } + else { + assert serializablePendingMsgs.get(i) == null; + assert m instanceof TcpDiscoveryAbstractMessage; + + res.add((TcpDiscoveryAbstractMessage)m); + } + } + + return res; } /** + * TODO: revise after refactoring of discovery messages serialization https://issues.apache.org/jira/browse/IGNITE-25883 * Sets pending messages to send to new node. * * @param msgs Pending messages to send to new node. */ - public void messages( - @Nullable Collection msgs - ) { - this.msgs = msgs; + public void messages(@Nullable List msgs) { + if (F.isEmpty(msgs)) { + serializablePendingMsgs = null; + pendingMsgs = null; + + return; + } + + int idx = 0; + + for (TcpDiscoveryAbstractMessage m : msgs) { + if (m instanceof Message) { + if (pendingMsgs == null) + pendingMsgs = U.newHashMap(msgs.size()); + + pendingMsgs.put(idx++, (Message)m); + + continue; + } + + if (serializablePendingMsgs == null) + serializablePendingMsgs = U.newHashMap(msgs.size()); + + serializablePendingMsgs.put(idx++, m); + } } /** @@ -162,13 +372,35 @@ public Collection clientTopology() { return clientTop; } + /** @return Topology history messages. */ + public @Nullable Map topologyHistoryMessages() { + return topHistMsgs; + } + + /** @param topHistMsgs Topology history messages. */ + public void topologyHistoryMessages(@Nullable Map topHistMsgs) { + this.topHistMsgs = topHistMsgs; + } + /** * Gets topology snapshots history. * * @return Map with topology snapshots history. */ - public Map> topologyHistory() { - return topHist; + public @Nullable Map> topologyHistory() { + if (topHistMsgs == null) + return null; + + Map> res = U.newHashMap(topHistMsgs.size()); + + topHistMsgs.forEach((nodeId, msgs) -> { + Collection clusterNodeImpls = msgs.clusterNodeMessages().stream().map(TcpDiscoveryNode::new) + .collect(Collectors.toList()); + + res.put(nodeId, clusterNodeImpls); + }); + + return res; } /** @@ -177,7 +409,20 @@ public Map> topologyHistory() { * @param topHist Map with topology snapshots history. */ public void topologyHistory(@Nullable Map> topHist) { - this.topHist = topHist; + if (topHist == null) { + topHistMsgs = null; + + return; + } + + topHistMsgs = U.newHashMap(topHist.size()); + + topHist.forEach((nodeId, clusterNodes) -> { + Collection clusterNodeImpls = clusterNodes.stream().map(ClusterNodeMessage::new) + .collect(Collectors.toList()); + + topHistMsgs.put(nodeId, new ClusterNodeCollectionMessage(clusterNodeImpls)); + }); } /** @@ -187,6 +432,11 @@ public DiscoveryDataPacket gridDiscoveryData() { return dataPacket; } + /** @param dataPacket Data packet data. */ + public void gridDiscoveryData(DiscoveryDataPacket dataPacket) { + this.dataPacket = dataPacket; + } + /** * Clears discovery data to minimize message size. */ @@ -210,6 +460,16 @@ public long gridStartTime() { return gridStartTime; } + /** @param gridStartTime First grid node start time. */ + public void gridStartTime(long gridStartTime) { + this.gridStartTime = gridStartTime; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 20; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryNodeAddedMessage.class, this, "super", super.toString()); From 1e1dd007715dc11946aa6624b98c605e1741d53c Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 19 Feb 2026 01:31:49 +0300 Subject: [PATCH 02/10] fix --- .../ignite/spi/discovery/tcp/ServerImpl.java | 2 +- .../tcp/internal/TcpDiscoveryNode.java | 205 +++++++++--------- .../tcp/messages/ClusterNodeMessage.java | 7 +- .../TcpDiscoveryNodeAddedMessage.java | 56 ++--- 4 files changed, 137 insertions(+), 133 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 3b16c46392645..723ec737a7798 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2491,7 +2491,7 @@ else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { if (addFinishMsg.clientDiscoData() != null) { addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(addFinishMsg); - addFinishMsg.prepareMarshal(spi.marshaller()); + addFinishMsg.prepareMarshal(spi.marshaller());` msg = addFinishMsg; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index 2b4ffc06b8f2b..e12b812f599ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -36,7 +36,6 @@ import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.managers.discovery.IgniteClusterNode; -import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage; import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -47,7 +46,6 @@ import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.messages.ClusterNodeMessage; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTENT_ID; @@ -64,8 +62,23 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Ignite /** */ private static final long serialVersionUID = 0L; - /** Values holding message of {@link ClusterNode}. */ - private ClusterNodeMessage clusterNodeMsg; + /** Node ID. */ + private volatile UUID id; + + /** Consistent ID. */ + @GridToStringInclude + private Object consistentId; + + /** Node attributes. */ + @GridToStringExclude + private Map attrs; + + /** Internal discovery addresses as strings. */ + @GridToStringInclude + private Collection addrs; + + /** Internal discovery host names as strings. */ + private Collection hostNames; /** */ @GridToStringInclude @@ -75,10 +88,17 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Ignite @GridToStringInclude private int discPort; + /** Node metrics. */ + @GridToStringExclude + private volatile ClusterMetrics metrics; + /** Node cache metrics. */ @GridToStringExclude private volatile Map cacheMetrics; + /** Node order in the topology. */ + private volatile long order; + /** Node order in the topology (internal). */ private volatile long intOrder; @@ -98,6 +118,12 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Ignite @GridToStringExclude private boolean visible; + /** Grid local node flag (transient). */ + private boolean loc; + + /** Version. */ + private IgniteProductVersion ver; + /** Alive check time (used by clients). */ @GridToStringExclude private transient volatile long aliveCheckTimeNanos; @@ -122,12 +148,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Ignite * Public default no-arg constructor for {@link Externalizable} interface. */ public TcpDiscoveryNode() { - clusterNodeMsg = new ClusterNodeMessage(); - } - - /** */ - public TcpDiscoveryNode(ClusterNodeMessage msg) { - clusterNodeMsg = msg; + // No-op. } /** @@ -141,8 +162,7 @@ public TcpDiscoveryNode(ClusterNodeMessage msg) { * @param ver Version. * @param consistentId Node consistent ID. */ - public TcpDiscoveryNode( - UUID id, + public TcpDiscoveryNode(UUID id, Collection addrs, Collection hostNames, int discPort, @@ -154,23 +174,21 @@ public TcpDiscoveryNode( assert metricsProvider != null; assert ver != null; + this.id = id; + List sortedAddrs = new ArrayList<>(addrs); - Collections.sort(sortedAddrs); - clusterNodeMsg = new ClusterNodeMessage(); - id(id); - clusterNodeMsg.consistentId(consistentId != null ? consistentId : U.consistentId(sortedAddrs, discPort)); - clusterNodeMsg.addresses(sortedAddrs); - clusterNodeMsg.hostNames(hostNames); - version(ver); + Collections.sort(sortedAddrs); - ClusterMetrics metrics = metricsProvider.metrics(); + this.addrs = sortedAddrs; + this.hostNames = hostNames; + this.discPort = discPort; + this.metricsProvider = metricsProvider; + this.ver = ver; - if (metrics != null) - setMetrics(metrics); + this.consistentId = consistentId != null ? consistentId : U.consistentId(sortedAddrs, discPort); - this.metricsProvider = metricsProvider; - this.discPort = discPort; + metrics = metricsProvider.metrics(); cacheMetrics = metricsProvider.cacheMetrics(); sockAddrs = U.toSocketAddresses(this, discPort); } @@ -190,18 +208,13 @@ public void lastSuccessfulAddress(InetSocketAddress lastSuccessfulAddr) { } /** {@inheritDoc} */ - @Override public synchronized UUID id() { - return clusterNodeMsg.id(); - } - - /** */ - private synchronized void id(UUID id) { - clusterNodeMsg.id(id); + @Override public UUID id() { + return id; } /** {@inheritDoc} */ @Override public Object consistentId() { - return clusterNodeMsg.consistentId(); + return consistentId; } /** @@ -210,13 +223,13 @@ private synchronized void id(UUID id) { * @param consistentId Consistent globally unique node ID. */ @Override public void setConsistentId(Serializable consistentId) { - clusterNodeMsg.consistentId(consistentId); + this.consistentId = consistentId; - Map map = new HashMap<>(clusterNodeMsg.attributes()); + final Map map = new HashMap<>(attrs); map.put(ATTR_NODE_CONSISTENT_ID, consistentId); - clusterNodeMsg.attributes(Collections.unmodifiableMap(map)); + attrs = Collections.unmodifiableMap(map); } /** {@inheritDoc} */ @@ -225,13 +238,13 @@ private synchronized void id(UUID id) { if (IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(name)) return null; - return (T)clusterNodeMsg.attributes().get(name); + return (T)attrs.get(name); } /** {@inheritDoc} */ @Override public Map attributes() { // Even though discovery SPI removes this attribute after authentication, keep this check for safety. - return F.view(clusterNodeMsg.attributes(), new IgnitePredicate<>() { + return F.view(attrs, new IgnitePredicate() { @Override public boolean apply(String s) { return !IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(s); } @@ -244,7 +257,7 @@ private synchronized void id(UUID id) { * @param attrs Node attributes. */ public void setAttributes(Map attrs) { - clusterNodeMsg.attributes(U.sealMap(attrs)); + this.attrs = U.sealMap(attrs); } /** @@ -253,29 +266,27 @@ public void setAttributes(Map attrs) { * @return Node attributes without filtering. */ public Map getAttributes() { - return clusterNodeMsg.attributes(); + return attrs; } /** {@inheritDoc} */ - @Override public synchronized ClusterMetrics metrics() { + @Override public ClusterMetrics metrics() { if (metricsProvider != null) { - ClusterMetrics metrics = metricsProvider.metrics(); + ClusterMetrics metrics0 = metricsProvider.metrics(); - setMetrics(metrics); + metrics = metrics0; - return metrics; + return metrics0; } - assert clusterNodeMsg.clusterMetricsMessage() != null; - - return new ClusterMetricsSnapshot(clusterNodeMsg.clusterMetricsMessage()); + return metrics; } /** {@inheritDoc} */ - @Override public synchronized void setMetrics(ClusterMetrics metrics) { + @Override public void setMetrics(ClusterMetrics metrics) { assert metrics != null; - clusterNodeMsg.clusterMetricsMessage(new NodeMetricsMessage(metrics)); + this.metrics = metrics; } /** {@inheritDoc} */ @@ -315,22 +326,22 @@ public void internalOrder(long intOrder) { /** * @return Order. */ - @Override public synchronized long order() { - return clusterNodeMsg.order(); + @Override public long order() { + return order; } /** * @param order Order of the node. */ - public synchronized void order(long order) { + public void order(long order) { assert order > 0 : "Order is invalid: " + this; - clusterNodeMsg.order(order); + this.order = order; } /** {@inheritDoc} */ @Override public IgniteProductVersion version() { - return new IgniteProductVersion(clusterNodeMsg.productVersionMessage()); + return ver; } /** @@ -339,29 +350,29 @@ public synchronized void order(long order) { public void version(IgniteProductVersion ver) { assert ver != null; - clusterNodeMsg.productVersionMessage(ver.message()); + this.ver = ver; } /** {@inheritDoc} */ @Override public Collection addresses() { - return clusterNodeMsg.addresses(); + return addrs; } /** {@inheritDoc} */ @Override public boolean isLocal() { - return clusterNodeMsg.local(); + return loc; } /** * @param loc Grid local node flag. */ public void local(boolean loc) { - clusterNodeMsg.local(loc); + this.loc = loc; } /** {@inheritDoc} */ @Override public Collection hostNames() { - return clusterNodeMsg.hostNames(); + return hostNames; } /** @@ -454,7 +465,7 @@ public void visible(boolean visible) { if (!cacheCliInit) { Boolean clientModeAttr = attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE); - clusterNodeMsg.client(cacheCli = clientModeAttr != null && clientModeAttr); + cacheCli = clientModeAttr != null && clientModeAttr; cacheCliInit = true; } @@ -509,7 +520,7 @@ public void clientRouterNodeId(UUID clientRouterNodeId) { * @param newId New node ID. */ public void onClientDisconnected(UUID newId) { - id(newId); + id = newId; } /** @@ -517,10 +528,11 @@ public void onClientDisconnected(UUID newId) { * @return Copy of local node for client reconnect request. */ public TcpDiscoveryNode clientReconnectNode(Map nodeAttrs) { - TcpDiscoveryNode node = new TcpDiscoveryNode(id(), clusterNodeMsg.addresses(), hostNames(), discPort, - metricsProvider, version(), null); + TcpDiscoveryNode node = new TcpDiscoveryNode( + id, addrs, hostNames, discPort, metricsProvider, ver, null + ); - node.clusterNodeMsg.attributes(Collections.unmodifiableMap(new HashMap<>(nodeAttrs))); + node.attrs = Collections.unmodifiableMap(new HashMap<>(nodeAttrs)); node.clientRouterNodeId = clientRouterNodeId; return node; @@ -544,16 +556,16 @@ public TcpDiscoveryNode clientReconnectNode(Map nodeAttrs) { /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeUuid(out, id()); - U.writeMap(out, clusterNodeMsg.attributes()); - U.writeCollection(out, clusterNodeMsg.addresses()); - U.writeCollection(out, clusterNodeMsg.hostNames()); + U.writeUuid(out, id); + U.writeMap(out, attrs); + U.writeCollection(out, addrs); + U.writeCollection(out, hostNames); out.writeInt(discPort); // Cluster metrics byte[] mtr = null; - ClusterMetrics metrics = metrics(); + ClusterMetrics metrics = this.metrics; if (metrics != null) mtr = ClusterMetricsSnapshot.serialize(metrics); @@ -563,55 +575,48 @@ public TcpDiscoveryNode clientReconnectNode(Map nodeAttrs) { // Legacy: Number of cache metrics out.writeInt(0); - out.writeLong(order()); + out.writeLong(order); out.writeLong(intOrder); - out.writeObject(version()); + out.writeObject(ver); U.writeUuid(out, clientRouterNodeId); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - assert clusterNodeMsg != null; - - id(U.readUuid(in)); + id = U.readUuid(in); - clusterNodeMsg.attributes(U.sealMap(U.readMap(in))); - clusterNodeMsg.addresses(U.readCollection(in)); - clusterNodeMsg.hostNames(U.readCollection(in)); + attrs = U.sealMap(U.readMap(in)); + addrs = U.readCollection(in); + hostNames = U.readCollection(in); discPort = in.readInt(); - Object consistentIdAttr = clusterNodeMsg.attributes().get(ATTR_NODE_CONSISTENT_ID); + Object consistentIdAttr = attrs.get(ATTR_NODE_CONSISTENT_ID); // Cluster metrics byte[] mtr = U.readByteArray(in); if (mtr != null) - setMetrics(ClusterMetricsSnapshot.deserialize(mtr, 0)); + metrics = ClusterMetricsSnapshot.deserialize(mtr, 0); // Legacy: Cache metrics int size = in.readInt(); - for (int i = 0; i < size; i++) { - in.readInt(); - in.readObject(); - } + assert size == 0; - order(in.readLong()); + order = in.readLong(); intOrder = in.readLong(); - version((IgniteProductVersion)in.readObject()); + ver = (IgniteProductVersion)in.readObject(); clientRouterNodeId = U.readUuid(in); if (clientRouterNodeId() != null) - clusterNodeMsg.consistentId(consistentIdAttr != null ? consistentIdAttr : id()); - else { - clusterNodeMsg.consistentId(consistentIdAttr != null ? consistentIdAttr - : U.consistentId(clusterNodeMsg.addresses(), discPort)); - } + consistentId = consistentIdAttr != null ? consistentIdAttr : id; + else + consistentId = consistentIdAttr != null ? consistentIdAttr : U.consistentId(addrs, discPort); } /** {@inheritDoc} */ @Override public int hashCode() { - return id().hashCode(); + return id.hashCode(); } /** {@inheritDoc} */ @@ -630,16 +635,14 @@ public TcpDiscoveryNode clientReconnectNode(Map nodeAttrs) { * @param node to copy data from */ public TcpDiscoveryNode(ClusterNode node) { - clusterNodeMsg = new ClusterNodeMessage(); - - id(node.id()); - clusterNodeMsg.consistentId(node.consistentId()); - clusterNodeMsg.addresses(node.addresses()); - clusterNodeMsg.hostNames(node.hostNames()); - order(node.order()); - version(node.version()); - clusterNodeMsg.attributes(Collections.singletonMap(ATTR_NODE_CONSISTENT_ID, clusterNodeMsg.consistentId())); - - clientRouterNodeId = node.isClient() ? node.id() : null; + this.id = node.id(); + this.consistentId = node.consistentId(); + this.addrs = node.addresses(); + this.hostNames = node.hostNames(); + this.order = node.order(); + this.ver = node.version(); + this.clientRouterNodeId = node.isClient() ? node.id() : null; + + attrs = Collections.singletonMap(ATTR_NODE_CONSISTENT_ID, consistentId); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeMessage.java index 510c7ac0ff33e..ff81142bc5717 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeMessage.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.discovery.tcp.messages; +import java.io.Serializable; import java.util.Collection; import java.util.Map; import java.util.UUID; @@ -77,7 +78,7 @@ public class ClusterNodeMessage implements Message { private String dataCenterId; /** Consistent ID. */ - private Object consistentId; + private Serializable consistentId; /** */ private byte[] consistentIdBytes; @@ -248,12 +249,12 @@ public void productVersionMessage(IgniteProductVersionMessage productVerMsg) { } /** @return Node consistent id. */ - public Object consistentId() { + public Serializable consistentId() { return consistentId; } /** @param consistentId Node consistent id. */ - public void consistentId(Object consistentId) { + public void consistentId(Serializable consistentId) { this.consistentId = consistentId; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index e445617fd87e7..76738ae283a7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -27,12 +27,15 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.ClusterMetricsSnapshot; +import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; @@ -78,7 +81,7 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableM /** Current topology. Initialized by coordinator. */ @GridToStringInclude - private Collection top; + private @Nullable Collection top; /** Marshalled {@link #top}. */ private byte[] topBytes; @@ -87,14 +90,11 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableM @GridToStringInclude private transient Collection clientTop; - /** Marshalled {@link #clientTop}. */ - private byte[] clientTopBytes; - /** * TODO: Remove after refactoring of discovery messages serialization https://issues.apache.org/jira/browse/IGNITE-25883 * Java-serializable pending messages from previous node. */ - private Map serializablePendingMsgs; + private @Nullable Map serializablePendingMsgs; /** Marshalled {@link #serializablePendingMsgs}. */ private byte[] serializablePendingMsgsBytes; @@ -152,7 +152,7 @@ public void prepareMarshal(Marshaller marsh) { if (!F.isEmpty(topHistMsgs)) topHistMsgs.values().forEach(m -> m.prepareMarshal(marsh)); - if (node == null && nodeBytes == null) { + if (node != null && nodeBytes == null) { try { nodeBytes = U.marshal(marsh, node); } @@ -170,15 +170,6 @@ public void prepareMarshal(Marshaller marsh) { } } - if (clientTop != null && clientTopBytes == null) { - try { - clientTopBytes = U.marshal(marsh, clientTop); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to marshal client topology nodes.", e); - } - } - if (serializablePendingMsgs != null && serializablePendingMsgsBytes == null) { try { serializablePendingMsgsBytes = U.marshal(marsh, serializablePendingMsgs); @@ -220,17 +211,6 @@ public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { } } - if (clientTopBytes != null && clientTop == null) { - try { - clientTop = U.unmarshal(marsh, clientTopBytes, clsLdr); - - clientTopBytes = null; - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to unmarshal client topology nodes.", e); - } - } - if (serializablePendingMsgsBytes != null && serializablePendingMsgs == null) { try { serializablePendingMsgs = U.unmarshal(marsh, serializablePendingMsgsBytes, clsLdr); @@ -394,8 +374,28 @@ public void topologyHistoryMessages(@Nullable Map> res = U.newHashMap(topHistMsgs.size()); topHistMsgs.forEach((nodeId, msgs) -> { - Collection clusterNodeImpls = msgs.clusterNodeMessages().stream().map(TcpDiscoveryNode::new) - .collect(Collectors.toList()); + Collection clusterNodeImpls = msgs.clusterNodeMessages().stream() + .map(m -> { + TcpDiscoveryNode tcpDiscoNode = new TcpDiscoveryNode( + m.id(), + m.addresses(), + m.hostNames(), + 0, + null, + new IgniteProductVersion(m.productVersionMessage()), + m.consistentId() + ); + + tcpDiscoNode.order(m.order()); + tcpDiscoNode.local(m.local()); + tcpDiscoNode.setAttributes(m.attributes()); + tcpDiscoNode.getAttributes().put(IgniteNodeAttributes.ATTR_CLIENT_MODE, m.client()); + + if (m.clusterMetricsMessage() != null) + tcpDiscoNode.setMetrics(new ClusterMetricsSnapshot(m.clusterMetricsMessage())); + + return tcpDiscoNode; + }).collect(Collectors.toList()); res.put(nodeId, clusterNodeImpls); }); From 43f42671b1cccede80df5c2fff20293b0d92cb35 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 19 Feb 2026 01:38:28 +0300 Subject: [PATCH 03/10] fix --- .../java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 723ec737a7798..3b16c46392645 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2491,7 +2491,7 @@ else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { if (addFinishMsg.clientDiscoData() != null) { addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(addFinishMsg); - addFinishMsg.prepareMarshal(spi.marshaller());` + addFinishMsg.prepareMarshal(spi.marshaller()); msg = addFinishMsg; From d68b352158b0b0a3d593a696a888ced11b3047b9 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 19 Feb 2026 12:22:21 +0300 Subject: [PATCH 04/10] fix --- .../cluster/ClusterNodeMetrics.java | 5 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 15 +++-- .../tcp/internal/TcpDiscoveryNode.java | 64 +++++++++++++++---- .../ClusterNodeCollectionMessage.java | 2 +- .../tcp/messages/ClusterNodeMessage.java | 8 +-- .../TcpDiscoveryNodeAddedMessage.java | 51 ++++++++++----- 6 files changed, 103 insertions(+), 42 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java index 7bba244fc7fc6..55414ebe7f6c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java @@ -24,6 +24,7 @@ import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.processors.cache.CacheMetricsSnapshot; +import org.jetbrains.annotations.Nullable; /** * @@ -33,13 +34,13 @@ class ClusterNodeMetrics { private final ClusterMetrics nodeMetrics; /** */ - private final Map cacheMetrics; + private final @Nullable Map cacheMetrics; /** * @param nodeMetrics Node metrics. * @param cacheMetrics Cache metrics. */ - ClusterNodeMetrics(ClusterMetrics nodeMetrics, Map cacheMetrics) { + ClusterNodeMetrics(ClusterMetrics nodeMetrics, @Nullable Map cacheMetrics) { this.nodeMetrics = nodeMetrics; this.cacheMetrics = cacheMetrics; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 3b16c46392645..cce3fb5ea6f57 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1897,6 +1897,8 @@ private void prepareNodeAddedMessage( } nodeAddedMsg.topologyHistory(hist); + + nodeAddedMsg.prepareMarshal(spi.marshaller()); } } } @@ -2646,8 +2648,6 @@ private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUI msg0.topology(addedMsg.clientTopology()); - msg0.prepareMarshal(spi.marshaller()); - return msg0; } } @@ -3111,6 +3111,11 @@ protected void runTasks() { if (!locNode.id().equals(msg.senderNodeId()) && ensured) lastRingMsgTimeNanos = System.nanoTime(); + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + ((TcpDiscoveryNodeAddedMessage)msg).finishUnmarshal(spi.marshaller(), + U.resolveClassLoader(spi.ignite().configuration())); + } + if (locNode.internalOrder() == 0) { boolean proc = false; @@ -3145,12 +3150,8 @@ else if (msg instanceof TcpDiscoveryClientReconnectMessage) { sendMessageAcrossRing(msg); } - else if (msg instanceof TcpDiscoveryNodeAddedMessage) { - ((TcpDiscoveryNodeAddedMessage)msg).finishUnmarshal(spi.marshaller(), - U.resolveClassLoader(spi.ignite().configuration())); - + else if (msg instanceof TcpDiscoveryNodeAddedMessage) processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg); - } else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index e12b812f599ea..ea9bd55c57d3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -94,7 +94,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Ignite /** Node cache metrics. */ @GridToStringExclude - private volatile Map cacheMetrics; + private volatile @Nullable Map cacheMetrics; /** Node order in the topology. */ private volatile long order; @@ -162,35 +162,73 @@ public TcpDiscoveryNode() { * @param ver Version. * @param consistentId Node consistent ID. */ - public TcpDiscoveryNode(UUID id, + public TcpDiscoveryNode( + UUID id, Collection addrs, Collection hostNames, int discPort, DiscoveryMetricsProvider metricsProvider, IgniteProductVersion ver, - Serializable consistentId + @Nullable Serializable consistentId + ) { + this(id, consistentId, 0, false, null, addrs, hostNames, null, ver, metricsProvider.metrics()); + + this.discPort = discPort; + this.metricsProvider = metricsProvider; + cacheMetrics = metricsProvider.cacheMetrics(); + sockAddrs = U.toSocketAddresses(this, discPort); + } + + /** + * Constructor to implement {@link ClusterNode}. + * + * @param id Node id. + * @param consistentId Consistent id. + * @param order Node order. + * @param loc Local node flag. + * @param client Client node flag. + * @param addrs Node addresses. + * @param hostNames Node host names. + * @param attrs Node attributes. + * @param ver Version. + * @param metrics Node metrics. + */ + public TcpDiscoveryNode( + UUID id, + @Nullable Serializable consistentId, + long order, + boolean loc, + @Nullable Boolean client, + Collection addrs, + Collection hostNames, + @Nullable Map attrs, + IgniteProductVersion ver, + ClusterMetrics metrics ) { assert id != null; - assert metricsProvider != null; assert ver != null; + assert metrics != null; this.id = id; + this.order = order; + this.loc = loc; + this.hostNames = hostNames; + this.ver = ver; + this.metrics = metrics; List sortedAddrs = new ArrayList<>(addrs); - Collections.sort(sortedAddrs); this.addrs = sortedAddrs; - this.hostNames = hostNames; - this.discPort = discPort; - this.metricsProvider = metricsProvider; - this.ver = ver; this.consistentId = consistentId != null ? consistentId : U.consistentId(sortedAddrs, discPort); - metrics = metricsProvider.metrics(); - cacheMetrics = metricsProvider.cacheMetrics(); - sockAddrs = U.toSocketAddresses(this, discPort); + if (attrs == null) + attrs = Collections.emptyMap(); + + this.attrs = new HashMap<>(attrs); + + this.attrs.put(IgniteNodeAttributes.ATTR_CLIENT_MODE, client != null && client); } /** @@ -290,7 +328,7 @@ public Map getAttributes() { } /** {@inheritDoc} */ - @Override public Map cacheMetrics() { + @Override public @Nullable Map cacheMetrics() { if (metricsProvider != null) { Map cacheMetrics0 = metricsProvider.cacheMetrics(); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeCollectionMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeCollectionMessage.java index 35304fd68bd96..ce59d5d434301 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeCollectionMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeCollectionMessage.java @@ -72,6 +72,6 @@ public void clusterNodeMessages(Collection clusterNodeMsgs) /** {@inheritDoc} */ @Override public short directType() { - return 115; + return -110; } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeMessage.java index ff81142bc5717..5fc7f344843ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeMessage.java @@ -55,7 +55,7 @@ public class ClusterNodeMessage implements Message { /** */ @Order(value = 3, method = "clusterMetricsMessage") - private NodeMetricsMessage clusterMetricsMsg; + private TcpDiscoveryNodeMetricsMessage clusterMetricsMsg; /** */ @Order(value = 4) @@ -100,7 +100,7 @@ public ClusterNodeMessage(ClusterNode clusterNode) { addrs = clusterNode.addresses(); hostNames = clusterNode.hostNames(); if (clusterNode.metrics() != null) - clusterMetricsMsg = new NodeMetricsMessage(clusterNode.metrics()); + clusterMetricsMsg = new TcpDiscoveryNodeMetricsMessage(clusterNode.metrics()); order = clusterNode.order(); productVerMsg = new IgniteProductVersionMessage(clusterNode.version()); loc = clusterNode.isLocal(); @@ -111,7 +111,7 @@ public ClusterNodeMessage(ClusterNode clusterNode) { /** @param marsh Marshalled. */ public void prepareMarshal(Marshaller marsh) { - if (F.isEmpty(attrs) && attrsBytes == null) { + if (!F.isEmpty(attrs) && attrsBytes == null) { try { attrsBytes = U.marshal(marsh, attrs); } @@ -174,7 +174,7 @@ public NodeMetricsMessage clusterMetricsMessage() { } /** @param clusterMetricsMsg Node metrics message. */ - public void clusterMetricsMessage(NodeMetricsMessage clusterMetricsMsg) { + public void clusterMetricsMessage(TcpDiscoveryNodeMetricsMessage clusterMetricsMsg) { this.clusterMetricsMsg = clusterMetricsMsg; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index 76738ae283a7c..ab81f053e7a49 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -28,7 +28,6 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.ClusterMetricsSnapshot; -import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -77,6 +76,7 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableM private TcpDiscoveryNode node; /** Marshalled {@link #node}. */ + @Order(10) private byte[] nodeBytes; /** Current topology. Initialized by coordinator. */ @@ -84,7 +84,8 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableM private @Nullable Collection top; /** Marshalled {@link #top}. */ - private byte[] topBytes; + @Order(value = 11, method = "topologyBytes") + private @Nullable byte[] topBytes; /** */ @GridToStringInclude @@ -135,9 +136,11 @@ public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { super(msg); node = msg.node; + nodeBytes = msg.nodeBytes; pendingMsgs = msg.pendingMsgs; serializablePendingMsgs = msg.serializablePendingMsgs; top = msg.top; + topBytes = msg.topBytes; clientTop = msg.clientTop; topHistMsgs = msg.topHistMsgs; dataPacket = msg.dataPacket; @@ -232,6 +235,16 @@ public TcpDiscoveryNode node() { return node; } + /** @return Serialized {@link #node}. */ + public byte[] nodeBytes() { + return nodeBytes; + } + + /** @param nodeBytes Serialized {@link #node}. */ + public void nodeBytes(byte[] nodeBytes) { + this.nodeBytes = nodeBytes; + } + /** * @return Pending messages. * @see #messages() @@ -334,6 +347,17 @@ public void messages(@Nullable List msgs) { */ public void topology(@Nullable Collection top) { this.top = top; + topBytes = null; + } + + /** @return Serialized {@link #top}. */ + public @Nullable byte[] topologyBytes() { + return topBytes; + } + + /** @param topBytes Serialized {@link #top}. */ + public void topologyBytes(@Nullable byte[] topBytes) { + this.topBytes = topBytes; } /** @@ -342,7 +366,7 @@ public void topology(@Nullable Collection top) { public void clientTopology(Collection top) { assert top != null && !top.isEmpty() : top; - this.clientTop = top; + clientTop = top; } /** @@ -376,24 +400,21 @@ public void topologyHistoryMessages(@Nullable Map { Collection clusterNodeImpls = msgs.clusterNodeMessages().stream() .map(m -> { + assert m.clusterMetricsMessage() != null; + TcpDiscoveryNode tcpDiscoNode = new TcpDiscoveryNode( m.id(), + m.consistentId(), + m.order(), + m.local(), + m.client(), m.addresses(), m.hostNames(), - 0, - null, + m.attributes(), new IgniteProductVersion(m.productVersionMessage()), - m.consistentId() + new ClusterMetricsSnapshot(m.clusterMetricsMessage()) ); - tcpDiscoNode.order(m.order()); - tcpDiscoNode.local(m.local()); - tcpDiscoNode.setAttributes(m.attributes()); - tcpDiscoNode.getAttributes().put(IgniteNodeAttributes.ATTR_CLIENT_MODE, m.client()); - - if (m.clusterMetricsMessage() != null) - tcpDiscoNode.setMetrics(new ClusterMetricsSnapshot(m.clusterMetricsMessage())); - return tcpDiscoNode; }).collect(Collectors.toList()); @@ -409,7 +430,7 @@ public void topologyHistoryMessages(@Nullable Map> topHist) { - if (topHist == null) { + if (F.isEmpty(topHist)) { topHistMsgs = null; return; From 1064805091d948b62070e0970d3f06e2a31a2815 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 19 Feb 2026 18:31:15 +0300 Subject: [PATCH 05/10] refactoring. + dedicated if --- .../ignite/spi/discovery/tcp/ClientImpl.java | 8 +--- .../ignite/spi/discovery/tcp/ServerImpl.java | 25 +---------- .../discovery/tcp/TcpDiscoveryIoSession.java | 9 ++++ .../TcpDiscoveryMarshallableMessage.java | 41 +++++++++++++++++++ .../TcpDiscoveryNodeAddFinishedMessage.java | 7 ++-- .../TcpDiscoveryNodeAddedMessage.java | 35 +++++++++------- 6 files changed, 76 insertions(+), 49 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index fe702517dbe33..9e1714867d2d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -2160,12 +2160,8 @@ protected void processDiscoveryMessage(TcpDiscoveryAbstractMessage msg) { if (msg instanceof TraceableMessage) tracing.messages().beforeSend((TraceableMessage)msg); - if (msg instanceof TcpDiscoveryNodeAddedMessage) { - ((TcpDiscoveryNodeAddedMessage)msg) - .finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); - + if (msg instanceof TcpDiscoveryNodeAddedMessage) processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg); - } else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg); else if (msg instanceof TcpDiscoveryNodeLeftMessage) @@ -2311,8 +2307,6 @@ private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage ms delayDiscoData.clear(); } - msg.finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); - locNode.setAttributes(msg.clientNodeAttributes()); clearNodeSensitiveData(locNode); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index cce3fb5ea6f57..07fc5755eebb2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1897,8 +1897,6 @@ private void prepareNodeAddedMessage( } nodeAddedMsg.topologyHistory(hist); - - nodeAddedMsg.prepareMarshal(spi.marshaller()); } } } @@ -2484,8 +2482,6 @@ void add(TcpDiscoveryAbstractMessage msg) { // Do not need this data for client reconnect. if (addedMsg.gridDiscoveryData() != null) addedMsg.clearDiscoveryData(); - - addedMsg.prepareMarshal(spi.marshaller()); } else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg; @@ -2493,8 +2489,6 @@ else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { if (addFinishMsg.clientDiscoData() != null) { addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(addFinishMsg); - addFinishMsg.prepareMarshal(spi.marshaller()); - msg = addFinishMsg; DiscoveryDataPacket discoData = addFinishMsg.clientDiscoData(); @@ -3111,11 +3105,6 @@ protected void runTasks() { if (!locNode.id().equals(msg.senderNodeId()) && ensured) lastRingMsgTimeNanos = System.nanoTime(); - if (msg instanceof TcpDiscoveryNodeAddedMessage) { - ((TcpDiscoveryNodeAddedMessage)msg).finishUnmarshal(spi.marshaller(), - U.resolveClassLoader(spi.ignite().configuration())); - } - if (locNode.internalOrder() == 0) { boolean proc = false; @@ -3263,8 +3252,6 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) { } } - ((TcpDiscoveryNodeAddedMessage)msg).prepareMarshal(spi.marshaller()); - // TODO Investigate possible optimizations: https://issues.apache.org/jira/browse/IGNITE-27722 clientMsgWorker.addMessage(msg); } @@ -4846,8 +4833,6 @@ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { addFinishMsg.clientDiscoData(msg.gridDiscoveryData()); addFinishMsg.clientNodeAttributes(node.attributes()); - - addFinishMsg.prepareMarshal(spi.marshaller()); } addFinishMsg = tracing.messages().branch(addFinishMsg, msg); @@ -4874,11 +4859,8 @@ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { else if (!locNodeId.equals(node.id()) && ring.node(node.id()) != null) { // Local node already has node from message in local topology. // Just pass it to coordinator via the ring. - if (sendMessageToRemotes(msg)) { - msg.prepareMarshal(spi.marshaller()); - + if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); - } if (log.isDebugEnabled()) { log.debug("Local node already has node being added. Passing TcpDiscoveryNodeAddedMessage to " + @@ -5101,11 +5083,8 @@ else if (spiState == CONNECTING) processMessageFailedNodes(msg); } - if (sendMessageToRemotes(msg)) { - msg.prepareMarshal(spi.marshaller()); - + if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); - } } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index fa8d71e40f171..a77b44e413d6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -40,6 +40,7 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMarshallableMessage; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -140,6 +141,9 @@ void writeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException } try { + if (msg instanceof TcpDiscoveryMarshallableMessage) + ((TcpDiscoveryMarshallableMessage)msg).prepareMarshal(spi.marshaller()); + out.write(MESSAGE_SERIALIZATION); serializeMessage((Message)msg, out); @@ -212,6 +216,11 @@ T readMessage() throws IgniteCheckedException, IOException { } while (!finished); + if (msg instanceof TcpDiscoveryMarshallableMessage) { + ((TcpDiscoveryMarshallableMessage)msg).finishUnmarshal(spi.marshaller(), + U.resolveClassLoader(spi.ignite().configuration())); + } + return (T)msg; } catch (Exception e) { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java new file mode 100644 index 0000000000000..0cdd3621eee94 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * Base class for TCP Discovery messages which still require external pre- and post- serialization. + *
+ * TODO: Remove/revise after https://issues.apache.org/jira/browse/IGNITE-25883 + */ +public interface TcpDiscoveryMarshallableMessage extends Message { + /** @param marsh Marshaller. */ + default void prepareMarshal(Marshaller marsh) { + throw new UnsupportedOperationException("Marshalling is not required for discovery message " + getClass().getSimpleName()); + } + + /** + * @param marsh Marshaller. + * @param clsLdr Class loader. + */ + default void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { + throw new UnsupportedOperationException("Marshalling is not required for discovery message " + getClass().getSimpleName()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java index 7e75a16853d33..68afb1d4c01cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java @@ -27,7 +27,6 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.jetbrains.annotations.Nullable; @@ -36,7 +35,7 @@ */ @TcpDiscoveryEnsureDelivery @TcpDiscoveryRedirectToClient -public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractTraceableMessage implements Message { +public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractTraceableMessage implements TcpDiscoveryMarshallableMessage { /** */ private static final long serialVersionUID = 0L; @@ -151,7 +150,7 @@ public void clientNodeAttributesBytes(@Nullable byte[] clientNodeAttrsBytes) { /** * @param marsh Marshaller. */ - public void prepareMarshal(Marshaller marsh) { + @Override public void prepareMarshal(Marshaller marsh) { if (clientNodeAttrs != null && clientNodeAttrsBytes == null) { try { clientNodeAttrsBytes = U.marshal(marsh, clientNodeAttrs); @@ -166,7 +165,7 @@ public void prepareMarshal(Marshaller marsh) { * @param marsh Marshaller. * @param clsLdr Class loader. */ - public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { if (F.isEmpty(clientNodeAttrsBytes)) clientNodeAttrs = null; else { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index ab81f053e7a49..5e3e0fc990e19 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -42,6 +42,7 @@ import org.jetbrains.annotations.Nullable; /** + * TODO: Revise serialization of the {@link TcpDiscoveryNode} fields after https://issues.apache.org/jira/browse/IGNITE-27899 * Message telling nodes that new node should be added to topology. * When newly added node receives the message it connects to its next and finishes * join process. @@ -52,7 +53,7 @@ */ @TcpDiscoveryEnsureDelivery @TcpDiscoveryRedirectToClient -public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableMessage implements Message { +public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableMessage implements TcpDiscoveryMarshallableMessage { /** */ private static final long serialVersionUID = 0L; @@ -92,12 +93,13 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableM private transient Collection clientTop; /** - * TODO: Remove after refactoring of discovery messages serialization https://issues.apache.org/jira/browse/IGNITE-25883 + * TODO: Remove/revise after https://issues.apache.org/jira/browse/IGNITE-25883 * Java-serializable pending messages from previous node. */ private @Nullable Map serializablePendingMsgs; /** Marshalled {@link #serializablePendingMsgs}. */ + @Order(value = 12, method = "serializablePendingMessagesBytes") private byte[] serializablePendingMsgsBytes; /** Constructor for {@link DiscoveryMessageFactory}. */ @@ -137,21 +139,16 @@ public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { node = msg.node; nodeBytes = msg.nodeBytes; - pendingMsgs = msg.pendingMsgs; - serializablePendingMsgs = msg.serializablePendingMsgs; - top = msg.top; - topBytes = msg.topBytes; + pendingMessages(msg.pendingMsgs); + topology(msg.top); clientTop = msg.clientTop; topHistMsgs = msg.topHistMsgs; dataPacket = msg.dataPacket; gridStartTime = msg.gridStartTime; } - /** - * TODO: Revise after refactoring of TcpDiscoveryNode serialization https://issues.apache.org/jira/browse/IGNITE-27899 - * @param marsh marshaller. - */ - public void prepareMarshal(Marshaller marsh) { + /** @param marsh marshaller. */ + @Override public void prepareMarshal(Marshaller marsh) { if (!F.isEmpty(topHistMsgs)) topHistMsgs.values().forEach(m -> m.prepareMarshal(marsh)); @@ -184,11 +181,10 @@ public void prepareMarshal(Marshaller marsh) { } /** - * TODO: Revise after refactoring of TcpDiscoveryNode serialization https://issues.apache.org/jira/browse/IGNITE-27899 * @param marsh Marshaller. * @param clsLdr Class loader. */ - public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { if (!F.isEmpty(topHistMsgs)) topHistMsgs.values().forEach(m -> m.finishUnmarshal(marsh, clsLdr)); @@ -259,10 +255,20 @@ public Map pendingMessages() { */ public void pendingMessages(Map pendingMsgs) { this.pendingMsgs = pendingMsgs; + this.serializablePendingMsgsBytes = null; + } + + /** @return Bytes of {@link #serializablePendingMsgs}. */ + public @Nullable byte[] serializablePendingMessagesBytes() { + return serializablePendingMsgsBytes; + } + + /** @param serializablePendingMsgsBytes Bytes of {@link #serializablePendingMsgs}. */ + public void serializablePendingMessagesBytes(@Nullable byte[] serializablePendingMsgsBytes) { + this.serializablePendingMsgsBytes = serializablePendingMsgsBytes; } /** - * TODO: revise after refactoring of discovery messages serialization https://issues.apache.org/jira/browse/IGNITE-25883 * Gets pending messages sent to new node by its previous. * * @return Pending messages from previous node. @@ -299,7 +305,6 @@ public void pendingMessages(Map pendingMsgs) { } /** - * TODO: revise after refactoring of discovery messages serialization https://issues.apache.org/jira/browse/IGNITE-25883 * Sets pending messages to send to new node. * * @param msgs Pending messages to send to new node. From 4adf01a488977152bbf24b6569c2bdd6e0469880 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 19 Feb 2026 20:47:01 +0300 Subject: [PATCH 06/10] impl --- .../TcpDiscoveryMarshallableMessage.java | 8 +--- .../TcpDiscoveryNodeAddedMessage.java | 44 +++++++++++++++---- 2 files changed, 37 insertions(+), 15 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java index 0cdd3621eee94..de4ef13808dd4 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java @@ -27,15 +27,11 @@ */ public interface TcpDiscoveryMarshallableMessage extends Message { /** @param marsh Marshaller. */ - default void prepareMarshal(Marshaller marsh) { - throw new UnsupportedOperationException("Marshalling is not required for discovery message " + getClass().getSimpleName()); - } + void prepareMarshal(Marshaller marsh); /** * @param marsh Marshaller. * @param clsLdr Class loader. */ - default void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { - throw new UnsupportedOperationException("Marshalling is not required for discovery message " + getClass().getSimpleName()); - } + void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index 5e3e0fc990e19..d25e736bb0fa1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -178,6 +178,17 @@ public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { throw new IgniteException("Failed to marshal serializable pending messages.", e); } } + + if (F.isEmpty(pendingMsgs)) + return; + + for (Message msg : pendingMsgs.values()) { + if (msg instanceof TcpDiscoveryMarshallableMessage) { + TcpDiscoveryMarshallableMessage ms = (TcpDiscoveryMarshallableMessage)msg; + + ms.prepareMarshal(marsh); + } + } } /** @@ -220,6 +231,17 @@ public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { throw new IgniteException("Failed to unmarshal serializable pending messages.", e); } } + + if (F.isEmpty(pendingMsgs)) + return; + + for (Message msg : pendingMsgs.values()) { + if (msg instanceof TcpDiscoveryMarshallableMessage) { + TcpDiscoveryMarshallableMessage ms = (TcpDiscoveryMarshallableMessage)msg; + + ms.finishUnmarshal(marsh, clsLdr); + } + } } /** @@ -285,19 +307,22 @@ public void serializablePendingMessagesBytes(@Nullable byte[] serializablePendin List res = new ArrayList<>(totalSz); for (int i = 0; i < totalSz; ++i) { - Message m = pendingMsgs.get(i); + Message msg = pendingMsgs == null ? null : pendingMsgs.get(i); + + if (msg == null) { + assert serializablePendingMsgs != null; - if (m == null) { TcpDiscoveryAbstractMessage sm = serializablePendingMsgs.get(i); + assert sm != null; res.add(sm); } else { - assert serializablePendingMsgs.get(i) == null; - assert m instanceof TcpDiscoveryAbstractMessage; + assert serializablePendingMsgs == null || serializablePendingMsgs.get(i) == null; + assert msg instanceof TcpDiscoveryAbstractMessage; - res.add((TcpDiscoveryAbstractMessage)m); + res.add((TcpDiscoveryAbstractMessage)msg); } } @@ -312,6 +337,7 @@ public void serializablePendingMessagesBytes(@Nullable byte[] serializablePendin public void messages(@Nullable List msgs) { if (F.isEmpty(msgs)) { serializablePendingMsgs = null; + serializablePendingMsgsBytes = null; pendingMsgs = null; return; @@ -319,12 +345,12 @@ public void messages(@Nullable List msgs) { int idx = 0; - for (TcpDiscoveryAbstractMessage m : msgs) { - if (m instanceof Message) { + for (TcpDiscoveryAbstractMessage msg : msgs) { + if (msg instanceof Message) { if (pendingMsgs == null) pendingMsgs = U.newHashMap(msgs.size()); - pendingMsgs.put(idx++, (Message)m); + pendingMsgs.put(idx++, (Message)msg); continue; } @@ -332,7 +358,7 @@ public void messages(@Nullable List msgs) { if (serializablePendingMsgs == null) serializablePendingMsgs = U.newHashMap(msgs.size()); - serializablePendingMsgs.put(idx++, m); + serializablePendingMsgs.put(idx++, msg); } } From 330d359e5dcf684cfb966c28bb00534ac0f8293b Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 19 Feb 2026 20:48:15 +0300 Subject: [PATCH 07/10] Revert "impl" This reverts commit 4adf01a488977152bbf24b6569c2bdd6e0469880. --- .../TcpDiscoveryMarshallableMessage.java | 8 +++- .../TcpDiscoveryNodeAddedMessage.java | 44 ++++--------------- 2 files changed, 15 insertions(+), 37 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java index de4ef13808dd4..0cdd3621eee94 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java @@ -27,11 +27,15 @@ */ public interface TcpDiscoveryMarshallableMessage extends Message { /** @param marsh Marshaller. */ - void prepareMarshal(Marshaller marsh); + default void prepareMarshal(Marshaller marsh) { + throw new UnsupportedOperationException("Marshalling is not required for discovery message " + getClass().getSimpleName()); + } /** * @param marsh Marshaller. * @param clsLdr Class loader. */ - void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr); + default void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { + throw new UnsupportedOperationException("Marshalling is not required for discovery message " + getClass().getSimpleName()); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index d25e736bb0fa1..5e3e0fc990e19 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -178,17 +178,6 @@ public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { throw new IgniteException("Failed to marshal serializable pending messages.", e); } } - - if (F.isEmpty(pendingMsgs)) - return; - - for (Message msg : pendingMsgs.values()) { - if (msg instanceof TcpDiscoveryMarshallableMessage) { - TcpDiscoveryMarshallableMessage ms = (TcpDiscoveryMarshallableMessage)msg; - - ms.prepareMarshal(marsh); - } - } } /** @@ -231,17 +220,6 @@ public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { throw new IgniteException("Failed to unmarshal serializable pending messages.", e); } } - - if (F.isEmpty(pendingMsgs)) - return; - - for (Message msg : pendingMsgs.values()) { - if (msg instanceof TcpDiscoveryMarshallableMessage) { - TcpDiscoveryMarshallableMessage ms = (TcpDiscoveryMarshallableMessage)msg; - - ms.finishUnmarshal(marsh, clsLdr); - } - } } /** @@ -307,22 +285,19 @@ public void serializablePendingMessagesBytes(@Nullable byte[] serializablePendin List res = new ArrayList<>(totalSz); for (int i = 0; i < totalSz; ++i) { - Message msg = pendingMsgs == null ? null : pendingMsgs.get(i); - - if (msg == null) { - assert serializablePendingMsgs != null; + Message m = pendingMsgs.get(i); + if (m == null) { TcpDiscoveryAbstractMessage sm = serializablePendingMsgs.get(i); - assert sm != null; res.add(sm); } else { - assert serializablePendingMsgs == null || serializablePendingMsgs.get(i) == null; - assert msg instanceof TcpDiscoveryAbstractMessage; + assert serializablePendingMsgs.get(i) == null; + assert m instanceof TcpDiscoveryAbstractMessage; - res.add((TcpDiscoveryAbstractMessage)msg); + res.add((TcpDiscoveryAbstractMessage)m); } } @@ -337,7 +312,6 @@ public void serializablePendingMessagesBytes(@Nullable byte[] serializablePendin public void messages(@Nullable List msgs) { if (F.isEmpty(msgs)) { serializablePendingMsgs = null; - serializablePendingMsgsBytes = null; pendingMsgs = null; return; @@ -345,12 +319,12 @@ public void messages(@Nullable List msgs) { int idx = 0; - for (TcpDiscoveryAbstractMessage msg : msgs) { - if (msg instanceof Message) { + for (TcpDiscoveryAbstractMessage m : msgs) { + if (m instanceof Message) { if (pendingMsgs == null) pendingMsgs = U.newHashMap(msgs.size()); - pendingMsgs.put(idx++, (Message)msg); + pendingMsgs.put(idx++, (Message)m); continue; } @@ -358,7 +332,7 @@ public void messages(@Nullable List msgs) { if (serializablePendingMsgs == null) serializablePendingMsgs = U.newHashMap(msgs.size()); - serializablePendingMsgs.put(idx++, msg); + serializablePendingMsgs.put(idx++, m); } } From 1baa2f222a0cd316de238c1d87eec8c8892a551e Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 19 Feb 2026 20:48:23 +0300 Subject: [PATCH 08/10] Revert "refactoring. + dedicated if" This reverts commit 1064805091d948b62070e0970d3f06e2a31a2815. --- .../ignite/spi/discovery/tcp/ClientImpl.java | 8 +++- .../ignite/spi/discovery/tcp/ServerImpl.java | 25 ++++++++++- .../discovery/tcp/TcpDiscoveryIoSession.java | 9 ---- .../TcpDiscoveryMarshallableMessage.java | 41 ------------------- .../TcpDiscoveryNodeAddFinishedMessage.java | 7 ++-- .../TcpDiscoveryNodeAddedMessage.java | 35 +++++++--------- 6 files changed, 49 insertions(+), 76 deletions(-) delete mode 100644 modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 9e1714867d2d2..fe702517dbe33 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -2160,8 +2160,12 @@ protected void processDiscoveryMessage(TcpDiscoveryAbstractMessage msg) { if (msg instanceof TraceableMessage) tracing.messages().beforeSend((TraceableMessage)msg); - if (msg instanceof TcpDiscoveryNodeAddedMessage) + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + ((TcpDiscoveryNodeAddedMessage)msg) + .finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); + processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg); + } else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg); else if (msg instanceof TcpDiscoveryNodeLeftMessage) @@ -2307,6 +2311,8 @@ private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage ms delayDiscoData.clear(); } + msg.finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); + locNode.setAttributes(msg.clientNodeAttributes()); clearNodeSensitiveData(locNode); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 07fc5755eebb2..cce3fb5ea6f57 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1897,6 +1897,8 @@ private void prepareNodeAddedMessage( } nodeAddedMsg.topologyHistory(hist); + + nodeAddedMsg.prepareMarshal(spi.marshaller()); } } } @@ -2482,6 +2484,8 @@ void add(TcpDiscoveryAbstractMessage msg) { // Do not need this data for client reconnect. if (addedMsg.gridDiscoveryData() != null) addedMsg.clearDiscoveryData(); + + addedMsg.prepareMarshal(spi.marshaller()); } else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg; @@ -2489,6 +2493,8 @@ else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { if (addFinishMsg.clientDiscoData() != null) { addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(addFinishMsg); + addFinishMsg.prepareMarshal(spi.marshaller()); + msg = addFinishMsg; DiscoveryDataPacket discoData = addFinishMsg.clientDiscoData(); @@ -3105,6 +3111,11 @@ protected void runTasks() { if (!locNode.id().equals(msg.senderNodeId()) && ensured) lastRingMsgTimeNanos = System.nanoTime(); + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + ((TcpDiscoveryNodeAddedMessage)msg).finishUnmarshal(spi.marshaller(), + U.resolveClassLoader(spi.ignite().configuration())); + } + if (locNode.internalOrder() == 0) { boolean proc = false; @@ -3252,6 +3263,8 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) { } } + ((TcpDiscoveryNodeAddedMessage)msg).prepareMarshal(spi.marshaller()); + // TODO Investigate possible optimizations: https://issues.apache.org/jira/browse/IGNITE-27722 clientMsgWorker.addMessage(msg); } @@ -4833,6 +4846,8 @@ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { addFinishMsg.clientDiscoData(msg.gridDiscoveryData()); addFinishMsg.clientNodeAttributes(node.attributes()); + + addFinishMsg.prepareMarshal(spi.marshaller()); } addFinishMsg = tracing.messages().branch(addFinishMsg, msg); @@ -4859,8 +4874,11 @@ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { else if (!locNodeId.equals(node.id()) && ring.node(node.id()) != null) { // Local node already has node from message in local topology. // Just pass it to coordinator via the ring. - if (sendMessageToRemotes(msg)) + if (sendMessageToRemotes(msg)) { + msg.prepareMarshal(spi.marshaller()); + sendMessageAcrossRing(msg); + } if (log.isDebugEnabled()) { log.debug("Local node already has node being added. Passing TcpDiscoveryNodeAddedMessage to " + @@ -5083,8 +5101,11 @@ else if (spiState == CONNECTING) processMessageFailedNodes(msg); } - if (sendMessageToRemotes(msg)) + if (sendMessageToRemotes(msg)) { + msg.prepareMarshal(spi.marshaller()); + sendMessageAcrossRing(msg); + } } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index a77b44e413d6b..fa8d71e40f171 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -40,7 +40,6 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMarshallableMessage; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -141,9 +140,6 @@ void writeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException } try { - if (msg instanceof TcpDiscoveryMarshallableMessage) - ((TcpDiscoveryMarshallableMessage)msg).prepareMarshal(spi.marshaller()); - out.write(MESSAGE_SERIALIZATION); serializeMessage((Message)msg, out); @@ -216,11 +212,6 @@ T readMessage() throws IgniteCheckedException, IOException { } while (!finished); - if (msg instanceof TcpDiscoveryMarshallableMessage) { - ((TcpDiscoveryMarshallableMessage)msg).finishUnmarshal(spi.marshaller(), - U.resolveClassLoader(spi.ignite().configuration())); - } - return (T)msg; } catch (Exception e) { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java deleted file mode 100644 index 0cdd3621eee94..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.tcp.messages; - -import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.Message; - -/** - * Base class for TCP Discovery messages which still require external pre- and post- serialization. - *
- * TODO: Remove/revise after https://issues.apache.org/jira/browse/IGNITE-25883 - */ -public interface TcpDiscoveryMarshallableMessage extends Message { - /** @param marsh Marshaller. */ - default void prepareMarshal(Marshaller marsh) { - throw new UnsupportedOperationException("Marshalling is not required for discovery message " + getClass().getSimpleName()); - } - - /** - * @param marsh Marshaller. - * @param clsLdr Class loader. - */ - default void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { - throw new UnsupportedOperationException("Marshalling is not required for discovery message " + getClass().getSimpleName()); - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java index 68afb1d4c01cb..7e75a16853d33 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.jetbrains.annotations.Nullable; @@ -35,7 +36,7 @@ */ @TcpDiscoveryEnsureDelivery @TcpDiscoveryRedirectToClient -public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractTraceableMessage implements TcpDiscoveryMarshallableMessage { +public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractTraceableMessage implements Message { /** */ private static final long serialVersionUID = 0L; @@ -150,7 +151,7 @@ public void clientNodeAttributesBytes(@Nullable byte[] clientNodeAttrsBytes) { /** * @param marsh Marshaller. */ - @Override public void prepareMarshal(Marshaller marsh) { + public void prepareMarshal(Marshaller marsh) { if (clientNodeAttrs != null && clientNodeAttrsBytes == null) { try { clientNodeAttrsBytes = U.marshal(marsh, clientNodeAttrs); @@ -165,7 +166,7 @@ public void clientNodeAttributesBytes(@Nullable byte[] clientNodeAttrsBytes) { * @param marsh Marshaller. * @param clsLdr Class loader. */ - @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { + public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { if (F.isEmpty(clientNodeAttrsBytes)) clientNodeAttrs = null; else { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index 5e3e0fc990e19..ab81f053e7a49 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -42,7 +42,6 @@ import org.jetbrains.annotations.Nullable; /** - * TODO: Revise serialization of the {@link TcpDiscoveryNode} fields after https://issues.apache.org/jira/browse/IGNITE-27899 * Message telling nodes that new node should be added to topology. * When newly added node receives the message it connects to its next and finishes * join process. @@ -53,7 +52,7 @@ */ @TcpDiscoveryEnsureDelivery @TcpDiscoveryRedirectToClient -public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableMessage implements TcpDiscoveryMarshallableMessage { +public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableMessage implements Message { /** */ private static final long serialVersionUID = 0L; @@ -93,13 +92,12 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableM private transient Collection clientTop; /** - * TODO: Remove/revise after https://issues.apache.org/jira/browse/IGNITE-25883 + * TODO: Remove after refactoring of discovery messages serialization https://issues.apache.org/jira/browse/IGNITE-25883 * Java-serializable pending messages from previous node. */ private @Nullable Map serializablePendingMsgs; /** Marshalled {@link #serializablePendingMsgs}. */ - @Order(value = 12, method = "serializablePendingMessagesBytes") private byte[] serializablePendingMsgsBytes; /** Constructor for {@link DiscoveryMessageFactory}. */ @@ -139,16 +137,21 @@ public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { node = msg.node; nodeBytes = msg.nodeBytes; - pendingMessages(msg.pendingMsgs); - topology(msg.top); + pendingMsgs = msg.pendingMsgs; + serializablePendingMsgs = msg.serializablePendingMsgs; + top = msg.top; + topBytes = msg.topBytes; clientTop = msg.clientTop; topHistMsgs = msg.topHistMsgs; dataPacket = msg.dataPacket; gridStartTime = msg.gridStartTime; } - /** @param marsh marshaller. */ - @Override public void prepareMarshal(Marshaller marsh) { + /** + * TODO: Revise after refactoring of TcpDiscoveryNode serialization https://issues.apache.org/jira/browse/IGNITE-27899 + * @param marsh marshaller. + */ + public void prepareMarshal(Marshaller marsh) { if (!F.isEmpty(topHistMsgs)) topHistMsgs.values().forEach(m -> m.prepareMarshal(marsh)); @@ -181,10 +184,11 @@ public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { } /** + * TODO: Revise after refactoring of TcpDiscoveryNode serialization https://issues.apache.org/jira/browse/IGNITE-27899 * @param marsh Marshaller. * @param clsLdr Class loader. */ - @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { + public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { if (!F.isEmpty(topHistMsgs)) topHistMsgs.values().forEach(m -> m.finishUnmarshal(marsh, clsLdr)); @@ -255,20 +259,10 @@ public Map pendingMessages() { */ public void pendingMessages(Map pendingMsgs) { this.pendingMsgs = pendingMsgs; - this.serializablePendingMsgsBytes = null; - } - - /** @return Bytes of {@link #serializablePendingMsgs}. */ - public @Nullable byte[] serializablePendingMessagesBytes() { - return serializablePendingMsgsBytes; - } - - /** @param serializablePendingMsgsBytes Bytes of {@link #serializablePendingMsgs}. */ - public void serializablePendingMessagesBytes(@Nullable byte[] serializablePendingMsgsBytes) { - this.serializablePendingMsgsBytes = serializablePendingMsgsBytes; } /** + * TODO: revise after refactoring of discovery messages serialization https://issues.apache.org/jira/browse/IGNITE-25883 * Gets pending messages sent to new node by its previous. * * @return Pending messages from previous node. @@ -305,6 +299,7 @@ public void serializablePendingMessagesBytes(@Nullable byte[] serializablePendin } /** + * TODO: revise after refactoring of discovery messages serialization https://issues.apache.org/jira/browse/IGNITE-25883 * Sets pending messages to send to new node. * * @param msgs Pending messages to send to new node. From 9067b9b41c607655a788df6ab08af6f00eab5731 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 19 Feb 2026 21:41:20 +0300 Subject: [PATCH 09/10] fix the serialization --- .../ignite/spi/discovery/tcp/ServerImpl.java | 3 +- .../TcpDiscoveryNodeAddedMessage.java | 39 +++++++++++++++++-- 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index cce3fb5ea6f57..47a6f53e4d1e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2485,6 +2485,7 @@ void add(TcpDiscoveryAbstractMessage msg) { if (addedMsg.gridDiscoveryData() != null) addedMsg.clearDiscoveryData(); + // Update the marshallable data. addedMsg.prepareMarshal(spi.marshaller()); } else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { @@ -3263,8 +3264,6 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) { } } - ((TcpDiscoveryNodeAddedMessage)msg).prepareMarshal(spi.marshaller()); - // TODO Investigate possible optimizations: https://issues.apache.org/jira/browse/IGNITE-27722 clientMsgWorker.addMessage(msg); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index ab81f053e7a49..bec486ef6d89a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -98,6 +98,7 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableM private @Nullable Map serializablePendingMsgs; /** Marshalled {@link #serializablePendingMsgs}. */ + @Order(value = 12, method = "serializablePendingMessagesBytes") private byte[] serializablePendingMsgsBytes; /** Constructor for {@link DiscoveryMessageFactory}. */ @@ -137,10 +138,13 @@ public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { node = msg.node; nodeBytes = msg.nodeBytes; + pendingMsgs = msg.pendingMsgs; serializablePendingMsgs = msg.serializablePendingMsgs; - top = msg.top; - topBytes = msg.topBytes; + serializablePendingMsgsBytes = msg.serializablePendingMsgsBytes; + + topology(msg.topology()); + clientTop = msg.clientTop; topHistMsgs = msg.topHistMsgs; dataPacket = msg.dataPacket; @@ -181,6 +185,14 @@ public void prepareMarshal(Marshaller marsh) { throw new IgniteException("Failed to marshal serializable pending messages.", e); } } + + if (F.isEmpty(pendingMsgs)) + return; + + for (Message msg : pendingMsgs.values()) { + if (msg instanceof TcpDiscoveryNodeAddedMessage) + ((TcpDiscoveryNodeAddedMessage)msg).prepareMarshal(marsh); + } } /** @@ -224,6 +236,14 @@ public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { throw new IgniteException("Failed to unmarshal serializable pending messages.", e); } } + + if (F.isEmpty(pendingMsgs)) + return; + + for (Message msg : pendingMsgs.values()) { + if (msg instanceof TcpDiscoveryNodeAddedMessage) + ((TcpDiscoveryNodeAddedMessage)msg).finishUnmarshal(marsh, clsLdr); + } } /** @@ -279,7 +299,7 @@ public void pendingMessages(Map pendingMsgs) { List res = new ArrayList<>(totalSz); for (int i = 0; i < totalSz; ++i) { - Message m = pendingMsgs.get(i); + Message m = F.isEmpty(pendingMsgs) ? null : pendingMsgs.get(i); if (m == null) { TcpDiscoveryAbstractMessage sm = serializablePendingMsgs.get(i); @@ -288,7 +308,7 @@ public void pendingMessages(Map pendingMsgs) { res.add(sm); } else { - assert serializablePendingMsgs.get(i) == null; + assert serializablePendingMsgs == null || serializablePendingMsgs.get(i) == null; assert m instanceof TcpDiscoveryAbstractMessage; res.add((TcpDiscoveryAbstractMessage)m); @@ -308,6 +328,7 @@ public void messages(@Nullable List msgs) { if (F.isEmpty(msgs)) { serializablePendingMsgs = null; pendingMsgs = null; + serializablePendingMsgsBytes = null; return; } @@ -331,6 +352,16 @@ public void messages(@Nullable List msgs) { } } + /** @return Bytes of {@link #serializablePendingMsgs}. */ + public @Nullable byte[] serializablePendingMessagesBytes() { + return serializablePendingMsgsBytes; + } + + /** @param serializablePendingMsgsBytes Bytes of {@link #serializablePendingMsgs}. */ + public void serializablePendingMessagesBytes(@Nullable byte[] serializablePendingMsgsBytes) { + this.serializablePendingMsgsBytes = serializablePendingMsgsBytes; + } + /** * Gets topology. * From d4e6b2b3899dadc2001f637ba68cd5a36d8c0894 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 20 Feb 2026 11:27:14 +0300 Subject: [PATCH 10/10] fixes --- .../apache/ignite/spi/discovery/tcp/ServerImpl.java | 6 +++++- .../tcp/messages/TcpDiscoveryNodeAddedMessage.java | 12 +++--------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 47a6f53e4d1e4..fb181008d9fdc 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1898,6 +1898,7 @@ private void prepareNodeAddedMessage( nodeAddedMsg.topologyHistory(hist); + // Re-marshall the changed data. nodeAddedMsg.prepareMarshal(spi.marshaller()); } } @@ -2485,7 +2486,7 @@ void add(TcpDiscoveryAbstractMessage msg) { if (addedMsg.gridDiscoveryData() != null) addedMsg.clearDiscoveryData(); - // Update the marshallable data. + // Ensure that the required data is marshalled after the message creation. addedMsg.prepareMarshal(spi.marshaller()); } else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { @@ -2649,6 +2650,9 @@ private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUI msg0.topology(addedMsg.clientTopology()); + // Ensure that the chnaged data is remarshalled. + msg0.prepareMarshal(spi.marshaller()); + return msg0; } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index bec486ef6d89a..64eb8337f48ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -185,14 +185,6 @@ public void prepareMarshal(Marshaller marsh) { throw new IgniteException("Failed to marshal serializable pending messages.", e); } } - - if (F.isEmpty(pendingMsgs)) - return; - - for (Message msg : pendingMsgs.values()) { - if (msg instanceof TcpDiscoveryNodeAddedMessage) - ((TcpDiscoveryNodeAddedMessage)msg).prepareMarshal(marsh); - } } /** @@ -325,14 +317,16 @@ public void pendingMessages(Map pendingMsgs) { * @param msgs Pending messages to send to new node. */ public void messages(@Nullable List msgs) { + serializablePendingMsgsBytes = null; + if (F.isEmpty(msgs)) { serializablePendingMsgs = null; pendingMsgs = null; - serializablePendingMsgsBytes = null; return; } + // Keeps the original message order as in a list. int idx = 0; for (TcpDiscoveryAbstractMessage m : msgs) {