From ab87c6986818c1b949e95db81a806984e13f33e2 Mon Sep 17 00:00:00 2001 From: Alejandro Revilla Date: Thu, 2 Mar 2017 18:37:45 -0300 Subject: [PATCH] added 'binlog' module --- doc/src/asciidoc/book.adoc | 9 +- doc/src/asciidoc/module_binlog.adoc | 176 +++++++ modules/binlog/build.gradle | 6 + .../src/main/java/org/jpos/binlog/BinLog.java | 434 ++++++++++++++++++ .../java/org/jpos/binlog/BinLogReader.java | 157 +++++++ .../java/org/jpos/binlog/BinLogWriter.java | 95 ++++ .../java/org/jpos/binlog/package-info.java | 22 + .../src/main/java/org/jpos/q2/cli/BINLOG.java | 29 ++ .../java/org/jpos/q2/cli/binlog/CUTOVER.java | 40 ++ .../java/org/jpos/q2/cli/binlog/MONITOR.java | 48 ++ .../test/java/org/jpos/binlog/BinLogTest.java | 97 ++++ settings.gradle | 3 +- 12 files changed, 1112 insertions(+), 4 deletions(-) create mode 100644 doc/src/asciidoc/module_binlog.adoc create mode 100644 modules/binlog/build.gradle create mode 100644 modules/binlog/src/main/java/org/jpos/binlog/BinLog.java create mode 100644 modules/binlog/src/main/java/org/jpos/binlog/BinLogReader.java create mode 100644 modules/binlog/src/main/java/org/jpos/binlog/BinLogWriter.java create mode 100644 modules/binlog/src/main/java/org/jpos/binlog/package-info.java create mode 100644 modules/binlog/src/main/java/org/jpos/q2/cli/BINLOG.java create mode 100644 modules/binlog/src/main/java/org/jpos/q2/cli/binlog/CUTOVER.java create mode 100644 modules/binlog/src/main/java/org/jpos/q2/cli/binlog/MONITOR.java create mode 100644 modules/binlog/src/test/java/org/jpos/binlog/BinLogTest.java diff --git a/doc/src/asciidoc/book.adoc b/doc/src/asciidoc/book.adoc index dac6e1c8e6..68a94092d5 100644 --- a/doc/src/asciidoc/book.adoc +++ b/doc/src/asciidoc/book.adoc @@ -2,8 +2,8 @@ jPOS Extended Edition ===================== :author: Alejandro Revilla :email: apr@jpos.org -:revnumber: 2.0.9 -:jposee_version: 2.2.3 +:revnumber: 2.0.10 +:jposee_version: 2.2.4 :toc: Introduction @@ -24,7 +24,7 @@ Modules include::introduction_module.adoc[] -== Essensial Modules +== Core Modules include::module_core.adoc[] include::module_txn.adoc[] @@ -33,6 +33,9 @@ include::module_txn.adoc[] include::module_database_support.adoc[] +== Binary Log +include::module_binlog.adoc[] + == Tools include::module_freemarker_decorator.adoc[] diff --git a/doc/src/asciidoc/module_binlog.adoc b/doc/src/asciidoc/module_binlog.adoc new file mode 100644 index 0000000000..d276df2cd6 --- /dev/null +++ b/doc/src/asciidoc/module_binlog.adoc @@ -0,0 +1,176 @@ +=== BinLog + +[frame="none",cols="20%,80%"] +|================================================================= +| *What* | General purpose binary log +| *When* | Implemented during 2.2.4 +| *Who* | The jPOS Software team. +| *Where* | Directory modules/binlog +| *Why* | Used by local Q2 nodes as audit trail or to SAF its transactions +| *Status* | Experimental +| *License* | <> +|================================================================= + +.Maven Coordinates +[source,xml] +---- + + org.jpos.ee + jposee-binlog + ${jposee.version} + +---- + +The jPOS BinLog has the following features: + +* multiple readers and writers can be used from the same JVM +* multiple readers and writers can be used from different JVMs + +[TIP] +===== +Make sure you read and understand the implementation notes at the end of this +section before you attempt to use it. +===== + +Here is a sample Writer: + +[source,java] +---------------------------------------------------------------------------------------- + File dir = new File("/tmp/binlog"); + try (BinLogWriter bl = new BinLogWriter(dir)) { <1> + bl.add( ... ); // byte array + bl.add( ... ); // byte array + bl.add( ... ); // byte array + } +---------------------------------------------------------------------------------------- +<1> The BinLogWriter implements `AutoCloseable` so `try-with-resources` can be used + +A reader would look like this: + +[source,java] +---------------------------------------------------------------------------------------- + File dir = new File("/tmp/binlog"); + try (BinLogReader bl = new BinLogReader(dir)) { + while (bl.hasNext()) { + byte[] b = bl.next().get(); + // do something with the byte[] + } + } +---------------------------------------------------------------------------------------- + +The `BinLogReader` implements an `Iterator`. Each `BinLog.Entry` has two +main methods: + +* `BinLog.Rer ref()` +* `byte[] get()` + +While iterating over a BinLog, it might make sense to persistently store its `BinLog.Ref` +in order to be able to restart the iterator at a given point if required (this is useful +if using the BinLog to implement a Store and Forward. + +The `BinLogReader` has two constructors: + +* `BinLogReader(File dir)` +* `BinLogReader(File dir, BinLog.Ref ref)` + +the latter can be used to restart the iterator at a given reference point obtained from a previous run. + +In addition to the standard `hasNext()` method required by the `Iterator` implementation, +`BinLogReader` also has a `hasNext(long millis)` method that waits a given number of +milliseconds once it reaches the end of the log, attempting to wait for a new entry +to be available. + + +==== Implementation notes + +The goal behind the BinLog implementation is to have a future proof file format easy to +read from any language, 10 years down the road. We found that the Mastercard simple IPM +file format, that's basically a two-byte message length followed by the message itself +was suitable for that. The payload on each record can be ISO-8583 (like Mastercard), JSON, +FSDMsg based, Protocol buffers or whatever format the user choose. + +But that format isn't crash proof. If a system crashes while a record is being written to +disk, the file can get easily corrupted. So we picked some ideas from Square's _tape_ +project that implements a highly crash proof on-disk persistent circular queue using +a very small header. Tape is great and we encourage you to consider it instead of this binlog +for some use cases, but we didn't want a circular queue, we wanted a place to securely store +events for audit or store and forward purposes, and we also wanted to be able to access the +same binlog from multiple JVMs with access to the same file-system, so we had to write our own. + +The on-disk file format looks like this: + +``` +Format: + 256 bytes Header + ... Data + ... Data + +Header format (256 bytes): + 4 bytes header length + 2 bytes version + 2 bytes Status (00=open, 01=closed) + 8 bytes Last element position + 4 bytes this log number + 4 bytes next log number +232 bytes reserved + +Element: + 4 bytes Data length + ... Data + +``` + +Each record has a length prefix (four bytes in network byte order) followed by +its data. The header has a fixed length of 256 bytes but we found useful to +make it look like a regular record too by providing its length at the very +beginning. An implementation in any language reading a jPOS binlog can just +be programmed to skip the first record. + +At any given time (usually at end of day), a process can request a *cut-over* +by calling the `BinLogWriter.cutover()` method in that case, all writers and +readers will close the current file and move to the next one (Readers can +choose to not-follow to the next file, for example while producing daily +extracts). + +In order to achieve file crash resilience, each write does the following: + +* Lock the file +* Write the record's length and data +* Sync to disc +* Write the last element position to the header +* Sync to disc +* Unlock the file + +[NOTE] +====== +In an MBP with SDRAM we've managed to achieve approximately 6000 writes per +second. On an iMac with regular disck the numbers go down to approximately 1500 +writes per second for regular ISO-8583 message lengths (500..1000 bytes per +record). +====== + +Due to the fact that the header is small enough to fit in an operating +system block, the second write where we place the last element position happens +to be atomic. While this works OK for readers and writers reading the file from +different JVMs, that's not the case for readers and writers running on the same +JVM, even if they use a different file descriptor to open the file, the operating +system stack has early access to the header that under high concurrency can lead +to garbage values, that's the reason the code synchronizes on a `mutex` object +at specific places. + +==== Supporting CLI commands + +The `binlog` CLI command is a subsystem that currently have two commands: + +* monitor (to visually monitor a binlog) +* cutover (to force a cutover) +* exit (builtin command) + +`binlog` accepts a parameter with the binlog's path, i.e: `binlog /tmp/binlog` + +So a cutover can be triggered from cron using the following command: + +``` +q2 --command="binlog /tmp/binlog; cutover; exit; shutdown --force" +``` + diff --git a/modules/binlog/build.gradle b/modules/binlog/build.gradle new file mode 100644 index 0000000000..41c6259f55 --- /dev/null +++ b/modules/binlog/build.gradle @@ -0,0 +1,6 @@ +description = 'jPOS-EE :: BinLog Module' + +dependencies { + compile libraries.jpos +} + diff --git a/modules/binlog/src/main/java/org/jpos/binlog/BinLog.java b/modules/binlog/src/main/java/org/jpos/binlog/BinLog.java new file mode 100644 index 0000000000..64306cbb58 --- /dev/null +++ b/modules/binlog/src/main/java/org/jpos/binlog/BinLog.java @@ -0,0 +1,434 @@ +/* + * jPOS Project [http://jpos.org] + * Copyright (C) 2000-2017 jPOS Software SRL + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jpos.binlog; + +import org.jpos.iso.ISOUtil; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.io.Serializable; +import java.nio.channels.FileLock; +import java.security.SecureRandom; +import java.util.*; +import java.util.concurrent.ThreadLocalRandom; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * General purpose binary log + *
+ *
+ * Format:
+ *   256 bytes Header
+ *   ... Data
+ *   ... Data
+ *
+ * Header format (256 bytes):
+ *   4 bytes header length
+ *   2 bytes version
+ *   2 bytes Status (@see Status enum for possible values)
+ *   8 bytes Last element position
+ *   4 bytes this log number
+ *   4 bytes next log number
+ * 232 bytes reserved
+ *
+ * Element:
+ *   4 bytes Data length
+ *   ...     Data
+ * 
+ */ +@SuppressWarnings("unused") + +/** + * Abstract base clase used by BinLogReader and BinLogWriter + */ +public abstract class BinLog implements AutoCloseable { + private static final int FILE_MAGIC = 0x100; + private static final int VERSION = 0x0001; + private static final int RESERVED_LEN = 232; + private static final int MAX_CREATE_ATTEMPTS = 100; + protected static final int STATUS_OFFSET = Integer.BYTES + Short.BYTES; + private static final int TAIL_OFFSET = STATUS_OFFSET + Short.BYTES; + private static final int THIS_LOG_INDEX_OFFSET = TAIL_OFFSET + Long.BYTES; + protected static final int NEXT_LOG_INDEX_OFFSET = THIS_LOG_INDEX_OFFSET + Integer.BYTES; + protected static final int INITIAL_INDEX = 1; + private static final long CREATE_DELAY = 100L; + protected static final long FIRST_EVENT_OFFSET = TAIL_OFFSET + RESERVED_LEN + Long.BYTES; + private static SecureRandom numberGenerator = new SecureRandom(); + private static Pattern filePattern = Pattern.compile("^[\\d]{8}.dat$"); + private String mode; + private static Map mutexs = Collections.synchronizedMap(new HashMap<>()); + protected File dir; + protected int fileNumber; + protected RandomAccessFile raf; + protected final Object mutex; + + /** + * Creates a new FileBinLog instance + * + * @param dir BinLog directory (created if not present) + * @param create create directory if not exists + * @throws IOException on error + */ + protected BinLog(File dir, boolean create) throws IOException { + mutexs.putIfAbsent(dir.getAbsolutePath(), new Object()); + mutex = mutexs.get(dir.getAbsolutePath()); + if ((dir.exists() && !dir.isDirectory())|| (!dir.exists() && !create)) + throw new IOException ("Invalid directory '" + dir.toString() + "'"); + else + dir.mkdirs(); + this.dir = dir; + mode = create ? "rw" : "r"; + } + + /** + * Opens a binlog file, or creates it if necessary + * @param dir binlog directory + * @param fileNumber binlog file number + * @return a RandomAccessFile + * @throws IOException on error + */ + protected RandomAccessFile openOrCreateFile(File dir, int fileNumber) throws IOException { + File file = new File (dir, toFileName(fileNumber)); + for (int i=0; !file.exists() && i pos) + throw new IOException ("Invalid tailoffset " + fileNumber + "/" + pos + "/" + currentTailOffset); + raf.seek(TAIL_OFFSET); + raf.writeLong(pos); + } + } + + protected int getFileNumber(String s) { + return s != null ? Integer.parseInt(s.substring(0,8)) : 0; + } + + protected String getLastClosed (File dir) throws IOException { + for (String s : getFilesReversed(dir)) { + if (isClosed(new File(dir, s))) + return s; + } + return null; + } + + protected String getFirst (File dir) { + return Arrays.stream(dir.list()) + .filter(filePattern.asPredicate()) + .sorted(String::compareTo) + .findFirst() + .orElse(null); + } + + private void verifyHeader(RandomAccessFile raf) throws IOException { + synchronized(mutex) { + if (raf.length() < TAIL_OFFSET + Long.BYTES) + throw new IOException ("Invalid jPOS BinLog file " + fileNumber); + raf.seek(0); + int magic = raf.readInt(); + if (!(FILE_MAGIC == magic)) + throw new IOException ("Invalid jPOS BinLog version " + fileNumber); + long pos = readTailOffset(raf); + if (pos < TAIL_OFFSET + Long.BYTES) + throw new IOException ("Invalid jPOS BinLog header " + fileNumber); + long rafLength = raf.length(); + if (pos > rafLength) + throw new IOException ("Truncated jPOS BinLog file " + fileNumber + " (" + pos + "/" + rafLength + ")"); + } + } + + private void lock (long timeout) throws IOException, InterruptedException { + long end = System.currentTimeMillis() + timeout; + FileLock lock = raf.getChannel().lock(); + while (System.currentTimeMillis() < end) { + Thread.sleep (10); + } + lock.release(); + } + + private List getFiles(File dir) { + return Arrays.stream(dir.list()) + .filter(filePattern.asPredicate()) + .sorted(String::compareTo) + .collect(Collectors.toList()); + } + private List getFilesReversed(File dir) { + return Arrays.stream(dir.list()) + .filter(filePattern.asPredicate()) + .sorted((s1, s2) -> -s1.compareTo(s2)) + .collect(Collectors.toList()); + } + + private boolean isClosed (File f) throws IOException { + if (f.exists()) { + try (RandomAccessFile raf = new RandomAccessFile(f, "r")) { + raf.seek(STATUS_OFFSET); + return Status.valueOf(raf.readShort()) == Status.CLOSED; + } + } + return false; + } + + private void writeHeader (RandomAccessFile r, int i) throws IOException { + r.seek(0); + r.writeInt (FILE_MAGIC); + r.writeShort (VERSION); + r.writeShort (Status.OPEN.intValue()); + r.writeLong(FIRST_EVENT_OFFSET); + r.writeInt(i); // this Log Number + r.writeInt(0); // next Log Number + r.write (new byte[RESERVED_LEN]); + } + + public enum Status { + OPEN(0), + CLOSED(1); + + private int val; + private static Map map = new HashMap<>(); + static { + for (Status s : Status.values()) { + map.put (s.intValue(), s); + } + } + Status (int val) { + this.val = val; + } + public int intValue() { + return val; + } + public static Status valueOf (int i) { + return map.get(i); + } + } + + /** + * Reference to a BinLog entry + */ + public static class Ref implements Serializable { + private static final long serialVersionUID = 4201380716050124987L; + private int fileNumber; + private long offset; + + @SuppressWarnings("unused") + private Ref() { } + + /** + * Creates a BinLog reference to the first entry in a given binlog file + * @param fileNumber binlog file number + */ + public Ref(int fileNumber) { + this.fileNumber = fileNumber; + this.offset = BinLog.FIRST_EVENT_OFFSET; + } + + /** + * Creates a BinLog reference to a given entry in a given binlog file + * @param fileNumber binlog file number + * @param offset binlog entry offset + */ + + public Ref(int fileNumber, long offset) { + this.fileNumber = fileNumber; + this.offset = offset; + } + + /** + * @return this reference's binlog file number + */ + public int getFileNumber() { + return fileNumber; + } + + /** + * @return this reference's offset + */ + public long getOffset() { + return offset; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Ref that = (Ref) o; + + if (fileNumber != that.fileNumber) return false; + return offset == that.offset; + } + + @Override + public int hashCode() { + int result = fileNumber; + result = 31 * result + (int) (offset ^ (offset >>> 32)); + return result; + } + + @Override + public String toString() { + return "Ref{" + + "fileNumber=" + fileNumber + + ", offset=" + offset + + '}'; + } + } + /** + * Provides access to a binlog entry + */ + public static class Entry implements Serializable { + private static final long serialVersionUID = 4841830838031550274L; + private Ref ref; + private byte[] data; + + @SuppressWarnings("unused") + private Entry() { } + + /** + * @param ref reference to this entry + * @param data entry's data + */ + public Entry(Ref ref, byte[] data) { + this.ref = ref; + this.data = data; + } + + /** + * @return this entry's binlog reference + */ + public Ref ref() { + return ref; + } + + /** + * @return this entry's data + */ + public byte[] get() { + return data; + } + } +} diff --git a/modules/binlog/src/main/java/org/jpos/binlog/BinLogReader.java b/modules/binlog/src/main/java/org/jpos/binlog/BinLogReader.java new file mode 100644 index 0000000000..c522d99133 --- /dev/null +++ b/modules/binlog/src/main/java/org/jpos/binlog/BinLogReader.java @@ -0,0 +1,157 @@ +/* + * jPOS Project [http://jpos.org] + * Copyright (C) 2000-2017 jPOS Software SRL + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jpos.binlog; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Used to iterate over a binlog + */ +public class BinLogReader extends BinLog implements Iterator { + private long iteratorPos; + private boolean follow = true; + private long cachedTailOffset = 0L; + + /** + * Instantiates a BinLogReader. + * + * It is the responsibility of the caller to close this reader when done with it. + * + * @param dir this Binlog's directory + * @throws IOException on error + */ + public BinLogReader (File dir) throws IOException { + super (dir, false); + int first = getFileNumber(getFirst(dir)); + this.iteratorPos = FIRST_EVENT_OFFSET; + raf = open (dir, fileNumber = (first == 0 ? INITIAL_INDEX : first)); + } + + /** + * Instantiaates a BinLogReader at a specific reference position. + * + * It is the responsibility of the caller to close this reader when done with it. + * + * @param dir this Binlog's directory + * @param ref reference to a given entry + * @throws IOException on error + */ + public BinLogReader (File dir, BinLog.Ref ref) throws IOException { + super (dir, false); + this.iteratorPos = ref.getOffset(); + raf = open (dir, fileNumber = ref.getFileNumber()); + } + + /** + * BinLog reader automatically follow BinLog's cutovers, unless we + * setFollow(false) + * @param follow value + */ + public void setFollow(boolean follow) { + this.follow = follow; + } + + /** + * @return true if this reader automatically follows cutovers (default) + */ + public boolean isFollow() { + return follow; + } + + /** + * @return reference to this reader's next binlog entry + */ + public synchronized BinLog.Ref getNextRef() { + return new BinLog.Ref (fileNumber, iteratorPos); + } + + @Override + public boolean hasNext() { + try { + if (iteratorPos >= cachedTailOffset) { + cachedTailOffset = readTailOffset(raf); + } + if (iteratorPos >= cachedTailOffset && follow && checkCutover(false)) { + iteratorPos = FIRST_EVENT_OFFSET; + cachedTailOffset = readTailOffset(raf); + } + return iteratorPos < cachedTailOffset; + } catch (IOException e) { + throw new IllegalStateException("Invalid jPOS BinLog header", e); + } + } + + /** + * + * @param timeout in millis + * @return true if this binlog has a next entry + */ + public boolean hasNext(long timeout) { + long end = System.currentTimeMillis() + timeout; + while (System.currentTimeMillis() < end) { + if (hasNext()) { + return true; + } + try { + Thread.sleep(50L); + } catch (InterruptedException e) { + break; + } + } + return false; + } + + @Override + public BinLog.Entry next() { + byte[] ev; + long pos = iteratorPos; + try { + ev = read(iteratorPos); + iteratorPos += Integer.BYTES + ev.length; + } catch (IOException e) { + long actualTailOffset = 0L; + long size = 0L; + try { + actualTailOffset = readTailOffset(raf); + size = raf.getChannel().size(); + } catch (IOException ignored) { + } + throw new NoSuchElementException( + String.format("Invalid jPOS BinLog content @%d:%d (%d/%d)", fileNumber, pos, actualTailOffset, size) + ); + } + return new BinLog.Entry(new BinLog.Ref(fileNumber, pos), ev); + } + + private byte[] read (long pos) throws IOException { + int len = 0; + try { + raf.seek(pos); + len = raf.readInt(); + byte[] buf = new byte[len]; + raf.read(buf); + return buf; + } catch (IOException e) { + throw new IOException ("Error reading position " + pos + " length " + len, e); + } + } +} diff --git a/modules/binlog/src/main/java/org/jpos/binlog/BinLogWriter.java b/modules/binlog/src/main/java/org/jpos/binlog/BinLogWriter.java new file mode 100644 index 0000000000..ab8e3dbfcd --- /dev/null +++ b/modules/binlog/src/main/java/org/jpos/binlog/BinLogWriter.java @@ -0,0 +1,95 @@ +/* + * jPOS Project [http://jpos.org] + * Copyright (C) 2000-2017 jPOS Software SRL + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jpos.binlog; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; + +/** + * Used to append add records to a BinLog + */ +public class BinLogWriter extends BinLog { + /** + * Instantiates a BinLogWriter. Creates directory if necessary. + * + * It is the responsibility of the caller to close this writer when done with it. + * + * @param dir pointer to this Binlog's directory + * @throws IOException on error + */ + public BinLogWriter(File dir) throws IOException { + super (dir, true); + int last = getFileNumber(getLastClosed(dir)); + if (last == 0) { + last = getFileNumber(getFirst(dir)); + if (last == 0) + last = INITIAL_INDEX; + } + raf = openOrCreateFile(dir, fileNumber = last); + } + + /** + * Adds an entry to the BinLog + * @param record entry's binary image + * @return reference to this entry + * @throws IOException on error + */ + public BinLog.Ref add(byte[] record) throws IOException { + synchronized(mutex) { + checkCutover(true); + FileChannel channel = raf.getChannel(); + try (FileLock lock = channel.lock()) { + long pos = readTailOffset(raf); + raf.seek(pos); + raf.writeInt(record.length); + raf.write(record); + channel.force(true); + writeTailOffset(pos + Integer.BYTES + record.length); + channel.force(false); + return new BinLog.Ref(fileNumber, pos); + } + } + } + + /** + * The cutover method closes the current binlog file and creates the next one (in sequencial order) + * @throws IOException on error + */ + public void cutover () throws IOException { + synchronized(mutex) { + checkCutover(true); + FileChannel channel = raf.getChannel(); + try (FileLock lock = channel.lock()) { + if (readStatus() != Status.OPEN) + throw new IOException ("BinLog not open"); + RandomAccessFile newRaf = openOrCreateFile(dir, ++fileNumber); + raf.seek(NEXT_LOG_INDEX_OFFSET); + raf.writeInt(fileNumber); + channel.force(false); + raf.seek(STATUS_OFFSET); + raf.writeShort(Status.CLOSED.intValue()); + channel.force(false); + raf = newRaf; + } + } + } +} diff --git a/modules/binlog/src/main/java/org/jpos/binlog/package-info.java b/modules/binlog/src/main/java/org/jpos/binlog/package-info.java new file mode 100644 index 0000000000..873b32fd7e --- /dev/null +++ b/modules/binlog/src/main/java/org/jpos/binlog/package-info.java @@ -0,0 +1,22 @@ +/* + * jPOS Project [http://jpos.org] + * Copyright (C) 2000-2017 jPOS Software SRL + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +/** + * General purpose multi-process binary log + */ +package org.jpos.binlog; diff --git a/modules/binlog/src/main/java/org/jpos/q2/cli/BINLOG.java b/modules/binlog/src/main/java/org/jpos/q2/cli/BINLOG.java new file mode 100644 index 0000000000..61c7a5f640 --- /dev/null +++ b/modules/binlog/src/main/java/org/jpos/q2/cli/BINLOG.java @@ -0,0 +1,29 @@ +package org.jpos.q2.cli; + +import org.jpos.q2.CLIContext; +import org.jpos.q2.CLISubSystem; + +import java.io.File; + +/** + * CLI implementation - Binlog subsystem + */ +public class BINLOG implements CLISubSystem { + @Override + public String getPrompt(CLIContext ctx, String[] args) { + if (args.length < 2) { + ctx.println ("Usage: binlog directory-name"); + return null; + } + ctx.getUserData().put("binlog", args[1]); + return String.format("(%s)> ", new File(args[1]).getName()); + } + + @Override + public String[] getCompletionPrefixes(CLIContext ctx, String[] args) { + if (args.length < 2) + return null; + return new String[] { "org.jpos.q2.cli.binlog." }; + } +} + diff --git a/modules/binlog/src/main/java/org/jpos/q2/cli/binlog/CUTOVER.java b/modules/binlog/src/main/java/org/jpos/q2/cli/binlog/CUTOVER.java new file mode 100644 index 0000000000..2c83e98e74 --- /dev/null +++ b/modules/binlog/src/main/java/org/jpos/q2/cli/binlog/CUTOVER.java @@ -0,0 +1,40 @@ +/* + * jPOS Project [http://jpos.org] + * Copyright (C) 2000-2017 jPOS Software SRL + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jpos.q2.cli.binlog; + +import org.jpos.binlog.BinLogWriter; +import org.jpos.q2.CLICommand; +import org.jpos.q2.CLIContext; +import java.io.File; + +@SuppressWarnings("unused") +public class CUTOVER implements CLICommand { + CLIContext cli; + boolean ansi; + + public void exec(CLIContext cli, String[] args) throws Exception { + boolean quiet = args.length > 1 && "-q".equals (args[1]); + try (BinLogWriter bl = new BinLogWriter(new File((String) cli.getUserData().get("binlog")))) { + int oldFile = bl.getFileNumber(); + bl.cutover(); + if (!quiet) + cli.print (String.format ("cutover %06d -> %06d%n", oldFile, bl.getFileNumber())); + } + } +} diff --git a/modules/binlog/src/main/java/org/jpos/q2/cli/binlog/MONITOR.java b/modules/binlog/src/main/java/org/jpos/q2/cli/binlog/MONITOR.java new file mode 100644 index 0000000000..a214e72fa3 --- /dev/null +++ b/modules/binlog/src/main/java/org/jpos/q2/cli/binlog/MONITOR.java @@ -0,0 +1,48 @@ +/* + * jPOS Project [http://jpos.org] + * Copyright (C) 2000-2017 jPOS Software SRL + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jpos.q2.cli.binlog; + +import org.jpos.binlog.BinLog; +import org.jpos.binlog.BinLogReader; +import org.jpos.iso.ISOUtil; +import org.jpos.q2.CLICommand; +import org.jpos.q2.CLIContext; +import java.io.File; + +@SuppressWarnings("unused") +public class MONITOR implements CLICommand { + CLIContext cli; + boolean ansi; + + public void exec(CLIContext cli, String[] args) throws Exception { + try (BinLogReader bl = new BinLogReader(new File((String) cli.getUserData().get("binlog")))) { + while (bl.hasNext(10000L)) { + BinLog.Entry ref = bl.next(); + cli.print(String.format("%06d@%08d %.70s", + ref.ref().getFileNumber(), + ref.ref().getOffset(), + ISOUtil.hexdump(ref.get()))); + cli.getReader().getTerminal().flush(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } +} + diff --git a/modules/binlog/src/test/java/org/jpos/binlog/BinLogTest.java b/modules/binlog/src/test/java/org/jpos/binlog/BinLogTest.java new file mode 100644 index 0000000000..ecf150176b --- /dev/null +++ b/modules/binlog/src/test/java/org/jpos/binlog/BinLogTest.java @@ -0,0 +1,97 @@ + +/* + * jPOS Project [http://jpos.org] + * Copyright (C) 2000-2017 jPOS Software SRL + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jpos.binlog; + +import org.jpos.iso.ISOUtil; +import org.jpos.util.TPS; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class BinLogTest implements Runnable { + public static File dir; + private AtomicLong cnt = new AtomicLong(); + + @BeforeClass + public static void setup () throws IOException { + dir = File.createTempFile("binlog-", ""); + dir.delete(); + System.out.println ("TEMP=" + dir); + // dir = new File("/tmp/binlog"); + } + @Test + public void test000_Write() throws IOException { + try (BinLogWriter w = new BinLogWriter(dir)) { } + for (int i=0; i<50; i++) { + new Thread(this).start(); + } + try (BinLogReader bl = new BinLogReader(dir)) { + int i = 0; + while (bl.hasNext(10000L)) { + i++; + byte[] b = bl.next().get(); + if ((i % 1000) == 0) + System.out.println(i + " " + new String(b)); + } + assertEquals("Invalid number of entries", 500000, i); + } + } + + public void run() { + TPS tps = new TPS(); + try (BinLogWriter bl = new BinLogWriter(dir)) { + for (int i = 1; i <= 10000; i++) { + long l = cnt.incrementAndGet(); + if (i % 500 == 0) { + bl.cutover(); + } + bl.add(ISOUtil.zeropad(l, 12).getBytes()); + tps.tick(); + if (i % 1000 == 0) + Thread.sleep(100); + } + tps.dump(System.out, ""); + } catch (Throwable e) { + e.printStackTrace(System.err); + } + } + + @AfterClass + public static void cleanup() throws IOException { + for (File f : dir.listFiles()) { + if (f.toString().endsWith(".dat")) { + System.out.println ("Deleting " + f.toString()); + f.delete(); + } + } + System.out.println ("Deleting " + dir); + dir.delete(); + } +} diff --git a/settings.gradle b/settings.gradle index 567d318954..09a1aee1f2 100644 --- a/settings.gradle +++ b/settings.gradle @@ -27,7 +27,8 @@ include ':modules:core', ':modules:groovy', ':modules:qi-core', ':modules:qi-eeuser', - ':modules:qi-sysconfig' + ':modules:qi-sysconfig', + ':modules:binlog' rootProject.name = 'jposee'