Skip to content

Commit

Permalink
Introduced FixedArrayFIFOQueue
Browse files Browse the repository at this point in the history
This is a MPMC (Multi Producer Multi Consumer), wait-free and fixed size
FIFO/LILO queue with one limitation that using it as FILO or LIFO queue
leads to a swallow elements.
  • Loading branch information
catap committed Apr 19, 2023
1 parent 566da9d commit 9f1bf5b
Show file tree
Hide file tree
Showing 4 changed files with 804 additions and 0 deletions.
346 changes: 346 additions & 0 deletions drv/FixedArrayFIFOQueue.drv
Original file line number Diff line number Diff line change
@@ -0,0 +1,346 @@
/*
* Copyright (C) 2010-2023 Sebastiano Vigna
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package PACKAGE;

#if KEY_CLASS_Object
import java.util.Arrays;
import java.util.Comparator;
#endif

import java.io.Serializable;
import it.unimi.dsi.fastutil.PriorityQueue;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.NoSuchElementException;

/** This is a MPMC (Multi Producer Multi Consumer), wait-free and fixed size FIFO/LILO queue.
*
* <p>This algorithm has limitation that using it as FILO or LIFO queue leads to a swallow elements.
*/
public class FIXED_ARRAY_FIFO_QUEUE KEY_GENERIC implements PRIORITY_QUEUE KEY_GENERIC, Serializable {
private static final long serialVersionUID = 0L;

/** The backing array of sequence IDs. */
protected transient int[] seqs;

/** The backing array of elements. */
protected transient KEY_GENERIC_TYPE[] elements;

/** a bit mask to emulate modulo operation */
protected transient int mask;

/** the head of the used sequence */
protected final AtomicInteger headSeq = new AtomicInteger();

/** the tail of the used sequence */
protected final AtomicInteger tailSeq = new AtomicInteger();

/** Creates a new empty queue with given capacity. Keep in mind that you may insert
* up to {@param capacity} &minus; 1 element into the queue.
*
* @implNote This is fast implementation of an algorithm which avoids {@code mod} operation,
* by using {@code & mask} instead that adds a restrictions that {@param capacity}
* should be power of 2.
*
* @implNote Because of inner limitations of the JVM, the
* capacity cannot exceed {@link it.unimi.dsi.fastutil.Arrays#MAX_ARRAY_SIZE} &minus; 1.
*
* @param capacity the capacity of this queue.
*/
SUPPRESS_WARNINGS_KEY_UNCHECKED
public FIXED_ARRAY_FIFO_QUEUE(final int capacity) {
if (capacity > it.unimi.dsi.fastutil.Arrays.MAX_ARRAY_SIZE - 1) throw new IllegalArgumentException("Capacity (" + capacity + ") exceeds " + (it.unimi.dsi.fastutil.Arrays.MAX_ARRAY_SIZE - 1));
if (capacity <= 1) throw new IllegalArgumentException("Capacity (" + capacity + ") should be greater than 1");
if ((capacity & (~capacity + 1)) != capacity) throw new IllegalArgumentException("Capacity (" + capacity + ") should be power of 2");

mask = capacity - 1;

seqs = new int[capacity];
elements = KEY_GENERIC_ARRAY_CAST new KEY_TYPE[seqs.length];

// fill the sequence by initial value
for (int i = 0; i < seqs.length; i++) {
seqs[i] = i;
}
}

/** {@inheritDoc}
* @implSpec This implementation returns {@code null} (FIFO queues have no comparator). */
@Override
public KEY_COMPARATOR KEY_SUPER_GENERIC comparator() {
return null;
}

@Override
public KEY_GENERIC_TYPE DEQUEUE() {
int diff = 0;
int lastDiff;

do {
lastDiff = diff;
final int s = tailSeq.get();
final int idx = s & mask;
diff = (seqs[idx] - s - 1) & mask;

if (diff == mask) break;
if (diff > 0 || !tailSeq.compareAndSet(s, s + 1)) continue;

seqs[idx] = s + mask + 1;
#if KEYS_PRIMITIVE
return elements[idx];
#else
KEY_GENERIC_TYPE x = elements[idx];
elements[idx] = null;
return x;
#endif
} while (diff != lastDiff);

throw new NoSuchElementException();
}

@Override
public KEY_GENERIC_TYPE DEQUEUE(KEY_GENERIC_TYPE x) {
int diff = 0;
int lastDiff;

do {
lastDiff = diff;
final int s = tailSeq.get();
final int idx = s & mask;
diff = (seqs[idx] - s - 1) & mask;

if (diff == mask) break;
if (diff > 0 || !tailSeq.compareAndSet(s, s + 1)) continue;

seqs[idx] = s + mask + 1;
#if KEYS_PRIMITIVE
return elements[idx];
#else
x = elements[idx];
elements[idx] = null;
return x;
#endif
} while (diff != lastDiff);

return x;
}

@Override
public KEY_GENERIC_TYPE DEQUEUE_LAST() {
int diff = 0;
int lastDiff;

do {
lastDiff = diff;
final int s = headSeq.get() - 1;
final int idx = s & mask;
diff = (seqs[idx] - s - 1) & mask;

if (diff == mask) throw new NoSuchElementException();
if (diff > 0 || !headSeq.compareAndSet(s + 1, s)) continue;

seqs[idx] = s + mask + 1;
#if KEYS_PRIMITIVE
return elements[idx];
#else
KEY_GENERIC_TYPE x = elements[idx];
elements[idx] = null;
return x;
#endif
} while (diff != lastDiff);

throw new NoSuchElementException();
}

@Override
public KEY_GENERIC_TYPE DEQUEUE_LAST(KEY_GENERIC_TYPE x) {
int diff = 0;
int lastDiff;

do {
lastDiff = diff;
final int s = headSeq.get() - 1;
final int idx = s & mask;
diff = (seqs[idx] - s - 1) & mask;

if (diff == mask) break;
if (diff > 0 || !headSeq.compareAndSet(s + 1, s)) continue;
seqs[idx] = s + mask + 1;
#if KEYS_PRIMITIVE
return elements[idx];
#else
x = elements[idx];
elements[idx] = null;
return x;
#endif
} while (diff != lastDiff);

return x;
}

@Override
public boolean enqueue(KEY_GENERIC_TYPE x) {
int diff = 0;
int lastDiff;

do {
lastDiff = diff;
final int s = headSeq.get();
final int idx = s & mask;
diff = (seqs[idx] - s) & mask;

if (diff == mask) break;
if (diff > 0 || !headSeq.compareAndSet(s, s + 1)) continue;

seqs[idx]++;
elements[idx] = x;
return true;
} while (diff != lastDiff);

return false;
}

@Override
public boolean enqueueFirst(KEY_GENERIC_TYPE x) {
int diff = 0;
int lastDiff;

do {
lastDiff = diff;
final int s = tailSeq.get() - 1;
final int idx = s & mask;
diff = (seqs[idx] - s - 1) & mask;

if (diff == 0) break;
if (diff < mask || !tailSeq.compareAndSet(s + 1, s)) continue;

seqs[idx] -= mask;
elements[idx] = x;
return true;
} while (diff != lastDiff);

return false;
}

@Override
public KEY_GENERIC_TYPE FIRST() {
int diff = 0;
int lastDiff;

do {
lastDiff = diff;
final int s = tailSeq.get();
final int idx = s & mask;
diff = (seqs[idx] - s - 1) & mask;

if (diff == mask) break;
if (diff > 0) continue;

return elements[idx];
} while (diff != lastDiff);

throw new NoSuchElementException();
}


@Override
public KEY_GENERIC_TYPE FIRST(KEY_GENERIC_TYPE x) {
int diff = 0;
int lastDiff;

do {
lastDiff = diff;
final int s = tailSeq.get();
final int idx = s & mask;
diff = (seqs[idx] - s - 1) & mask;

if (diff == mask) break;
if (diff > 0) continue;

return elements[idx];
} while (diff != lastDiff);

return x;
}


@Override
public KEY_GENERIC_TYPE LAST() {
int diff = 0;
int lastDiff;

do {
lastDiff = diff;
final int s = headSeq.get() - 1;
final int idx = s & mask;
diff = (seqs[idx] - s - 1) & mask;

if (diff == mask) break;
if (diff > 0) continue;

return elements[idx];
} while (diff != lastDiff);

throw new NoSuchElementException();
}

@Override
public KEY_GENERIC_TYPE LAST(KEY_GENERIC_TYPE x) {
int diff = 0;
int lastDiff;

do {
lastDiff = diff;
final int s = headSeq.get() - 1;
final int idx = s & mask;
diff = (seqs[idx] - s - 1) & mask;

if (diff == mask) break;
if (diff > 0) continue;

return elements[idx];
} while (diff != lastDiff);

return x;
}

@Override
public void clear() {
headSeq.set(Integer.MAX_VALUE);
tailSeq.set(Integer.MAX_VALUE);

for (int i = 0; i < seqs.length; i++) {
seqs[i] = i;
#if !KEYS_PRIMITIVE
elements[i] = null;
#endif
}

headSeq.set(0);
tailSeq.set(0);
}

@Override
public int size() {
return Math.abs((int) headSeq.get() - tailSeq.get());
}

public int capacity() {
return mask;
}
}
1 change: 1 addition & 0 deletions gencsource.sh
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ $(if [[ "${CLASS[$k]}" != "" && "${CLASS[$v]}" != "" ]]; then\
"#define HEAP_SESQUI_INDIRECT_DOUBLE_PRIORITY_QUEUE ${TYPE_CAP2[$k]}HeapSesquiIndirectDoublePriorityQueue\n"\
"#define HEAP_INDIRECT_DOUBLE_PRIORITY_QUEUE ${TYPE_CAP2[$k]}HeapIndirectDoublePriorityQueue\n"\
"#define ARRAY_FIFO_QUEUE ${TYPE_CAP2[$k]}ArrayFIFOQueue\n"\
"#define FIXED_ARRAY_FIFO_QUEUE ${TYPE_CAP2[$k]}FixedArrayFIFOQueue\n"\
"#define ARRAY_PRIORITY_QUEUE ${TYPE_CAP2[$k]}ArrayPriorityQueue\n"\
"#define ARRAY_INDIRECT_PRIORITY_QUEUE ${TYPE_CAP2[$k]}ArrayIndirectPriorityQueue\n"\
"#define ARRAY_INDIRECT_DOUBLE_PRIORITY_QUEUE ${TYPE_CAP2[$k]}ArrayIndirectDoublePriorityQueue\n"\
Expand Down
5 changes: 5 additions & 0 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,11 @@ $(ARRAY_FIFO_QUEUES): drv/ArrayFIFOQueue.drv; ./gencsource.sh $< $@ >$@

CSOURCES += $(ARRAY_FIFO_QUEUES)

FIXED_ARRAY_FIFO_QUEUES := $(foreach k,$(TYPE_NOBOOL_NOREF), $(GEN_SRCDIR)/$(PKG_PATH)/$(PACKAGE_$(k))/$(k)FixedArrayFIFOQueue.c)
$(FIXED_ARRAY_FIFO_QUEUES): drv/FixedArrayFIFOQueue.drv; ./gencsource.sh $< $@ >$@

CSOURCES += $(FIXED_ARRAY_FIFO_QUEUES)

HEAP_SEMI_INDIRECT_PRIORITY_QUEUES := $(foreach k, $(TYPE_NOBOOL_NOREF), $(GEN_SRCDIR)/$(PKG_PATH)/$(PACKAGE_$(k))/$(k)HeapSemiIndirectPriorityQueue.c)
$(HEAP_SEMI_INDIRECT_PRIORITY_QUEUES): drv/HeapSemiIndirectPriorityQueue.drv; ./gencsource.sh $< $@ >$@

Expand Down
Loading

0 comments on commit 9f1bf5b

Please sign in to comment.