diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 753f00569c5cf..7a799ad101ab3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -17,7 +17,10 @@ package org.apache.ignite.internal.managers.discovery; +import org.apache.ignite.internal.codegen.ClusterNodeCollectionMessageSerializer; +import org.apache.ignite.internal.codegen.ClusterNodeMessageSerializer; import org.apache.ignite.internal.codegen.DiscoveryDataPacketSerializer; +import org.apache.ignite.internal.codegen.IgniteProductVersionMessageSerializer; import org.apache.ignite.internal.codegen.InetAddressMessageSerializer; import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer; import org.apache.ignite.internal.codegen.NodeSpecificDataSerializer; @@ -37,6 +40,7 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryMetricsUpdateMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryNodeAddFinishedMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryNodeAddedMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryNodeFailedMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryNodeFullMetricsMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryNodeLeftMessageSerializer; @@ -48,6 +52,9 @@ import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; +import org.apache.ignite.spi.discovery.tcp.messages.ClusterNodeCollectionMessage; +import org.apache.ignite.spi.discovery.tcp.messages.ClusterNodeMessage; +import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessage; import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage; import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage; import org.apache.ignite.spi.discovery.tcp.messages.NodeSpecificData; @@ -67,6 +74,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; @@ -80,6 +88,9 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { /** {@inheritDoc} */ @Override public void registerAll(MessageFactory factory) { + factory.register((short)-110, ClusterNodeCollectionMessage::new, new ClusterNodeCollectionMessageSerializer()); + factory.register((short)-109, ClusterNodeMessage::new, new ClusterNodeMessageSerializer()); + factory.register((short)-108, IgniteProductVersionMessage::new, new IgniteProductVersionMessageSerializer()); factory.register((short)-107, NodeSpecificData::new, new NodeSpecificDataSerializer()); factory.register((short)-106, DiscoveryDataPacket::new, new DiscoveryDataPacketSerializer()); factory.register((short)-105, TcpDiscoveryNodeFullMetricsMessage::new, @@ -110,5 +121,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)17, TcpDiscoveryNodeFailedMessage::new, new TcpDiscoveryNodeFailedMessageSerializer()); factory.register((short)18, TcpDiscoveryStatusCheckMessage::new, new TcpDiscoveryStatusCheckMessageSerializer()); factory.register((short)19, TcpDiscoveryNodeAddFinishedMessage::new, new TcpDiscoveryNodeAddFinishedMessageSerializer()); + factory.register((short)20, TcpDiscoveryNodeAddedMessage::new, new TcpDiscoveryNodeAddedMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusMetaIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusMetaIO.java index edcfefc4f8890..7ab88b3809182 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusMetaIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusMetaIO.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.util.GridStringBuilder; import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessage; /** * IO routines for B+Tree meta pages. @@ -303,7 +304,7 @@ public IgniteProductVersion createdVersion(long pageAddr) { PageUtils.getByte(pageAddr, CREATED_VER_OFFSET + 1), PageUtils.getByte(pageAddr, CREATED_VER_OFFSET + 2), PageUtils.getLong(pageAddr, CREATED_VER_OFFSET + 3), - PageUtils.getBytes(pageAddr, CREATED_VER_OFFSET + 11, IgniteProductVersion.REV_HASH_SIZE)); + PageUtils.getBytes(pageAddr, CREATED_VER_OFFSET + 11, IgniteProductVersionMessage.REV_HASH_SIZE)); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java index de920fddadf31..5fb2d97a02d13 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java @@ -120,6 +120,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessage; import org.apache.ignite.spi.encryption.EncryptionSpi; import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi; import org.jetbrains.annotations.Nullable; @@ -850,7 +851,7 @@ WALRecord readPlainRecord(RecordType type, ByteBufferBackedDataInput in, long flags = in.readLong(); - byte[] revHash = new byte[IgniteProductVersion.REV_HASH_SIZE]; + byte[] revHash = new byte[IgniteProductVersionMessage.REV_HASH_SIZE]; byte maj = in.readByte(); byte min = in.readByte(); byte maint = in.readByte(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java index 7bba244fc7fc6..55414ebe7f6c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java @@ -24,6 +24,7 @@ import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.processors.cache.CacheMetricsSnapshot; +import org.jetbrains.annotations.Nullable; /** * @@ -33,13 +34,13 @@ class ClusterNodeMetrics { private final ClusterMetrics nodeMetrics; /** */ - private final Map cacheMetrics; + private final @Nullable Map cacheMetrics; /** * @param nodeMetrics Node metrics. * @param cacheMetrics Cache metrics. */ - ClusterNodeMetrics(ClusterMetrics nodeMetrics, Map cacheMetrics) { + ClusterNodeMetrics(ClusterMetrics nodeMetrics, @Nullable Map cacheMetrics) { this.nodeMetrics = nodeMetrics; this.cacheMetrics = cacheMetrics; } diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java index 1c78694550bd5..3efda7095caa4 100644 --- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java +++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java @@ -27,6 +27,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteVersionUtils; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessage; import org.jetbrains.annotations.NotNull; /** @@ -41,39 +42,26 @@ public class IgniteProductVersion implements Comparable, E /** */ private static final long serialVersionUID = 0L; - /** Size of the {@link #revHash }*/ - public static final int REV_HASH_SIZE = 20; - /** Size in bytes of serialized: 3 bytes (maj, min, maintenance version), 8 bytes - timestamp */ - public static final int SIZE_IN_BYTES = 3 + 8 + REV_HASH_SIZE; + public static final int SIZE_IN_BYTES = 3 + 8 + IgniteProductVersionMessage.REV_HASH_SIZE; /** Regexp parse pattern. */ private static final Pattern VER_PATTERN = Pattern.compile("(\\d+)\\.(\\d+)\\.(\\d+)([-.]([^0123456789][^-]+)(-SNAPSHOT)?)?(-(\\d+))?(-([\\da-f]+))?"); - /** Major version number. */ - private byte major; - - /** Minor version number. */ - private byte minor; - - /** Maintenance version number. */ - private byte maintenance; - - /** Stage of development. */ - private String stage; - - /** Revision timestamp. */ - private long revTs; - - /** Revision hash. */ - private byte[] revHash; + /** The values holding message. */ + private IgniteProductVersionMessage productVerMsg; /** * Empty constructor required by {@link Externalizable}. */ public IgniteProductVersion() { - // No-op. + productVerMsg = new IgniteProductVersionMessage(); + } + + /** @param productVerMsg Product version message. */ + public IgniteProductVersion(IgniteProductVersionMessage productVerMsg) { + this.productVerMsg = productVerMsg; } /** @@ -96,17 +84,17 @@ public IgniteProductVersion(byte major, byte minor, byte maintenance, long revTs * @param revHash Revision hash. */ public IgniteProductVersion(byte major, byte minor, byte maintenance, String stage, long revTs, byte[] revHash) { - if (revHash != null && revHash.length != REV_HASH_SIZE) { + productVerMsg = new IgniteProductVersionMessage(major, minor, maintenance, stage, revTs, revHash); + + if (revHash != null && revHash.length != IgniteProductVersionMessage.REV_HASH_SIZE) { throw new IllegalArgumentException("Invalid length for SHA1 hash (must be " - + REV_HASH_SIZE + "): " + revHash.length); + + IgniteProductVersionMessage.REV_HASH_SIZE + "): " + revHash.length); } + } - this.major = major; - this.minor = minor; - this.maintenance = maintenance; - this.stage = stage; - this.revTs = revTs; - this.revHash = revHash != null ? revHash : new byte[REV_HASH_SIZE]; + /** @return {@link IgniteProductVersionMessage}. */ + public IgniteProductVersionMessage message() { + return productVerMsg; } /** @@ -115,7 +103,7 @@ public IgniteProductVersion(byte major, byte minor, byte maintenance, String sta * @return Major version number. */ public byte major() { - return major; + return productVerMsg.major(); } /** @@ -124,7 +112,7 @@ public byte major() { * @return Minor version number. */ public byte minor() { - return minor; + return productVerMsg.minor(); } /** @@ -133,14 +121,14 @@ public byte minor() { * @return Maintenance version number. */ public byte maintenance() { - return maintenance; + return productVerMsg.maintenance(); } /** * @return Stage of development. */ public String stage() { - return stage; + return productVerMsg.stage(); } /** @@ -149,7 +137,7 @@ public String stage() { * @return Revision timestamp. */ public long revisionTimestamp() { - return revTs; + return productVerMsg.revisionTimestamp(); } /** @@ -158,7 +146,7 @@ public long revisionTimestamp() { * @return Revision hash. */ public byte[] revisionHash() { - return revHash; + return productVerMsg.revisionHash(); } /** @@ -167,7 +155,7 @@ public byte[] revisionHash() { * @return Release date. */ public Date releaseDate() { - return new Date(revTs * 1000); + return new Date(revisionTimestamp() * 1000); } /** @@ -178,31 +166,31 @@ public Date releaseDate() { */ public boolean greaterThanEqual(int major, int minor, int maintenance) { // NOTE: Unknown version is less than any other version. - if (major == this.major) - return minor == this.minor ? this.maintenance >= maintenance : this.minor > minor; + if (major == major()) + return minor == minor() ? maintenance() >= maintenance : minor() > minor; else - return this.major > major; + return major() > major; } /** {@inheritDoc} */ @Override public int compareTo(@NotNull IgniteProductVersion o) { // NOTE: Unknown version is less than any other version. - int res = Integer.compare(major, o.major); + int res = Integer.compare(major(), o.major()); if (res != 0) return res; - res = Integer.compare(minor, o.minor); + res = Integer.compare(minor(), o.minor()); if (res != 0) return res; - res = Integer.compare(maintenance, o.maintenance); + res = Integer.compare(maintenance(), o.maintenance()); if (res != 0) return res; - return Long.compare(revTs, o.revTs); + return Long.compare(revisionTimestamp(), o.revisionTimestamp()); } /** @@ -210,17 +198,17 @@ public boolean greaterThanEqual(int major, int minor, int maintenance) { * @return Compare result. */ public int compareToIgnoreTimestamp(@NotNull IgniteProductVersion o) { - int res = Integer.compare(major, o.major); + int res = Integer.compare(major(), o.major()); if (res != 0) return res; - res = Integer.compare(minor, o.minor); + res = Integer.compare(minor(), o.minor()); if (res != 0) return res; - return Integer.compare(maintenance, o.maintenance); + return Integer.compare(maintenance(), o.maintenance()); } /** {@inheritDoc} */ @@ -233,47 +221,50 @@ public int compareToIgnoreTimestamp(@NotNull IgniteProductVersion o) { IgniteProductVersion that = (IgniteProductVersion)o; - return revTs == that.revTs && maintenance == that.maintenance && minor == that.minor && major == that.major; + return revisionTimestamp() == that.revisionTimestamp() && maintenance() == that.maintenance() + && minor() == that.minor() && major() == that.major(); } /** {@inheritDoc} */ @Override public int hashCode() { - int res = major; + int res = major(); - res = 31 * res + minor; - res = 31 * res + maintenance; - res = 31 * res + (int)(revTs ^ (revTs >>> 32)); + res = 31 * res + minor(); + res = 31 * res + maintenance(); + res = 31 * res + (int)(revisionTimestamp() ^ (revisionTimestamp() >>> 32)); return res; } /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeByte(major); - out.writeByte(minor); - out.writeByte(maintenance); - out.writeLong(revTs); - U.writeByteArray(out, revHash); + out.writeByte(major()); + out.writeByte(minor()); + out.writeByte(maintenance()); + out.writeLong(revisionTimestamp()); + U.writeByteArray(out, revisionHash()); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - major = in.readByte(); - minor = in.readByte(); - maintenance = in.readByte(); - revTs = in.readLong(); - revHash = U.readByteArray(in); + assert productVerMsg != null; + + productVerMsg.major(in.readByte()); + productVerMsg.minor(in.readByte()); + productVerMsg.maintenance(in.readByte()); + productVerMsg.revisionTimestamp(in.readLong()); + productVerMsg.revisionHash(U.readByteArray(in)); } /** {@inheritDoc} */ @Override public String toString() { - String revTsStr = IgniteVersionUtils.formatBuildTimeStamp(revTs * 1000); + String revTsStr = IgniteVersionUtils.formatBuildTimeStamp(revisionTimestamp() * 1000); - String hash = U.byteArray2HexString(revHash).toLowerCase(); + String hash = U.byteArray2HexString(revisionHash()).toLowerCase(); hash = hash.length() > 8 ? hash.substring(0, 8) : hash; - return major + "." + minor + "." + maintenance + "#" + revTsStr + "-sha1:" + hash; + return major() + "." + minor() + "." + maintenance() + "#" + revTsStr + "-sha1:" + hash; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 2e96845ff0332..fe702517dbe33 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -2160,8 +2160,12 @@ protected void processDiscoveryMessage(TcpDiscoveryAbstractMessage msg) { if (msg instanceof TraceableMessage) tracing.messages().beforeSend((TraceableMessage)msg); - if (msg instanceof TcpDiscoveryNodeAddedMessage) + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + ((TcpDiscoveryNodeAddedMessage)msg) + .finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); + processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg); + } else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg); else if (msg instanceof TcpDiscoveryNodeLeftMessage) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 2f6d431fa9a4b..fb181008d9fdc 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1877,7 +1877,7 @@ private void prepareNodeAddedMessage( nodeAddedMsg.topology(topToSnd); - Collection msgs0 = null; + List msgs0 = null; if (msgs != null) { msgs0 = new ArrayList<>(msgs.size()); @@ -1897,6 +1897,9 @@ private void prepareNodeAddedMessage( } nodeAddedMsg.topologyHistory(hist); + + // Re-marshall the changed data. + nodeAddedMsg.prepareMarshal(spi.marshaller()); } } } @@ -2482,6 +2485,9 @@ void add(TcpDiscoveryAbstractMessage msg) { // Do not need this data for client reconnect. if (addedMsg.gridDiscoveryData() != null) addedMsg.clearDiscoveryData(); + + // Ensure that the required data is marshalled after the message creation. + addedMsg.prepareMarshal(spi.marshaller()); } else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg; @@ -2644,6 +2650,9 @@ private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUI msg0.topology(addedMsg.clientTopology()); + // Ensure that the chnaged data is remarshalled. + msg0.prepareMarshal(spi.marshaller()); + return msg0; } } @@ -3107,6 +3116,11 @@ protected void runTasks() { if (!locNode.id().equals(msg.senderNodeId()) && ensured) lastRingMsgTimeNanos = System.nanoTime(); + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + ((TcpDiscoveryNodeAddedMessage)msg).finishUnmarshal(spi.marshaller(), + U.resolveClassLoader(spi.ignite().configuration())); + } + if (locNode.internalOrder() == 0) { boolean proc = false; @@ -4863,8 +4877,11 @@ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { else if (!locNodeId.equals(node.id()) && ring.node(node.id()) != null) { // Local node already has node from message in local topology. // Just pass it to coordinator via the ring. - if (sendMessageToRemotes(msg)) + if (sendMessageToRemotes(msg)) { + msg.prepareMarshal(spi.marshaller()); + sendMessageAcrossRing(msg); + } if (log.isDebugEnabled()) { log.debug("Local node already has node being added. Passing TcpDiscoveryNodeAddedMessage to " + @@ -5087,8 +5104,11 @@ else if (spiState == CONNECTING) processMessageFailedNodes(msg); } - if (sendMessageToRemotes(msg)) + if (sendMessageToRemotes(msg)) { + msg.prepareMarshal(spi.marshaller()); + sendMessageAcrossRing(msg); + } } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index 89f1f492e9272..ea9bd55c57d3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -94,7 +94,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Ignite /** Node cache metrics. */ @GridToStringExclude - private volatile Map cacheMetrics; + private volatile @Nullable Map cacheMetrics; /** Node order in the topology. */ private volatile long order; @@ -162,35 +162,73 @@ public TcpDiscoveryNode() { * @param ver Version. * @param consistentId Node consistent ID. */ - public TcpDiscoveryNode(UUID id, + public TcpDiscoveryNode( + UUID id, Collection addrs, Collection hostNames, int discPort, DiscoveryMetricsProvider metricsProvider, IgniteProductVersion ver, - Serializable consistentId + @Nullable Serializable consistentId + ) { + this(id, consistentId, 0, false, null, addrs, hostNames, null, ver, metricsProvider.metrics()); + + this.discPort = discPort; + this.metricsProvider = metricsProvider; + cacheMetrics = metricsProvider.cacheMetrics(); + sockAddrs = U.toSocketAddresses(this, discPort); + } + + /** + * Constructor to implement {@link ClusterNode}. + * + * @param id Node id. + * @param consistentId Consistent id. + * @param order Node order. + * @param loc Local node flag. + * @param client Client node flag. + * @param addrs Node addresses. + * @param hostNames Node host names. + * @param attrs Node attributes. + * @param ver Version. + * @param metrics Node metrics. + */ + public TcpDiscoveryNode( + UUID id, + @Nullable Serializable consistentId, + long order, + boolean loc, + @Nullable Boolean client, + Collection addrs, + Collection hostNames, + @Nullable Map attrs, + IgniteProductVersion ver, + ClusterMetrics metrics ) { assert id != null; - assert metricsProvider != null; assert ver != null; + assert metrics != null; this.id = id; + this.order = order; + this.loc = loc; + this.hostNames = hostNames; + this.ver = ver; + this.metrics = metrics; List sortedAddrs = new ArrayList<>(addrs); - Collections.sort(sortedAddrs); this.addrs = sortedAddrs; - this.hostNames = hostNames; - this.discPort = discPort; - this.metricsProvider = metricsProvider; - this.ver = ver; this.consistentId = consistentId != null ? consistentId : U.consistentId(sortedAddrs, discPort); - metrics = metricsProvider.metrics(); - cacheMetrics = metricsProvider.cacheMetrics(); - sockAddrs = U.toSocketAddresses(this, discPort); + if (attrs == null) + attrs = Collections.emptyMap(); + + this.attrs = new HashMap<>(attrs); + + this.attrs.put(IgniteNodeAttributes.ATTR_CLIENT_MODE, client != null && client); } /** @@ -290,7 +328,7 @@ public Map getAttributes() { } /** {@inheritDoc} */ - @Override public Map cacheMetrics() { + @Override public @Nullable Map cacheMetrics() { if (metricsProvider != null) { Map cacheMetrics0 = metricsProvider.cacheMetrics(); @@ -463,7 +501,7 @@ public void visible(boolean visible) { /** {@inheritDoc} */ @Override public boolean isClient() { if (!cacheCliInit) { - Boolean clientModeAttr = ((ClusterNode)this).attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE); + Boolean clientModeAttr = attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE); cacheCli = clientModeAttr != null && clientModeAttr; @@ -601,10 +639,7 @@ public TcpDiscoveryNode clientReconnectNode(Map nodeAttrs) { // Legacy: Cache metrics int size = in.readInt(); - for (int i = 0; i < size; i++) { - in.readInt(); - in.readObject(); - } + assert size == 0; order = in.readLong(); intOrder = in.readLong(); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeCollectionMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeCollectionMessage.java new file mode 100644 index 0000000000000..ce59d5d434301 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeCollectionMessage.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import java.util.Collection; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * Container message for a collection of {@link ClusterNodeMessage}. + *
+ * Requires pre- and post- marshalling. + * + * @see #prepareMarshal(Marshaller) + * @see #prepareMarshal(Marshaller) + */ +public class ClusterNodeCollectionMessage implements Message { + /** The collection of wrapped {@link ClusterNodeMessage}. */ + @Order(value = 0, method = "clusterNodeMessages") + private Collection clusterNodeMsgs; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public ClusterNodeCollectionMessage() { + // No-op. + } + + /** @param clusterNodeMsgs Holder messages of {@link ClusterNode}. */ + public ClusterNodeCollectionMessage(Collection clusterNodeMsgs) { + this.clusterNodeMsgs = clusterNodeMsgs; + } + + /** @param marsh Marshalled. */ + public void prepareMarshal(Marshaller marsh) { + clusterNodeMsgs.forEach(msg -> msg.prepareMarshal(marsh)); + } + + /** + * @param marsh Marshalled. + * @param clsLdr Class loader. + */ + public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { + clusterNodeMsgs.forEach(msg -> msg.finishUnmarshal(marsh, clsLdr)); + } + + /** @return Holder messages of {@link ClusterNode}. */ + public Collection clusterNodeMessages() { + return clusterNodeMsgs; + } + + /** @param clusterNodeMsgs Holder messages of {@link ClusterNode}. */ + public void clusterNodeMessages(Collection clusterNodeMsgs) { + this.clusterNodeMsgs = clusterNodeMsgs; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -110; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeMessage.java new file mode 100644 index 0000000000000..5fc7f344843ee --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeMessage.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * Message for {@link ClusterNode}. Requites pre- and post- serialization with the class loader. + *
+ * Requires pre- and -post marshalling. + * + * @see #prepareMarshal(Marshaller) + * @see #finishUnmarshal(Marshaller, ClassLoader) + */ +public class ClusterNodeMessage implements Message { + /** Node ID. */ + @Order(0) + private UUID id; + + /** Internal discovery addresses as strings. */ + @Order(value = 1, method = "addresses") + private Collection addrs; + + /** Internal discovery host names as strings. */ + @Order(2) + private Collection hostNames; + + /** */ + @Order(value = 3, method = "clusterMetricsMessage") + private TcpDiscoveryNodeMetricsMessage clusterMetricsMsg; + + /** */ + @Order(value = 4) + private long order; + + /** */ + @Order(value = 5, method = "productVersionMessage") + private IgniteProductVersionMessage productVerMsg; + + /** Grid local node flag (transient). */ + @Order(value = 6, method = "local") + private boolean loc; + + /** */ + @Order(7) + private boolean client; + + /** */ + @Order(8) + private String dataCenterId; + + /** Consistent ID. */ + private Serializable consistentId; + + /** */ + private byte[] consistentIdBytes; + + /** Node attributes. */ + private Map attrs; + + /** */ + private byte[] attrsBytes; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public ClusterNodeMessage() { + // No-op. + } + + /** @param clusterNode Cluster node. */ + public ClusterNodeMessage(ClusterNode clusterNode) { + id = clusterNode.id(); + addrs = clusterNode.addresses(); + hostNames = clusterNode.hostNames(); + if (clusterNode.metrics() != null) + clusterMetricsMsg = new TcpDiscoveryNodeMetricsMessage(clusterNode.metrics()); + order = clusterNode.order(); + productVerMsg = new IgniteProductVersionMessage(clusterNode.version()); + loc = clusterNode.isLocal(); + client = clusterNode.isClient(); + dataCenterId = clusterNode.dataCenterId(); + attrs = clusterNode.attributes(); + } + + /** @param marsh Marshalled. */ + public void prepareMarshal(Marshaller marsh) { + if (!F.isEmpty(attrs) && attrsBytes == null) { + try { + attrsBytes = U.marshal(marsh, attrs); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal cluster node attributes.", e); + } + } + + if (consistentId != null && consistentIdBytes == null) { + try { + consistentIdBytes = U.marshal(marsh, consistentId); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal cluster node's consistent id.", e); + } + } + } + + /** + * @param marsh Marshalled. + * @param clsLdr Class loader. + */ + public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { + if (attrsBytes != null && F.isEmpty(attrs)) { + try { + attrs = U.unmarshal(marsh, attrsBytes, clsLdr); + + attrsBytes = null; + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to unmarshal cluster node attributes.", e); + } + } + + if (consistentIdBytes != null && consistentId == null) { + try { + consistentId = U.unmarshal(marsh, consistentIdBytes, clsLdr); + + consistentIdBytes = null; + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to cluster node's consistent id.", e); + } + } + } + + /** @return Addresses. */ + public Collection addresses() { + return addrs; + } + + /** @param addrs Addresses. */ + public void addresses(Collection addrs) { + this.addrs = addrs; + } + + /** @return Node metrics message. */ + public NodeMetricsMessage clusterMetricsMessage() { + return clusterMetricsMsg; + } + + /** @param clusterMetricsMsg Node metrics message. */ + public void clusterMetricsMessage(TcpDiscoveryNodeMetricsMessage clusterMetricsMsg) { + this.clusterMetricsMsg = clusterMetricsMsg; + } + + /** @return Client flag. */ + public boolean client() { + return client; + } + + /** @param client Client flag. */ + public void client(boolean client) { + this.client = client; + } + + /** @return Datacenter id. */ + public String dataCenterId() { + return dataCenterId; + } + + /** @param dataCenterId Datacenter id. */ + public void dataCenterId(String dataCenterId) { + this.dataCenterId = dataCenterId; + } + + /** @return Host names. */ + public Collection hostNames() { + return hostNames; + } + + /** @param hostNames Host names. */ + public void hostNames(Collection hostNames) { + this.hostNames = hostNames; + } + + /** @return Node id. */ + public UUID id() { + return id; + } + + /** @param id Node id. */ + public void id(UUID id) { + this.id = id; + } + + /** @return Node order. */ + public long order() { + return order; + } + + /** @param order Node order. */ + public void order(long order) { + this.order = order; + } + + /** @return Local node flag. */ + public boolean local() { + return loc; + } + + /** @param loc Local node flag. */ + public void local(boolean loc) { + this.loc = loc; + } + + /** @return Product version. */ + public IgniteProductVersionMessage productVersionMessage() { + return productVerMsg; + } + + /** @param productVerMsg Product version. */ + public void productVersionMessage(IgniteProductVersionMessage productVerMsg) { + this.productVerMsg = productVerMsg; + } + + /** @return Node consistent id. */ + public Serializable consistentId() { + return consistentId; + } + + /** @param consistentId Node consistent id. */ + public void consistentId(Serializable consistentId) { + this.consistentId = consistentId; + } + + /** @return Node's attributes. */ + public Map attributes() { + return attrs; + } + + /** @param attrs Node's attributes. */ + public void attributes(Map attrs) { + this.attrs = attrs; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -109; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/IgniteProductVersionMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/IgniteProductVersionMessage.java new file mode 100644 index 0000000000000..bdf1da1e1b8aa --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/IgniteProductVersionMessage.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** Message for {@link IgniteProductVersion}.*/ +public class IgniteProductVersionMessage implements Message { + /** Size of the {@link #revHash }*/ + public static final int REV_HASH_SIZE = 20; + + /** Major version number. */ + @Order(0) + private byte major; + + /** Minor version number. */ + @Order(1) + private byte minor; + + /** Maintenance version number. */ + @Order(2) + private byte maintenance; + + /** Stage of development. */ + @Order(3) + private String stage; + + /** Revision timestamp. */ + @Order(value = 4, method = "revisionTimestamp") + private long revTs; + + /** Revision hash. */ + @Order(value = 5, method = "revisionHash") + private byte[] revHash; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public IgniteProductVersionMessage() { + // No-op. + } + + /** + * @param major Major version. + * @param minor Minor version. + * @param maintenance Maintenance. + * @param stage Stage. + * @param revTs Revision timestamp. + * @param revHash Revision hash. + */ + public IgniteProductVersionMessage(byte major, byte minor, byte maintenance, String stage, long revTs, byte[] revHash) { + this.major = major; + this.minor = minor; + this.maintenance = maintenance; + this.stage = stage; + this.revTs = revTs; + this.revHash = revHash != null ? revHash : new byte[REV_HASH_SIZE]; + } + + /** @param ver Product version. */ + public IgniteProductVersionMessage(IgniteProductVersion ver) { + this( + ver.major(), + ver.minor(), + ver.maintenance(), + ver.stage(), + ver.revisionTimestamp(), + ver.revisionHash() + ); + } + + /** @return Maintenance. */ + public byte maintenance() { + return maintenance; + } + + /** @param maintenance Maintenance. */ + public void maintenance(byte maintenance) { + this.maintenance = maintenance; + } + + /** @return Major version. */ + public byte major() { + return major; + } + + /** @param major Major version. */ + public void major(byte major) { + this.major = major; + } + + /** @return Minor version. */ + public byte minor() { + return minor; + } + + /** @param minor Minor version. */ + public void minor(byte minor) { + this.minor = minor; + } + + /** @return Revision hash. */ + public byte[] revisionHash() { + return revHash; + } + + /** @param revHash Revision hash. */ + public void revisionHash(byte[] revHash) { + this.revHash = revHash; + } + + /** @return Revision timestamp. */ + public long revisionTimestamp() { + return revTs; + } + + /** @param revTs Revision timestamp. */ + public void revisionTimestamp(long revTs) { + this.revTs = revTs; + } + + /** @return Statge. */ + public String stage() { + return stage; + } + + /** @param stage Stage. */ + public void stage(String stage) { + this.stage = stage; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -108; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index 36540d8b7dfc1..64eb8337f48ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -17,12 +17,26 @@ package org.apache.ignite.spi.discovery.tcp.messages; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.ClusterMetricsSnapshot; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.jetbrains.annotations.Nullable; @@ -31,35 +45,66 @@ * Message telling nodes that new node should be added to topology. * When newly added node receives the message it connects to its next and finishes * join process. + *
+ * Requires pre- and post- marshalling. + * @see #prepareMarshal(Marshaller) + * @see #finishUnmarshal(Marshaller, ClassLoader) */ @TcpDiscoveryEnsureDelivery @TcpDiscoveryRedirectToClient -public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableMessage { +public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableMessage implements Message { /** */ private static final long serialVersionUID = 0L; - /** Added node. */ - private final TcpDiscoveryNode node; - /** */ + @Order(value = 6, method = "gridDiscoveryData") private DiscoveryDataPacket dataPacket; - /** Pending messages from previous node. */ - private Collection msgs; + /** Start time of the first grid node. */ + @Order(7) + private long gridStartTime; + + /** Topology snapshots history. */ + @Order(value = 8, method = "topologyHistoryMessages") + private @Nullable Map topHistMsgs; + + /** {@link TcpDiscoveryAbstractMessage} pending messages from previous node which is a {@link Message}. */ + @Order(value = 9, method = "pendingMessages") + private Map pendingMsgs; + + /** Added node. */ + private TcpDiscoveryNode node; + + /** Marshalled {@link #node}. */ + @Order(10) + private byte[] nodeBytes; /** Current topology. Initialized by coordinator. */ @GridToStringInclude - private Collection top; + private @Nullable Collection top; + + /** Marshalled {@link #top}. */ + @Order(value = 11, method = "topologyBytes") + private @Nullable byte[] topBytes; /** */ @GridToStringInclude private transient Collection clientTop; - /** Topology snapshots history. */ - private Map> topHist; + /** + * TODO: Remove after refactoring of discovery messages serialization https://issues.apache.org/jira/browse/IGNITE-25883 + * Java-serializable pending messages from previous node. + */ + private @Nullable Map serializablePendingMsgs; - /** Start time of the first grid node. */ - private final long gridStartTime; + /** Marshalled {@link #serializablePendingMsgs}. */ + @Order(value = 12, method = "serializablePendingMessagesBytes") + private byte[] serializablePendingMsgsBytes; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryNodeAddedMessage() { + // No-op. + } /** * Constructor. @@ -69,7 +114,8 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableM * @param dataPacket container for collecting discovery data across the cluster. * @param gridStartTime Start time of the first grid node. */ - public TcpDiscoveryNodeAddedMessage(UUID creatorNodeId, + public TcpDiscoveryNodeAddedMessage( + UUID creatorNodeId, TcpDiscoveryNode node, DiscoveryDataPacket dataPacket, long gridStartTime @@ -90,13 +136,106 @@ public TcpDiscoveryNodeAddedMessage(UUID creatorNodeId, public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { super(msg); - this.node = msg.node; - this.msgs = msg.msgs; - this.top = msg.top; - this.clientTop = msg.clientTop; - this.topHist = msg.topHist; - this.dataPacket = msg.dataPacket; - this.gridStartTime = msg.gridStartTime; + node = msg.node; + nodeBytes = msg.nodeBytes; + + pendingMsgs = msg.pendingMsgs; + serializablePendingMsgs = msg.serializablePendingMsgs; + serializablePendingMsgsBytes = msg.serializablePendingMsgsBytes; + + topology(msg.topology()); + + clientTop = msg.clientTop; + topHistMsgs = msg.topHistMsgs; + dataPacket = msg.dataPacket; + gridStartTime = msg.gridStartTime; + } + + /** + * TODO: Revise after refactoring of TcpDiscoveryNode serialization https://issues.apache.org/jira/browse/IGNITE-27899 + * @param marsh marshaller. + */ + public void prepareMarshal(Marshaller marsh) { + if (!F.isEmpty(topHistMsgs)) + topHistMsgs.values().forEach(m -> m.prepareMarshal(marsh)); + + if (node != null && nodeBytes == null) { + try { + nodeBytes = U.marshal(marsh, node); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal cluster node.", e); + } + } + + if (top != null && topBytes == null) { + try { + topBytes = U.marshal(marsh, top); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal topology nodes.", e); + } + } + + if (serializablePendingMsgs != null && serializablePendingMsgsBytes == null) { + try { + serializablePendingMsgsBytes = U.marshal(marsh, serializablePendingMsgs); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal serializable pending messages.", e); + } + } + } + + /** + * TODO: Revise after refactoring of TcpDiscoveryNode serialization https://issues.apache.org/jira/browse/IGNITE-27899 + * @param marsh Marshaller. + * @param clsLdr Class loader. + */ + public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { + if (!F.isEmpty(topHistMsgs)) + topHistMsgs.values().forEach(m -> m.finishUnmarshal(marsh, clsLdr)); + + if (nodeBytes != null && node == null) { + try { + node = U.unmarshal(marsh, nodeBytes, clsLdr); + + nodeBytes = null; + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to unmarshal cluster node.", e); + } + } + + if (topBytes != null && top == null) { + try { + top = U.unmarshal(marsh, topBytes, clsLdr); + + topBytes = null; + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to unmarshal topology nodes.", e); + } + } + + if (serializablePendingMsgsBytes != null && serializablePendingMsgs == null) { + try { + serializablePendingMsgs = U.unmarshal(marsh, serializablePendingMsgsBytes, clsLdr); + + serializablePendingMsgsBytes = null; + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to unmarshal serializable pending messages.", e); + } + } + + if (F.isEmpty(pendingMsgs)) + return; + + for (Message msg : pendingMsgs.values()) { + if (msg instanceof TcpDiscoveryNodeAddedMessage) + ((TcpDiscoveryNodeAddedMessage)msg).finishUnmarshal(marsh, clsLdr); + } } /** @@ -108,24 +247,113 @@ public TcpDiscoveryNode node() { return node; } + /** @return Serialized {@link #node}. */ + public byte[] nodeBytes() { + return nodeBytes; + } + + /** @param nodeBytes Serialized {@link #node}. */ + public void nodeBytes(byte[] nodeBytes) { + this.nodeBytes = nodeBytes; + } + + /** + * @return Pending messages. + * @see #messages() + */ + public Map pendingMessages() { + return pendingMsgs; + } + + /** + * @param pendingMsgs Pending messages. + * @see #messages(List) + */ + public void pendingMessages(Map pendingMsgs) { + this.pendingMsgs = pendingMsgs; + } + /** + * TODO: revise after refactoring of discovery messages serialization https://issues.apache.org/jira/browse/IGNITE-25883 * Gets pending messages sent to new node by its previous. * * @return Pending messages from previous node. */ - @Nullable public Collection messages() { - return msgs; + @Nullable public List messages() { + assert serializablePendingMsgs == null; + + if (F.isEmpty(pendingMsgs) && F.isEmpty(serializablePendingMsgs)) + return Collections.emptyList(); + + int totalSz = (F.isEmpty(pendingMsgs) ? 0 : pendingMsgs.size()) + + (F.isEmpty(serializablePendingMsgs) ? 0 : serializablePendingMsgs.size()); + + List res = new ArrayList<>(totalSz); + + for (int i = 0; i < totalSz; ++i) { + Message m = F.isEmpty(pendingMsgs) ? null : pendingMsgs.get(i); + + if (m == null) { + TcpDiscoveryAbstractMessage sm = serializablePendingMsgs.get(i); + assert sm != null; + + res.add(sm); + } + else { + assert serializablePendingMsgs == null || serializablePendingMsgs.get(i) == null; + assert m instanceof TcpDiscoveryAbstractMessage; + + res.add((TcpDiscoveryAbstractMessage)m); + } + } + + return res; } /** + * TODO: revise after refactoring of discovery messages serialization https://issues.apache.org/jira/browse/IGNITE-25883 * Sets pending messages to send to new node. * * @param msgs Pending messages to send to new node. */ - public void messages( - @Nullable Collection msgs - ) { - this.msgs = msgs; + public void messages(@Nullable List msgs) { + serializablePendingMsgsBytes = null; + + if (F.isEmpty(msgs)) { + serializablePendingMsgs = null; + pendingMsgs = null; + + return; + } + + // Keeps the original message order as in a list. + int idx = 0; + + for (TcpDiscoveryAbstractMessage m : msgs) { + if (m instanceof Message) { + if (pendingMsgs == null) + pendingMsgs = U.newHashMap(msgs.size()); + + pendingMsgs.put(idx++, (Message)m); + + continue; + } + + if (serializablePendingMsgs == null) + serializablePendingMsgs = U.newHashMap(msgs.size()); + + serializablePendingMsgs.put(idx++, m); + } + } + + /** @return Bytes of {@link #serializablePendingMsgs}. */ + public @Nullable byte[] serializablePendingMessagesBytes() { + return serializablePendingMsgsBytes; + } + + /** @param serializablePendingMsgsBytes Bytes of {@link #serializablePendingMsgs}. */ + public void serializablePendingMessagesBytes(@Nullable byte[] serializablePendingMsgsBytes) { + this.serializablePendingMsgsBytes = serializablePendingMsgsBytes; } /** @@ -144,6 +372,17 @@ public void messages( */ public void topology(@Nullable Collection top) { this.top = top; + topBytes = null; + } + + /** @return Serialized {@link #top}. */ + public @Nullable byte[] topologyBytes() { + return topBytes; + } + + /** @param topBytes Serialized {@link #top}. */ + public void topologyBytes(@Nullable byte[] topBytes) { + this.topBytes = topBytes; } /** @@ -152,7 +391,7 @@ public void topology(@Nullable Collection top) { public void clientTopology(Collection top) { assert top != null && !top.isEmpty() : top; - this.clientTop = top; + clientTop = top; } /** @@ -162,13 +401,52 @@ public Collection clientTopology() { return clientTop; } + /** @return Topology history messages. */ + public @Nullable Map topologyHistoryMessages() { + return topHistMsgs; + } + + /** @param topHistMsgs Topology history messages. */ + public void topologyHistoryMessages(@Nullable Map topHistMsgs) { + this.topHistMsgs = topHistMsgs; + } + /** * Gets topology snapshots history. * * @return Map with topology snapshots history. */ - public Map> topologyHistory() { - return topHist; + public @Nullable Map> topologyHistory() { + if (topHistMsgs == null) + return null; + + Map> res = U.newHashMap(topHistMsgs.size()); + + topHistMsgs.forEach((nodeId, msgs) -> { + Collection clusterNodeImpls = msgs.clusterNodeMessages().stream() + .map(m -> { + assert m.clusterMetricsMessage() != null; + + TcpDiscoveryNode tcpDiscoNode = new TcpDiscoveryNode( + m.id(), + m.consistentId(), + m.order(), + m.local(), + m.client(), + m.addresses(), + m.hostNames(), + m.attributes(), + new IgniteProductVersion(m.productVersionMessage()), + new ClusterMetricsSnapshot(m.clusterMetricsMessage()) + ); + + return tcpDiscoNode; + }).collect(Collectors.toList()); + + res.put(nodeId, clusterNodeImpls); + }); + + return res; } /** @@ -177,7 +455,20 @@ public Map> topologyHistory() { * @param topHist Map with topology snapshots history. */ public void topologyHistory(@Nullable Map> topHist) { - this.topHist = topHist; + if (F.isEmpty(topHist)) { + topHistMsgs = null; + + return; + } + + topHistMsgs = U.newHashMap(topHist.size()); + + topHist.forEach((nodeId, clusterNodes) -> { + Collection clusterNodeImpls = clusterNodes.stream().map(ClusterNodeMessage::new) + .collect(Collectors.toList()); + + topHistMsgs.put(nodeId, new ClusterNodeCollectionMessage(clusterNodeImpls)); + }); } /** @@ -187,6 +478,11 @@ public DiscoveryDataPacket gridDiscoveryData() { return dataPacket; } + /** @param dataPacket Data packet data. */ + public void gridDiscoveryData(DiscoveryDataPacket dataPacket) { + this.dataPacket = dataPacket; + } + /** * Clears discovery data to minimize message size. */ @@ -210,6 +506,16 @@ public long gridStartTime() { return gridStartTime; } + /** @param gridStartTime First grid node start time. */ + public void gridStartTime(long gridStartTime) { + this.gridStartTime = gridStartTime; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 20; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryNodeAddedMessage.class, this, "super", super.toString());