diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 1dc8af2d28d26..aa4026904c485 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer; import org.apache.ignite.internal.codegen.ClusterMetricsUpdateMessageSerializer; import org.apache.ignite.internal.codegen.ContinuousRoutineStartResultMessageSerializer; +import org.apache.ignite.internal.codegen.DataStreamerRequestSerializer; import org.apache.ignite.internal.codegen.DataStreamerResponseSerializer; import org.apache.ignite.internal.codegen.ErrorMessageSerializer; import org.apache.ignite.internal.codegen.ExchangeInfoSerializer; @@ -422,7 +423,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider { factory.register((short)58, GridCacheQueryRequest::new, new GridCacheQueryRequestSerializer()); factory.register((short)59, GridCacheQueryResponse::new, new GridCacheQueryResponseSerializer()); factory.register((short)61, GridContinuousMessage::new); - factory.register((short)62, DataStreamerRequest::new); + factory.register((short)62, DataStreamerRequest::new, new DataStreamerRequestSerializer()); factory.register((short)63, DataStreamerResponse::new, new DataStreamerResponseSerializer()); factory.register((short)76, GridTaskResultRequest::new, new GridTaskResultRequestSerializer()); factory.register((short)77, GridTaskResultResponse::new, new GridTaskResultResponseSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java index 577dc470cafca..2e7c048ba80a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java @@ -17,79 +17,89 @@ package org.apache.ignite.internal.processors.datastreamer; -import java.nio.ByteBuffer; import java.util.Collection; import java.util.Map; import java.util.UUID; import org.apache.ignite.configuration.DeploymentMode; -import org.apache.ignite.internal.GridDirectCollection; -import org.apache.ignite.internal.GridDirectMap; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType.IGNITE_UUID; - /** * */ public class DataStreamerRequest implements Message { /** */ + @Order(value = 0, method = "requestId") private long reqId; /** */ + // TODO: byte[] field - consider refactoring to use typed serialization + @Order(value = 1, method = "responseTopicBytes") private byte[] resTopicBytes; /** Cache name. */ + @Order(2) private String cacheName; /** */ + // TODO: byte[] field - consider refactoring to use typed serialization + @Order(3) private byte[] updaterBytes; /** Entries to update. */ - @GridDirectCollection(DataStreamerEntry.class) + @Order(4) private Collection entries; /** {@code True} to ignore deployment ownership. */ + @Order(value = 5, method = "ignoreDeploymentOwnership") private boolean ignoreDepOwnership; /** */ + @Order(6) private boolean skipStore; /** Keep binary flag. */ + @Order(7) private boolean keepBinary; /** */ + // TODO: DeploymentMode enum is serialized as byte ordinal - consider refactoring to use enum serialization + @Order(value = 8, method = "deploymentMode") private DeploymentMode depMode; /** */ + @Order(value = 9, method = "sampleClassName") private String sampleClsName; /** */ + @Order(value = 10, method = "userVersion") private String userVer; /** Node class loader participants. */ @GridToStringInclude - @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class) + @Order(value = 11, method = "participants") private Map ldrParticipants; /** */ + @Order(value = 12, method = "classLoaderId") private IgniteUuid clsLdrId; /** */ + @Order(value = 13, method = "forceLocalDeployment") private boolean forceLocDep; /** Topology version. */ + @Order(value = 14, method = "topologyVersion") private AffinityTopologyVersion topVer; /** */ + @Order(value = 15, method = "partition") private int partId; /** @@ -162,6 +172,13 @@ public long requestId() { return reqId; } + /** + * @param reqId Request ID. + */ + public void requestId(long reqId) { + this.reqId = reqId; + } + /** * @return Response topic. */ @@ -169,6 +186,13 @@ public byte[] responseTopicBytes() { return resTopicBytes; } + /** + * @param resTopicBytes Response topic bytes. + */ + public void responseTopicBytes(byte[] resTopicBytes) { + this.resTopicBytes = resTopicBytes; + } + /** * @return Cache name. */ @@ -176,6 +200,13 @@ public String cacheName() { return cacheName; } + /** + * @param cacheName Cache name. + */ + public void cacheName(String cacheName) { + this.cacheName = cacheName; + } + /** * @return Updater. */ @@ -183,6 +214,13 @@ public byte[] updaterBytes() { return updaterBytes; } + /** + * @param updaterBytes Updater bytes. + */ + public void updaterBytes(byte[] updaterBytes) { + this.updaterBytes = updaterBytes; + } + /** * @return Entries to update. */ @@ -190,6 +228,13 @@ public Collection entries() { return entries; } + /** + * @param entries Entries to update. + */ + public void entries(Collection entries) { + this.entries = entries; + } + /** * @return {@code True} to ignore ownership. */ @@ -197,6 +242,13 @@ public boolean ignoreDeploymentOwnership() { return ignoreDepOwnership; } + /** + * @param ignoreDepOwnership Ignore deployment ownership flag. + */ + public void ignoreDeploymentOwnership(boolean ignoreDepOwnership) { + this.ignoreDepOwnership = ignoreDepOwnership; + } + /** * @return Skip store flag. */ @@ -204,6 +256,13 @@ public boolean skipStore() { return skipStore; } + /** + * @param skipStore Skip store flag. + */ + public void skipStore(boolean skipStore) { + this.skipStore = skipStore; + } + /** * @return Keep binary flag. */ @@ -211,6 +270,13 @@ public boolean keepBinary() { return keepBinary; } + /** + * @param keepBinary Keep binary flag. + */ + public void keepBinary(boolean keepBinary) { + this.keepBinary = keepBinary; + } + /** * @return Deployment mode. */ @@ -218,6 +284,13 @@ public DeploymentMode deploymentMode() { return depMode; } + /** + * @param depMode Deployment mode. + */ + public void deploymentMode(DeploymentMode depMode) { + this.depMode = depMode; + } + /** * @return Sample class name. */ @@ -225,6 +298,13 @@ public String sampleClassName() { return sampleClsName; } + /** + * @param sampleClsName Sample class name. + */ + public void sampleClassName(String sampleClsName) { + this.sampleClsName = sampleClsName; + } + /** * @return User version. */ @@ -232,6 +312,13 @@ public String userVersion() { return userVer; } + /** + * @param userVer User version. + */ + public void userVersion(String userVer) { + this.userVer = userVer; + } + /** * @return Participants. */ @@ -239,6 +326,13 @@ public Map participants() { return ldrParticipants; } + /** + * @param ldrParticipants Loader participants. + */ + public void participants(Map ldrParticipants) { + this.ldrParticipants = ldrParticipants; + } + /** * @return Class loader ID. */ @@ -246,6 +340,13 @@ public IgniteUuid classLoaderId() { return clsLdrId; } + /** + * @param clsLdrId Class loader ID. + */ + public void classLoaderId(IgniteUuid clsLdrId) { + this.clsLdrId = clsLdrId; + } + /** * @return {@code True} to force local deployment. */ @@ -253,6 +354,13 @@ public boolean forceLocalDeployment() { return forceLocDep; } + /** + * @param forceLocDep Force local deployment flag. + */ + public void forceLocalDeployment(boolean forceLocDep) { + this.forceLocDep = forceLocDep; + } + /** * @return Topology version. */ @@ -260,6 +368,13 @@ public AffinityTopologyVersion topologyVersion() { return topVer; } + /** + * @param topVer Topology version. + */ + public void topologyVersion(AffinityTopologyVersion topVer) { + this.topVer = topVer; + } + /** * @return Partition ID. */ @@ -267,268 +382,20 @@ public int partition() { return partId; } - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(DataStreamerRequest.class, this); - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeString(cacheName)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeIgniteUuid(clsLdrId)) - return false; - - writer.incrementState(); - - case 2: - if (!writer.writeByte(depMode != null ? (byte)depMode.ordinal() : -1)) - return false; - - writer.incrementState(); - - case 3: - if (!writer.writeCollection(entries, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 4: - if (!writer.writeBoolean(forceLocDep)) - return false; - - writer.incrementState(); - - case 5: - if (!writer.writeBoolean(ignoreDepOwnership)) - return false; - - writer.incrementState(); - - case 6: - if (!writer.writeBoolean(keepBinary)) - return false; - - writer.incrementState(); - - case 7: - if (!writer.writeMap(ldrParticipants, MessageCollectionItemType.UUID, IGNITE_UUID)) - return false; - - writer.incrementState(); - - case 8: - if (!writer.writeInt(partId)) - return false; - - writer.incrementState(); - - case 9: - if (!writer.writeLong(reqId)) - return false; - - writer.incrementState(); - - case 10: - if (!writer.writeByteArray(resTopicBytes)) - return false; - - writer.incrementState(); - - case 11: - if (!writer.writeString(sampleClsName)) - return false; - - writer.incrementState(); - - case 12: - if (!writer.writeBoolean(skipStore)) - return false; - - writer.incrementState(); - - case 13: - if (!writer.writeAffinityTopologyVersion(topVer)) - return false; - - writer.incrementState(); - - case 14: - if (!writer.writeByteArray(updaterBytes)) - return false; - - writer.incrementState(); - - case 15: - if (!writer.writeString(userVer)) - return false; - - writer.incrementState(); - - } - - return true; + /** + * @param partId Partition ID. + */ + public void partition(int partId) { + this.partId = partId; } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - switch (reader.state()) { - case 0: - cacheName = reader.readString(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - clsLdrId = reader.readIgniteUuid(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 2: - byte depModeOrd; - - depModeOrd = reader.readByte(); - - if (!reader.isLastRead()) - return false; - - depMode = DeploymentMode.fromOrdinal(depModeOrd); - - reader.incrementState(); - - case 3: - entries = reader.readCollection(MessageCollectionItemType.MSG); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 4: - forceLocDep = reader.readBoolean(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 5: - ignoreDepOwnership = reader.readBoolean(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 6: - keepBinary = reader.readBoolean(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 7: - ldrParticipants = reader.readMap(MessageCollectionItemType.UUID, IGNITE_UUID, false); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 8: - partId = reader.readInt(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 9: - reqId = reader.readLong(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 10: - resTopicBytes = reader.readByteArray(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 11: - sampleClsName = reader.readString(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 12: - skipStore = reader.readBoolean(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 13: - topVer = reader.readAffinityTopologyVersion(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 14: - updaterBytes = reader.readByteArray(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 15: - userVer = reader.readString(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return true; + @Override public short directType() { + return 62; } /** {@inheritDoc} */ - @Override public short directType() { - return 62; + @Override public String toString() { + return S.toString(DataStreamerRequest.class, this); } }