diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java index 06e6d2653bb50..03305c563acfc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java @@ -63,6 +63,9 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { /** Deployment cache by class name. */ private final ConcurrentMap> cache = new ConcurrentHashMap<>(); + /** Deployment cache by classloader. */ + private final ConcurrentMap> cacheByLdr = new ConcurrentHashMap<>(); + /** Mutex. */ private final Object mux = new Object(); @@ -296,23 +299,49 @@ private GridDeployment deploy( try { Deque cachedDeps = null; - // Find existing class loader info. - for (Deque deps : cache.values()) { - for (GridDeployment d : deps) { - if (d.classLoader() == ldr) { - // Cache class and alias. - fireEvt = d.addDeployedClass(cls, alias); + Deque depsByLdr = cacheByLdr.get(ldr); - cachedDeps = deps; + if (depsByLdr != null) { + GridDeployment candidate = null; - dep = d; + for (GridDeployment d : depsByLdr) { + if (!d.undeployed() && d.classLoader() == ldr) { + candidate = d; break; } } - if (cachedDeps != null) - break; + if (candidate != null) { + fireEvt = candidate.addDeployedClass(cls, alias); + + cachedDeps = depsByLdr; + + dep = candidate; + } + } + else { + // Find existing class loader info. + for (Deque deps : cache.values()) { + for (GridDeployment d : deps) { + if (d.classLoader() == ldr) { + // Cache class and alias. + fireEvt = d.addDeployedClass(cls, alias); + + cachedDeps = deps; + + dep = d; + + break; + } + } + + if (cachedDeps != null) { + cacheByLdr.put(ldr, cachedDeps); + + break; + } + } } if (cachedDeps != null) { @@ -353,6 +382,8 @@ private GridDeployment deploy( cache.put(cls.getName(), deps); } + cacheByLdr.put(ldr, deps); + if (log.isDebugEnabled()) log.debug("Created new deployment: " + dep); } @@ -567,6 +598,8 @@ private void undeploy(ClassLoader ldr) { if (deps.isEmpty()) i1.remove(); } + + cacheByLdr.remove(ldr); } for (GridDeployment dep : doomed) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java index 3069f201be9e8..aeeecd79ad533 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java @@ -391,6 +391,8 @@ private GridDeployment checkDeployment(GridDeployment deployment, String store) String clsName = lambdaEnclosingClsName == null ? rsrcName : lambdaEnclosingClsName; + ClassLoader ldr = Thread.currentThread().getContextClassLoader(); + GridDeploymentMetadata meta = new GridDeploymentMetadata(); meta.record(true); @@ -398,6 +400,7 @@ private GridDeployment checkDeployment(GridDeployment deployment, String store) meta.alias(rsrcName); meta.className(clsName); meta.senderNodeId(ctx.localNodeId()); + meta.classLoader(ldr); return locStore.getDeployment(meta); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStoreReuseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStoreReuseTest.java new file mode 100644 index 0000000000000..7bb34d80b27cd --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStoreReuseTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.deployment; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.ClientConnectorConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.ThinClientConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.client.thin.AbstractThinClientTest; +import org.apache.ignite.internal.client.thin.TestTask; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.junit.Test; + +import static org.apache.ignite.testframework.GridTestUtils.runAsync; +import static org.apache.ignite.testframework.GridTestUtils.waitForAllFutures; + +/** */ +public class GridDeploymentLocalStoreReuseTest extends AbstractThinClientTest { + /** */ + private static final int NODE_CNT = 3; + + /** */ + private static final int CLIENT_CNT = 3; + + /** */ + protected static final int EXEC_CNT = 10; + + /** */ + private List logs; + + /** */ + private List clients; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + DeploymentListeningLogger testLog = new DeploymentListeningLogger(log); + logs.add(testLog); + + return super.getConfiguration(igniteInstanceName) + .setClientConnectorConfiguration( + new ClientConnectorConfiguration().setThinClientConfiguration( + new ThinClientConfiguration().setMaxActiveComputeTasksPerConnection(1000))) + .setGridLogger(testLog) + .setPeerClassLoadingEnabled(true); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + logs = new ArrayList<>(NODE_CNT); + + clients = new ArrayList<>(CLIENT_CNT); + + setLoggerDebugLevel(); + + startGrids(NODE_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + clients.clear(); + + super.afterTest(); + } + + /** + * Verifies that multiple task executions do not cause excessive local deployment cache misses. The "deployment not + * found ... clsLdrId=null" message is allowed only once per thin client (initial task execution). + */ + @Test + public void testNoExcessiveLocalDeployment() { + try { + ClusterNode[] allServerNodes = grid(0).cluster().forServers().nodes().toArray(new ClusterNode[0]); + + for (int i = 0; i < CLIENT_CNT; i++) + clients.add(startClient(allServerNodes)); + + List> futs = new ArrayList<>(CLIENT_CNT); + + for (IgniteClient client : clients) + futs.add(runAsync(() -> executeTasksOnClient(client, EXEC_CNT, 5_000L))); + + waitForAllFutures(futs.toArray(new IgniteInternalFuture[0])); + + List allNotFound = new ArrayList<>(); + + for (DeploymentListeningLogger log : logs) + allNotFound.addAll(log.depNotFound()); + + String taskClsName = TestTask.class.getName(); + + String notFoundMsg = String.format( + "Deployment was not found for class with specific class loader [alias=%s, clsLdrId=null]", taskClsName); + + assertEquals(CLIENT_CNT, Collections.frequency(allNotFound, notFoundMsg)); + } + finally { + clients.forEach(IgniteClient::close); + } + } + + /** */ + private static void executeTasksOnClient(IgniteClient client, int cnt, long timeout) { + for (int i = 0; i < cnt; i++) { + CompletableFuture>> fut = client.compute() + .withTimeout(timeout). + >, T2>>executeAsync2(TestTask.class.getName(), null) + .toCompletableFuture(); + + try { + fut.get(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** */ + private static class DeploymentListeningLogger extends ListeningTestLogger { + /** */ + private final ConcurrentLinkedQueue depNotFound = new ConcurrentLinkedQueue<>(); + + /** */ + public DeploymentListeningLogger(IgniteLogger log) { + super(log); + } + + /** {@inheritDoc} */ + @Override public void debug(String msg) { + if (msg.contains("Deployment was not found for class with specific class loader")) + depNotFound.add(msg); + + super.debug(msg); + } + + /** {@inheritDoc} */ + @Override public ListeningTestLogger getLogger(Object ctgr) { + return this; + } + + /** */ + public List depNotFound() { + return depNotFound.stream().collect(Collectors.toUnmodifiableList()); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java index 1bf0b31d861a3..e239324b3bfb2 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import org.apache.ignite.internal.GridP2PAffinitySelfTest; +import org.apache.ignite.internal.managers.deployment.GridDeploymentLocalStoreReuseTest; import org.apache.ignite.internal.managers.deployment.GridDeploymentMessageCountSelfTest; import org.apache.ignite.internal.managers.deployment.GridDifferentLocalDeploymentSelfTest; import org.apache.ignite.internal.managers.deployment.P2PCacheOperationIntoComputeTest; @@ -82,7 +83,8 @@ GridDifferentLocalDeploymentSelfTest.class, P2PUnsupportedClassVersionTest.class, P2PClassLoadingFailureHandlingTest.class, - P2PClassLoadingIssuesTest.class + P2PClassLoadingIssuesTest.class, + GridDeploymentLocalStoreReuseTest.class }) public class IgniteP2PSelfTestSuite { }