Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jarek/6617: add output widget #6742

Merged
merged 5 commits into from
Feb 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class DefaultJVMVariables {
public DefaultJVMVariables() {
addImports(
"com.twosigma.beakerx.NamespaceClient",
"com.twosigma.beakerx.widgets.OutputManager",
"com.twosigma.beakerx.chart.Color",
"com.twosigma.beakerx.chart.GradientColor",
"com.twosigma.beakerx.chart.legend.*",
Expand Down
45 changes: 22 additions & 23 deletions kernel/base/src/main/java/com/twosigma/beakerx/NamespaceClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.twosigma.beakerx.jvm.serialization.BasicObjectSerializer;
import com.twosigma.beakerx.jvm.serialization.BeakerObjectConverter;
import com.twosigma.beakerx.table.TableDisplayToJson;

import java.io.IOException;
import java.io.Serializable;
import java.io.StringWriter;
Expand All @@ -35,10 +36,11 @@
import java.util.concurrent.SynchronousQueue;

import static com.fasterxml.jackson.databind.SerializationFeature.WRITE_ENUMS_USING_TO_STRING;
import static com.twosigma.beakerx.kernel.msg.JupyterMessages.COMM_MSG;

public class NamespaceClient {
private static Map<String,NamespaceClient> nsClients = new ConcurrentHashMap<>();

private static Map<String, NamespaceClient> nsClients = new ConcurrentHashMap<>();
private static String currentSession;
private static Map<String, SynchronousQueue<Object>> messagePool = new HashMap<>();
private ObjectMapper objectMapper;
Expand Down Expand Up @@ -68,22 +70,22 @@ public SimpleEvaluationObject getOutputObj() {
public synchronized void setOutputObj(SimpleEvaluationObject input) {
currentCeo = input;
}

public synchronized static NamespaceClient getBeaker() {
if (currentSession!=null){
if (currentSession != null) {
return nsClients.get(currentSession);
}
return null;
}

public synchronized static NamespaceClient getBeaker(String session) {
currentSession = session;
if (!nsClients.containsKey(session)) {
nsClients.put(session, new NamespaceClient());
}
return nsClients.get(currentSession);
}

public synchronized static void delBeaker(String sessionId) {
nsClients.remove(sessionId);
currentSession = null;
Expand All @@ -98,12 +100,11 @@ public synchronized Object set(String name, Object value) throws IOException {
state.put("sync", true);
data.put("state", state);
data.put("buffer_paths", new HashMap<>());
c.setData(data);
c.send();
c.send(COMM_MSG, Comm.Buffer.EMPTY, new Comm.Data(data));
return value;
}
protected String getJson(Object value) throws IOException{

protected String getJson(Object value) throws IOException {
StringWriter sw = new StringWriter();
JsonGenerator jgen = objectMapper.getFactory().createGenerator(sw);
objectSerializer.writeObject(value, jgen, true);
Expand All @@ -114,17 +115,17 @@ protected String getJson(Object value) throws IOException{

//TODO : Not Implemented
public Object setFast(String name, Object value) {
throw new RuntimeException("This option is not implemented now") ;
throw new RuntimeException("This option is not implemented now");
}

//TODO : Not Implemented
public Object unset(String name) {
throw new RuntimeException("This option is not implemented now") ;
throw new RuntimeException("This option is not implemented now");
}

//TODO : Not Implemented
public synchronized Object get(final String name) {
throw new RuntimeException("This option is not implemented now") ;
throw new RuntimeException("This option is not implemented now");
}

public static SynchronousQueue<Object> getMessageQueue(String channel) {
Expand All @@ -137,29 +138,29 @@ public static SynchronousQueue<Object> getMessageQueue(String channel) {
}

protected Comm getAutotranslationComm() {
if(autotranslationComm == null){
if (autotranslationComm == null) {
autotranslationComm = new Comm(TargetNamesEnum.BEAKER_AUTOTRANSLATION);
autotranslationComm.open();
}
return autotranslationComm;
}

protected Comm getCodeCellsComm() {
if(codeCellsComm == null){
if (codeCellsComm == null) {
codeCellsComm = new Comm(TargetNamesEnum.BEAKER_GETCODECELLS);
codeCellsComm.open();
}
return codeCellsComm;
}

protected Comm getTagRunComm() {
if(tagRunComm == null){
if (tagRunComm == null) {
tagRunComm = new Comm(TargetNamesEnum.BEAKER_TAG_RUN);
tagRunComm.open();
}
return tagRunComm;
}


public List<CodeCell> getCodeCells(String tagFilter) throws IOException, InterruptedException {
// first send message to get cells
Expand All @@ -170,11 +171,10 @@ public List<CodeCell> getCodeCells(String tagFilter) throws IOException, Interru
state.put("value", getJson(tagFilter));
data.put("state", state);
data.put("buffer_paths", new HashMap<>());
c.setData(data);
c.send();
c.send(COMM_MSG, Comm.Buffer.EMPTY, new Comm.Data(data));
// block
Object cells = getMessageQueue("CodeCells").take();
return (List<CodeCell>)cells;
return (List<CodeCell>) cells;
}

public synchronized void runByTag(String tag) {
Expand All @@ -184,8 +184,7 @@ public synchronized void runByTag(String tag) {
state.put("runByTag", tag);
data.put("state", state);
data.put("buffer_paths", new HashMap<>());
c.setData(data);
c.send();
c.send(COMM_MSG, Comm.Buffer.EMPTY, new Comm.Data(data));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.twosigma.beakerx.DefaultJVMVariables;
import com.twosigma.beakerx.NamespaceClient;
import com.twosigma.beakerx.TryResult;
import com.twosigma.beakerx.jvm.object.SimpleEvaluationObject;
import com.twosigma.beakerx.jvm.threads.CellExecutor;
import com.twosigma.beakerx.kernel.AddImportStatus;
import com.twosigma.beakerx.kernel.Classpath;
Expand All @@ -38,6 +39,9 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public abstract class BaseEvaluator implements Evaluator {

Expand All @@ -51,6 +55,7 @@ public abstract class BaseEvaluator implements Evaluator {
private final CellExecutor executor;
private Path tempFolder;
protected Repos repos;
protected ExecutorService executorService;

public BaseEvaluator(String id, String sId, CellExecutor cellExecutor, TempFolderFactory tempFolderFactory, EvaluatorParameters evaluatorParameters) {
shellId = id;
Expand All @@ -61,9 +66,21 @@ public BaseEvaluator(String id, String sId, CellExecutor cellExecutor, TempFolde
classPath = new Classpath();
classPath.add(new PathToJar(outDir));
repos = new Repos();
executorService = Executors.newSingleThreadExecutor();
init(evaluatorParameters);
}

protected TryResult evaluate(SimpleEvaluationObject seo, Callable<TryResult> callable) {
Future<TryResult> submit = executorService.submit(callable);
TryResult either = null;
try {
either = submit.get();
} catch (Exception e) {
either = TryResult.createError(e.getLocalizedMessage());
}
return either;
}

protected abstract void addJarToClassLoader(PathToJar pathToJar);

protected abstract void addImportToClassLoader(ImportPath anImport);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.twosigma.beakerx.kernel.KernelFunctionality;
import com.twosigma.beakerx.message.Message;

import java.util.Collections;

import static com.google.common.base.Preconditions.checkNotNull;

public abstract class KernelHandler<T> implements Handler<T> {
Expand All @@ -33,7 +35,7 @@ public void send(Message message) {
}

public void publish(Message message) {
kernel.publish(message);
kernel.publish(Collections.singletonList(message));
}

public void exit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,46 @@
import java.util.List;
import java.util.Observable;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;

import com.twosigma.beakerx.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Abstraction around an evaluation, for communication of the state over REST to the plugin.
*/
public class SimpleEvaluationObject extends Observable {

private final static Logger logger = LoggerFactory.getLogger(SimpleEvaluationObject.class.getName());

private Message jupyterMessage;
private int executionCount;
private EvaluationStatus status;
private final String expression;
private Object payload;
private BeakerOutputHandler stdout;
private BeakerOutputHandler stderr;
private Queue<ConsoleOutput> consoleOutput = new ConcurrentLinkedQueue<ConsoleOutput>();
private Queue<ConsoleOutput> consoleOutput = new ConcurrentLinkedQueue<>();
private ProgressReporting progressReporting;
private boolean showResult = true;

public SimpleEvaluationObject(String e, BeakerOutputHandler stdout, BeakerOutputHandler stderr) {
expression = e;
status = EvaluationStatus.QUEUED;
this.stdout = stdout;
this.stderr = stderr;
}

public SimpleEvaluationObject(String e) {
expression = e;
status = EvaluationStatus.QUEUED;
this.stdout = new SimpleOutputHandler(false);
this.stderr = new SimpleOutputHandler(true);
}

public boolean isShowResult() {
return showResult;
}

public synchronized void started() {
setOutputHandler();
this.status = EvaluationStatus.RUNNING;
setChanged();
notifyObservers();
Expand All @@ -71,7 +76,6 @@ public synchronized void finished(Object r) {
notifyObservers();
}


public synchronized void error(Object r) {
clrOutputHandler();
this.status = EvaluationStatus.ERROR;
Expand Down Expand Up @@ -148,14 +152,10 @@ public void write(byte[] b, int off, int len) {
}

public synchronized BeakerOutputHandler getStdOutputHandler() {
if (stdout == null)
stdout = new SimpleOutputHandler(false);
return stdout;
}

public synchronized BeakerOutputHandler getStdErrorHandler() {
if (stderr == null)
stderr = new SimpleOutputHandler(true);
return stderr;
}

Expand Down
Loading