Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.ignite.internal.managers.discovery;

import org.apache.ignite.internal.codegen.ClusterNodeCollectionMessageSerializer;
import org.apache.ignite.internal.codegen.ClusterNodeMessageSerializer;
import org.apache.ignite.internal.codegen.DiscoveryDataPacketSerializer;
import org.apache.ignite.internal.codegen.IgniteProductVersionMessageSerializer;
import org.apache.ignite.internal.codegen.InetAddressMessageSerializer;
import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer;
import org.apache.ignite.internal.codegen.NodeSpecificDataSerializer;
Expand All @@ -37,6 +40,7 @@
import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryMetricsUpdateMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeAddFinishedMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeAddedMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeFailedMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeFullMetricsMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeLeftMessageSerializer;
Expand All @@ -48,6 +52,9 @@
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
import org.apache.ignite.spi.discovery.tcp.messages.ClusterNodeCollectionMessage;
import org.apache.ignite.spi.discovery.tcp.messages.ClusterNodeMessage;
import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessage;
import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage;
import org.apache.ignite.spi.discovery.tcp.messages.NodeSpecificData;
Expand All @@ -67,6 +74,7 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
Expand All @@ -80,6 +88,9 @@
public class DiscoveryMessageFactory implements MessageFactoryProvider {
/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
factory.register((short)-110, ClusterNodeCollectionMessage::new, new ClusterNodeCollectionMessageSerializer());
factory.register((short)-109, ClusterNodeMessage::new, new ClusterNodeMessageSerializer());
factory.register((short)-108, IgniteProductVersionMessage::new, new IgniteProductVersionMessageSerializer());
factory.register((short)-107, NodeSpecificData::new, new NodeSpecificDataSerializer());
factory.register((short)-106, DiscoveryDataPacket::new, new DiscoveryDataPacketSerializer());
factory.register((short)-105, TcpDiscoveryNodeFullMetricsMessage::new,
Expand Down Expand Up @@ -110,5 +121,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
factory.register((short)17, TcpDiscoveryNodeFailedMessage::new, new TcpDiscoveryNodeFailedMessageSerializer());
factory.register((short)18, TcpDiscoveryStatusCheckMessage::new, new TcpDiscoveryStatusCheckMessageSerializer());
factory.register((short)19, TcpDiscoveryNodeAddFinishedMessage::new, new TcpDiscoveryNodeAddFinishedMessageSerializer());
factory.register((short)20, TcpDiscoveryNodeAddedMessage::new, new TcpDiscoveryNodeAddedMessageSerializer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.util.GridStringBuilder;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessage;

/**
* IO routines for B+Tree meta pages.
Expand Down Expand Up @@ -303,7 +304,7 @@ public IgniteProductVersion createdVersion(long pageAddr) {
PageUtils.getByte(pageAddr, CREATED_VER_OFFSET + 1),
PageUtils.getByte(pageAddr, CREATED_VER_OFFSET + 2),
PageUtils.getLong(pageAddr, CREATED_VER_OFFSET + 3),
PageUtils.getBytes(pageAddr, CREATED_VER_OFFSET + 11, IgniteProductVersion.REV_HASH_SIZE));
PageUtils.getBytes(pageAddr, CREATED_VER_OFFSET + 11, IgniteProductVersionMessage.REV_HASH_SIZE));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessage;
import org.apache.ignite.spi.encryption.EncryptionSpi;
import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -850,7 +851,7 @@ WALRecord readPlainRecord(RecordType type, ByteBufferBackedDataInput in,

long flags = in.readLong();

byte[] revHash = new byte[IgniteProductVersion.REV_HASH_SIZE];
byte[] revHash = new byte[IgniteProductVersionMessage.REV_HASH_SIZE];
byte maj = in.readByte();
byte min = in.readByte();
byte maint = in.readByte();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.internal.ClusterMetricsSnapshot;
import org.apache.ignite.internal.processors.cache.CacheMetricsSnapshot;
import org.jetbrains.annotations.Nullable;

/**
*
Expand All @@ -33,13 +34,13 @@ class ClusterNodeMetrics {
private final ClusterMetrics nodeMetrics;

/** */
private final Map<Integer, CacheMetrics> cacheMetrics;
private final @Nullable Map<Integer, CacheMetrics> cacheMetrics;

/**
* @param nodeMetrics Node metrics.
* @param cacheMetrics Cache metrics.
*/
ClusterNodeMetrics(ClusterMetrics nodeMetrics, Map<Integer, CacheMetrics> cacheMetrics) {
ClusterNodeMetrics(ClusterMetrics nodeMetrics, @Nullable Map<Integer, CacheMetrics> cacheMetrics) {
this.nodeMetrics = nodeMetrics;
this.cacheMetrics = cacheMetrics;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteVersionUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessage;
import org.jetbrains.annotations.NotNull;

/**
Expand All @@ -41,39 +42,26 @@ public class IgniteProductVersion implements Comparable<IgniteProductVersion>, E
/** */
private static final long serialVersionUID = 0L;

/** Size of the {@link #revHash }*/
public static final int REV_HASH_SIZE = 20;

/** Size in bytes of serialized: 3 bytes (maj, min, maintenance version), 8 bytes - timestamp */
public static final int SIZE_IN_BYTES = 3 + 8 + REV_HASH_SIZE;
public static final int SIZE_IN_BYTES = 3 + 8 + IgniteProductVersionMessage.REV_HASH_SIZE;

/** Regexp parse pattern. */
private static final Pattern VER_PATTERN =
Pattern.compile("(\\d+)\\.(\\d+)\\.(\\d+)([-.]([^0123456789][^-]+)(-SNAPSHOT)?)?(-(\\d+))?(-([\\da-f]+))?");

/** Major version number. */
private byte major;

/** Minor version number. */
private byte minor;

/** Maintenance version number. */
private byte maintenance;

/** Stage of development. */
private String stage;

/** Revision timestamp. */
private long revTs;

/** Revision hash. */
private byte[] revHash;
/** The values holding message. */
private IgniteProductVersionMessage productVerMsg;

/**
* Empty constructor required by {@link Externalizable}.
*/
public IgniteProductVersion() {
// No-op.
productVerMsg = new IgniteProductVersionMessage();
}

/** @param productVerMsg Product version message. */
public IgniteProductVersion(IgniteProductVersionMessage productVerMsg) {
this.productVerMsg = productVerMsg;
}

/**
Expand All @@ -96,17 +84,17 @@ public IgniteProductVersion(byte major, byte minor, byte maintenance, long revTs
* @param revHash Revision hash.
*/
public IgniteProductVersion(byte major, byte minor, byte maintenance, String stage, long revTs, byte[] revHash) {
if (revHash != null && revHash.length != REV_HASH_SIZE) {
productVerMsg = new IgniteProductVersionMessage(major, minor, maintenance, stage, revTs, revHash);

if (revHash != null && revHash.length != IgniteProductVersionMessage.REV_HASH_SIZE) {
throw new IllegalArgumentException("Invalid length for SHA1 hash (must be "
+ REV_HASH_SIZE + "): " + revHash.length);
+ IgniteProductVersionMessage.REV_HASH_SIZE + "): " + revHash.length);
}
}

this.major = major;
this.minor = minor;
this.maintenance = maintenance;
this.stage = stage;
this.revTs = revTs;
this.revHash = revHash != null ? revHash : new byte[REV_HASH_SIZE];
/** @return {@link IgniteProductVersionMessage}. */
public IgniteProductVersionMessage message() {
return productVerMsg;
}

/**
Expand All @@ -115,7 +103,7 @@ public IgniteProductVersion(byte major, byte minor, byte maintenance, String sta
* @return Major version number.
*/
public byte major() {
return major;
return productVerMsg.major();
}

/**
Expand All @@ -124,7 +112,7 @@ public byte major() {
* @return Minor version number.
*/
public byte minor() {
return minor;
return productVerMsg.minor();
}

/**
Expand All @@ -133,14 +121,14 @@ public byte minor() {
* @return Maintenance version number.
*/
public byte maintenance() {
return maintenance;
return productVerMsg.maintenance();
}

/**
* @return Stage of development.
*/
public String stage() {
return stage;
return productVerMsg.stage();
}

/**
Expand All @@ -149,7 +137,7 @@ public String stage() {
* @return Revision timestamp.
*/
public long revisionTimestamp() {
return revTs;
return productVerMsg.revisionTimestamp();
}

/**
Expand All @@ -158,7 +146,7 @@ public long revisionTimestamp() {
* @return Revision hash.
*/
public byte[] revisionHash() {
return revHash;
return productVerMsg.revisionHash();
}

/**
Expand All @@ -167,7 +155,7 @@ public byte[] revisionHash() {
* @return Release date.
*/
public Date releaseDate() {
return new Date(revTs * 1000);
return new Date(revisionTimestamp() * 1000);
}

/**
Expand All @@ -178,49 +166,49 @@ public Date releaseDate() {
*/
public boolean greaterThanEqual(int major, int minor, int maintenance) {
// NOTE: Unknown version is less than any other version.
if (major == this.major)
return minor == this.minor ? this.maintenance >= maintenance : this.minor > minor;
if (major == major())
return minor == minor() ? maintenance() >= maintenance : minor() > minor;
else
return this.major > major;
return major() > major;
}

/** {@inheritDoc} */
@Override public int compareTo(@NotNull IgniteProductVersion o) {
// NOTE: Unknown version is less than any other version.
int res = Integer.compare(major, o.major);
int res = Integer.compare(major(), o.major());

if (res != 0)
return res;

res = Integer.compare(minor, o.minor);
res = Integer.compare(minor(), o.minor());

if (res != 0)
return res;

res = Integer.compare(maintenance, o.maintenance);
res = Integer.compare(maintenance(), o.maintenance());

if (res != 0)
return res;

return Long.compare(revTs, o.revTs);
return Long.compare(revisionTimestamp(), o.revisionTimestamp());
}

/**
* @param o Other version.
* @return Compare result.
*/
public int compareToIgnoreTimestamp(@NotNull IgniteProductVersion o) {
int res = Integer.compare(major, o.major);
int res = Integer.compare(major(), o.major());

if (res != 0)
return res;

res = Integer.compare(minor, o.minor);
res = Integer.compare(minor(), o.minor());

if (res != 0)
return res;

return Integer.compare(maintenance, o.maintenance);
return Integer.compare(maintenance(), o.maintenance());
}

/** {@inheritDoc} */
Expand All @@ -233,47 +221,50 @@ public int compareToIgnoreTimestamp(@NotNull IgniteProductVersion o) {

IgniteProductVersion that = (IgniteProductVersion)o;

return revTs == that.revTs && maintenance == that.maintenance && minor == that.minor && major == that.major;
return revisionTimestamp() == that.revisionTimestamp() && maintenance() == that.maintenance()
&& minor() == that.minor() && major() == that.major();
}

/** {@inheritDoc} */
@Override public int hashCode() {
int res = major;
int res = major();

res = 31 * res + minor;
res = 31 * res + maintenance;
res = 31 * res + (int)(revTs ^ (revTs >>> 32));
res = 31 * res + minor();
res = 31 * res + maintenance();
res = 31 * res + (int)(revisionTimestamp() ^ (revisionTimestamp() >>> 32));

return res;
}

/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeByte(major);
out.writeByte(minor);
out.writeByte(maintenance);
out.writeLong(revTs);
U.writeByteArray(out, revHash);
out.writeByte(major());
out.writeByte(minor());
out.writeByte(maintenance());
out.writeLong(revisionTimestamp());
U.writeByteArray(out, revisionHash());
}

/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
major = in.readByte();
minor = in.readByte();
maintenance = in.readByte();
revTs = in.readLong();
revHash = U.readByteArray(in);
assert productVerMsg != null;

productVerMsg.major(in.readByte());
productVerMsg.minor(in.readByte());
productVerMsg.maintenance(in.readByte());
productVerMsg.revisionTimestamp(in.readLong());
productVerMsg.revisionHash(U.readByteArray(in));
}

/** {@inheritDoc} */
@Override public String toString() {
String revTsStr = IgniteVersionUtils.formatBuildTimeStamp(revTs * 1000);
String revTsStr = IgniteVersionUtils.formatBuildTimeStamp(revisionTimestamp() * 1000);

String hash = U.byteArray2HexString(revHash).toLowerCase();
String hash = U.byteArray2HexString(revisionHash()).toLowerCase();

hash = hash.length() > 8 ? hash.substring(0, 8) : hash;

return major + "." + minor + "." + maintenance + "#" + revTsStr + "-sha1:" + hash;
return major() + "." + minor() + "." + maintenance() + "#" + revTsStr + "-sha1:" + hash;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2160,8 +2160,12 @@ protected void processDiscoveryMessage(TcpDiscoveryAbstractMessage msg) {
if (msg instanceof TraceableMessage)
tracing.messages().beforeSend((TraceableMessage)msg);

if (msg instanceof TcpDiscoveryNodeAddedMessage)
if (msg instanceof TcpDiscoveryNodeAddedMessage) {
((TcpDiscoveryNodeAddedMessage)msg)
.finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()));

processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
}
else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg);
else if (msg instanceof TcpDiscoveryNodeLeftMessage)
Expand Down
Loading