Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryConnectionCheckMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryCustomEventMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryDiscardMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryDuplicateIdMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer;
Expand All @@ -44,6 +45,7 @@
import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryServerOnlyCustomEventMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryStatusCheckMessageSerializer;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
Expand All @@ -60,6 +62,7 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDiscardMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest;
Expand All @@ -74,6 +77,7 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryServerOnlyCustomEventMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;

/** Message factory for discovery messages. */
Expand Down Expand Up @@ -110,5 +114,8 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
factory.register((short)17, TcpDiscoveryNodeFailedMessage::new, new TcpDiscoveryNodeFailedMessageSerializer());
factory.register((short)18, TcpDiscoveryStatusCheckMessage::new, new TcpDiscoveryStatusCheckMessageSerializer());
factory.register((short)19, TcpDiscoveryNodeAddFinishedMessage::new, new TcpDiscoveryNodeAddFinishedMessageSerializer());
factory.register((short)20, TcpDiscoveryCustomEventMessage::new, new TcpDiscoveryCustomEventMessageSerializer());
factory.register((short)21, TcpDiscoveryServerOnlyCustomEventMessage::new,
new TcpDiscoveryServerOnlyCustomEventMessageSerializer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -505,11 +505,9 @@ else if (state == DISCONNECTED) {
TcpDiscoveryCustomEventMessage msg;

if (((CustomMessageWrapper)evt).delegate() instanceof DiscoveryServerOnlyCustomMessage)
msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt,
U.marshal(spi.marshaller(), evt));
msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt);
else
msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
U.marshal(spi.marshaller(), evt));
msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt);

Span rootSpan = tracing.create(TraceableMessagesTable.traceName(msg.getClass()))
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> getLocalNodeId().toString())
Expand All @@ -521,6 +519,8 @@ else if (state == DISCONNECTED) {
// This root span will be parent both from local and remote nodes.
msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));

msg.prepareMarshal(spi.marshaller());

sockWriter.sendMessage(msg);

rootSpan.addLog(() -> "Sent").end();
Expand Down Expand Up @@ -2597,8 +2597,9 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {

if (node != null && node.visible()) {
try {
DiscoverySpiCustomMessage msgObj = msg.message(spi.marshaller(),
U.resolveClassLoader(spi.ignite().configuration()));
msg.finishUnmarhal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()));

DiscoverySpiCustomMessage msgObj = msg.message();

notifyDiscovery(
EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj, msg.spanContainer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1022,11 +1022,9 @@ private void interruptPing(TcpDiscoveryNode node) {
TcpDiscoveryCustomEventMessage msg;

if (((CustomMessageWrapper)evt).delegate() instanceof DiscoveryServerOnlyCustomMessage)
msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt,
U.marshal(spi.marshaller(), evt));
msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt);
else
msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
U.marshal(spi.marshaller(), evt));
msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt);

Span rootSpan = tracing.create(TraceableMessagesTable.traceName(msg.getClass()))
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> getLocalNodeId().toString())
Expand All @@ -1038,6 +1036,8 @@ private void interruptPing(TcpDiscoveryNode node) {
// This root span will be parent both from local and remote nodes.
msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));

msg.prepareMarshal(spi.marshaller());

msgWorker.addMessage(msg);

rootSpan.addLog(() -> "Sent").end();
Expand Down Expand Up @@ -6041,15 +6041,17 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean wa
}
}

msg.message(null, msg.messageBytes());
msg.clearMessage();
}
else {
addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true));

DiscoverySpiCustomMessage msgObj = null;

try {
msgObj = msg.message(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()));
msg.finishUnmarhal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()));

