diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 753f00569c5cf..d98a1d292b612 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryConnectionCheckMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryCustomEventMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryDiscardMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryDuplicateIdMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer; @@ -44,6 +45,7 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryServerOnlyCustomEventMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryStatusCheckMessageSerializer; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; @@ -60,6 +62,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDiscardMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest; @@ -74,6 +77,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryServerOnlyCustomEventMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage; /** Message factory for discovery messages. */ @@ -110,5 +114,8 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)17, TcpDiscoveryNodeFailedMessage::new, new TcpDiscoveryNodeFailedMessageSerializer()); factory.register((short)18, TcpDiscoveryStatusCheckMessage::new, new TcpDiscoveryStatusCheckMessageSerializer()); factory.register((short)19, TcpDiscoveryNodeAddFinishedMessage::new, new TcpDiscoveryNodeAddFinishedMessageSerializer()); + factory.register((short)20, TcpDiscoveryCustomEventMessage::new, new TcpDiscoveryCustomEventMessageSerializer()); + factory.register((short)21, TcpDiscoveryServerOnlyCustomEventMessage::new, + new TcpDiscoveryServerOnlyCustomEventMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 2e96845ff0332..c0604e7b4e060 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -505,11 +505,9 @@ else if (state == DISCONNECTED) { TcpDiscoveryCustomEventMessage msg; if (((CustomMessageWrapper)evt).delegate() instanceof DiscoveryServerOnlyCustomMessage) - msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt, - U.marshal(spi.marshaller(), evt)); + msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt); else - msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, - U.marshal(spi.marshaller(), evt)); + msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt); Span rootSpan = tracing.create(TraceableMessagesTable.traceName(msg.getClass())) .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> getLocalNodeId().toString()) @@ -521,6 +519,8 @@ else if (state == DISCONNECTED) { // This root span will be parent both from local and remote nodes. msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan)); + msg.prepareMarshal(spi.marshaller()); + sockWriter.sendMessage(msg); rootSpan.addLog(() -> "Sent").end(); @@ -2597,8 +2597,9 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) { if (node != null && node.visible()) { try { - DiscoverySpiCustomMessage msgObj = msg.message(spi.marshaller(), - U.resolveClassLoader(spi.ignite().configuration())); + msg.finishUnmarhal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); + + DiscoverySpiCustomMessage msgObj = msg.message(); notifyDiscovery( EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj, msg.spanContainer()); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 2f6d431fa9a4b..87be31cf20ccc 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1022,11 +1022,9 @@ private void interruptPing(TcpDiscoveryNode node) { TcpDiscoveryCustomEventMessage msg; if (((CustomMessageWrapper)evt).delegate() instanceof DiscoveryServerOnlyCustomMessage) - msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt, - U.marshal(spi.marshaller(), evt)); + msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt); else - msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, - U.marshal(spi.marshaller(), evt)); + msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt); Span rootSpan = tracing.create(TraceableMessagesTable.traceName(msg.getClass())) .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> getLocalNodeId().toString()) @@ -1038,6 +1036,8 @@ private void interruptPing(TcpDiscoveryNode node) { // This root span will be parent both from local and remote nodes. msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan)); + msg.prepareMarshal(spi.marshaller()); + msgWorker.addMessage(msg); rootSpan.addLog(() -> "Sent").end(); @@ -6041,7 +6041,7 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean wa } } - msg.message(null, msg.messageBytes()); + msg.clearMessage(); } else { addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true)); @@ -6049,7 +6049,9 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean wa DiscoverySpiCustomMessage msgObj = null; try { - msgObj = msg.message(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); + msg.finishUnmarhal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); + + msgObj = msg.message(); } catch (Throwable e) { U.error(log, "Failed to unmarshal discovery custom message.", e); @@ -6061,10 +6063,12 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean wa if (nextMsg != null) { try { TcpDiscoveryCustomEventMessage ackMsg = new TcpDiscoveryCustomEventMessage( - getLocalNodeId(), nextMsg, U.marshal(spi.marshaller(), nextMsg)); + getLocalNodeId(), nextMsg); ackMsg.topologyVersion(msg.topologyVersion()); + ackMsg.prepareMarshal(spi.marshaller()); + processCustomMessage(ackMsg, waitForNotification); } catch (IgniteCheckedException e) { @@ -6095,8 +6099,7 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean wa notifyDiscoveryListener(msg, waitForNotification); } - // Clear msg field to prevent possible memory leak. - msg.message(null, msg.messageBytes()); + msg.clearMessage(); if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); @@ -6237,7 +6240,9 @@ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg, boolean DiscoverySpiCustomMessage msgObj; try { - msgObj = msg.message(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); + msg.finishUnmarhal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); + + msgObj = msg.message(); } catch (Throwable t) { throw new IgniteException("Failed to unmarshal discovery custom message: " + msg, t); @@ -6269,7 +6274,7 @@ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg, boolean if (msgObj.isMutable()) { try { - msg.message(msgObj, U.marshal(spi.marshaller(), msgObj)); + msg.prepareMarshal(spi.marshaller()); } catch (Throwable t) { throw new IgniteException("Failed to marshal mutable discovery message: " + msgObj, t); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java index cd1b90b348c94..d71a0510b5cf7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java @@ -20,41 +20,48 @@ import java.util.Objects; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; import org.apache.ignite.internal.managers.discovery.IncompleteDeserializationException; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** - * Wrapped for custom message. + * Wrapper for custom message. */ @TcpDiscoveryRedirectToClient @TcpDiscoveryEnsureDelivery -public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractTraceableMessage { +public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractTraceableMessage implements Message { /** */ private static final long serialVersionUID = 0L; /** */ - private transient volatile DiscoverySpiCustomMessage msg; + private volatile DiscoverySpiCustomMessage msg; - /** */ - private byte[] msgBytes; + /** Serialized message bytes. */ + //TODO: Should be removed in https://issues.apache.org/jira/browse/IGNITE-27627 + @Order(value = 6, method = "messageBytes") + private volatile @Nullable byte[] msgBytes; + + /** + * Default constructor. + */ + public TcpDiscoveryCustomEventMessage() { + //No-op. + } /** * @param creatorNodeId Creator node id. * @param msg Message. - * @param msgBytes Serialized message. */ - public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, @Nullable DiscoverySpiCustomMessage msg, - @NotNull byte[] msgBytes) { + public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, DiscoverySpiCustomMessage msg) { super(creatorNodeId); this.msg = msg; - this.msgBytes = msgBytes; } /** @@ -76,44 +83,63 @@ public void clearMessage() { } /** - * @return Serialized message. + * @return Serialized message bytes. */ public byte[] messageBytes() { return msgBytes; } /** - * @param msg Message. - * @param msgBytes Serialized message. + * @param msgBytes Serialized message bytes. */ - public void message(@Nullable DiscoverySpiCustomMessage msg, @NotNull byte[] msgBytes) { - this.msg = msg; + public void messageBytes(@Nullable byte[] msgBytes) { this.msgBytes = msgBytes; } /** + * @return Original message. + */ + public DiscoverySpiCustomMessage message() { + return msg; + } + + /** + * Prepare message for serialization. + * * @param marsh Marshaller. - * @param ldr Classloader. - * @return Deserialized message, - * @throws java.lang.Throwable if unmarshal failed. */ - @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, ClassLoader ldr) throws Throwable { + //TODO: Should be removed in https://issues.apache.org/jira/browse/IGNITE-27627 + public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + assert msgBytes == null || msg.isMutable() : "Message bytes are not null for immutable message: msg =" + msg; + + msgBytes = U.marshal(marsh, msg); + } + + /** + * Finish deserialization. + * + * @param marsh Marshaller. + * @param ldr Class loader. + */ + //TODO: Should be removed in https://issues.apache.org/jira/browse/IGNITE-27627 + public void finishUnmarhal(Marshaller marsh, ClassLoader ldr) throws Throwable { if (msg == null) { try { msg = U.unmarshal(marsh, msgBytes, ldr); } catch (IgniteCheckedException e) { // Try to resurrect a message in a case of deserialization failure - if (e.getCause() instanceof IncompleteDeserializationException) - return new CustomMessageWrapper(((IncompleteDeserializationException)e.getCause()).message()); + if (e.getCause() instanceof IncompleteDeserializationException) { + msg = new CustomMessageWrapper(((IncompleteDeserializationException)e.getCause()).message()); + + return; + } throw e; } - - assert msg != null; } - return msg; + assert msg != null; } /** {@inheritDoc} */ @@ -127,4 +153,9 @@ public void message(@Nullable DiscoverySpiCustomMessage msg, @NotNull byte[] msg @Override public String toString() { return S.toString(TcpDiscoveryCustomEventMessage.class, this, "super", super.toString()); } + + /** {@inheritDoc} */ + @Override public short directType() { + return 20; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java index 97f701ed6e832..d552867718d94 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java @@ -29,13 +29,23 @@ public class TcpDiscoveryServerOnlyCustomEventMessage extends TcpDiscoveryCustom /** */ private static final long serialVersionUID = 0L; + /** + * Default constructor. + */ + public TcpDiscoveryServerOnlyCustomEventMessage() { + // No-op. + } + /** * @param creatorNodeId Creator node id. * @param msg Message. - * @param msgBytes Serialized message. */ - public TcpDiscoveryServerOnlyCustomEventMessage(UUID creatorNodeId, @NotNull DiscoverySpiCustomMessage msg, - @NotNull byte[] msgBytes) { - super(creatorNodeId, msg, msgBytes); + public TcpDiscoveryServerOnlyCustomEventMessage(UUID creatorNodeId, @NotNull DiscoverySpiCustomMessage msg) { + super(creatorNodeId, msg); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 21; } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java index a31feaffb66a3..f5c6d82271f5b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java @@ -99,8 +99,9 @@ static class CustomMessageInterceptingDiscoverySpi extends TcpDiscoverySpi { DiscoveryCustomMessage delegate; try { - DiscoverySpiCustomMessage custMsg = cm.message(marshaller(), - U.resolveClassLoader(ignite().configuration())); + cm.finishUnmarhal(marshaller(), U.resolveClassLoader(ignite().configuration())); + + DiscoverySpiCustomMessage custMsg = cm.message(); assertNotNull(custMsg); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java index c485ac9441739..f520769b1e86e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java @@ -163,8 +163,11 @@ private void doTestMarshallingBinaryMappingsLoadedFromClient(boolean receiveMeta @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) { if (msg instanceof TcpDiscoveryCustomEventMessage) { try { - DiscoverySpiCustomMessage custom = - ((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), U.gridClassLoader()); + TcpDiscoveryCustomEventMessage evtMsg = (TcpDiscoveryCustomEventMessage)msg; + + evtMsg.finishUnmarhal(marshaller(), U.gridClassLoader()); + + DiscoverySpiCustomMessage custom = evtMsg.message(); if (custom instanceof CustomMessageWrapper) { DiscoveryCustomMessage delegate = ((CustomMessageWrapper)custom).delegate(); @@ -240,8 +243,11 @@ public void testBinaryMetaDelayedForComputeJobResult() throws Exception { @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) { if (msg instanceof TcpDiscoveryCustomEventMessage) { try { - DiscoverySpiCustomMessage custom = - ((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), U.gridClassLoader()); + TcpDiscoveryCustomEventMessage evtMsg = (TcpDiscoveryCustomEventMessage)msg; + + evtMsg.finishUnmarhal(marshaller(), U.gridClassLoader()); + + DiscoverySpiCustomMessage custom = evtMsg.message(); if (custom instanceof CustomMessageWrapper) { DiscoveryCustomMessage delegate = ((CustomMessageWrapper)custom).delegate(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java index dc4f15bd07b78..dad72bbb0a4d6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java @@ -360,7 +360,9 @@ private DiscoveryCustomMessage extractCustomMessage(TcpDiscoveryCustomEventMessa DiscoverySpiCustomMessage msgObj = null; try { - msgObj = msg.message(marshaller(), U.resolveClassLoader(ignite().configuration())); + msg.finishUnmarhal(marshaller(), U.gridClassLoader()); + + msgObj = msg.message(); } catch (Throwable e) { U.error(log, "Failed to unmarshal discovery custom message.", e); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java index aaae9d6b9e270..4393a9d6abcca 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java @@ -247,8 +247,9 @@ private static class CoordinatorBlockingDiscoverySpi extends TcpDiscoverySpi { TcpDiscoveryCustomEventMessage m = (TcpDiscoveryCustomEventMessage)msg; try { - CustomMessageWrapper m0 = (CustomMessageWrapper)m.message( - marshaller(), U.resolveClassLoader(ignite().configuration())); + m.finishUnmarhal(marshaller(), U.resolveClassLoader(ignite().configuration())); + + CustomMessageWrapper m0 = (CustomMessageWrapper)m.message(); if (m0.delegate() instanceof InitMessage) rcvStartSnpReq.countDown(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/schema/IndexWithSameNameTestBase.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/schema/IndexWithSameNameTestBase.java index fce48f3bbbcc7..ef7b52fea3536 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/schema/IndexWithSameNameTestBase.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/schema/IndexWithSameNameTestBase.java @@ -50,7 +50,6 @@ import org.apache.ignite.internal.util.lang.ConsumerX; import org.apache.ignite.internal.util.lang.GridTuple3; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; @@ -313,10 +312,11 @@ public static class SchemaFinishListeningTcpDiscoverySpi extends TcpDiscoverySpi @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) { if (msg instanceof TcpDiscoveryCustomEventMessage) { try { - DiscoverySpiCustomMessage spiCustomMsg = ((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), - U.resolveClassLoader(ignite().configuration())); + TcpDiscoveryCustomEventMessage evtMsg = (TcpDiscoveryCustomEventMessage)msg; - DiscoveryCustomMessage discoCustomMsg = ((CustomMessageWrapper)spiCustomMsg).delegate(); + evtMsg.finishUnmarhal(marshaller(), U.gridClassLoader()); + + DiscoveryCustomMessage discoCustomMsg = ((CustomMessageWrapper)evtMsg.message()).delegate(); if (discoCustomMsg instanceof SchemaFinishDiscoveryMessage) { SchemaFinishDiscoveryMessage finishMsg = (SchemaFinishDiscoveryMessage)discoCustomMsg; diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java index f72eac9999460..0e8ea6c593417 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java @@ -58,7 +58,9 @@ private synchronized void apply(ClusterNode addr, TcpDiscoveryAbstractMessage ms DiscoveryCustomMessage delegate; try { - DiscoverySpiCustomMessage custMsg = cm.message(marshaller(), U.resolveClassLoader(ignite().configuration())); + cm.finishUnmarhal(marshaller(), U.gridClassLoader()); + + DiscoverySpiCustomMessage custMsg = cm.message(); assertNotNull(custMsg); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 999c7c747c35e..6ba101227e470 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -2607,8 +2607,11 @@ private static class TestCustomerEventAckSpi extends TcpDiscoverySpi { if (stopBeforeSndAck) { if (msg instanceof TcpDiscoveryCustomEventMessage) { try { - DiscoveryCustomMessage custMsg = GridTestUtils.getFieldValue( - ((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), U.gridClassLoader()), "delegate"); + TcpDiscoveryCustomEventMessage evtMsg = (TcpDiscoveryCustomEventMessage)msg; + + evtMsg.finishUnmarhal(marshaller(), U.gridClassLoader()); + + DiscoveryCustomMessage custMsg = GridTestUtils.getFieldValue(evtMsg, "delegate"); if (custMsg instanceof StartRoutineAckDiscoveryMessage) { log.info("Skip message send and stop node: " + msg);