[#622] Improved concurrency behavior of RequestManager

This commit is contained in:
Sebastian Zarnekow 2020-02-08 22:49:56 +01:00 committed by Sebastian Zarnekow
parent 1066397d18
commit 93b20b3794
9 changed files with 710 additions and 129 deletions

View file

@ -8,32 +8,51 @@
*******************************************************************************/
package org.eclipse.xtext.ide.tests.server.concurrent
import com.google.common.util.concurrent.Uninterruptibles
import com.google.inject.Guice
import com.google.inject.Inject
import com.google.inject.Provider
import java.util.concurrent.CancellationException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import org.apache.log4j.Level
import org.eclipse.xtext.ide.server.ServerModule
import org.eclipse.xtext.ide.server.concurrent.AbstractRequest
import org.eclipse.xtext.ide.server.concurrent.ReadRequest
import org.eclipse.xtext.ide.server.concurrent.RequestManager
import org.eclipse.xtext.ide.server.concurrent.WriteRequest
import org.eclipse.xtext.service.OperationCanceledManager
import org.eclipse.xtext.testing.RepeatedTest
import org.eclipse.xtext.testing.logging.LoggingTester
import org.junit.After
import org.junit.Assert
import org.junit.Before
import org.junit.Ignore
import org.junit.Rule
import org.junit.Test
import static org.junit.Assert.*
import org.junit.Assert
import org.eclipse.xtext.ide.server.concurrent.WriteRequest
import org.eclipse.xtext.ide.server.concurrent.ReadRequest
/**
* @author kosyakov - Initial contribution and API
*/
class RequestManagerTest {
@Rule public RepeatedTest.Rule rule = new RepeatedTest.Rule(false)
@Inject
RequestManager requestManager
@Inject
Provider<ExecutorService> executorServiceProvider
@Inject
Provider<OperationCanceledManager> cancelManagerProvider
AtomicInteger sharedState
@ -122,6 +141,71 @@ class RequestManagerTest {
Assert.fail
}
@Test
def void testWriteWaitsForReadToFinish_01() throws Exception {
val marked = new AtomicBoolean(false)
val countDownInRead = new CountDownLatch(1)
val countDownInWrite = new CountDownLatch(1)
val proceedWithWrite = new CountDownLatch(1)
val reader = requestManager.runRead [ cancelIndicator |
countDownInRead.countDown
Uninterruptibles.awaitUninterruptibly(countDownInWrite)
marked.set(true)
proceedWithWrite.countDown
return null
]
Uninterruptibles.awaitUninterruptibly(countDownInRead)
val writer = requestManager.runWrite([], [ cancelIndicator, ignored |
countDownInWrite.countDown
Uninterruptibles.awaitUninterruptibly(proceedWithWrite)
assertTrue(marked.get)
null
])
try {
Uninterruptibles.getUninterruptibly(writer, 100, TimeUnit.MILLISECONDS)
fail("Expected timeout")
} catch(TimeoutException e) {
assertFalse(marked.get()) // this should not be the case
}
countDownInWrite.countDown;
Uninterruptibles.getUninterruptibly(writer, 100, TimeUnit.MILLISECONDS)
assertTrue(reader.isDone)
assertFalse(reader.isCancelled)
}
@Test
def void testWriteWaitsForReadToFinish_02() throws Exception {
val marked = new AtomicBoolean(false)
val countDownInRead = new CountDownLatch(1)
val countDownInWrite = new CountDownLatch(1)
val proceedWithWrite = new CountDownLatch(1)
val reader = requestManager.runRead [ cancelIndicator |
countDownInRead.countDown
Uninterruptibles.awaitUninterruptibly(countDownInWrite)
marked.set(true)
proceedWithWrite.countDown
throw new CancellationException
]
Uninterruptibles.awaitUninterruptibly(countDownInRead)
val writer = requestManager.runWrite([], [ cancelIndicator, ignored |
countDownInWrite.countDown
Uninterruptibles.awaitUninterruptibly(proceedWithWrite)
assertTrue(marked.get)
null
])
try {
Uninterruptibles.getUninterruptibly(writer, 100, TimeUnit.MILLISECONDS)
fail("Expected timeout")
} catch(TimeoutException e) {
assertFalse(marked.get()) // this should not be the case
}
countDownInWrite.countDown;
Uninterruptibles.getUninterruptibly(writer, 100, TimeUnit.MILLISECONDS)
assertTrue(reader.isDone)
assertTrue(reader.isCancelled)
}
@Test(timeout = 1000)
def void testRunRead() {
@ -177,19 +261,47 @@ class RequestManagerTest {
assertEquals(2, sharedState.get)
}
//FIXME https://github.com/eclipse/xtext-core/issues/622
@Test(timeout = 1000)
@Ignore("https://github.com/eclipse/xtext-core/issues/622")
def void testRunWriteAfterRead() {
@RepeatedTest(times=50)
def void testRunWriteAfterReadStarted() {
val readStarted = new CountDownLatch(1)
requestManager.runRead [
readStarted.countDown
sharedState.incrementAndGet
]
Uninterruptibles.awaitUninterruptibly(readStarted)
requestManager.runWrite([], [
assertEquals (1, sharedState.get)
sharedState.incrementAndGet
]).join
assertEquals(2, sharedState.get)
}
@Test(timeout = 1000)
@RepeatedTest(times=50)
def void testRunWriteBeforeReadStarted() {
val writeSubmitted = new CountDownLatch(1)
val firstWriteDone = new AtomicBoolean
requestManager.runWrite([
Uninterruptibles.awaitUninterruptibly(writeSubmitted)
firstWriteDone.set(true)
null
], [
sharedState.incrementAndGet
])
requestManager.runRead [
sharedState.incrementAndGet
]
val joinMe = requestManager.runWrite([], [
assertEquals (0, sharedState.get)
assertTrue(firstWriteDone.get)
sharedState.incrementAndGet
])
writeSubmitted.countDown
joinMe.join
assertTrue(firstWriteDone.get)
assertEquals(1, sharedState.get)
}
@Test(timeout = 1000)
def void testCancelRead() {
@ -213,4 +325,74 @@ class RequestManagerTest {
Thread.sleep(10)
}
}
/*
* The tests assumes an implementation of a Command that has access to the request manager
*
*/
@Test(timeout = 5000)
@RepeatedTest(times=50)
def void testReadCommandSubmitsWriteCommand() {
val mainThread = Thread.currentThread
val submittedFromMain = new CountDownLatch(1)
val addedFromReader = new CountDownLatch(1)
val AtomicReference<Thread> readerThreadRef = new AtomicReference
val myRequestManager = new RequestManager(executorServiceProvider.get, cancelManagerProvider.get) {
override protected addRequest(AbstractRequest<?> request) {
if (request instanceof WriteRequest && Thread.currentThread == readerThreadRef.get) {
super.addRequest(request)
addedFromReader.countDown
Uninterruptibles.awaitUninterruptibly(submittedFromMain, 100, TimeUnit.MILLISECONDS)
} else {
super.addRequest(request)
}
}
override protected submitRequest(AbstractRequest<?> request) {
if (request instanceof WriteRequest && Thread.currentThread == mainThread) {
super.submitRequest(request)
submittedFromMain.countDown
} else {
super.submitRequest(request)
}
}
override protected cancel() {
if (Thread.currentThread == mainThread) {
Uninterruptibles.awaitUninterruptibly(addedFromReader, 100, TimeUnit.MILLISECONDS)
}
super.cancel()
}
}
val threadSet = new CountDownLatch(1)
val readResult = myRequestManager.runRead [
readerThreadRef.set(Thread.currentThread)
threadSet.countDown
return myRequestManager.runWrite([], [])
]
Uninterruptibles.awaitUninterruptibly(threadSet)
assertNotNull(readerThreadRef.get)
val writeResult = myRequestManager.runWrite([], [])
val writeFromReader = readResult.get
try {
writeFromReader.get
try {
writeResult.get
} catch(CancellationException ce) {
// one of both will be cancelled
assertTrue(writeFromReader.isDone)
assertTrue(writeResult.isDone)
assertTrue(writeFromReader.isCancelled != writeResult.isCancelled)
}
} catch(CancellationException ce) {
writeResult.get
// one of both will be cancelled
assertTrue(writeFromReader.isDone)
assertTrue(writeResult.isDone)
assertTrue(writeFromReader.isCancelled != writeResult.isCancelled)
}
}
}

View file

@ -8,17 +8,29 @@
*/
package org.eclipse.xtext.ide.tests.server.concurrent;
import com.google.common.base.Objects;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Provider;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Level;
import org.eclipse.xtext.ide.server.ServerModule;
import org.eclipse.xtext.ide.server.concurrent.AbstractRequest;
import org.eclipse.xtext.ide.server.concurrent.ReadRequest;
import org.eclipse.xtext.ide.server.concurrent.RequestManager;
import org.eclipse.xtext.ide.server.concurrent.WriteRequest;
import org.eclipse.xtext.service.OperationCanceledManager;
import org.eclipse.xtext.testing.RepeatedTest;
import org.eclipse.xtext.testing.logging.LoggingTester;
import org.eclipse.xtext.util.CancelIndicator;
import org.eclipse.xtext.xbase.lib.Exceptions;
@ -28,7 +40,7 @@ import org.eclipse.xtext.xbase.lib.Functions.Function2;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
/**
@ -36,9 +48,18 @@ import org.junit.Test;
*/
@SuppressWarnings("all")
public class RequestManagerTest {
@Rule
public RepeatedTest.Rule rule = new RepeatedTest.Rule(false);
@Inject
private RequestManager requestManager;
@Inject
private Provider<ExecutorService> executorServiceProvider;
@Inject
private Provider<OperationCanceledManager> cancelManagerProvider;
private AtomicInteger sharedState;
@Before
@ -158,6 +179,96 @@ public class RequestManagerTest {
Assert.fail();
}
@Test
public void testWriteWaitsForReadToFinish_01() throws Exception {
final AtomicBoolean marked = new AtomicBoolean(false);
final CountDownLatch countDownInRead = new CountDownLatch(1);
final CountDownLatch countDownInWrite = new CountDownLatch(1);
final CountDownLatch proceedWithWrite = new CountDownLatch(1);
final Function1<CancelIndicator, Object> _function = (CancelIndicator cancelIndicator) -> {
countDownInRead.countDown();
Uninterruptibles.awaitUninterruptibly(countDownInWrite);
marked.set(true);
proceedWithWrite.countDown();
return null;
};
final CompletableFuture<Object> reader = this.requestManager.<Object>runRead(_function);
Uninterruptibles.awaitUninterruptibly(countDownInRead);
final Function0<Object> _function_1 = () -> {
return null;
};
final Function2<CancelIndicator, Object, Object> _function_2 = (CancelIndicator cancelIndicator, Object ignored) -> {
Object _xblockexpression = null;
{
countDownInWrite.countDown();
Uninterruptibles.awaitUninterruptibly(proceedWithWrite);
Assert.assertTrue(marked.get());
_xblockexpression = null;
}
return _xblockexpression;
};
final CompletableFuture<Object> writer = this.requestManager.<Object, Object>runWrite(_function_1, _function_2);
try {
Uninterruptibles.<Object>getUninterruptibly(writer, 100, TimeUnit.MILLISECONDS);
Assert.fail("Expected timeout");
} catch (final Throwable _t) {
if (_t instanceof TimeoutException) {
Assert.assertFalse(marked.get());
} else {
throw Exceptions.sneakyThrow(_t);
}
}
countDownInWrite.countDown();
Uninterruptibles.<Object>getUninterruptibly(writer, 100, TimeUnit.MILLISECONDS);
Assert.assertTrue(reader.isDone());
Assert.assertFalse(reader.isCancelled());
}
@Test
public void testWriteWaitsForReadToFinish_02() throws Exception {
final AtomicBoolean marked = new AtomicBoolean(false);
final CountDownLatch countDownInRead = new CountDownLatch(1);
final CountDownLatch countDownInWrite = new CountDownLatch(1);
final CountDownLatch proceedWithWrite = new CountDownLatch(1);
final Function1<CancelIndicator, Object> _function = (CancelIndicator cancelIndicator) -> {
countDownInRead.countDown();
Uninterruptibles.awaitUninterruptibly(countDownInWrite);
marked.set(true);
proceedWithWrite.countDown();
throw new CancellationException();
};
final CompletableFuture<Object> reader = this.requestManager.<Object>runRead(_function);
Uninterruptibles.awaitUninterruptibly(countDownInRead);
final Function0<Object> _function_1 = () -> {
return null;
};
final Function2<CancelIndicator, Object, Object> _function_2 = (CancelIndicator cancelIndicator, Object ignored) -> {
Object _xblockexpression = null;
{
countDownInWrite.countDown();
Uninterruptibles.awaitUninterruptibly(proceedWithWrite);
Assert.assertTrue(marked.get());
_xblockexpression = null;
}
return _xblockexpression;
};
final CompletableFuture<Object> writer = this.requestManager.<Object, Object>runWrite(_function_1, _function_2);
try {
Uninterruptibles.<Object>getUninterruptibly(writer, 100, TimeUnit.MILLISECONDS);
Assert.fail("Expected timeout");
} catch (final Throwable _t) {
if (_t instanceof TimeoutException) {
Assert.assertFalse(marked.get());
} else {
throw Exceptions.sneakyThrow(_t);
}
}
countDownInWrite.countDown();
Uninterruptibles.<Object>getUninterruptibly(writer, 100, TimeUnit.MILLISECONDS);
Assert.assertTrue(reader.isDone());
Assert.assertTrue(reader.isCancelled());
}
@Test(timeout = 1000)
public void testRunRead() {
try {
@ -255,12 +366,19 @@ public class RequestManagerTest {
}
@Test(timeout = 1000)
@Ignore("https://github.com/eclipse/xtext-core/issues/622")
public void testRunWriteAfterRead() {
@RepeatedTest(times = 50)
public void testRunWriteAfterReadStarted() {
final CountDownLatch readStarted = new CountDownLatch(1);
final Function1<CancelIndicator, Integer> _function = (CancelIndicator it) -> {
return Integer.valueOf(this.sharedState.incrementAndGet());
int _xblockexpression = (int) 0;
{
readStarted.countDown();
_xblockexpression = this.sharedState.incrementAndGet();
}
return Integer.valueOf(_xblockexpression);
};
this.requestManager.<Integer>runRead(_function);
Uninterruptibles.awaitUninterruptibly(readStarted);
final Function0<Object> _function_1 = () -> {
return null;
};
@ -276,6 +394,47 @@ public class RequestManagerTest {
Assert.assertEquals(2, this.sharedState.get());
}
@Test(timeout = 1000)
@RepeatedTest(times = 50)
public void testRunWriteBeforeReadStarted() {
final CountDownLatch writeSubmitted = new CountDownLatch(1);
final AtomicBoolean firstWriteDone = new AtomicBoolean();
final Function0<Object> _function = () -> {
Object _xblockexpression = null;
{
Uninterruptibles.awaitUninterruptibly(writeSubmitted);
firstWriteDone.set(true);
_xblockexpression = null;
}
return _xblockexpression;
};
final Function2<CancelIndicator, Object, Integer> _function_1 = (CancelIndicator $0, Object $1) -> {
return Integer.valueOf(this.sharedState.incrementAndGet());
};
this.requestManager.<Object, Integer>runWrite(_function, _function_1);
final Function1<CancelIndicator, Integer> _function_2 = (CancelIndicator it) -> {
return Integer.valueOf(this.sharedState.incrementAndGet());
};
this.requestManager.<Integer>runRead(_function_2);
final Function0<Object> _function_3 = () -> {
return null;
};
final Function2<CancelIndicator, Object, Integer> _function_4 = (CancelIndicator $0, Object $1) -> {
int _xblockexpression = (int) 0;
{
Assert.assertEquals(0, this.sharedState.get());
Assert.assertTrue(firstWriteDone.get());
_xblockexpression = this.sharedState.incrementAndGet();
}
return Integer.valueOf(_xblockexpression);
};
final CompletableFuture<Integer> joinMe = this.requestManager.<Object, Integer>runWrite(_function_3, _function_4);
writeSubmitted.countDown();
joinMe.join();
Assert.assertTrue(firstWriteDone.get());
Assert.assertEquals(1, this.sharedState.get());
}
@Test(timeout = 1000)
public void testCancelRead() {
try {
@ -307,4 +466,110 @@ public class RequestManagerTest {
throw Exceptions.sneakyThrow(_e);
}
}
/**
* The tests assumes an implementation of a Command that has access to the request manager
*/
@Test(timeout = 5000)
@RepeatedTest(times = 50)
public void testReadCommandSubmitsWriteCommand() {
try {
final Thread mainThread = Thread.currentThread();
final CountDownLatch submittedFromMain = new CountDownLatch(1);
final CountDownLatch addedFromReader = new CountDownLatch(1);
final AtomicReference<Thread> readerThreadRef = new AtomicReference<Thread>();
ExecutorService _get = this.executorServiceProvider.get();
OperationCanceledManager _get_1 = this.cancelManagerProvider.get();
final RequestManager myRequestManager = new RequestManager(_get, _get_1) {
@Override
protected void addRequest(final AbstractRequest<?> request) {
if (((request instanceof WriteRequest) && Objects.equal(Thread.currentThread(), readerThreadRef.get()))) {
super.addRequest(request);
addedFromReader.countDown();
Uninterruptibles.awaitUninterruptibly(submittedFromMain, 100, TimeUnit.MILLISECONDS);
} else {
super.addRequest(request);
}
}
@Override
protected void submitRequest(final AbstractRequest<?> request) {
if (((request instanceof WriteRequest) && Objects.equal(Thread.currentThread(), mainThread))) {
super.submitRequest(request);
submittedFromMain.countDown();
} else {
super.submitRequest(request);
}
}
@Override
protected CompletableFuture<Void> cancel() {
CompletableFuture<Void> _xblockexpression = null;
{
Thread _currentThread = Thread.currentThread();
boolean _equals = Objects.equal(_currentThread, mainThread);
if (_equals) {
Uninterruptibles.awaitUninterruptibly(addedFromReader, 100, TimeUnit.MILLISECONDS);
}
_xblockexpression = super.cancel();
}
return _xblockexpression;
}
};
final CountDownLatch threadSet = new CountDownLatch(1);
final Function1<CancelIndicator, CompletableFuture<Object>> _function = (CancelIndicator it) -> {
readerThreadRef.set(Thread.currentThread());
threadSet.countDown();
final Function0<Object> _function_1 = () -> {
return null;
};
final Function2<CancelIndicator, Object, Object> _function_2 = (CancelIndicator $0, Object $1) -> {
return null;
};
return myRequestManager.<Object, Object>runWrite(_function_1, _function_2);
};
final CompletableFuture<CompletableFuture<Object>> readResult = myRequestManager.<CompletableFuture<Object>>runRead(_function);
Uninterruptibles.awaitUninterruptibly(threadSet);
Assert.assertNotNull(readerThreadRef.get());
final Function0<Object> _function_1 = () -> {
return null;
};
final Function2<CancelIndicator, Object, Object> _function_2 = (CancelIndicator $0, Object $1) -> {
return null;
};
final CompletableFuture<Object> writeResult = myRequestManager.<Object, Object>runWrite(_function_1, _function_2);
final CompletableFuture<Object> writeFromReader = readResult.get();
try {
writeFromReader.get();
try {
writeResult.get();
} catch (final Throwable _t) {
if (_t instanceof CancellationException) {
Assert.assertTrue(writeFromReader.isDone());
Assert.assertTrue(writeResult.isDone());
boolean _isCancelled = writeFromReader.isCancelled();
boolean _isCancelled_1 = writeResult.isCancelled();
boolean _notEquals = (_isCancelled != _isCancelled_1);
Assert.assertTrue(_notEquals);
} else {
throw Exceptions.sneakyThrow(_t);
}
}
} catch (final Throwable _t) {
if (_t instanceof CancellationException) {
writeResult.get();
Assert.assertTrue(writeFromReader.isDone());
Assert.assertTrue(writeResult.isDone());
boolean _isCancelled = writeFromReader.isCancelled();
boolean _isCancelled_1 = writeResult.isCancelled();
boolean _notEquals = (_isCancelled != _isCancelled_1);
Assert.assertTrue(_notEquals);
} else {
throw Exceptions.sneakyThrow(_t);
}
}
} catch (Throwable _e) {
throw Exceptions.sneakyThrow(_e);
}
}
}

View file

@ -10,6 +10,8 @@ package org.eclipse.xtext.ide.server.concurrent;
import java.util.concurrent.CompletableFuture;
import org.apache.log4j.Logger;
/**
* Abstract base type for read and write requests.
*
@ -17,10 +19,23 @@ import java.util.concurrent.CompletableFuture;
* @since 2.11
*/
public abstract class AbstractRequest<V> implements Runnable, Cancellable {
private class ResultFuture extends CompletableFuture<V> {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
AbstractRequest.this.cancel(mayInterruptIfRunning);
return isCancelled();
}
void doCancel(boolean mayInterruptIfRunning) {
super.cancel(mayInterruptIfRunning);
}
}
/**
* The underyling future.
* The underlying future.
*/
protected final CompletableFuture<V> result;
protected final ResultFuture result;
/**
* The current cancel indicator.
@ -34,19 +49,46 @@ public abstract class AbstractRequest<V> implements Runnable, Cancellable {
protected AbstractRequest(RequestManager requestManager) {
this.requestManager = requestManager;
this.result = new CompletableFuture<>();
this.cancelIndicator = new RequestCancelIndicator(this.result);
this.result = new ResultFuture();
this.cancelIndicator = new RequestCancelIndicator(this);
}
protected void cancelResult(boolean mayInterruptIfRunning) {
result.doCancel(mayInterruptIfRunning);
}
protected boolean isDone() {
return result.isDone();
}
protected void complete(V value) {
result.complete(value);
}
protected abstract Logger getLogger();
protected void logAndCompleteExceptionally(Throwable t) {
if (!requestManager.isCancelException(t)) {
getLogger().error("Error during request: ", t);
result.completeExceptionally(t);
} else {
cancelResult(true);
}
}
protected void cancel(boolean mayInterruptIfRunning) {
cancelIndicator.doCancel();
}
@Override
public void cancel() {
cancelIndicator.cancel();
public final void cancel() {
cancel(true);
}
/**
* Return the underlying future.
*/
public CompletableFuture<V> get() {
return result;
return this.result;
}
}

View file

@ -8,6 +8,7 @@
*******************************************************************************/
package org.eclipse.xtext.ide.server.concurrent;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.log4j.Logger;
@ -21,33 +22,58 @@ import org.eclipse.xtext.xbase.lib.Functions.Function1;
public class ReadRequest<V> extends AbstractRequest<V> {
private static final Logger LOG = Logger.getLogger(ReadRequest.class);
private final Function1<? super CancelIndicator, ? extends V> cancellable;
private final Function1<? super CancelIndicator, ? extends V> readOperation;
/**
* The initializer future allows to track the running state of this request, e.g. if it was already started or not.
*/
private final CompletableFuture<Void> initializer;
private final ExecutorService executor;
public ReadRequest(RequestManager requestManager, Function1<? super CancelIndicator, ? extends V> cancellable,
public ReadRequest(RequestManager requestManager, Function1<? super CancelIndicator, ? extends V> readOperation,
ExecutorService executor) {
super(requestManager);
this.cancellable = cancellable;
this.readOperation = readOperation;
this.executor = executor;
this.initializer = new CompletableFuture<>();
this.initializer.thenRun(this::doRun);
}
@Override
protected void cancel(boolean mayInterruptIfRunning) {
super.cancel(mayInterruptIfRunning);
if (initializer.cancel(mayInterruptIfRunning)) {
cancelResult(mayInterruptIfRunning);
}
}
@Override
public void run() {
if (result.isCancelled()) {
initializer.complete(null);
}
private void doRun() {
if (isDone()) {
return;
}
this.executor.submit(() -> {
try {
cancelIndicator.checkCanceled();
result.complete(cancellable.apply(cancelIndicator));
} catch (Throwable t) {
if (!requestManager.isCancelException(t)) {
LOG.error("Error during request: ", t);
if (isDone()) {
return;
}
result.completeExceptionally(t);
cancelIndicator.checkCanceled();
V readResult = readOperation.apply(cancelIndicator);
complete(readResult);
} catch (Throwable t) {
logAndCompleteExceptionally(t);
}
});
}
@Override
protected Logger getLogger() {
return LOG;
}
}

View file

@ -9,7 +9,6 @@
package org.eclipse.xtext.ide.server.concurrent;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import org.eclipse.lsp4j.jsonrpc.CancelChecker;
import org.eclipse.xtext.util.CancelIndicator;
@ -18,31 +17,31 @@ import org.eclipse.xtext.util.CancelIndicator;
* @author kosyakov - Initial contribution and API
* @since 2.11
*/
public class RequestCancelIndicator implements CancelIndicator, CancelChecker, Cancellable {
private final CompletableFuture<?> requestFuture;
class RequestCancelIndicator implements CancelIndicator, CancelChecker, Cancellable {
private volatile boolean cancelled = false;
private final AbstractRequest<?> request;
public RequestCancelIndicator(CompletableFuture<?> requestFuture) {
this.requestFuture = requestFuture;
RequestCancelIndicator(AbstractRequest<?> request) {
this.request = request;
}
@Override
public void cancel() {
this.requestFuture.cancel(true);
request.cancel();
}
protected void doCancel() {
this.cancelled = true;
}
/**
* Not really boolean guard but will throw a {@link CancellationException} instead of returning
* <code>true</code>. Otherwise returns <code>false</code>.
*/
@Override
public boolean isCanceled() {
this.checkCanceled();
return false;
return cancelled;
}
@Override
public void checkCanceled() {
if (this.requestFuture.isCancelled()) {
if (cancelled) {
throw new CancellationException();
}
}

View file

@ -32,12 +32,16 @@ import com.google.inject.Singleton;
@Singleton
public class RequestManager {
@Inject
private ExecutorService parallel;
private final ExecutorService parallel;
private final OperationCanceledManager operationCanceledManager;
@Inject
private OperationCanceledManager operationCanceledManager;
public RequestManager(ExecutorService parallel, OperationCanceledManager operationCanceledManager) {
this.parallel = parallel;
this.operationCanceledManager = operationCanceledManager;
}
private final ExecutorService queue = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("RequestManager-Queue-%d").build());
@ -51,18 +55,26 @@ public class RequestManager {
parallel.shutdown();
cancel();
}
protected final OperationCanceledManager getOperationCanceledManager() {
return operationCanceledManager;
}
protected final ExecutorService getParallelExecutorService() {
return parallel;
}
/**
* Run the given cancellable logic as a read request.
*/
public <V> CompletableFuture<V> runRead(Function1<? super CancelIndicator, ? extends V> cancellable) {
public synchronized <V> CompletableFuture<V> runRead(Function1<? super CancelIndicator, ? extends V> cancellable) {
return submit(new ReadRequest<>(this, cancellable, parallel));
}
/**
* Perform the given write and run the cancellable logic afterwards.
*/
public <U, V> CompletableFuture<V> runWrite(
public synchronized <U, V> CompletableFuture<V> runWrite(
Function0<? extends U> nonCancellable,
Function2<? super CancelIndicator, ? super U, ? extends V> cancellable) {
return submit(new WriteRequest<>(this, nonCancellable, cancellable, cancel()));
@ -72,11 +84,21 @@ public class RequestManager {
* Submit the given request.
*/
protected <V> CompletableFuture<V> submit(AbstractRequest<V> request) {
requests.add(request);
queue.submit(request);
addRequest(request);
submitRequest(request);
return request.get();
}
/* @ProtectedForTesting */
protected void submitRequest(AbstractRequest<?> request) {
queue.submit(request);
}
/* @ProtectedForTesting */
protected void addRequest(AbstractRequest<?> request) {
requests.add(request);
}
/**
* Cancel all requests in the queue.
*/

View file

@ -26,34 +26,39 @@ public class WriteRequest<U, V> extends AbstractRequest<V> {
private final Function2<? super CancelIndicator, ? super U, ? extends V> cancellable;
private final CompletableFuture<Void> previous;
private final CompletableFuture<Void> allPreviousRequests;
public WriteRequest(RequestManager requestManager, Function0<? extends U> nonCancellable,
Function2<? super CancelIndicator, ? super U, ? extends V> cancellable,
CompletableFuture<Void> previous) {
CompletableFuture<Void> allPreviousRequests) {
super(requestManager);
this.nonCancellable = nonCancellable;
this.cancellable = cancellable;
this.previous = previous;
this.allPreviousRequests = allPreviousRequests;
}
@Override
public void run() {
try {
previous.join();
} catch (Throwable t) {
// We are not interested in results, only to make sure that all previous requests are finished before running next write request.
}
try {
U intermediateResult = this.nonCancellable.apply();
cancelIndicator.checkCanceled();
result.complete(cancellable.apply(cancelIndicator, intermediateResult));
allPreviousRequests.join();
} catch (Throwable t) {
if (!requestManager.isCancelException(t)) {
LOG.error("Error during request: ", t);
}
result.completeExceptionally(t);
}
try {
U intermediateResult = this.nonCancellable.apply();
cancelIndicator.checkCanceled();
V writeResult = cancellable.apply(cancelIndicator, intermediateResult);
complete(writeResult);
} catch (Throwable t) {
logAndCompleteExceptionally(t);
}
}
@Override
protected Logger getLogger() {
return LOG;
}
}

View file

@ -11,6 +11,7 @@ package org.eclipse.xtext.testing
import com.google.inject.Guice
import com.google.inject.Inject
import com.google.inject.Module
import com.google.inject.Singleton
import java.io.File
import java.io.FileWriter
import java.nio.file.Path
@ -63,6 +64,7 @@ import org.eclipse.lsp4j.TextDocumentItem
import org.eclipse.lsp4j.TextEdit
import org.eclipse.lsp4j.VersionedTextDocumentIdentifier
import org.eclipse.lsp4j.WorkspaceEdit
import org.eclipse.lsp4j.WorkspaceFolder
import org.eclipse.lsp4j.WorkspaceSymbolParams
import org.eclipse.lsp4j.jsonrpc.Endpoint
import org.eclipse.lsp4j.jsonrpc.messages.Either
@ -78,6 +80,7 @@ import org.eclipse.xtext.ide.server.ServerModule
import org.eclipse.xtext.ide.server.UriExtensions
import org.eclipse.xtext.ide.server.concurrent.RequestManager
import org.eclipse.xtext.resource.IResourceServiceProvider
import org.eclipse.xtext.service.OperationCanceledManager
import org.eclipse.xtext.util.CancelIndicator
import org.eclipse.xtext.util.Files
import org.eclipse.xtext.util.Modules2
@ -89,7 +92,6 @@ import org.junit.jupiter.api.BeforeEach
import static extension org.eclipse.lsp4j.util.Ranges.containsRange
import static extension org.eclipse.xtext.util.Strings.*
import org.eclipse.lsp4j.WorkspaceFolder
/**
* @author Sven Efftinge - Initial contribution and API
@ -133,32 +135,52 @@ abstract class AbstractLanguageServerTest implements Endpoint {
protected def Class<? extends LanguageClient> getLanguageClientClass() {
return LanguageClient;
}
/**
* A request manager that will run the given read and write actions in the same thread immediatly, sequentially.
*/
@Singleton
static class DirectRequestManager extends RequestManager {
@Inject
new(OperationCanceledManager operationCanceledManager) {
super(null, operationCanceledManager)
}
override synchronized <V> runRead((CancelIndicator)=>V request) {
val result = new CompletableFuture()
try {
result.complete(request.apply [ false ])
} catch (Throwable t) {
if (isCancelException(t)) {
result.cancel(true)
} else {
result.completeExceptionally(t)
}
}
return result
}
override synchronized <U, V> runWrite(()=>U nonCancellable, (CancelIndicator, U)=>V request) {
val result = new CompletableFuture()
try {
result.complete(request.apply(CancelIndicator.NullImpl, nonCancellable.apply()))
} catch (Throwable t) {
if (isCancelException(t)) {
result.cancel(true)
} else {
result.completeExceptionally(t)
}
}
return result
}
}
protected def Module getServerModule() {
Modules2.mixin(new ServerModule, [
bind(RequestManager).toInstance(new RequestManager() {
override <V> runRead((CancelIndicator)=>V request) {
val result = new CompletableFuture()
try {
result.complete(request.apply [ false ])
} catch (Throwable e) {
result.completeExceptionally(e)
}
return result
}
override <U,V> runWrite(()=>U nonCancellable, (CancelIndicator, U)=>V request) {
val result = new CompletableFuture()
try {
result.complete(request.apply([ false ], nonCancellable.apply()))
} catch (Throwable e) {
result.completeExceptionally(e)
}
return result
}
})
])
return Modules2.mixin(new ServerModule) [
bind(RequestManager).to(DirectRequestManager)
]
}
@Inject

View file

@ -14,7 +14,7 @@ import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.binder.AnnotatedBindingBuilder;
import com.google.inject.Singleton;
import java.io.File;
import java.io.FileWriter;
import java.net.URI;
@ -98,6 +98,7 @@ import org.eclipse.xtext.ide.server.ServerModule;
import org.eclipse.xtext.ide.server.UriExtensions;
import org.eclipse.xtext.ide.server.concurrent.RequestManager;
import org.eclipse.xtext.resource.IResourceServiceProvider;
import org.eclipse.xtext.service.OperationCanceledManager;
import org.eclipse.xtext.testing.DefinitionTestConfiguration;
import org.eclipse.xtext.testing.DocumentHighlightConfiguration;
import org.eclipse.xtext.testing.DocumentSymbolConfiguraiton;
@ -141,6 +142,62 @@ import org.junit.jupiter.api.BeforeEach;
@FinalFieldsConstructor
@SuppressWarnings("all")
public abstract class AbstractLanguageServerTest implements Endpoint {
/**
* A request manager that will run the given read and write actions in the same thread immediatly, sequentially.
*/
@Singleton
public static class DirectRequestManager extends RequestManager {
@Inject
public DirectRequestManager(final OperationCanceledManager operationCanceledManager) {
super(null, operationCanceledManager);
}
@Override
public synchronized <V extends Object> CompletableFuture<V> runRead(final Function1<? super CancelIndicator, ? extends V> request) {
final CompletableFuture<V> result = new CompletableFuture<V>();
try {
final CancelIndicator _function = () -> {
return false;
};
result.complete(request.apply(_function));
} catch (final Throwable _t) {
if (_t instanceof Throwable) {
final Throwable t = (Throwable)_t;
boolean _isCancelException = this.isCancelException(t);
if (_isCancelException) {
result.cancel(true);
} else {
result.completeExceptionally(t);
}
} else {
throw Exceptions.sneakyThrow(_t);
}
}
return result;
}
@Override
public synchronized <U extends Object, V extends Object> CompletableFuture<V> runWrite(final Function0<? extends U> nonCancellable, final Function2<? super CancelIndicator, ? super U, ? extends V> request) {
final CompletableFuture<V> result = new CompletableFuture<V>();
try {
result.complete(request.apply(CancelIndicator.NullImpl, nonCancellable.apply()));
} catch (final Throwable _t) {
if (_t instanceof Throwable) {
final Throwable t = (Throwable)_t;
boolean _isCancelException = this.isCancelException(t);
if (_isCancelException) {
result.cancel(true);
} else {
result.completeExceptionally(t);
}
} else {
throw Exceptions.sneakyThrow(_t);
}
}
return result;
}
}
@Accessors
public static class TestCodeLensConfiguration extends TextDocumentPositionConfiguration {
private String expectedCodeLensItems = "";
@ -235,46 +292,7 @@ public abstract class AbstractLanguageServerTest implements Endpoint {
protected com.google.inject.Module getServerModule() {
ServerModule _serverModule = new ServerModule();
final com.google.inject.Module _function = (Binder it) -> {
AnnotatedBindingBuilder<RequestManager> _bind = it.<RequestManager>bind(RequestManager.class);
_bind.toInstance(new RequestManager() {
@Override
public <V extends Object> CompletableFuture<V> runRead(final Function1<? super CancelIndicator, ? extends V> request) {
final CompletableFuture<V> result = new CompletableFuture<V>();
try {
final CancelIndicator _function = () -> {
return false;
};
result.complete(request.apply(_function));
} catch (final Throwable _t) {
if (_t instanceof Throwable) {
final Throwable e = (Throwable)_t;
result.completeExceptionally(e);
} else {
throw Exceptions.sneakyThrow(_t);
}
}
return result;
}
@Override
public <U extends Object, V extends Object> CompletableFuture<V> runWrite(final Function0<? extends U> nonCancellable, final Function2<? super CancelIndicator, ? super U, ? extends V> request) {
final CompletableFuture<V> result = new CompletableFuture<V>();
try {
final CancelIndicator _function = () -> {
return false;
};
result.complete(request.apply(_function, nonCancellable.apply()));
} catch (final Throwable _t) {
if (_t instanceof Throwable) {
final Throwable e = (Throwable)_t;
result.completeExceptionally(e);
} else {
throw Exceptions.sneakyThrow(_t);
}
}
return result;
}
});
it.<RequestManager>bind(RequestManager.class).to(AbstractLanguageServerTest.DirectRequestManager.class);
};
return Modules2.mixin(_serverModule, _function);
}