IGNITE-27871 Improve deployment lookup to reduce deploy() contention …#12760
IGNITE-27871 Improve deployment lookup to reduce deploy() contention …#12760oleg-vlsk wants to merge 1 commit intoapache:masterfrom
Conversation
…for locally available tasks with peerClassLoadingEnabled=true
| P2PClassLoadingFailureHandlingTest.class, | ||
| P2PClassLoadingIssuesTest.class | ||
| P2PClassLoadingIssuesTest.class, | ||
| GridDeploymentLocalStoreReuseTest.class |
There was a problem hiding this comment.
Add comma to the end of line please (to reduce conflicts on merge)
| CompletableFuture<T2<UUID, Set<UUID>>> fut = client.compute() | ||
| .withTimeout(timeout). | ||
| <T2<UUID, Set<UUID>>, T2<UUID, Set<UUID>>>executeAsync2(TestTask.class.getName(), null) | ||
| .toCompletableFuture(); | ||
|
|
||
| try { | ||
| fut.get(); |
There was a problem hiding this comment.
client.compute().execute(TestTask.class.getName(), null);
| List<IgniteInternalFuture<Void>> futs = new ArrayList<>(CLIENT_CNT); | ||
|
|
||
| for (IgniteClient client : clients) | ||
| futs.add(runAsync(() -> executeTasksOnClient(client, EXEC_CNT, 5_000L))); | ||
|
|
||
| waitForAllFutures(futs.toArray(new IgniteInternalFuture[0])); |
There was a problem hiding this comment.
runMultiThreaded(i -> executeTasksOnClient(clients.get(i), EXEC_CNT), CLIENT_CNT, "worker");
| ClusterNode[] allServerNodes = grid(0).cluster().forServers().nodes().toArray(new ClusterNode[0]); | ||
|
|
||
| for (int i = 0; i < CLIENT_CNT; i++) | ||
| clients.add(startClient(allServerNodes)); |
There was a problem hiding this comment.
You can connect to any server node,it's not necessary to provide all nodes, one is enough, i.e. clients.add(startClient(0));
| /** */ | ||
| private static class DeploymentListeningLogger extends ListeningTestLogger { | ||
| /** */ | ||
| private final ConcurrentLinkedQueue<String> depNotFound = new ConcurrentLinkedQueue<>(); | ||
|
|
||
| /** */ | ||
| public DeploymentListeningLogger(IgniteLogger log) { | ||
| super(log); | ||
| } | ||
|
|
||
| /** {@inheritDoc} */ | ||
| @Override public void debug(String msg) { | ||
| if (msg.contains("Deployment was not found for class with specific class loader")) | ||
| depNotFound.add(msg); | ||
|
|
||
| super.debug(msg); | ||
| } | ||
|
|
||
| /** {@inheritDoc} */ | ||
| @Override public ListeningTestLogger getLogger(Object ctgr) { | ||
| return this; | ||
| } | ||
|
|
||
| /** */ | ||
| public List<String> depNotFound() { | ||
| return depNotFound.stream().collect(Collectors.toUnmodifiableList()); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
It's incorrect usage of listening logger, all you need is register listener like:
LogListener lsnr = LogListener.matches(notFoundMsg).times(CLIENT_CNT).build();
listeningTestLog.registerListener(lsnr);
listeningTestLog should be created on top of standard logger, for example:
setLoggerDebugLevel();
listeningTestLog = new ListeningTestLogger(log);
And passed to ignite configuration. No need for logger for each node.
| meta.alias(rsrcName); | ||
| meta.className(clsName); | ||
| meta.senderNodeId(ctx.localNodeId()); | ||
| meta.classLoader(ldr); |
There was a problem hiding this comment.
Setting classloader disables deployment SPI as far as I understand. See https://github.com/apache/ignite/blob/master/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java#L174
| private final ConcurrentMap<String, Deque<GridDeployment>> cache = new ConcurrentHashMap<>(); | ||
|
|
||
| /** Deployment cache by classloader. */ | ||
| private final ConcurrentMap<ClassLoader, Deque<GridDeployment>> cacheByLdr = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
cacheByLdr always used under the lock mux, no ConcurrentMap overhead required here.
Also maybe it worth to use IdentityHashMap in case someone redefine classloader's equals() in a wrong way.
|
|
||
| dep = d; | ||
| for (GridDeployment d : depsByLdr) { | ||
| if (!d.undeployed() && d.classLoader() == ldr) { |
There was a problem hiding this comment.
If it's undeployed, it's cleaned from cache, how we can find it?
Why do we need to check classloader if we put in cache only items with exactly this classloader?
| dep = candidate; | ||
| } | ||
| } | ||
| else { |
There was a problem hiding this comment.
Do we still need this check? If deployment not found by classloader in classloader cache it can't be found in aliases cache. We preserve both caches synchronized and modify it only under the lock.
| if (d.classLoader() == ldr) { | ||
| // Cache class and alias. | ||
| fireEvt = d.addDeployedClass(cls, alias); | ||
| Deque<GridDeployment> depsByLdr = cacheByLdr.get(ldr); |
There was a problem hiding this comment.
Looks like it's one-to-one relation for deployment and classloader. Did I miss something?
…for locally available tasks with peerClassLoadingEnabled=true
Thank you for submitting the pull request to the Apache Ignite.
In order to streamline the review of the contribution
we ask you to ensure the following steps have been taken:
The Contribution Checklist
The description explains WHAT and WHY was made instead of HOW.
The following pattern must be used:
IGNITE-XXXX Change summarywhereXXXX- number of JIRA issue.(see the Maintainers list)
the
green visaattached to the JIRA ticket (see TC.Bot: Check PR)Notes
If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com #ignite channel.