Skip to content

Commit

Permalink
fixed #43
Browse files Browse the repository at this point in the history
  • Loading branch information
mgeipel committed Mar 20, 2013
1 parent f47c291 commit 43ed872
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 44 deletions.
20 changes: 10 additions & 10 deletions examples/sort/sort-gnd.flux
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
default fileName = FLUX_DIR + "gnd-sample.pica";
fileName|
open-file|
as-lines|
decode-pica|
morph(FLUX_DIR + "gnd-pref-label.xml")|
stream-to-triples|
sort-triples(sortBy="object")|
template("${s}\t${o}")|
default fileName = FLUX_DIR + "gnd-sample.pica";

fileName|
open-file|
as-lines|
decode-pica|
morph(FLUX_DIR + "gnd-pref-label.xml")|
stream-to-triples|
sort-triples(by="object")|
template("${s}\t${o}")|
write("stdout");
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,39 @@ public abstract class AbstractTripleSort extends DefaultObjectPipe<Triple, Objec
/**
* specifies the comparator
*/
public enum CompareBy {
public enum Compare {
SUBJECT, PREDICATE, OBJECT, ALL;
}

/**
* sort order
*
*/
public enum Order {
INCREASING {
@Override
public int order(final int indicator) {
return indicator;
}
}, DECREASING {
@Override
public int order(final int indicator) {
return -indicator;
}
};
public abstract int order(int indicator);
}


//public static final String OBJECT = "Object";
//public static final String PREDICATE = "Predicate";
//public static final String SUBJECT = "Subject";

private static final int KILO = 1024;
private static final int DEFUALT_BLOCKSIZE = 128 * KILO * KILO;
private static final int STRING_OVERHEAD = 124;

private final List<Triple> buffer = new ArrayList<Triple>();
private final List<File> tempFiles = new ArrayList<File>();
private CompareBy compareBy = CompareBy.SUBJECT;
private Comparator<Triple> comparator = createComparator(compareBy);
private Compare compare = Compare.SUBJECT;
private Order order = Order.INCREASING;
//private Comparator<Triple> comparator = createComparator(compareBy, order);
private long bufferSizeEstimate;

private long blockSize = DEFUALT_BLOCKSIZE;
Expand All @@ -67,16 +83,20 @@ public final void setBlockSize(final int blockSize) {
this.blockSize = blockSize * KILO * KILO;
}

protected final void setComparator(final CompareBy compareBy) {
comparator = createComparator(compareBy);
this.compareBy = compareBy;
protected final void setCompare(final Compare compare) {
this.compare = compare;
}

protected final CompareBy getComparatorType() {
return compareBy;
protected final Compare getCompare() {
return compare;
}

protected final void setSortOrder(final Order order){
this.order = order;
}



@Override
public final void process(final Triple namedValue) {

Expand All @@ -96,7 +116,7 @@ public final void process(final Triple namedValue) {
}

private void nextBatch() throws IOException {
Collections.sort(buffer, comparator);
Collections.sort(buffer, createComparator(compare, order));
final File tempFile = File.createTempFile("sort", "namedValues", null);
tempFile.deleteOnExit();
final ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(tempFile));
Expand All @@ -112,24 +132,22 @@ private void nextBatch() throws IOException {
tempFiles.add(tempFile);
}

protected final Comparator<Triple> getComparator() {
return comparator;
}

@Override
public final void onCloseStream() {


if (tempFiles.isEmpty()) {
Collections.sort(buffer, comparator);
Collections.sort(buffer, createComparator(compare, order));
for (Triple triple : buffer) {
sortedTriple(triple);
}
onFinished();
} else {
final Comparator<Triple> comparator = createComparator(compare, order);
final PriorityQueue<SortedTripleFileFacade> queue = new PriorityQueue<SortedTripleFileFacade>(11,
new Comparator<SortedTripleFileFacade>() {
private final Comparator<Triple> comparator = getComparator();
// private final Comparator<Triple> comparator = getComparator();

@Override
public int compare(final SortedTripleFileFacade o1, final SortedTripleFileFacade o2) {
Expand Down Expand Up @@ -170,30 +188,34 @@ protected void onFinished() {

protected abstract void sortedTriple(Triple namedValue);

public static Comparator<Triple> createComparator(final CompareBy compareBy) {
public final Comparator<Triple> createComparator(){
return createComparator(compare, order);
}

public static Comparator<Triple> createComparator(final Compare compareBy, final Order order) {
final Comparator<Triple> comparator;
switch (compareBy) {
case ALL:
comparator = new Comparator<Triple>() {
@Override
public int compare(final Triple o1, final Triple o2) {
return o1.compareTo(o2);
return order.order(o1.compareTo(o2));
}
};
break;
case OBJECT:
comparator = new Comparator<Triple>() {
@Override
public int compare(final Triple o1, final Triple o2) {
return o1.getObject().compareTo(o2.getObject());
return order.order(o1.getObject().compareTo(o2.getObject()));
}
};
break;
case SUBJECT:
comparator = new Comparator<Triple>() {
@Override
public int compare(final Triple o1, final Triple o2) {
return o1.getSubject().compareTo(o2.getSubject());
return order.order(o1.getSubject().compareTo(o2.getSubject()));
}
};
break;
Expand All @@ -202,7 +224,7 @@ public int compare(final Triple o1, final Triple o2) {
comparator = new Comparator<Triple>() {
@Override
public int compare(final Triple o1, final Triple o2) {
return o1.getPredicate().compareTo(o2.getPredicate());
return order.order(o1.getPredicate().compareTo(o2.getPredicate()));
}
};
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package org.culturegraph.mf.stream.pipe.sort;

import java.util.Comparator;

import org.culturegraph.mf.framework.annotations.Description;
import org.culturegraph.mf.framework.annotations.In;
import org.culturegraph.mf.framework.annotations.Out;
Expand All @@ -29,20 +31,25 @@
@In(NamedValue.class)
@Out(NamedValue.class)
public final class TripleCount extends AbstractTripleSort {

public static final String DEFAULT_COUNTP_REDICATE = "count";
private Triple current;

private static final Triple INIT = new Triple("", "", "");

private Triple current = INIT;
private int count;
private String countPredicate = DEFAULT_COUNTP_REDICATE;
private Comparator<Triple> comparator;

@Override
protected void sortedTriple(final Triple triple) {

if(current==null){
if(current==INIT){
current = triple;
comparator = createComparator();
}

if(getComparator().compare(current, triple)==0){
if(comparator.compare(current, triple)==0){
++count;
}else{
writeResult();
Expand All @@ -61,7 +68,7 @@ protected void onFinished() {
}

private void writeResult() {
final CompareBy compareBy = getComparatorType();
final Compare compareBy = getCompare();
switch (compareBy) {
case ALL:
getReceiver().process(new Triple(current.toString(), countPredicate , String.valueOf(count)));
Expand All @@ -79,8 +86,7 @@ private void writeResult() {
}
}

public void setCountBy(final CompareBy countBy){
setComparator(countBy);
public void setCountBy(final Compare countBy){
setCompare(countBy);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@ protected void sortedTriple(final Triple triple) {
getReceiver().process(triple);
}

public void setSortBy(final CompareBy sortBy){
setComparator(sortBy);
public void setBy(final Compare compare){
setCompare(compare);
}


public void setOrder(final Order order){
setSortOrder(order);
}


}

0 comments on commit 43ed872

Please sign in to comment.