diff --git a/plugins/org.eclipse.xtext.ide/src/org/eclipse/xtext/ide/server/concurrent/RequestCancelIndicator.xtend b/plugins/org.eclipse.xtext.ide/src/org/eclipse/xtext/ide/server/concurrent/RequestCancelIndicator.xtend index 5e54645db..8f6474eba 100644 --- a/plugins/org.eclipse.xtext.ide/src/org/eclipse/xtext/ide/server/concurrent/RequestCancelIndicator.xtend +++ b/plugins/org.eclipse.xtext.ide/src/org/eclipse/xtext/ide/server/concurrent/RequestCancelIndicator.xtend @@ -13,12 +13,12 @@ import org.eclipse.xtend.lib.annotations.Accessors * @author kosyakov - Initial contribution and API */ class RequestCancelIndicator implements CancellableIndicator { - + @Accessors(PUBLIC_GETTER) - boolean canceled - + volatile boolean canceled + override cancel() { canceled = true } -} \ No newline at end of file +} diff --git a/plugins/org.eclipse.xtext.ide/src/org/eclipse/xtext/ide/server/concurrent/RequestManager.xtend b/plugins/org.eclipse.xtext.ide/src/org/eclipse/xtext/ide/server/concurrent/RequestManager.xtend index d3c170175..e71d7c271 100644 --- a/plugins/org.eclipse.xtext.ide/src/org/eclipse/xtext/ide/server/concurrent/RequestManager.xtend +++ b/plugins/org.eclipse.xtext.ide/src/org/eclipse/xtext/ide/server/concurrent/RequestManager.xtend @@ -39,50 +39,77 @@ class RequestManager { val semaphore = new Semaphore(MAX_PERMITS) + def void shutdown() { + readExecutorService.shutdown() + writeExecutorService.shutdown() + } + def CompletableFuture runWrite((CancelIndicator)=>void writeRequest) { return runWrite(writeRequest, new RequestCancelIndicator) } + /** + *

+ * The given write request will be ran first when all running requests completed. + *

+ *

+ * Currently running requests will be cancelled. + *

+ *

+ * A provided cancel indicator should implement {@link org.eclipse.xtext.ide.server.concurrent.CancellableIndicator CancellableIndicator} + * to let the given request to be cancelled by a write request. + *

+ */ def CompletableFuture runWrite((CancelIndicator)=>void writeRequest, CancelIndicator cancelIndicator) { cancelIndicators.forEach[cancel] + + if (cancelIndicator instanceof CancellableIndicator) + cancelIndicators += cancelIndicator + return CompletableFuture.runAsync([ - writeRequest.withVoidAsReturnType.run(MAX_PERMITS, cancelIndicator) - ], writeExecutorService) + semaphore.acquire(MAX_PERMITS) + try { + writeRequest.apply(cancelIndicator) + } finally { + semaphore.release(MAX_PERMITS) + } + ], writeExecutorService).whenComplete [ + if (cancelIndicator instanceof CancellableIndicator) + cancelIndicators -= cancelIndicator + ] } def CompletableFuture runRead((CancelIndicator)=>V readRequest) { return runRead(readRequest, new RequestCancelIndicator) } + /** + *

+ * The given read request will be ran: + *

    + *
  • concurrent with running read requests;
  • + *
  • first when running write requests completed.
  • + *
+ *

+ *

+ * A provided cancel indicator should implement {@link org.eclipse.xtext.ide.server.concurrent.CancellableIndicator CancellableIndicator} + * to let the given request to be cancelled by a write request. + *

+ */ def CompletableFuture runRead((CancelIndicator)=>V readRequest, CancelIndicator cancelIndicator) { + if (cancelIndicator instanceof CancellableIndicator) + cancelIndicators += cancelIndicator + return CompletableFuture.supplyAsync([ - readRequest.run(1, cancelIndicator) - ], readExecutorService) - } - - protected def V run( - (CancelIndicator)=>V request, - int permits, - CancelIndicator cancelIndicator - ) { - semaphore.acquire(permits) - try { - if (cancelIndicator instanceof CancellableIndicator) - cancelIndicators += cancelIndicator - - return request.apply(cancelIndicator) - } finally { + semaphore.acquire(1) + try { + return readRequest.apply(cancelIndicator) + } finally { + semaphore.release(1) + } + ], readExecutorService).whenComplete [ if (cancelIndicator instanceof CancellableIndicator) cancelIndicators -= cancelIndicator - - semaphore.release(permits) - } - } - - protected def (CancelIndicator)=>Void withVoidAsReturnType((CancelIndicator)=>void request) { - return [ cancelIindicator | - request.apply(cancelIindicator) - return null ] } diff --git a/tests/org.eclipse.xtext.ide.tests/src/org/eclipse/xtext/ide/tests/server/concurrent/RequestManagerTest.xtend b/tests/org.eclipse.xtext.ide.tests/src/org/eclipse/xtext/ide/tests/server/concurrent/RequestManagerTest.xtend new file mode 100644 index 000000000..7c7d6fbfd --- /dev/null +++ b/tests/org.eclipse.xtext.ide.tests/src/org/eclipse/xtext/ide/tests/server/concurrent/RequestManagerTest.xtend @@ -0,0 +1,142 @@ +/******************************************************************************* + * Copyright (c) 2016 TypeFox GmbH (http://www.typefox.io) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + *******************************************************************************/ +package org.eclipse.xtext.ide.tests.server.concurrent + +import com.google.inject.Guice +import com.google.inject.Inject +import java.util.concurrent.atomic.AtomicInteger +import org.eclipse.xtext.ide.server.ServerModule +import org.eclipse.xtext.ide.server.concurrent.RequestManager +import org.junit.After +import org.junit.Before +import org.junit.Test + +import static org.junit.Assert.* + +/** + * @author kosyakov - Initial contribution and API + */ +class RequestManagerTest { + + @Inject + RequestManager requestManager + + AtomicInteger sharedState + + @Before + def void setUp() { + sharedState = new AtomicInteger + Guice.createInjector(new ServerModule).injectMembers(this) + } + + @After + def void tearDown() { + requestManager.shutdown + sharedState = null + } + + @Test + def void testRunRead() { + val future = requestManager.runRead [ + 'Foo' + ] + assertEquals('Foo', future.get) + } + + @Test + def void testRunReadConcurrent() { + val future = requestManager.runRead [ + while (sharedState.get == 0) { + } + sharedState.incrementAndGet + ] + val future2 = requestManager.runRead [ + sharedState.incrementAndGet + ] + future2.join + future.join + assertEquals(2, sharedState.get) + } + + @Test + def void testRunReadAfterWrite() { + requestManager.runWrite [ + while (sharedState.get == 0) { + } + sharedState.incrementAndGet + ] + val future = requestManager.runRead [ + sharedState.get + ] + sharedState.incrementAndGet + assertEquals(2, future.get) + } + + @Test + def void testRunWrite() { + requestManager.runWrite [ + sharedState.incrementAndGet + ].join + assertEquals(1, sharedState.get) + } + + @Test + def void testRunWriteAfterWrite() { + val future = requestManager.runWrite [ + sharedState.incrementAndGet + ] + val future2 = requestManager.runWrite [ + if (sharedState.get != 0) + sharedState.incrementAndGet + ] + future2.join + future.join + assertEquals(2, sharedState.get) + } + + @Test + def void testRunWriteAfterRead() { + requestManager.runRead [ + sharedState.incrementAndGet + ] + requestManager.runWrite [ + while (sharedState.get == 0) { + } + sharedState.incrementAndGet + ].join + assertEquals(2, sharedState.get) + } + + @Test + def void testCancelWrite() { + requestManager.runWrite [ cancelIndicator | + while (!cancelIndicator.canceled) { + } + sharedState.incrementAndGet + ] + requestManager.runWrite [ + sharedState.incrementAndGet + ].join + assertEquals(2, sharedState.get) + } + + @Test + def void testCancelRead() { + sharedState.incrementAndGet + val future = requestManager.runRead [ cancelIndicator | + while (!cancelIndicator.canceled) { + } + sharedState.get + ] + requestManager.runWrite [ + sharedState.set(0) + ].join + assertEquals(1, future.get) + } + +}