diff --git a/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/event/scope/LexicalScope.java b/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/event/scope/LexicalScope.java index 153098a01e..d68256874c 100644 --- a/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/event/scope/LexicalScope.java +++ b/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/event/scope/LexicalScope.java @@ -13,8 +13,8 @@ package org.eclipse.tracecompass.ctf.core.event.scope; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.Nullable; @@ -28,7 +28,7 @@ public class LexicalScope implements ILexicalScope { private int hash = 0; private final @NonNull String fName; private final @NonNull String fPath; - private final Map fChildren = new ConcurrentHashMap<>(); + private final Map fChildren = new HashMap<>(); /** * Hidden constructor for the root node only @@ -40,6 +40,22 @@ protected LexicalScope() { fName = ""; //$NON-NLS-1$ } + /** + * Create a scope + * @param parent + * The parent node, can be null, but shouldn't + * @param name + * the name of the field + * @return the scope + */ + public static @NonNull ILexicalScope create(ILexicalScope parent, @NonNull String name) { + ILexicalScope child = parent.getChild(name); + if( child == null) { + child = new LexicalScope(parent, name); + } + return child; + } + /** * The scope constructor * diff --git a/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/event/types/Declaration.java b/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/event/types/Declaration.java index 76b5b275b5..96ede37145 100644 --- a/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/event/types/Declaration.java +++ b/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/event/types/Declaration.java @@ -43,18 +43,10 @@ public ILexicalScope getPath(IDefinitionScope definitionScope, @NonNull String f if (definitionScope != null) { final ILexicalScope parentPath = definitionScope.getScopePath(); if (parentPath != null) { - ILexicalScope myScope = parentPath.getChild(fieldName); - if (myScope == null) { - myScope = new LexicalScope(parentPath, fieldName); - } - return myScope; + return LexicalScope.create(parentPath, fieldName); } } - ILexicalScope child = ILexicalScope.ROOT.getChild(fieldName); - if (child != null) { - return child; - } - return new LexicalScope(ILexicalScope.ROOT, fieldName); + return LexicalScope.create(ILexicalScope.ROOT, fieldName); } /** diff --git a/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/event/types/EnumDeclaration.java b/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/event/types/EnumDeclaration.java index 28a9504cd1..8cd3e858f2 100644 --- a/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/event/types/EnumDeclaration.java +++ b/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/event/types/EnumDeclaration.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -46,6 +47,9 @@ */ public final class EnumDeclaration extends Declaration implements ISimpleDatatypeDeclaration { + private static final int CACHE_SIZE = 256; + private Map fCache = new HashMap<>(); + /** * A pair of longs class * @@ -141,7 +145,7 @@ public EnumDeclaration(IntegerDeclaration containerType) { * Existing enum declaration table * @since 2.3 */ - public EnumDeclaration(IntegerDeclaration containerType, Map enumTree){ + public EnumDeclaration(IntegerDeclaration containerType, Map enumTree) { fContainerType = containerType; enumTree.entrySet().forEach(entry -> fEnumMap.put(entry.getKey(), entry.getValue())); } @@ -241,37 +245,50 @@ public boolean add(@Nullable String label) { * @return the label of that value, can be null */ public @Nullable String query(long value) { - List strValues = new ArrayList<>(); - fEnumMap.forEach((k, v) -> { - if (value >= k.getFirst() && value <= k.getSecond()) { - strValues.add(v); + String retVal = fCache.get(value); + if (retVal == null) { + List strValues = new ArrayList<>(); + fEnumMap.forEach((k, v) -> { + if (value >= k.getFirst() && value <= k.getSecond()) { + strValues.add(v); + } + }); + if (!strValues.isEmpty()) { + retVal = strValues.size() == 1 ? strValues.get(0) : strValues.toString(); + fCache.put(value, retVal); + if (fCache.size() > CACHE_SIZE) { + fCache.remove(fCache.keySet().toArray()[0]); + } + return retVal; } - }); - if (!strValues.isEmpty()) { - return strValues.size() == 1 ? strValues.get(0) : strValues.toString(); - } - /* - * Divide the positive value in bits and see if there is a value for all - * those bits - */ - List flagsSet = new ArrayList<>(); - for (int i = 0; i < Long.SIZE; i++) { - Long bitValue = 1L << i; - if ((bitValue & value) != 0) { - /* - * See if there is a value for this bit where lower == upper, no - * range accepted here - */ - Pair bitPair = new Pair(bitValue, bitValue); - Collection flagValues = fEnumMap.get(bitPair); - if (flagValues.isEmpty()) { - // No value for this bit, not an enum flag - return null; + /* + * Divide the positive value in bits and see if there is a value for + * all those bits + */ + List flagsSet = new ArrayList<>(); + for (int i = 0; i < Long.SIZE; i++) { + Long bitValue = 1L << i; + if ((bitValue & value) != 0) { + /* + * See if there is a value for this bit where lower == + * upper, no range accepted here + */ + Pair bitPair = new Pair(bitValue, bitValue); + Collection flagValues = fEnumMap.get(bitPair); + if (flagValues.isEmpty()) { + // No value for this bit, not an enum flag + return null; + } + flagsSet.add(flagValues.size() == 1 ? flagValues.iterator().next() : flagValues.toString()); } - flagsSet.add(flagValues.size() == 1 ? flagValues.iterator().next() : flagValues.toString()); + } + retVal = flagsSet.isEmpty() ? null : String.join(" | ", flagsSet); //$NON-NLS-1$ + fCache.put(value, retVal); + if (fCache.size() > CACHE_SIZE) { + fCache.remove(fCache.keySet().toArray()[0]); } } - return flagsSet.isEmpty() ? null : String.join(" | ", flagsSet); //$NON-NLS-1$ + return retVal; } /** @@ -332,8 +349,8 @@ public boolean equals(@Nullable Object obj) { return false; } /* - * Must iterate through the entry sets as the comparator used in the enum tree - * does not respect the contract + * Must iterate through the entry sets as the comparator used in the + * enum tree does not respect the contract */ return Iterables.elementsEqual(fEnumMap.entries(), other.fEnumMap.entries()); } @@ -354,8 +371,8 @@ public boolean isBinaryEquivalent(@Nullable IDeclaration obj) { return false; } /* - * Must iterate through the entry sets as the comparator used in the enum tree - * does not respect the contract + * Must iterate through the entry sets as the comparator used in the + * enum tree does not respect the contract */ return Iterables.elementsEqual(fEnumMap.entries(), other.fEnumMap.entries()); } diff --git a/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/event/types/EnumDefinition.java b/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/event/types/EnumDefinition.java index 18c2bdd830..bc7f67708a 100644 --- a/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/event/types/EnumDefinition.java +++ b/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/event/types/EnumDefinition.java @@ -36,9 +36,11 @@ public final class EnumDefinition extends SimpleDatatypeDefinition { private static final String UNKNOWN_ENUM = " (%s)"; //$NON-NLS-1$ + private static final String UNINITIALIZED = "UNINITIALIZED"; //$NON-NLS-1$ + private final IntegerDefinition fInteger; - private final @Nullable String fValue; + private @Nullable String fValue; // ------------------------------------------------------------------------ // Constructors @@ -59,9 +61,9 @@ public final class EnumDefinition extends SimpleDatatypeDefinition { public EnumDefinition(@NonNull EnumDeclaration declaration, IDefinitionScope definitionScope, @NonNull String fieldName, IntegerDefinition intValue) { super(declaration, definitionScope, fieldName); - fInteger = intValue; - fValue = declaration.query(fInteger.getValue()); + fValue = UNINITIALIZED; + } // ------------------------------------------------------------------------ @@ -75,6 +77,9 @@ public EnumDefinition(@NonNull EnumDeclaration declaration, * @return the value of the enum. */ public String getValue() { + if (fValue == UNINITIALIZED) { + fValue = getDeclaration().query(fInteger.getValue()); + } return fValue != null ? fValue : String.format(UNKNOWN_ENUM, getIntegerValue()); } diff --git a/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/trace/CTFStreamInput.java b/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/trace/CTFStreamInput.java index 1506bf50af..78efd05ae9 100644 --- a/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/trace/CTFStreamInput.java +++ b/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/trace/CTFStreamInput.java @@ -299,7 +299,7 @@ private BitBuffer createBitBufferForPacketHeader(FileChannel fc, long dataOffset * create a packet bit buffer to read the packet header */ int maximumSize = fStreamPacketContextDecl.getMaximumSize() + fTracePacketHeaderDecl.getMaximumSize(); - BitBuffer bitBuffer = new BitBuffer(createPacketBitBuffer(fc, dataOffsetbits/Byte.SIZE, maximumSize)); + BitBuffer bitBuffer = new BitBuffer(createPacketBitBuffer(fc, dataOffsetbits / Byte.SIZE, maximumSize)); bitBuffer.setByteOrder(getStream().getTrace().getByteOrder()); return bitBuffer; } @@ -372,7 +372,7 @@ private StructDefinition parseTracePacketHeader( } if (!Objects.equals(getStream().getTrace().getUUID(), uuid) && !fUUIDMismatchWarning) { fUUIDMismatchWarning = true; - CtfCoreLoggerUtil.logWarning("Reading CTF trace: UUID mismatch for trace " + getStream().getTrace()); //$NON-NLS-1$ + CtfCoreLoggerUtil.logWarning("Reading CTF trace: UUID mismatch for trace " + getStream().getTrace() + " is not uuid from metadata (" + String.valueOf(uuid) + ")"); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ } if (streamIDDef != null) { long streamID = streamIDDef.getValue(); diff --git a/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/trace/CTFStreamInputReader.java b/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/trace/CTFStreamInputReader.java index 8e2362b0b5..2d2a140df7 100644 --- a/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/trace/CTFStreamInputReader.java +++ b/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/ctf/core/trace/CTFStreamInputReader.java @@ -21,7 +21,12 @@ import java.nio.channels.FileChannel.MapMode; import java.nio.file.StandardOpenOption; import java.util.List; +import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; import org.eclipse.tracecompass.ctf.core.CTFException; @@ -32,7 +37,7 @@ import org.eclipse.tracecompass.ctf.core.event.types.StructDeclaration; import org.eclipse.tracecompass.internal.ctf.core.CtfCoreLoggerUtil; import org.eclipse.tracecompass.internal.ctf.core.SafeMappedByteBuffer; -import org.eclipse.tracecompass.internal.ctf.core.trace.CTFPacketReader; +import org.eclipse.tracecompass.internal.ctf.core.trace.CTFThreadedPacketReader; import org.eclipse.tracecompass.internal.ctf.core.trace.NullPacketReader; /** @@ -51,6 +56,56 @@ public class CTFStreamInputReader implements AutoCloseable { // Attributes // ------------------------------------------------------------------------ + private final class ExecutorClass implements Executor { + BlockingQueue<@NonNull Runnable> fQueue = new ArrayBlockingQueue<>(16); + Thread fThread = new Thread() { + @Override + public void run() { + try { + Runnable runnable = fQueue.take(); + while (runnable != POISON_PILL) { + runnable.run(); + runnable = fQueue.take(); + } + } catch (InterruptedException e) { + // don't consume it + } + } + }; + + private ExecutorClass(String fileName) { + fThread.setName("StreamReader: " + fileName); //$NON-NLS-1$ + fThread.start(); + Thread.currentThread().setName("Stream enqueuer"); //$NON-NLS-1$ + } + + @Override + public void execute(@Nullable Runnable command) { + if (command != null) { + try { + fQueue.put(command); + } catch (InterruptedException e) { + + } + } + + } + + Runnable POISON_PILL = new Runnable() { + + @Override + public void run() { + } + }; + + public void terminate() throws InterruptedException { + fQueue.put(POISON_PILL); + fThread.join(); + } + } + + private Executor fStreamConsumer; + /** * The StreamInput we are reading. */ @@ -97,6 +152,7 @@ public class CTFStreamInputReader implements AutoCloseable { public CTFStreamInputReader(CTFStreamInput streamInput) throws CTFException { fStreamInput = streamInput; fFile = fStreamInput.getFile(); + fStreamConsumer = new ExecutorClass(String.valueOf(fFile)); try { fFileChannel = FileChannel.open(fFile.toPath(), StandardOpenOption.READ); } catch (IOException e) { @@ -141,7 +197,7 @@ private IPacketReader getCurrentPacketReader(@Nullable ICTFPacketDescriptor pack bitBuffer.position(packet.getPayloadStartBits()); IDeclaration eventHeaderDeclaration = getStreamInput().getStream().getEventHeaderDeclaration(); CTFTrace trace = getStreamInput().getStream().getTrace(); - ctfPacketReader = new CTFPacketReader(bitBuffer, packet, getEventDeclarations(), eventHeaderDeclaration, getStreamEventContextDecl(), trace.getPacketHeaderDef(), trace); + ctfPacketReader = new CTFThreadedPacketReader(fStreamConsumer, bitBuffer, packet, getEventDeclarations(), eventHeaderDeclaration, getStreamEventContextDecl(), Objects.requireNonNull(trace.getPacketHeaderDef()), trace); } return ctfPacketReader; } @@ -179,6 +235,10 @@ public void close() throws IOException { if (fFileChannel != null) { fFileChannel.close(); } + try { + ((ExecutorClass) fStreamConsumer).terminate(); + } catch (InterruptedException e) { + } fPacketReader = NullPacketReader.INSTANCE; } diff --git a/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/internal/ctf/core/trace/CTFThreadedPacketReader.java b/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/internal/ctf/core/trace/CTFThreadedPacketReader.java new file mode 100644 index 0000000000..18559a66a7 --- /dev/null +++ b/ctf/org.eclipse.tracecompass.ctf.core/src/org/eclipse/tracecompass/internal/ctf/core/trace/CTFThreadedPacketReader.java @@ -0,0 +1,157 @@ +/******************************************************************************* + * Copyright (c) 2015 Ericsson + * + * All rights reserved. This program and the accompanying materials are made + * available under the terms of the Eclipse Public License v1.0 which + * accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + *******************************************************************************/ + +package org.eclipse.tracecompass.internal.ctf.core.trace; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; + +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; +import org.eclipse.tracecompass.ctf.core.CTFException; +import org.eclipse.tracecompass.ctf.core.event.IEventDeclaration; +import org.eclipse.tracecompass.ctf.core.event.IEventDefinition; +import org.eclipse.tracecompass.ctf.core.event.io.BitBuffer; +import org.eclipse.tracecompass.ctf.core.event.scope.IDefinitionScope; +import org.eclipse.tracecompass.ctf.core.event.types.ICompositeDefinition; +import org.eclipse.tracecompass.ctf.core.event.types.IDeclaration; +import org.eclipse.tracecompass.ctf.core.event.types.StructDeclaration; +import org.eclipse.tracecompass.ctf.core.trace.ICTFPacketDescriptor; +import org.eclipse.tracecompass.ctf.core.trace.IPacketReader; + +/** + * Threaded Packet Reader + * + * @author Matthew Khouzam - Initial API and implementation + * + */ +public class CTFThreadedPacketReader implements IPacketReader { + + private static final int QUEUE_LENGTH = 15; + + private static final @NonNull IEventDefinition[] POISON_PILL = new IEventDefinition[0]; + + private final CTFException[] fException = new CTFException[1]; + + private final CTFPacketReader fPacketReader; + + private final BlockingQueue fEvents = new ArrayBlockingQueue<>(QUEUE_LENGTH); + + private IEventDefinition[] fNextEvents; + private IEventDefinition fCurrentEvent; + private int fCurrentIndex = 0; + + private final Runnable fRunnable = new Runnable() { + private static final int CHUNK_SIZE = 1023; + + @Override + public void run() { + try { + IEventDefinition[] chunk = new IEventDefinition[CHUNK_SIZE]; + int index = 0; + while (fPacketReader.hasMoreEvents()) { + + chunk[index] = (fPacketReader.readNextEvent()); + index++; + if (index >= CHUNK_SIZE) { + fEvents.put(chunk); + index = 0; + chunk = new IEventDefinition[CHUNK_SIZE]; + } + } + if (index != 0) { + fEvents.put(Arrays.copyOf(chunk, index)); + } + fEvents.put(POISON_PILL); + } catch (CTFException | InterruptedException ex) { + fException[0] = new CTFException(ex); + } + } + }; + + /** + * Constructor + * + * @param executor + * Executor to enqueue packet reader + * @param input + * input {@link BitBuffer} + * @param packetContext + * packet_context where we get info like lost events and cpu_id + * @param declarations + * event declarations for this packet reader + * @param eventHeaderDeclaration + * event header declaration, what to read before any given event, + * to find it's id + * @param streamContext + * the context declaration + * @param packetHeader + * the header with the magic numbers and such + * @param packetScope + * the scope of the packetHeader + * @throws CTFException + * A ctf error or a timeout + */ + public CTFThreadedPacketReader(Executor executor, @NonNull BitBuffer input, @NonNull ICTFPacketDescriptor packetContext, @NonNull List<@Nullable IEventDeclaration> declarations, @Nullable IDeclaration eventHeaderDeclaration, + @Nullable StructDeclaration streamContext, + @NonNull ICompositeDefinition packetHeader, + @NonNull IDefinitionScope packetScope) throws CTFException { + fPacketReader = new CTFPacketReader(input, packetContext, declarations, eventHeaderDeclaration, streamContext, packetHeader, packetScope); + if (input.canRead(1)) { + fNextEvents = new IEventDefinition[1]; + fNextEvents[0] = fPacketReader.readNextEvent(); + } else { + fNextEvents = POISON_PILL; + } + executor.execute(fRunnable); + } + + @Override + public int getCPU() { + return fCurrentEvent.getCPU(); + } + + @Override + public boolean hasMoreEvents() { + return fNextEvents != POISON_PILL; + } + + @Override + public IEventDefinition readNextEvent() throws CTFException { + fCurrentEvent = fNextEvents[fCurrentIndex]; + fCurrentIndex++; + if (fCurrentIndex == fNextEvents.length) { + try { + fNextEvents = fEvents.take(); + fCurrentIndex = 0; + } catch (InterruptedException e) { + throw new CTFException(e); + } + } + if (fException[0] != null) { + throw fException[0]; + } + return fCurrentEvent; + + } + + @Override + public ICTFPacketDescriptor getCurrentPacket() { + return fPacketReader.getCurrentPacket(); + } + + @Override + public ICompositeDefinition getCurrentPacketEventHeader() { + return fPacketReader.getCurrentPacketEventHeader(); + } + +}