[lsi] Added tests for RequestManager

Change-Id: I6e5478ae517d86e38a45fbc780f9ea6ce8e4b851
Signed-off-by: akosyakov <anton.kosyakov@typefox.io>
This commit is contained in:
akosyakov 2016-05-30 17:17:52 +02:00
parent 3982fe311b
commit c519572159
3 changed files with 200 additions and 31 deletions

View file

@ -13,12 +13,12 @@ import org.eclipse.xtend.lib.annotations.Accessors
* @author kosyakov - Initial contribution and API
*/
class RequestCancelIndicator implements CancellableIndicator {
@Accessors(PUBLIC_GETTER)
boolean canceled
volatile boolean canceled
override cancel() {
canceled = true
}
}
}

View file

@ -39,50 +39,77 @@ class RequestManager {
val semaphore = new Semaphore(MAX_PERMITS)
def void shutdown() {
readExecutorService.shutdown()
writeExecutorService.shutdown()
}
def CompletableFuture<Void> runWrite((CancelIndicator)=>void writeRequest) {
return runWrite(writeRequest, new RequestCancelIndicator)
}
/**
* <p>
* The given <i>write request</i> will be ran first when <i>all running requests</i> completed.
* </p>
* <p>
* Currently <i>running requests</i> will be cancelled.
* </p>
* <p>
* A provided cancel indicator should implement {@link org.eclipse.xtext.ide.server.concurrent.CancellableIndicator CancellableIndicator}
* to let the given request to be cancelled by a write request.
* </p>
*/
def CompletableFuture<Void> runWrite((CancelIndicator)=>void writeRequest, CancelIndicator cancelIndicator) {
cancelIndicators.forEach[cancel]
if (cancelIndicator instanceof CancellableIndicator)
cancelIndicators += cancelIndicator
return CompletableFuture.runAsync([
writeRequest.withVoidAsReturnType.run(MAX_PERMITS, cancelIndicator)
], writeExecutorService)
semaphore.acquire(MAX_PERMITS)
try {
writeRequest.apply(cancelIndicator)
} finally {
semaphore.release(MAX_PERMITS)
}
], writeExecutorService).whenComplete [
if (cancelIndicator instanceof CancellableIndicator)
cancelIndicators -= cancelIndicator
]
}
def <V> CompletableFuture<V> runRead((CancelIndicator)=>V readRequest) {
return runRead(readRequest, new RequestCancelIndicator)
}
/**
* <p>
* The given <i>read request</i> will be ran:
* <ul>
* <li>concurrent with <i>running read requests</i>;</li>
* <li>first when <i>running write requests</i> completed.</li>
* </ul>
* </p>
* <p>
* A provided cancel indicator should implement {@link org.eclipse.xtext.ide.server.concurrent.CancellableIndicator CancellableIndicator}
* to let the given request to be cancelled by a write request.
* </p>
*/
def <V> CompletableFuture<V> runRead((CancelIndicator)=>V readRequest, CancelIndicator cancelIndicator) {
if (cancelIndicator instanceof CancellableIndicator)
cancelIndicators += cancelIndicator
return CompletableFuture.supplyAsync([
readRequest.run(1, cancelIndicator)
], readExecutorService)
}
protected def <V> V run(
(CancelIndicator)=>V request,
int permits,
CancelIndicator cancelIndicator
) {
semaphore.acquire(permits)
try {
if (cancelIndicator instanceof CancellableIndicator)
cancelIndicators += cancelIndicator
return request.apply(cancelIndicator)
} finally {
semaphore.acquire(1)
try {
return readRequest.apply(cancelIndicator)
} finally {
semaphore.release(1)
}
], readExecutorService).whenComplete [
if (cancelIndicator instanceof CancellableIndicator)
cancelIndicators -= cancelIndicator
semaphore.release(permits)
}
}
protected def (CancelIndicator)=>Void withVoidAsReturnType((CancelIndicator)=>void request) {
return [ cancelIindicator |
request.apply(cancelIindicator)
return null
]
}

View file

@ -0,0 +1,142 @@
/*******************************************************************************
* Copyright (c) 2016 TypeFox GmbH (http://www.typefox.io) and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*******************************************************************************/
package org.eclipse.xtext.ide.tests.server.concurrent
import com.google.inject.Guice
import com.google.inject.Inject
import java.util.concurrent.atomic.AtomicInteger
import org.eclipse.xtext.ide.server.ServerModule
import org.eclipse.xtext.ide.server.concurrent.RequestManager
import org.junit.After
import org.junit.Before
import org.junit.Test
import static org.junit.Assert.*
/**
* @author kosyakov - Initial contribution and API
*/
class RequestManagerTest {
@Inject
RequestManager requestManager
AtomicInteger sharedState
@Before
def void setUp() {
sharedState = new AtomicInteger
Guice.createInjector(new ServerModule).injectMembers(this)
}
@After
def void tearDown() {
requestManager.shutdown
sharedState = null
}
@Test
def void testRunRead() {
val future = requestManager.runRead [
'Foo'
]
assertEquals('Foo', future.get)
}
@Test
def void testRunReadConcurrent() {
val future = requestManager.runRead [
while (sharedState.get == 0) {
}
sharedState.incrementAndGet
]
val future2 = requestManager.runRead [
sharedState.incrementAndGet
]
future2.join
future.join
assertEquals(2, sharedState.get)
}
@Test
def void testRunReadAfterWrite() {
requestManager.runWrite [
while (sharedState.get == 0) {
}
sharedState.incrementAndGet
]
val future = requestManager.runRead [
sharedState.get
]
sharedState.incrementAndGet
assertEquals(2, future.get)
}
@Test
def void testRunWrite() {
requestManager.runWrite [
sharedState.incrementAndGet
].join
assertEquals(1, sharedState.get)
}
@Test
def void testRunWriteAfterWrite() {
val future = requestManager.runWrite [
sharedState.incrementAndGet
]
val future2 = requestManager.runWrite [
if (sharedState.get != 0)
sharedState.incrementAndGet
]
future2.join
future.join
assertEquals(2, sharedState.get)
}
@Test
def void testRunWriteAfterRead() {
requestManager.runRead [
sharedState.incrementAndGet
]
requestManager.runWrite [
while (sharedState.get == 0) {
}
sharedState.incrementAndGet
].join
assertEquals(2, sharedState.get)
}
@Test
def void testCancelWrite() {
requestManager.runWrite [ cancelIndicator |
while (!cancelIndicator.canceled) {
}
sharedState.incrementAndGet
]
requestManager.runWrite [
sharedState.incrementAndGet
].join
assertEquals(2, sharedState.get)
}
@Test
def void testCancelRead() {
sharedState.incrementAndGet
val future = requestManager.runRead [ cancelIndicator |
while (!cancelIndicator.canceled) {
}
sharedState.get
]
requestManager.runWrite [
sharedState.set(0)
].join
assertEquals(1, future.get)
}
}