Skip to content

Commit

Permalink
BinLog gets 'queue' functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
ar committed Mar 13, 2024
1 parent 2b6d4b5 commit 14a84ca
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 67 deletions.
50 changes: 43 additions & 7 deletions modules/binlog/src/main/java/org/jpos/binlog/BinLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@

import java.io.EOFException;
import java.io.IOException;
import java.io.Serial;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.FileLock;
import java.nio.file.*;
import java.security.SecureRandom;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -171,8 +174,7 @@ protected boolean checkCutover (boolean findLast) throws IOException {

private int getNextFileIndex() throws IOException {
ByteBuffer index = readBuffer(raf, "NEXT_LOG_INDEX_OFFSET", 4, NEXT_LOG_INDEX_OFFSET);
int next = index.getInt();
return next;
return index.getInt();
}

/**
Expand Down Expand Up @@ -237,9 +239,7 @@ private ByteBuffer readBuffer(AsynchronousFileChannel raf, String name, int buff
if (read != bufferSize) {
throw new IOException("Failed to read " + bufferSize + " byte " + name + ", return: " + read);
}
} catch (InterruptedException e) {
throw new IOException (e.getMessage());
} catch (ExecutionException e) {
} catch (InterruptedException | ExecutionException e) {
throw new IOException (e.getMessage());
}
buffer.flip();
Expand Down Expand Up @@ -387,8 +387,8 @@ public enum Status {
OPEN((short)0),
CLOSED((short)1);

private short val;
private static Map<Integer,Status> map = new HashMap<>();
private final short val;
private static final Map<Integer,Status> map = new HashMap<>();
static {
for (Status s : Status.values()) {
map.put (s.intValue(), s);
Expand All @@ -412,6 +412,7 @@ public static Status valueOf (int i) {
* Reference to a BinLog entry
*/
public static class Ref implements Serializable {
@Serial
private static final long serialVersionUID = 4201380716050124987L;
private int fileNumber;
private long offset;
Expand Down Expand Up @@ -483,6 +484,7 @@ public String toString() {
* Provides access to a binlog entry
*/
public static class Entry implements Serializable {
@Serial
private static final long serialVersionUID = 4841830838031550274L;
private Ref ref;
private byte[] data;
Expand Down Expand Up @@ -513,4 +515,38 @@ public byte[] get() {
return data;
}
}

public static class QueueEntry {
private byte[] record;
private Ref ref;
private CompletableFuture<BinLog.Ref> future;

public QueueEntry(byte[] record) {
this.record = record;
this.future = new CompletableFuture<>();
}

public byte[] record () {
return record;
}
public Ref ref(Ref ref) {
this.ref = ref;
return ref;
}
public void complete () {
future.complete(ref);
}

public Future<BinLog.Ref> future() {
return future;
}

@Override
public String toString() {
return "QueueEntry{" +
"ref=" + ref +
", future=" + future +
'}';
}
}
}
122 changes: 71 additions & 51 deletions modules/binlog/src/main/java/org/jpos/binlog/BinLogWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import java.nio.channels.FileLock;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
* Used to append add records to a BinLog
*/
public class BinLogWriter extends BinLog {
private ConcurrentLinkedQueue <QueueEntry> queue = new ConcurrentLinkedQueue<>();
/**
* Instantiates a BinLogWriter. Creates directory if necessary.
*
Expand Down Expand Up @@ -68,49 +70,26 @@ public BinLogWriter(String dir) throws IOException {
* @return reference to this entry
* @throws IOException on error
*/
public BinLog.Ref add(byte[] record) throws IOException {
public BinLog.Ref add(byte[] record) throws IOException, ExecutionException, InterruptedException {
var entry = queue (record);
try {
mutex.lock();
checkCutover(true);
AsynchronousFileChannel channel = raf;
Future<FileLock> lockfut = channel.lock();
FileLock lock;
try {
lock = lockfut.get();
} catch (InterruptedException e) {
throw new IOException (e.getMessage());
} catch (ExecutionException e) {
throw new IOException (e.getMessage());
}
if (lock.isValid()) {
long pos = readTailOffset(raf);
int length = 4 + record.length;
ByteBuffer tail = ByteBuffer.allocate(length);
tail.putInt(record.length);
tail.put(record);
tail.flip();
try {
int write = raf.write(tail, pos).get();
if (write != length) {
throw new IOException ("Failed to write " + length + " byte record, return: " + write);
}
} catch (InterruptedException e) {
throw new IOException (e.getMessage());
} catch (ExecutionException e) {
throw new IOException (e.getMessage());
}
writeTailOffset(pos + Integer.BYTES + record.length);
channel.force(true);
lock.release();
return new BinLog.Ref(fileNumber, pos);
} else {
throw new IOException ("Failed to acquire file lock");
}
} finally {
mutex.unlock();
flush();
return entry.get();
} catch (ExecutionException | InterruptedException e) {
throw new IOException (e);
}
}

public Future<BinLog.Ref> queue (byte[] record) {
QueueEntry entry = new QueueEntry(record);
queue.add(entry);
return entry.future();
}

public BinLog.Ref addSync (byte[] record) throws ExecutionException, InterruptedException {
return queue (record).get();
}

/**
* The cutover method closes the current binlog file and creates the next one (in sequencial order)
* @throws IOException on error
Expand All @@ -121,12 +100,10 @@ public void cutover () throws IOException {
checkCutover(true);
AsynchronousFileChannel channel = raf;
Future<FileLock> lockfut = channel.lock();
FileLock lock = null;
FileLock lock;
try {
lock = lockfut.get();
} catch (InterruptedException e) {
throw new IOException (e.getMessage());
} catch (ExecutionException e) {
} catch (InterruptedException | ExecutionException e) {
throw new IOException (e.getMessage());
}
if (lock.isValid()) {
Expand All @@ -142,9 +119,7 @@ public void cutover () throws IOException {
if (write != 4) {
throw new IOException ("Failed to write 4 byte NEXT_LOG_INDEX_OFFSET, return: " + write);
}
} catch (InterruptedException e) {
throw new IOException (e.getMessage());
} catch (ExecutionException e) {
} catch (InterruptedException | ExecutionException e) {
throw new IOException (e.getMessage());
}
ByteBuffer status = ByteBuffer.allocate(2);
Expand All @@ -155,9 +130,7 @@ public void cutover () throws IOException {
if (write != 2) {
throw new IOException ("Failed to write 2 byte STATUS_OFFSET, return: " + write);
}
} catch (InterruptedException e) {
throw new IOException (e.getMessage());
} catch (ExecutionException e) {
} catch (InterruptedException | ExecutionException e) {
throw new IOException (e.getMessage());
}
channel.force(false);
Expand All @@ -177,4 +150,51 @@ public void cutover () throws IOException {
mutex.unlock();
}
}

public void flush () throws IOException {
try {
mutex.lock();
checkCutover(true);
FileLock lock = raf.lock().get();
if (lock.isValid()) {
var flushed = flushQueue();
raf.force(false);
lock.release();
flushed.forEach(QueueEntry::complete);
} else {
throw new IOException("Failed to acquire file lock");
}
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e.getMessage());
} finally {
mutex.unlock();
}
}
private long addRecord (byte[] record) throws IOException {
long pos = readTailOffset(raf);
int length = 4 + record.length;
ByteBuffer tail = ByteBuffer.allocate(length);
tail.putInt(record.length);
tail.put(record);
tail.flip();
try {
int write = raf.write(tail, pos).get();
if (write != length) {
throw new IOException ("Failed to write " + length + " byte record, return: " + write);
}
} catch (InterruptedException | ExecutionException e) {
throw new IOException (e.getMessage());
}
writeTailOffset(pos + Integer.BYTES + record.length);
return pos;
}
private List<QueueEntry> flushQueue () throws IOException {
List<QueueEntry> list = new ArrayList<>();
QueueEntry entry;
while ((entry = queue.poll()) != null) {
entry.ref(new Ref(fileNumber, addRecord(entry.record())));
list.add(entry);
}
return list;
}
}
31 changes: 22 additions & 9 deletions modules/binlog/src/test/java/org/jpos/binlog/BinLogTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -42,10 +47,13 @@
public class BinLogTest implements Runnable {
public static Path dir;
private AtomicLong cnt = new AtomicLong();
private static ExecutorService executor;
private static TPS tps = new TPS();

@BeforeAll
public static void setup () throws IOException {
dir = Files.createTempFile("binlog-", "");
executor = Executors.newVirtualThreadPerTaskExecutor();
Files.delete(dir);
System.out.println ("TEMP=" + dir);
}
Expand All @@ -57,8 +65,8 @@ public void test000_Write() throws IOException {
assertNull(w.getLastClosed(dir), "Found last closed file");
assertEquals(1, w.getFileNumber(w.getFirst(dir)), "Invalid first file");
}
for (int i=0; i<10; i++) {
new Thread(this).start();
for (int i=0; i<100; i++) {
executor.submit(this);
}
try (BinLogReader bl = new BinLogReader(dir)) {
int i = 0;
Expand All @@ -68,24 +76,29 @@ public void test000_Write() throws IOException {
if ((i % 100) == 0)
System.out.println(i + " " + new String(b));
}
assertEquals(1000, i, "Invalid number of entries");
assertEquals(10000, i, "Invalid number of entries");
assertEquals(1, bl.getFileNumber(bl.getFirst(dir)), "Invalid first file");
assertEquals(i/50, bl.getFileNumber(bl.getLastClosed(dir)), "Invalid last closed file");
assertEquals(100, bl.getFileNumber(bl.getLastClosed(dir)), "Invalid last closed file");
}
}

public void run() {
TPS tps = new TPS();
List<Future<BinLog.Ref>> futures = new ArrayList<>();
try (BinLogWriter bl = new BinLogWriter(dir)) {
for (int i = 1; i <= 100; i++) {
long l = cnt.incrementAndGet();
if (i % 50 == 0) {
bl.cutover();
}
bl.add(ISOUtil.zeropad(l, 12).getBytes());
futures.add (
bl.queue(ISOUtil.zeropad(l, 12).getBytes())
);
tps.tick();
}
bl.flush();
for (Future<BinLog.Ref> f : futures) {
if (!f.isDone())
throw new IllegalStateException("Future " + f + " !done");
}
tps.dump(System.out, "");
bl.cutover();
} catch (Throwable e) {
e.printStackTrace(System.err);
}
Expand Down

0 comments on commit 14a84ca

Please sign in to comment.