fix synchronization error (maybe)

Signed-off-by: mmews <marcus.mews@numberfour.eu>
This commit is contained in:
mmews 2019-12-12 00:19:51 +01:00
parent df3b967105
commit f8fbc38a0b
7 changed files with 65 additions and 41 deletions

View file

@ -23,6 +23,8 @@ import org.junit.Test
import static org.junit.Assert.*
import org.junit.Assert
import org.eclipse.xtext.ide.server.concurrent.WriteRequest
import org.eclipse.xtext.ide.server.concurrent.ReadRequest
/**
* @author kosyakov - Initial contribution and API
@ -48,7 +50,7 @@ class RequestManagerTest {
@Test(timeout = 1000)
def void testRunWriteLogExceptionNonCancellable() {
val logResult = LoggingTester.captureLogging(Level.ALL, RequestManager, [
val logResult = LoggingTester.captureLogging(Level.ALL, WriteRequest, [
val future = requestManager.runWrite([], [
throw new RuntimeException();
])
@ -64,7 +66,7 @@ class RequestManagerTest {
@Test(timeout = 1000)
def void testRunWriteLogExceptionCancellable() {
val logResult = LoggingTester.captureLogging(Level.ALL, RequestManager, [
val logResult = LoggingTester.captureLogging(Level.ALL, WriteRequest, [
val future = requestManager.runWrite([
throw new RuntimeException();
], [])
@ -80,7 +82,7 @@ class RequestManagerTest {
@Test(timeout = 1000, expected = ExecutionException)
def void testRunWriteCatchException() {
LoggingTester.captureLogging(Level.ALL, RequestManager, [
LoggingTester.captureLogging(Level.ALL, WriteRequest, [
val future = requestManager.runWrite([
throw new RuntimeException()
], [])
@ -88,12 +90,12 @@ class RequestManagerTest {
assertEquals('Foo', future.get)
])
Assert.fail
Assert.fail("unreachable")
}
@Test(timeout = 1000)
def void testRunReadLogException() {
val logResult = LoggingTester.captureLogging(Level.ALL, RequestManager, [
val logResult = LoggingTester.captureLogging(Level.ALL, ReadRequest, [
val future = requestManager.runRead([
throw new RuntimeException();
])
@ -109,7 +111,7 @@ class RequestManagerTest {
@Test(timeout = 1000, expected = ExecutionException)
def void testRunReadCatchException() {
LoggingTester.captureLogging(Level.ALL, RequestManager, [
LoggingTester.captureLogging(Level.ALL, ReadRequest, [
val future = requestManager.runRead([
throw new RuntimeException()
])

View file

@ -15,7 +15,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Level;
import org.eclipse.xtext.ide.server.ServerModule;
import org.eclipse.xtext.ide.server.concurrent.ReadRequest;
import org.eclipse.xtext.ide.server.concurrent.RequestManager;
import org.eclipse.xtext.ide.server.concurrent.WriteRequest;
import org.eclipse.xtext.testing.logging.LoggingTester;
import org.eclipse.xtext.util.CancelIndicator;
import org.eclipse.xtext.xbase.lib.Exceptions;
@ -71,7 +73,7 @@ public class RequestManagerTest {
}
}
};
final LoggingTester.LogCapture logResult = LoggingTester.captureLogging(Level.ALL, RequestManager.class, _function);
final LoggingTester.LogCapture logResult = LoggingTester.captureLogging(Level.ALL, WriteRequest.class, _function);
logResult.assertLogEntry("Error during request:");
}
@ -94,7 +96,7 @@ public class RequestManagerTest {
}
}
};
final LoggingTester.LogCapture logResult = LoggingTester.captureLogging(Level.ALL, RequestManager.class, _function);
final LoggingTester.LogCapture logResult = LoggingTester.captureLogging(Level.ALL, WriteRequest.class, _function);
logResult.assertLogEntry("Error during request:");
}
@ -114,8 +116,8 @@ public class RequestManagerTest {
throw Exceptions.sneakyThrow(_e);
}
};
LoggingTester.captureLogging(Level.ALL, RequestManager.class, _function);
Assert.fail();
LoggingTester.captureLogging(Level.ALL, WriteRequest.class, _function);
Assert.fail("unreachable");
}
@Test(timeout = 1000)
@ -134,7 +136,7 @@ public class RequestManagerTest {
}
}
};
final LoggingTester.LogCapture logResult = LoggingTester.captureLogging(Level.ALL, RequestManager.class, _function);
final LoggingTester.LogCapture logResult = LoggingTester.captureLogging(Level.ALL, ReadRequest.class, _function);
logResult.assertLogEntry("Error during request:");
}
@ -151,7 +153,7 @@ public class RequestManagerTest {
throw Exceptions.sneakyThrow(_e);
}
};
LoggingTester.captureLogging(Level.ALL, RequestManager.class, _function);
LoggingTester.captureLogging(Level.ALL, ReadRequest.class, _function);
Assert.fail();
}

View file

@ -24,7 +24,6 @@ import org.eclipse.xtext.util.CancelIndicator
* @since 2.11
*/
class RequestManager {
static final Logger LOG = Logger.getLogger(RequestManager);
@Inject ExecutorService parallel
@ -44,27 +43,21 @@ class RequestManager {
def <V> CompletableFuture<V> runRead((CancelIndicator)=>V cancellable) {
return submit(
new ReadRequest(cancellable, parallel)
new ReadRequest(this, cancellable, parallel)
)
}
def <U, V> CompletableFuture<V> runWrite(()=>U nonCancellable, (CancelIndicator, U)=>V cancellable) {
val cancelFuture = cancel()
return submit(
new WriteRequest(nonCancellable, cancellable, cancelFuture)
new WriteRequest(this, nonCancellable, cancellable, cancelFuture)
)
}
protected def <V> CompletableFuture<V> submit(AbstractRequest<V> request) {
requests += request
queue.submit(request)
val future = request.get;
future.whenComplete[v, thr|
if (thr !== null && !isCancelException(thr)) {
LOG.error("Error during request: ", thr);
}
]
return future
return request.get
}
protected def CompletableFuture<Void> cancel() {
@ -89,6 +82,7 @@ class RequestManager {
@FinalFieldsConstructor
class ReadRequest<V> extends AbstractRequest<V> {
static final Logger LOG = Logger.getLogger(ReadRequest);
val (CancelIndicator)=>V cancellable
val ExecutorService executor
@ -100,6 +94,9 @@ class ReadRequest<V> extends AbstractRequest<V> {
cancelIndicator.checkCanceled
result.complete(cancellable.apply(cancelIndicator))
} catch(Throwable e) {
if (e !== null && !requestManager.isCancelException(e)) {
LOG.error("Error during request: ", e);
}
result.completeExceptionally(e)
}
]
@ -109,6 +106,7 @@ class ReadRequest<V> extends AbstractRequest<V> {
@FinalFieldsConstructor
class WriteRequest<U, V> extends AbstractRequest<V> {
static final Logger LOG = Logger.getLogger(WriteRequest);
val ()=>U nonCancellable
val (CancelIndicator, U)=>V cancellable
@ -126,6 +124,9 @@ class WriteRequest<U, V> extends AbstractRequest<V> {
cancelIndicator.checkCanceled
result.complete(cancellable.apply(cancelIndicator, intermediateResult))
} catch(Throwable e) {
if (e !== null && !requestManager.isCancelException(e)) {
LOG.error("Error during request: ", e);
}
result.completeExceptionally(e)
}
}
@ -136,6 +137,11 @@ abstract class AbstractRequest<V> implements Runnable, Cancellable {
protected val result = new CompletableFuture<V>()
protected val cancelIndicator = new RequestCancelIndicator(result)
protected val RequestManager requestManager;
new (RequestManager requestManager) {
this.requestManager = requestManager;
}
override cancel() {
cancelIndicator.cancel

View file

@ -10,6 +10,7 @@ package org.eclipse.xtext.ide.server.concurrent;
import java.util.concurrent.CompletableFuture;
import org.eclipse.xtext.ide.server.concurrent.Cancellable;
import org.eclipse.xtext.ide.server.concurrent.RequestCancelIndicator;
import org.eclipse.xtext.ide.server.concurrent.RequestManager;
@SuppressWarnings("all")
public abstract class AbstractRequest<V extends Object> implements Runnable, Cancellable {
@ -17,6 +18,12 @@ public abstract class AbstractRequest<V extends Object> implements Runnable, Can
protected final RequestCancelIndicator cancelIndicator = new RequestCancelIndicator(this.result);
protected final RequestManager requestManager;
public AbstractRequest(final RequestManager requestManager) {
this.requestManager = requestManager;
}
@Override
public void cancel() {
this.cancelIndicator.cancel();

View file

@ -9,8 +9,10 @@ package org.eclipse.xtext.ide.server.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import org.apache.log4j.Logger;
import org.eclipse.xtend.lib.annotations.FinalFieldsConstructor;
import org.eclipse.xtext.ide.server.concurrent.AbstractRequest;
import org.eclipse.xtext.ide.server.concurrent.RequestManager;
import org.eclipse.xtext.util.CancelIndicator;
import org.eclipse.xtext.xbase.lib.Exceptions;
import org.eclipse.xtext.xbase.lib.Functions.Function1;
@ -18,6 +20,8 @@ import org.eclipse.xtext.xbase.lib.Functions.Function1;
@FinalFieldsConstructor
@SuppressWarnings("all")
public class ReadRequest<V extends Object> extends AbstractRequest<V> {
private static final Logger LOG = Logger.getLogger(ReadRequest.class);
private final Function1<? super CancelIndicator, ? extends V> cancellable;
private final ExecutorService executor;
@ -40,7 +44,14 @@ public class ReadRequest<V extends Object> extends AbstractRequest<V> {
} catch (final Throwable _t) {
if (_t instanceof Throwable) {
final Throwable e = (Throwable)_t;
_xtrycatchfinallyexpression = this.result.completeExceptionally(e);
boolean _xblockexpression_1 = false;
{
if (((e != null) && (!this.requestManager.isCancelException(e)))) {
ReadRequest.LOG.error("Error during request: ", e);
}
_xblockexpression_1 = this.result.completeExceptionally(e);
}
_xtrycatchfinallyexpression = _xblockexpression_1;
} else {
throw Exceptions.sneakyThrow(_t);
}
@ -50,8 +61,8 @@ public class ReadRequest<V extends Object> extends AbstractRequest<V> {
this.executor.<Boolean>submit(_function);
}
public ReadRequest(final Function1<? super CancelIndicator, ? extends V> cancellable, final ExecutorService executor) {
super();
public ReadRequest(final RequestManager requestManager, final Function1<? super CancelIndicator, ? extends V> cancellable, final ExecutorService executor) {
super(requestManager);
this.cancellable = cancellable;
this.executor = executor;
}

View file

@ -14,8 +14,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import org.apache.log4j.Logger;
import org.eclipse.xtext.ide.server.concurrent.AbstractRequest;
import org.eclipse.xtext.ide.server.concurrent.ReadRequest;
import org.eclipse.xtext.ide.server.concurrent.WriteRequest;
@ -33,8 +31,6 @@ import org.eclipse.xtext.xbase.lib.Functions.Function2;
*/
@SuppressWarnings("all")
public class RequestManager {
private static final Logger LOG = Logger.getLogger(RequestManager.class);
@Inject
private ExecutorService parallel;
@ -53,27 +49,20 @@ public class RequestManager {
}
public <V extends Object> CompletableFuture<V> runRead(final Function1<? super CancelIndicator, ? extends V> cancellable) {
ReadRequest<V> _readRequest = new ReadRequest<V>(cancellable, this.parallel);
ReadRequest<V> _readRequest = new ReadRequest<V>(this, cancellable, this.parallel);
return this.<V>submit(_readRequest);
}
public <U extends Object, V extends Object> CompletableFuture<V> runWrite(final Function0<? extends U> nonCancellable, final Function2<? super CancelIndicator, ? super U, ? extends V> cancellable) {
final CompletableFuture<Void> cancelFuture = this.cancel();
WriteRequest<U, V> _writeRequest = new WriteRequest<U, V>(nonCancellable, cancellable, cancelFuture);
WriteRequest<U, V> _writeRequest = new WriteRequest<U, V>(this, nonCancellable, cancellable, cancelFuture);
return this.<V>submit(_writeRequest);
}
protected <V extends Object> CompletableFuture<V> submit(final AbstractRequest<V> request) {
this.requests.add(request);
this.queue.submit(request);
final CompletableFuture<V> future = request.get();
final BiConsumer<V, Throwable> _function = (V v, Throwable thr) -> {
if (((thr != null) && (!this.isCancelException(thr)))) {
RequestManager.LOG.error("Error during request: ", thr);
}
};
future.whenComplete(_function);
return future;
return request.get();
}
protected CompletableFuture<Void> cancel() {

View file

@ -8,8 +8,10 @@
package org.eclipse.xtext.ide.server.concurrent;
import java.util.concurrent.CompletableFuture;
import org.apache.log4j.Logger;
import org.eclipse.xtend.lib.annotations.FinalFieldsConstructor;
import org.eclipse.xtext.ide.server.concurrent.AbstractRequest;
import org.eclipse.xtext.ide.server.concurrent.RequestManager;
import org.eclipse.xtext.util.CancelIndicator;
import org.eclipse.xtext.xbase.lib.Exceptions;
import org.eclipse.xtext.xbase.lib.Functions.Function0;
@ -18,6 +20,8 @@ import org.eclipse.xtext.xbase.lib.Functions.Function2;
@FinalFieldsConstructor
@SuppressWarnings("all")
public class WriteRequest<U extends Object, V extends Object> extends AbstractRequest<V> {
private static final Logger LOG = Logger.getLogger(WriteRequest.class);
private final Function0<? extends U> nonCancellable;
private final Function2<? super CancelIndicator, ? super U, ? extends V> cancellable;
@ -41,6 +45,9 @@ public class WriteRequest<U extends Object, V extends Object> extends AbstractRe
} catch (final Throwable _t) {
if (_t instanceof Throwable) {
final Throwable e = (Throwable)_t;
if (((e != null) && (!this.requestManager.isCancelException(e)))) {
WriteRequest.LOG.error("Error during request: ", e);
}
this.result.completeExceptionally(e);
} else {
throw Exceptions.sneakyThrow(_t);
@ -48,8 +55,8 @@ public class WriteRequest<U extends Object, V extends Object> extends AbstractRe
}
}
public WriteRequest(final Function0<? extends U> nonCancellable, final Function2<? super CancelIndicator, ? super U, ? extends V> cancellable, final CompletableFuture<Void> previous) {
super();
public WriteRequest(final RequestManager requestManager, final Function0<? extends U> nonCancellable, final Function2<? super CancelIndicator, ? super U, ? extends V> cancellable, final CompletableFuture<Void> previous) {
super(requestManager);
this.nonCancellable = nonCancellable;
this.cancellable = cancellable;
this.previous = previous;