From c5195721593b7c0209e3d5ef62e92d6719a81e02 Mon Sep 17 00:00:00 2001
From: akosyakov
Date: Mon, 30 May 2016 17:17:52 +0200
Subject: [PATCH] [lsi] Added tests for RequestManager
Change-Id: I6e5478ae517d86e38a45fbc780f9ea6ce8e4b851
Signed-off-by: akosyakov
.../concurrent/RequestCancelIndicator.xtend | 8 +-
.../server/concurrent/RequestManager.xtend | 81 ++++++----
.../concurrent/RequestManagerTest.xtend | 142 ++++++++++++++++++
3 files changed, 200 insertions(+), 31 deletions(-)
create mode 100644 tests/org.eclipse.xtext.ide.tests/src/org/eclipse/xtext/ide/tests/server/concurrent/RequestManagerTest.xtend
diff --git a/plugins/org.eclipse.xtext.ide/src/org/eclipse/xtext/ide/server/concurrent/RequestCancelIndicator.xtend b/plugins/org.eclipse.xtext.ide/src/org/eclipse/xtext/ide/server/concurrent/RequestCancelIndicator.xtend
index 5e54645db..8f6474eba 100644
--- a/plugins/org.eclipse.xtext.ide/src/org/eclipse/xtext/ide/server/concurrent/RequestCancelIndicator.xtend
+++ b/plugins/org.eclipse.xtext.ide/src/org/eclipse/xtext/ide/server/concurrent/RequestCancelIndicator.xtend
@@ -13,12 +13,12 @@ import org.eclipse.xtend.lib.annotations.Accessors
* @author kosyakov - Initial contribution and API
class RequestCancelIndicator implements CancellableIndicator {
- boolean canceled
+ volatile boolean canceled
override cancel() {
canceled = true
\ No newline at end of file
diff --git a/plugins/org.eclipse.xtext.ide/src/org/eclipse/xtext/ide/server/concurrent/RequestManager.xtend b/plugins/org.eclipse.xtext.ide/src/org/eclipse/xtext/ide/server/concurrent/RequestManager.xtend
index d3c170175..e71d7c271 100644
--- a/plugins/org.eclipse.xtext.ide/src/org/eclipse/xtext/ide/server/concurrent/RequestManager.xtend
+++ b/plugins/org.eclipse.xtext.ide/src/org/eclipse/xtext/ide/server/concurrent/RequestManager.xtend
@@ -39,50 +39,77 @@ class RequestManager {
val semaphore = new Semaphore(MAX_PERMITS)
+ def void shutdown() {
+ readExecutorService.shutdown()
+ writeExecutorService.shutdown()
+ }
def CompletableFuture runWrite((CancelIndicator)=>void writeRequest) {
return runWrite(writeRequest, new RequestCancelIndicator)
+ /**
+ *
+ * The given write request will be ran first when all running requests completed.
+ *
+ *
+ * Currently running requests will be cancelled.
+ *
+ *
+ * A provided cancel indicator should implement {@link org.eclipse.xtext.ide.server.concurrent.CancellableIndicator CancellableIndicator}
+ * to let the given request to be cancelled by a write request.
+ *
+ */
def CompletableFuture runWrite((CancelIndicator)=>void writeRequest, CancelIndicator cancelIndicator) {
+ if (cancelIndicator instanceof CancellableIndicator)
+ cancelIndicators += cancelIndicator
return CompletableFuture.runAsync([
-, cancelIndicator)
- ], writeExecutorService)
+ semaphore.acquire(MAX_PERMITS)
+ try {
+ writeRequest.apply(cancelIndicator)
+ } finally {
+ semaphore.release(MAX_PERMITS)
+ }
+ ], writeExecutorService).whenComplete [
+ if (cancelIndicator instanceof CancellableIndicator)
+ cancelIndicators -= cancelIndicator
+ ]
def CompletableFuture runRead((CancelIndicator)=>V readRequest) {
return runRead(readRequest, new RequestCancelIndicator)
+ /**
+ *
+ * The given read request will be ran:
+ *
+ * - concurrent with running read requests;
+ * - first when running write requests completed.
+ *
+ *
+ *
+ * A provided cancel indicator should implement {@link org.eclipse.xtext.ide.server.concurrent.CancellableIndicator CancellableIndicator}
+ * to let the given request to be cancelled by a write request.
+ *
+ */
def CompletableFuture runRead((CancelIndicator)=>V readRequest, CancelIndicator cancelIndicator) {
+ if (cancelIndicator instanceof CancellableIndicator)
+ cancelIndicators += cancelIndicator
return CompletableFuture.supplyAsync([
-, cancelIndicator)
- ], readExecutorService)
- }
- protected def V run(
- (CancelIndicator)=>V request,
- int permits,
- CancelIndicator cancelIndicator
- ) {
- semaphore.acquire(permits)
- try {
- if (cancelIndicator instanceof CancellableIndicator)
- cancelIndicators += cancelIndicator
- return request.apply(cancelIndicator)
- } finally {
+ semaphore.acquire(1)
+ try {
+ return readRequest.apply(cancelIndicator)
+ } finally {
+ semaphore.release(1)
+ }
+ ], readExecutorService).whenComplete [
if (cancelIndicator instanceof CancellableIndicator)
cancelIndicators -= cancelIndicator
- semaphore.release(permits)
- }
- }
- protected def (CancelIndicator)=>Void withVoidAsReturnType((CancelIndicator)=>void request) {
- return [ cancelIindicator |
- request.apply(cancelIindicator)
- return null
diff --git a/tests/org.eclipse.xtext.ide.tests/src/org/eclipse/xtext/ide/tests/server/concurrent/RequestManagerTest.xtend b/tests/org.eclipse.xtext.ide.tests/src/org/eclipse/xtext/ide/tests/server/concurrent/RequestManagerTest.xtend
new file mode 100644
index 000000000..7c7d6fbfd
--- /dev/null
+++ b/tests/org.eclipse.xtext.ide.tests/src/org/eclipse/xtext/ide/tests/server/concurrent/RequestManagerTest.xtend
@@ -0,0 +1,142 @@
+ * Copyright (c) 2016 TypeFox GmbH ( and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ *
+ *******************************************************************************/
+package org.eclipse.xtext.ide.tests.server.concurrent
+import java.util.concurrent.atomic.AtomicInteger
+import org.eclipse.xtext.ide.server.ServerModule
+import org.eclipse.xtext.ide.server.concurrent.RequestManager
+import org.junit.After
+import org.junit.Before
+import org.junit.Test
+import static org.junit.Assert.*
+ * @author kosyakov - Initial contribution and API
+ */
+class RequestManagerTest {
+ @Inject
+ RequestManager requestManager
+ AtomicInteger sharedState
+ @Before
+ def void setUp() {
+ sharedState = new AtomicInteger
+ Guice.createInjector(new ServerModule).injectMembers(this)
+ }
+ @After
+ def void tearDown() {
+ requestManager.shutdown
+ sharedState = null
+ }
+ @Test
+ def void testRunRead() {
+ val future = requestManager.runRead [
+ 'Foo'
+ ]
+ assertEquals('Foo', future.get)
+ }
+ @Test
+ def void testRunReadConcurrent() {
+ val future = requestManager.runRead [
+ while (sharedState.get == 0) {
+ }
+ sharedState.incrementAndGet
+ ]
+ val future2 = requestManager.runRead [
+ sharedState.incrementAndGet
+ ]
+ future2.join
+ future.join
+ assertEquals(2, sharedState.get)
+ }
+ @Test
+ def void testRunReadAfterWrite() {
+ requestManager.runWrite [
+ while (sharedState.get == 0) {
+ }
+ sharedState.incrementAndGet
+ ]
+ val future = requestManager.runRead [
+ sharedState.get
+ ]
+ sharedState.incrementAndGet
+ assertEquals(2, future.get)
+ }
+ @Test
+ def void testRunWrite() {
+ requestManager.runWrite [
+ sharedState.incrementAndGet
+ ].join
+ assertEquals(1, sharedState.get)
+ }
+ @Test
+ def void testRunWriteAfterWrite() {
+ val future = requestManager.runWrite [
+ sharedState.incrementAndGet
+ ]
+ val future2 = requestManager.runWrite [
+ if (sharedState.get != 0)
+ sharedState.incrementAndGet
+ ]
+ future2.join
+ future.join
+ assertEquals(2, sharedState.get)
+ }
+ @Test
+ def void testRunWriteAfterRead() {
+ requestManager.runRead [
+ sharedState.incrementAndGet
+ ]
+ requestManager.runWrite [
+ while (sharedState.get == 0) {
+ }
+ sharedState.incrementAndGet
+ ].join
+ assertEquals(2, sharedState.get)
+ }
+ @Test
+ def void testCancelWrite() {
+ requestManager.runWrite [ cancelIndicator |
+ while (!cancelIndicator.canceled) {
+ }
+ sharedState.incrementAndGet
+ ]
+ requestManager.runWrite [
+ sharedState.incrementAndGet
+ ].join
+ assertEquals(2, sharedState.get)
+ }
+ @Test
+ def void testCancelRead() {
+ sharedState.incrementAndGet
+ val future = requestManager.runRead [ cancelIndicator |
+ while (!cancelIndicator.canceled) {
+ }
+ sharedState.get
+ ]
+ requestManager.runWrite [
+ sharedState.set(0)
+ ].join
+ assertEquals(1, future.get)
+ }