msgObj = msg.message();
}
catch (Throwable e) {
U.error(log, "Failed to unmarshal discovery custom message.", e);
Expand All @@ -6061,10 +6063,12 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean wa
if (nextMsg != null) {
try {
TcpDiscoveryCustomEventMessage ackMsg = new TcpDiscoveryCustomEventMessage(
getLocalNodeId(), nextMsg, U.marshal(spi.marshaller(), nextMsg));
getLocalNodeId(), nextMsg);

ackMsg.topologyVersion(msg.topologyVersion());

ackMsg.prepareMarshal(spi.marshaller());

processCustomMessage(ackMsg, waitForNotification);
}
catch (IgniteCheckedException e) {
Expand Down Expand Up @@ -6095,8 +6099,7 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean wa
notifyDiscoveryListener(msg, waitForNotification);
}

// Clear msg field to prevent possible memory leak.
msg.message(null, msg.messageBytes());
msg.clearMessage();

if (sendMessageToRemotes(msg))
sendMessageAcrossRing(msg);
Expand Down Expand Up @@ -6237,7 +6240,9 @@ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg, boolean
DiscoverySpiCustomMessage msgObj;

try {
msgObj = msg.message(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()));
msg.finishUnmarhal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()));

msgObj = msg.message();
}
catch (Throwable t) {
throw new IgniteException("Failed to unmarshal discovery custom message: " + msg, t);
Expand Down Expand Up @@ -6269,7 +6274,7 @@ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg, boolean

if (msgObj.isMutable()) {
try {
msg.message(msgObj, U.marshal(spi.marshaller(), msgObj));
msg.prepareMarshal(spi.marshaller());
}
catch (Throwable t) {
throw new IgniteException("Failed to marshal mutable discovery message: " + msgObj, t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,41 +20,48 @@
import java.util.Objects;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.IncompleteDeserializationException;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/**
* Wrapped for custom message.
* Wrapper for custom message.
*/
@TcpDiscoveryRedirectToClient
@TcpDiscoveryEnsureDelivery
public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractTraceableMessage {
public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractTraceableMessage implements Message {
/** */
private static final long serialVersionUID = 0L;

/** */
private transient volatile DiscoverySpiCustomMessage msg;
private volatile DiscoverySpiCustomMessage msg;

/** */
private byte[] msgBytes;
/** Serialized message bytes. */
//TODO: Should be removed in https://issues.apache.org/jira/browse/IGNITE-27627
@Order(value = 6, method = "messageBytes")
private volatile @Nullable byte[] msgBytes;

/**
* Default constructor.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say smth. like 'for {@link DiscoveryMessageFactory}' as usual.

*/
public TcpDiscoveryCustomEventMessage() {
//No-op.
}

/**
* @param creatorNodeId Creator node id.
* @param msg Message.
* @param msgBytes Serialized message.
*/
public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, @Nullable DiscoverySpiCustomMessage msg,
@NotNull byte[] msgBytes) {
public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, DiscoverySpiCustomMessage msg) {
super(creatorNodeId);

this.msg = msg;
this.msgBytes = msgBytes;
}

/**
Expand All @@ -76,44 +83,63 @@ public void clearMessage() {
}

/**
* @return Serialized message.
* @return Serialized message bytes.
*/
public byte[] messageBytes() {
return msgBytes;
}

/**
* @param msg Message.
* @param msgBytes Serialized message.
* @param msgBytes Serialized message bytes.
*/
public void message(@Nullable DiscoverySpiCustomMessage msg, @NotNull byte[] msgBytes) {
this.msg = msg;
public void messageBytes(@Nullable byte[] msgBytes) {
this.msgBytes = msgBytes;
}

/**
* @return Original message.
*/
public DiscoverySpiCustomMessage message() {
return msg;
}

/**
* Prepare message for serialization.
*
* @param marsh Marshaller.
* @param ldr Classloader.
* @return Deserialized message,
* @throws java.lang.Throwable if unmarshal failed.
*/
@Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, ClassLoader ldr) throws Throwable {
//TODO: Should be removed in https://issues.apache.org/jira/browse/IGNITE-27627
public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
assert msgBytes == null || msg.isMutable() : "Message bytes are not null for immutable message: msg =" + msg;

msgBytes = U.marshal(marsh, msg);
}

/**
* Finish deserialization.
*
* @param marsh Marshaller.
* @param ldr Class loader.
*/
//TODO: Should be removed in https://issues.apache.org/jira/browse/IGNITE-27627
public void finishUnmarhal(Marshaller marsh, ClassLoader ldr) throws Throwable {
if (msg == null) {
try {
msg = U.unmarshal(marsh, msgBytes, ldr);
}
catch (IgniteCheckedException e) {
// Try to resurrect a message in a case of deserialization failure
if (e.getCause() instanceof IncompleteDeserializationException)
return new CustomMessageWrapper(((IncompleteDeserializationException)e.getCause()).message());
if (e.getCause() instanceof IncompleteDeserializationException) {
msg = new CustomMessageWrapper(((IncompleteDeserializationException)e.getCause()).message());

return;
}

throw e;
}

assert msg != null;
}

return msg;
assert msg != null;
}

/** {@inheritDoc} */
Expand All @@ -127,4 +153,9 @@ public void message(@Nullable DiscoverySpiCustomMessage msg, @NotNull byte[] msg
@Override public String toString() {
return S.toString(TcpDiscoveryCustomEventMessage.class, this, "super", super.toString());
}

/** {@inheritDoc} */
@Override public short directType() {
return 20;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,23 @@ public class TcpDiscoveryServerOnlyCustomEventMessage extends TcpDiscoveryCustom
/** */
private static final long serialVersionUID = 0L;

/**
* Default constructor.
*/
public TcpDiscoveryServerOnlyCustomEventMessage() {
// No-op.
}

/**
* @param creatorNodeId Creator node id.
* @param msg Message.
* @param msgBytes Serialized message.
*/
public TcpDiscoveryServerOnlyCustomEventMessage(UUID creatorNodeId, @NotNull DiscoverySpiCustomMessage msg,
@NotNull byte[] msgBytes) {
super(creatorNodeId, msg, msgBytes);
public TcpDiscoveryServerOnlyCustomEventMessage(UUID creatorNodeId, @NotNull DiscoverySpiCustomMessage msg) {
super(creatorNodeId, msg);
}

/** {@inheritDoc} */
@Override public short directType() {
return 21;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ static class CustomMessageInterceptingDiscoverySpi extends TcpDiscoverySpi {
DiscoveryCustomMessage delegate;

try {
DiscoverySpiCustomMessage custMsg = cm.message(marshaller(),
U.resolveClassLoader(ignite().configuration()));
cm.finishUnmarhal(marshaller(), U.resolveClassLoader(ignite().configuration()));

DiscoverySpiCustomMessage custMsg = cm.message();

assertNotNull(custMsg);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,11 @@ private void doTestMarshallingBinaryMappingsLoadedFromClient(boolean receiveMeta
@Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
if (msg instanceof TcpDiscoveryCustomEventMessage) {
try {
DiscoverySpiCustomMessage custom =
((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), U.gridClassLoader());
TcpDiscoveryCustomEventMessage evtMsg = (TcpDiscoveryCustomEventMessage)msg;

evtMsg.finishUnmarhal(marshaller(), U.gridClassLoader());

DiscoverySpiCustomMessage custom = evtMsg.message();

if (custom instanceof CustomMessageWrapper) {
DiscoveryCustomMessage delegate = ((CustomMessageWrapper)custom).delegate();
Expand Down Expand Up @@ -240,8 +243,11 @@ public void testBinaryMetaDelayedForComputeJobResult() throws Exception {
@Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
if (msg instanceof TcpDiscoveryCustomEventMessage) {
try {
DiscoverySpiCustomMessage custom =
((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), U.gridClassLoader());
TcpDiscoveryCustomEventMessage evtMsg = (TcpDiscoveryCustomEventMessage)msg;

evtMsg.finishUnmarhal(marshaller(), U.gridClassLoader());

DiscoverySpiCustomMessage custom = evtMsg.message();

if (custom instanceof CustomMessageWrapper) {
DiscoveryCustomMessage delegate = ((CustomMessageWrapper)custom).delegate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,9 @@ private DiscoveryCustomMessage extractCustomMessage(TcpDiscoveryCustomEventMessa
DiscoverySpiCustomMessage msgObj = null;

try {
msgObj = msg.message(marshaller(), U.resolveClassLoader(ignite().configuration()));
msg.finishUnmarhal(marshaller(), U.gridClassLoader());

msgObj = msg.message();
}
catch (Throwable e) {
U.error(log, "Failed to unmarshal discovery custom message.", e);
Expand Down
Loading
Loading