Skip to content

Commit

Permalink
jarek/6617: add output widget (#6742)
Browse files Browse the repository at this point in the history
* #6617: beakerx stdout per evaluation

* #6617: add Output widget

* #6617: handle rich material

* #6617: sync messages
  • Loading branch information
jaroslawmalekcodete authored and scottdraves committed Feb 7, 2018
1 parent 1f7840a commit bbf42c3
Show file tree
Hide file tree
Showing 44 changed files with 978 additions and 426 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class DefaultJVMVariables {
public DefaultJVMVariables() {
addImports(
"com.twosigma.beakerx.NamespaceClient",
"com.twosigma.beakerx.widgets.OutputManager",
"com.twosigma.beakerx.chart.Color",
"com.twosigma.beakerx.chart.GradientColor",
"com.twosigma.beakerx.chart.legend.*",
Expand Down
45 changes: 22 additions & 23 deletions kernel/base/src/main/java/com/twosigma/beakerx/NamespaceClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.twosigma.beakerx.jvm.serialization.BasicObjectSerializer;
import com.twosigma.beakerx.jvm.serialization.BeakerObjectConverter;
import com.twosigma.beakerx.table.TableDisplayToJson;

import java.io.IOException;
import java.io.Serializable;
import java.io.StringWriter;
Expand All @@ -35,10 +36,11 @@
import java.util.concurrent.SynchronousQueue;

import static com.fasterxml.jackson.databind.SerializationFeature.WRITE_ENUMS_USING_TO_STRING;
import static com.twosigma.beakerx.kernel.msg.JupyterMessages.COMM_MSG;

public class NamespaceClient {
private static Map<String,NamespaceClient> nsClients = new ConcurrentHashMap<>();

private static Map<String, NamespaceClient> nsClients = new ConcurrentHashMap<>();
private static String currentSession;
private static Map<String, SynchronousQueue<Object>> messagePool = new HashMap<>();
private ObjectMapper objectMapper;
Expand Down Expand Up @@ -68,22 +70,22 @@ public SimpleEvaluationObject getOutputObj() {
public synchronized void setOutputObj(SimpleEvaluationObject input) {
currentCeo = input;
}

public synchronized static NamespaceClient getBeaker() {
if (currentSession!=null){
if (currentSession != null) {
return nsClients.get(currentSession);
}
return null;
}

public synchronized static NamespaceClient getBeaker(String session) {
currentSession = session;
if (!nsClients.containsKey(session)) {
nsClients.put(session, new NamespaceClient());
}
return nsClients.get(currentSession);
}

public synchronized static void delBeaker(String sessionId) {
nsClients.remove(sessionId);
currentSession = null;
Expand All @@ -98,12 +100,11 @@ public synchronized Object set(String name, Object value) throws IOException {
state.put("sync", true);
data.put("state", state);
data.put("buffer_paths", new HashMap<>());
c.setData(data);
c.send();
c.send(COMM_MSG, Comm.Buffer.EMPTY, new Comm.Data(data));
return value;
}
protected String getJson(Object value) throws IOException{

protected String getJson(Object value) throws IOException {
StringWriter sw = new StringWriter();
JsonGenerator jgen = objectMapper.getFactory().createGenerator(sw);
objectSerializer.writeObject(value, jgen, true);
Expand All @@ -114,17 +115,17 @@ protected String getJson(Object value) throws IOException{

//TODO : Not Implemented
public Object setFast(String name, Object value) {
throw new RuntimeException("This option is not implemented now") ;
throw new RuntimeException("This option is not implemented now");
}

//TODO : Not Implemented
public Object unset(String name) {
throw new RuntimeException("This option is not implemented now") ;
throw new RuntimeException("This option is not implemented now");
}

//TODO : Not Implemented
public synchronized Object get(final String name) {
throw new RuntimeException("This option is not implemented now") ;
throw new RuntimeException("This option is not implemented now");
}

public static SynchronousQueue<Object> getMessageQueue(String channel) {
Expand All @@ -137,29 +138,29 @@ public static SynchronousQueue<Object> getMessageQueue(String channel) {
}

protected Comm getAutotranslationComm() {
if(autotranslationComm == null){
if (autotranslationComm == null) {
autotranslationComm = new Comm(TargetNamesEnum.BEAKER_AUTOTRANSLATION);
autotranslationComm.open();
}
return autotranslationComm;
}

protected Comm getCodeCellsComm() {
if(codeCellsComm == null){
if (codeCellsComm == null) {
codeCellsComm = new Comm(TargetNamesEnum.BEAKER_GETCODECELLS);
codeCellsComm.open();
}
return codeCellsComm;
}

protected Comm getTagRunComm() {
if(tagRunComm == null){
if (tagRunComm == null) {
tagRunComm = new Comm(TargetNamesEnum.BEAKER_TAG_RUN);
tagRunComm.open();
}
return tagRunComm;
}


public List<CodeCell> getCodeCells(String tagFilter) throws IOException, InterruptedException {
// first send message to get cells
Expand All @@ -170,11 +171,10 @@ public List<CodeCell> getCodeCells(String tagFilter) throws IOException, Interru
state.put("value", getJson(tagFilter));
data.put("state", state);
data.put("buffer_paths", new HashMap<>());
c.setData(data);
c.send();
c.send(COMM_MSG, Comm.Buffer.EMPTY, new Comm.Data(data));
// block
Object cells = getMessageQueue("CodeCells").take();
return (List<CodeCell>)cells;
return (List<CodeCell>) cells;
}

public synchronized void runByTag(String tag) {
Expand All @@ -184,8 +184,7 @@ public synchronized void runByTag(String tag) {
state.put("runByTag", tag);
data.put("state", state);
data.put("buffer_paths", new HashMap<>());
c.setData(data);
c.send();
c.send(COMM_MSG, Comm.Buffer.EMPTY, new Comm.Data(data));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.twosigma.beakerx.DefaultJVMVariables;
import com.twosigma.beakerx.NamespaceClient;
import com.twosigma.beakerx.TryResult;
import com.twosigma.beakerx.jvm.object.SimpleEvaluationObject;
import com.twosigma.beakerx.jvm.threads.CellExecutor;
import com.twosigma.beakerx.kernel.AddImportStatus;
import com.twosigma.beakerx.kernel.Classpath;
Expand All @@ -38,6 +39,9 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public abstract class BaseEvaluator implements Evaluator {

Expand All @@ -51,6 +55,7 @@ public abstract class BaseEvaluator implements Evaluator {
private final CellExecutor executor;
private Path tempFolder;
protected Repos repos;
protected ExecutorService executorService;

public BaseEvaluator(String id, String sId, CellExecutor cellExecutor, TempFolderFactory tempFolderFactory, EvaluatorParameters evaluatorParameters) {
shellId = id;
Expand All @@ -61,9 +66,21 @@ public BaseEvaluator(String id, String sId, CellExecutor cellExecutor, TempFolde
classPath = new Classpath();
classPath.add(new PathToJar(outDir));
repos = new Repos();
executorService = Executors.newSingleThreadExecutor();
init(evaluatorParameters);
}

protected TryResult evaluate(SimpleEvaluationObject seo, Callable<TryResult> callable) {
Future<TryResult> submit = executorService.submit(callable);
TryResult either = null;
try {
either = submit.get();
} catch (Exception e) {
either = TryResult.createError(e.getLocalizedMessage());
}
return either;
}

protected abstract void addJarToClassLoader(PathToJar pathToJar);

protected abstract void addImportToClassLoader(ImportPath anImport);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.twosigma.beakerx.kernel.KernelFunctionality;
import com.twosigma.beakerx.message.Message;

import java.util.Collections;

import static com.google.common.base.Preconditions.checkNotNull;

public abstract class KernelHandler<T> implements Handler<T> {
Expand All @@ -33,7 +35,7 @@ public void send(Message message) {
}

public void publish(Message message) {
kernel.publish(message);
kernel.publish(Collections.singletonList(message));
}

public void exit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,46 @@
import java.util.List;
import java.util.Observable;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;

import com.twosigma.beakerx.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Abstraction around an evaluation, for communication of the state over REST to the plugin.
*/
public class SimpleEvaluationObject extends Observable {

private final static Logger logger = LoggerFactory.getLogger(SimpleEvaluationObject.class.getName());

private Message jupyterMessage;
private int executionCount;
private EvaluationStatus status;
private final String expression;
private Object payload;
private BeakerOutputHandler stdout;
private BeakerOutputHandler stderr;
private Queue<ConsoleOutput> consoleOutput = new ConcurrentLinkedQueue<ConsoleOutput>();
private Queue<ConsoleOutput> consoleOutput = new ConcurrentLinkedQueue<>();
private ProgressReporting progressReporting;
private boolean showResult = true;

public SimpleEvaluationObject(String e, BeakerOutputHandler stdout, BeakerOutputHandler stderr) {
expression = e;
status = EvaluationStatus.QUEUED;
this.stdout = stdout;
this.stderr = stderr;
}

public SimpleEvaluationObject(String e) {
expression = e;
status = EvaluationStatus.QUEUED;
this.stdout = new SimpleOutputHandler(false);
this.stderr = new SimpleOutputHandler(true);
}

public boolean isShowResult() {
return showResult;
}

public synchronized void started() {
setOutputHandler();
this.status = EvaluationStatus.RUNNING;
setChanged();
notifyObservers();
Expand All @@ -71,7 +76,6 @@ public synchronized void finished(Object r) {
notifyObservers();
}


public synchronized void error(Object r) {
clrOutputHandler();
this.status = EvaluationStatus.ERROR;
Expand Down Expand Up @@ -148,14 +152,10 @@ public void write(byte[] b, int off, int len) {
}

public synchronized BeakerOutputHandler getStdOutputHandler() {
if (stdout == null)
stdout = new SimpleOutputHandler(false);
return stdout;
}

public synchronized BeakerOutputHandler getStdErrorHandler() {
if (stderr == null)
stderr = new SimpleOutputHandler(true);
return stderr;
}

Expand Down
Loading

0 comments on commit bbf42c3

Please sign in to comment.