From d1f52b50f7fb5547e803fc62ca7724b1c972ccf5 Mon Sep 17 00:00:00 2001 From: JackH Date: Sun, 21 Jun 2020 16:40:55 +0000 Subject: [PATCH] Add xGPU calls; beginnings of an LWA352 pipeline --- .gitmodules | 3 + bifrost | 2 +- test-scripts/pipeline.py | 501 ++++++++++++++++++++++++++++++++++++++ test-scripts/tweak.sh | 3 + test-scripts/xgpu_test.py | 110 +++++++++ xgpu | 1 + 6 files changed, 619 insertions(+), 1 deletion(-) create mode 100644 test-scripts/pipeline.py create mode 100644 test-scripts/tweak.sh create mode 100644 test-scripts/xgpu_test.py create mode 160000 xgpu diff --git a/.gitmodules b/.gitmodules index c524564..c48a82a 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "bifrost"] path = bifrost url = https://github.com/realtimeradio/bifrost +[submodule "xgpu"] + path = xgpu + url = https://github.com/realtimeradio/xgpu diff --git a/bifrost b/bifrost index 357283e..6d088a2 160000 --- a/bifrost +++ b/bifrost @@ -1 +1 @@ -Subproject commit 357283edddf7a74fc0c4264ba7296f297ea0f133 +Subproject commit 6d088a2aba4a9280fe52de0f7fac7571c45ac5a0 diff --git a/test-scripts/pipeline.py b/test-scripts/pipeline.py new file mode 100644 index 0000000..de4f75d --- /dev/null +++ b/test-scripts/pipeline.py @@ -0,0 +1,501 @@ +from bifrost.address import Address +from bifrost.udp_socket import UDPSocket +from bifrost.packet_capture import PacketCaptureCallback, UDPCapture +from bifrost.packet_writer import HeaderInfo, DiskWriter, UDPTransmit +from bifrost.ring import Ring, WriteSpan +import bifrost.affinity as cpu_affinity +import bifrost.ndarray as BFArray +from bifrost.ndarray import copy_array +from bifrost.unpack import unpack as Unpack +from bifrost.libbifrost import bf +from bifrost.proclog import ProcLog +from bifrost.memory import memcpy as BFMemCopy, memset as BFMemSet +from bifrost.linalg import LinAlg +from bifrost import map as BFMap, asarray as BFAsArray +from bifrost.device import set_device as BFSetGPU, get_device as BFGetGPU, stream_synchronize as BFSync, set_devices_no_spin_cpu as BFNoSpinZone +BFNoSpinZone() + +from bifrost.libbifrost import _bf + +#import numpy as np +import signal +import logging +import time +import os +import argparse +import ctypes +import threading +import json +import socket +import struct +#import time +import datetime +from collections import deque +import numpy as np + +ACTIVE_COR_CONFIG = threading.Event() + +__version__ = "0.2" +__date__ = '$LastChangedDate: 2016-08-09 15:44:00 -0600 (Fri, 25 Jul 2014) $' +__author__ = "Ben Barsdell, Daniel Price, Jayce Dowell" +__copyright__ = "Copyright 2016, The LWA-SV Project" +__credits__ = ["Ben Barsdell", "Daniel Price", "Jayce Dowell"] +__license__ = "Apache v2" +__maintainer__ = "Jayce Dowell" +__email__ = "jdowell at unm" +__status__ = "Development" + +class CaptureOp(object): + time_tag = 0 + def __init__(self, log, *args, **kwargs): + self.log = log + self.args = args + self.kwargs = kwargs + self.utc_start = self.kwargs['utc_start'] + del self.kwargs['utc_start'] + self.shutdown_event = threading.Event() + ## HACK TESTING + #self.seq_callback = None + def shutdown(self): + self.shutdown_event.set() + def seq_callback(self, seq0, chan0, nchan, nsrc, + time_tag_ptr, hdr_ptr, hdr_size_ptr): + timestamp0 = 0 + time_tag0 = 0 + self.time_tag += 1 + time_tag = self.time_tag + #print("++++++++++++++++ seq0 =", seq0) + #print(" time_tag =", time_tag) + time_tag_ptr[0] = time_tag + hdr = {'time_tag': time_tag, + 'seq0': seq0, + 'chan0': chan0, + 'nchan': nchan, + 'cfreq': (chan0 + 0.5*(nchan-1))*1, + 'bw': nchan*1, + 'nstand': nsrc*16, + #'stand0': src0*16, # TODO: Pass src0 to the callback too(?) + 'npol': 2, + 'complex': True, + 'nbit': 4} + hdr_str = json.dumps(hdr).encode() + # TODO: Can't pad with NULL because returned as C-string + #hdr_str = json.dumps(hdr).ljust(4096, '\0') + #hdr_str = json.dumps(hdr).ljust(4096, ' ') + self.header_buf = ctypes.create_string_buffer(hdr_str) + hdr_ptr[0] = ctypes.cast(self.header_buf, ctypes.c_void_p) + hdr_size_ptr[0] = len(hdr_str) + return 0 + def main(self): + seq_callback = PacketCaptureCallback() + seq_callback.set_snap2(self.seq_callback) + with UDPCapture(*self.args, + sequence_callback=seq_callback, + **self.kwargs) as capture: + while not self.shutdown_event.is_set(): + status = capture.recv() + #print status + del capture + +class CopyOp(object): + """ + Copy data from one buffer to another. + """ + def __init__(self, log, iring, oring, ntime_gulp=2500, + guarantee=True, core=-1, nchans=192, npols=704): + self.log = log + self.iring = iring + self.oring = oring + self.ntime_gulp = ntime_gulp + self.guarantee = guarantee + self.core = core + + self.bind_proclog = ProcLog(type(self).__name__+"/bind") + self.in_proclog = ProcLog(type(self).__name__+"/in") + self.out_proclog = ProcLog(type(self).__name__+"/out") + self.size_proclog = ProcLog(type(self).__name__+"/size") + self.sequence_proclog = ProcLog(type(self).__name__+"/sequence0") + self.perf_proclog = ProcLog(type(self).__name__+"/perf") + + self.in_proclog.update( {'nring':1, 'ring0':self.iring.name}) + self.out_proclog.update( {'nring':1, 'ring0':self.oring.name}) + self.size_proclog.update({'nseq_per_gulp': self.ntime_gulp}) + self.igulp_size = self.ntime_gulp*nchans*npols*1 # complex8 + + def main(self): + cpu_affinity.set_core(self.core) + self.bind_proclog.update({'ncore': 1, + 'core0': cpu_affinity.get_core(),}) + + self.oring.resize(self.igulp_size) + with self.oring.begin_writing() as oring: + for iseq in self.iring.read(guarantee=self.guarantee): + ihdr = json.loads(iseq.header.tostring()) + ohdr = ihdr.copy() + # Mash header in here if you want + ohdr_str = json.dumps(ohdr) + with oring.begin_sequence(time_tag=iseq.time_tag, header=ohdr_str, nringlet=iseq.nringlet) as oseq: + for ispan in iseq.read(self.igulp_size): + with oseq.reserve(self.igulp_size) as ospan: + self.log.info("Copying output") + copy_array(ospan.data, ispan.data) + +class Corr(object): + """ + Perform cross-correlation using xGPU + """ + def __init__(self, log, iring, oring, ntime_gulp=2500, + guarantee=True, core=-1, nchans=192, npols=704, acc_len=2400): + assert (acc_len % ntime_gulp == 0), "Acculmulation length must be a multiple of gulp size" + # TODO: Other things we could check: + # - that nchans/pols/gulp_size matches XGPU compilation + self.log = log + self.iring = iring + self.oring = oring + self.ntime_gulp = ntime_gulp + self.guarantee = guarantee + self.core = core + self.acc_len = acc_len + + self.bind_proclog = ProcLog(type(self).__name__+"/bind") + self.in_proclog = ProcLog(type(self).__name__+"/in") + self.out_proclog = ProcLog(type(self).__name__+"/out") + self.size_proclog = ProcLog(type(self).__name__+"/size") + self.sequence_proclog = ProcLog(type(self).__name__+"/sequence0") + self.perf_proclog = ProcLog(type(self).__name__+"/perf") + + self.in_proclog.update( {'nring':1, 'ring0':self.iring.name}) + self.out_proclog.update( {'nring':1, 'ring0':self.oring.name}) + self.size_proclog.update({'nseq_per_gulp': self.ntime_gulp}) + self.igulp_size = self.ntime_gulp*nchans*npols*1 # complex8 + self.ogulp_size = 47849472 * 8 # complex64 + + # initialize xGPU. Arrays passed as inputs don't really matter here + # but passing something prevents xGPU from trying to allocate + # host memory + ibuf = BFArray(self.igulp_size, dtype='ci4') + obuf = BFArray(self.ogulp_size // 8, dtype='ci32') + _bf.xgpuInitialize(ibuf.as_BFarray(), obuf.as_BFarray(), 0) + + def main(self): + cpu_affinity.set_core(self.core) + self.bind_proclog.update({'ncore': 1, + 'core0': cpu_affinity.get_core(),}) + + self.oring.resize(self.ogulp_size) + with self.oring.begin_writing() as oring: + for iseq in self.iring.read(guarantee=self.guarantee): + self.log.info("Correlating output") + ihdr = json.loads(iseq.header.tostring()) + subacc_id = ihdr['seq0'] % self.acc_len + first = subacc_id == 0 + last = subacc_id == self.acc_len - self.ntime_gulp + ohdr = ihdr.copy() + # Mash header in here if you want + ohdr_str = json.dumps(ohdr) + for ispan in iseq.read(self.igulp_size): + if first: + oseq = oring.begin_sequence(time_tag=iseq.time_tag, header=ohdr_str, nringlet=iseq.nringlet) + ospan = WriteSpan(oseq.ring, self.ogulp_size, nonblocking=False) + _bf.xgpuCorrelate(ispan.data.as_BFarray(), ospan.data.as_BFarray(), int(last)) + if last: + ospan.close() + oseq.end() + +class CorrAcc(object): + """ + Perform GPU side accumulation and then transfer to CPU + """ + def __init__(self, log, iring, oring, ntime_gulp=2400, + guarantee=True, core=-1, nchans=192, npols=704, acc_len=24000): + # TODO: Other things we could check: + # - that nchans/pols/gulp_size matches XGPU compilation + self.log = log + self.iring = iring + self.oring = oring + self.guarantee = guarantee + self.core = core + self.ntime_gulp = ntime_gulp + self.acc_len = acc_len + + self.bind_proclog = ProcLog(type(self).__name__+"/bind") + self.in_proclog = ProcLog(type(self).__name__+"/in") + self.out_proclog = ProcLog(type(self).__name__+"/out") + self.size_proclog = ProcLog(type(self).__name__+"/size") + self.sequence_proclog = ProcLog(type(self).__name__+"/sequence0") + self.perf_proclog = ProcLog(type(self).__name__+"/perf") + + self.in_proclog.update( {'nring':1, 'ring0':self.iring.name}) + self.out_proclog.update( {'nring':1, 'ring0':self.oring.name}) + self.size_proclog.update({'nseq_per_gulp': self.ntime_gulp}) + self.igulp_size = 47849472 * 8 # complex64 + self.ogulp_size = self.igulp_size + # integration buffer + self.accdata = BFArray(shape=(self.igulp_size // 4), dtype='i32', space='cuda') + self.zeros = BFArray(np.zeros(self.igulp_size // 4), dtype='i32', space='cuda') + + self.bfbf = LinAlg() + + + def main(self): + cpu_affinity.set_core(self.core) + self.bind_proclog.update({'ncore': 1, + 'core0': cpu_affinity.get_core(),}) + + self.oring.resize(self.ogulp_size) + oseq = None + with self.oring.begin_writing() as oring: + for iseq in self.iring.read(guarantee=self.guarantee): + ihdr = json.loads(iseq.header.tostring()) + subacc_id = ihdr['seq0'] % self.acc_len + first = subacc_id == 0 + last = subacc_id == self.acc_len - self.ntime_gulp + ohdr = ihdr.copy() + # Mash header in here if you want + ohdr_str = json.dumps(ohdr) + for ispan in iseq.read(self.igulp_size): + self.log.info("Accumulating correlation") + if first: + oseq = oring.begin_sequence(time_tag=iseq.time_tag, header=ohdr_str, nringlet=iseq.nringlet) + ospan = WriteSpan(oseq.ring, self.ogulp_size, nonblocking=False) + self.accdata = self.zeros + # do accumulation + idata = ispan.data_view('i32') + BFMap("a += b", data={'a': self.accdata, 'b': idata}) + if last: + if oseq is None: + print("Skipping output because oseq isn't open") + else: + # copy to CPU + odata = ospan.data_view('i32') + print(self.accdata.shape, odata.shape) + odata = self.accdata + ospan.close() + oseq.end() + oseq = None + +class CorrSub(object): + """ + Subselect entries from a full visitibility matrix and copy these + from GPU to CPU + """ + def __init__(self, log, iring, oring, ntime_gulp=2500, + guarantee=True, core=-1, nchans=192, npols=704): + self.log = log + self.iring = iring + self.oring = oring + self.ntime_gulp = ntime_gulp + self.guarantee = guarantee + self.core = core + + self.bind_proclog = ProcLog(type(self).__name__+"/bind") + self.in_proclog = ProcLog(type(self).__name__+"/in") + self.out_proclog = ProcLog(type(self).__name__+"/out") + self.size_proclog = ProcLog(type(self).__name__+"/size") + self.sequence_proclog = ProcLog(type(self).__name__+"/sequence0") + self.perf_proclog = ProcLog(type(self).__name__+"/perf") + + self.in_proclog.update( {'nring':1, 'ring0':self.iring.name}) + self.out_proclog.update( {'nring':1, 'ring0':self.oring.name}) + self.size_proclog.update({'nseq_per_gulp': self.ntime_gulp}) + self.igulp_size = self.ntime_gulp*nchans*npols*1 # complex8 + self.ogulp_size = 47849472 * 8 # complex64 + + # initialize xGPU. Arrays passed as inputs don't really matter here + # but passing something prevents xGPU from trying to allocate + # host memory + ibuf = BFArray(self.igulp_size, dtype='ci4') + obuf = BFArray(self.ogulp_size // 8, dtype='ci32') + _bf.xgpuInitialize(ibuf.as_BFarray(), obuf.as_BFarray(), 0) + + def main(self): + cpu_affinity.set_core(self.core) + self.bind_proclog.update({'ncore': 1, + 'core0': cpu_affinity.get_core(),}) + + self.oring.resize(self.ogulp_size) + #with self.oring.begin_writing() as oring: + # for iseq in self.iring.read(guarantee=self.guarantee): + # self.log.info("Correlating output") + # ihdr = json.loads(iseq.header.tostring()) + # subacc_id = ihdr['seq0'] % self.acc_len + # first = subacc_id == 0 + # last = subacc_id == self.acc_len - self.ntime_gulp + # ohdr = ihdr.copy() + # # Mash header in here if you want + # ohdr_str = json.dumps(ohdr) + # for ispan in iseq.read(self.igulp_size): + # if first: + # oseq = oring.begin_sequence(time_tag=iseq.time_tag, header=ohdr_str, nringlet=iseq.nringlet) + # ospan = WriteSpan(oseq.ring, self.ogulp_size, nonblocking=False) + # _bf.xgpuCorrelate(ispan.data.as_BFarray(), ospan.data.as_BFarray(), int(last)) + # if last: + # ospan.close() + # oseq.end() + +class DummyOp(object): + def __init__(self, log, iring, guarantee=True, core=-1, ntime_gulp=128): + self.log = log + self.iring = iring + self.guarantee = guarantee + self.core = core + self.ntime_gulp = ntime_gulp + + self.bind_proclog = ProcLog(type(self).__name__+"/bind") + self.in_proclog = ProcLog(type(self).__name__+"/in") + self.size_proclog = ProcLog(type(self).__name__+"/size") + self.sequence_proclog = ProcLog(type(self).__name__+"/sequence0") + self.perf_proclog = ProcLog(type(self).__name__+"/perf") + + self.in_proclog.update( {'nring':1, 'ring0':self.iring.name}) + + def main(self): + cpu_affinity.set_core(self.core) + self.bind_proclog.update({'ncore': 1, + 'core0': cpu_affinity.get_core(),}) + + igulp_size = self.ntime_gulp*64*704*1 # complex8 + for iseq in self.iring.read(guarantee=True): + self.log.info("Dumping output") + #for ispan in iseq.read(igulp_size): + # if ispan.size < igulp_size: + # print('ignoring final gulp') + # continue # Ignore final gulp + + + +def main(argv): + parser = argparse.ArgumentParser(description='LWA-SV ADP DRX Service') + parser.add_argument('-f', '--fork', action='store_true', help='Fork and run in the background') + parser.add_argument('-c', '--configfile', default='adp_config.json', help='Specify config file') + parser.add_argument('-d', '--dryrun', action='store_true', help='Test without acting') + parser.add_argument('-l', '--logfile', default=None, help='Specify log file') + parser.add_argument('-v', '--verbose', action='count', default=0, help='Increase verbosity') + parser.add_argument('-q', '--quiet', action='count', default=0, help='Decrease verbosity') + args = parser.parse_args() + + # Fork, if requested + tuning = 0 + if args.fork: + stderr = '/tmp/%s_%i.stderr' % (os.path.splitext(os.path.basename(__file__))[0], tuning) + daemonize(stdin='/dev/null', stdout='/dev/null', stderr=stderr) + + log = logging.getLogger(__name__) + logFormat = logging.Formatter('%(asctime)s [%(levelname)-8s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + logFormat.converter = time.gmtime + if args.logfile is None: + logHandler = logging.StreamHandler(sys.stdout) + else: + logHandler = Adp.AdpFileHandler(config, args.logfile) + logHandler.setFormatter(logFormat) + log.addHandler(logHandler) + verbosity = args.verbose - args.quiet + if verbosity > 0: log.setLevel(logging.DEBUG) + elif verbosity == 0: log.setLevel(logging.INFO) + elif verbosity < 0: log.setLevel(logging.WARNING) + + + short_date = ' '.join(__date__.split()[1:4]) + log.info("Starting %s with PID %i", argv[0], os.getpid()) + log.info("Cmdline args: \"%s\"", ' '.join(argv[1:])) + log.info("Version: %s", __version__) + log.info("Last changed: %s", short_date) + log.info("Config file: %s", args.configfile) + log.info("Log file: %s", args.logfile) + log.info("Dry run: %r", args.dryrun) + + ops = [] + shutdown_event = threading.Event() + def handle_signal_terminate(signum, frame): + SIGNAL_NAMES = dict((k, v) for v, k in \ + reversed(sorted(signal.__dict__.items())) + if v.startswith('SIG') and \ + not v.startswith('SIG_')) + log.warning("Received signal %i %s", signum, SIGNAL_NAMES[signum]) + try: + ops[0].shutdown() + except IndexError: + pass + shutdown_event.set() + for sig in [signal.SIGHUP, + signal.SIGINT, + signal.SIGQUIT, + signal.SIGTERM, + signal.SIGTSTP]: + signal.signal(sig, handle_signal_terminate) + + + hostname = socket.gethostname() + server_idx = 0 # HACK to allow testing on head node "adp" + log.info("Hostname: %s", hostname) + log.info("Server index: %i", server_idx) + + iaddr = Address('100.100.100.101', 10000) + isock = UDPSocket() + isock.bind(iaddr) + isock.timeout = 0.5 + + print("binding input to", iaddr) + + capture_ring = Ring(name="capture", space='cuda_host') + gpu_input_ring = Ring(name="gpu-input", space='cuda') + corr_output_ring = Ring(name="corr-output", space='cuda') + corr_slow_output_ring = Ring(name="corr-output", space='cuda_host') + + # TODO: Figure out what to do with this resize + GSIZE = 480#1200 + SLOT_NTIME = 2*GSIZE + + cores = list(range(8)) + + nroach = 11 + nfreqblocks = 2 + roach0 = 0 + ops.append(CaptureOp(log, fmt="snap2", sock=isock, ring=capture_ring, + nsrc=nroach*nfreqblocks, src0=0, max_payload_size=9000, + buffer_ntime=GSIZE, slot_ntime=SLOT_NTIME, core=cores.pop(0), + utc_start=datetime.datetime.now())) + + # capture_ring -> triggered buffer + + ops.append(CopyOp(log, iring=capture_ring, oring=gpu_input_ring, ntime_gulp=GSIZE, + core=cores.pop(0), guarantee=True)) + + # gpu_input_ring -> beamformer + # beamformer -> UDP + + ops.append(Corr(log, iring=gpu_input_ring, oring=corr_output_ring, ntime_gulp=GSIZE, + core=cores.pop(0), guarantee=True)) + + #ops.append(CorrSubOutput(log, iring=corr_output_ring, oring=corr_fast_output_ring, + # core=cores.pop(0), guarantee=True)) + + ops.append(CorrAcc(log, iring=corr_output_ring, oring=corr_slow_output_ring, + core=cores.pop(0), guarantee=True, acc_len=24000)) + + # corr_slow_output -> UDP + # corr_fast_output -> UDP + + final_ring = corr_slow_output_ring + + ops.append(DummyOp(log=log, iring=final_ring, + core=cores.pop(0), ntime_gulp=GSIZE)) + + threads = [threading.Thread(target=op.main) for op in ops] + + log.info("Launching %i thread(s)", len(threads)) + for thread in threads: + #thread.daemon = True + thread.start() + while not shutdown_event.is_set(): + signal.pause() + log.info("Shutdown, waiting for threads to join") + for thread in threads: + thread.join() + log.info("All done") + return 0 + +if __name__ == '__main__': + import sys + sys.exit(main(sys.argv)) diff --git a/test-scripts/tweak.sh b/test-scripts/tweak.sh new file mode 100644 index 0000000..b407018 --- /dev/null +++ b/test-scripts/tweak.sh @@ -0,0 +1,3 @@ +sudo ifconfig ens1f1 100.100.100.101/24 mtu 9000 +sudo sysctl -w net.core.rmem_max=26214400 +sudo sysctl -w net.core.rmem_default=26214400 diff --git a/test-scripts/xgpu_test.py b/test-scripts/xgpu_test.py new file mode 100644 index 0000000..0fcd790 --- /dev/null +++ b/test-scripts/xgpu_test.py @@ -0,0 +1,110 @@ +import bifrost as bf +from bifrost.libbifrost import _bf +import time +import numpy as np + +NSTATION=352; NFREQUENCY=192; NTIME=512; +DOSIM=False +#NSTATION=32; NFREQUENCY=32; NTIME=32; +MATLEN = 47849472 + +## Computes the triangular index of an (i,j) pair as shown here... +## NB: Output is valid only if i >= j. +## +## i=0 1 2 3 4.. +## +--------------- +## j=0 | 00 01 03 06 10 +## 1 | 02 04 07 11 +## 2 | 05 08 12 +## 3 | 09 13 +## 4 | 14 +## : +def tri_index(i, j): + return (i * (i+1))//2 + j; + +## Returns index into the GPU's register tile ordered output buffer for the +## real component of the cross product of inputs in0 and in1. Note that in0 +## and in1 are input indexes (i.e. 0 based) and often represent antenna and +## polarization by passing (2*ant_idx+pol_idx) as the input number (NB: ant_idx +## and pol_idx are also 0 based). Return value is valid if in1 >= in0. The +## corresponding imaginary component is located xgpu_info.matLength words after +## the real component. +def regtile_index(in0, in1): + a0 = in0 >> 1; + a1 = in1 >> 1; + p0 = in0 & 1; + p1 = in1 & 1; + num_words_per_cell = 4; + + # Index within a quadrant + quadrant_index = tri_index(a1//2, a0//2); + # Quadrant for this input pair + quadrant = 2*(a0&1) + (a1&1); + # Size of quadrant + quadrant_size = (NSTATION//2 + 1) * NSTATION//4; + # Index of cell (in units of cells) + cell_index = quadrant*quadrant_size + quadrant_index; + #printf("%s: in0=%d, in1=%d, a0=%d, a1=%d, cell_index=%d\n", __FUNCTION__, in0, in1, a0, a1, cell_index); + # Pol offset + pol_offset = 2*p1 + p0; + # Word index (in units of words (i.e. floats) of real component + index = (cell_index * num_words_per_cell) + pol_offset; + return index; + + +SPACE='cuda' + +invec = np.ones([NTIME, NFREQUENCY, NSTATION, 2]) +if DOSIM: + print("Polulating test vectors") + for t in range(NTIME): + for s in range(NSTATION): + invec[:,:,s,:] = (s+1)%7 +else: + print("Not populating test vectors") + +print('allocating input') +ibuf = bf.ndarray(invec, dtype='ci4', space=SPACE) +print('allocating output') +obuf = bf.ndarray(np.zeros([MATLEN], dtype=np.int32), dtype='ci32', space=SPACE) +#obuf = bf.ndarray(np.zeros([NFREQUENCY, MATLEN//NFREQUENCY], dtype=np.int32), dtype='ci32', space=SPACE) +print(obuf[0:10]) + +if SPACE == 'cuda': + print('running kernel as_GPUarray') + _bf.xgpuInitialize(ibuf.as_BFarray(), obuf.as_BFarray(), 0) + for i in range(4): + _bf.xgpuKernel(ibuf.as_BFarray(), obuf.as_BFarray(), 0) + _bf.xgpuKernel(ibuf.as_BFarray(), obuf.as_BFarray(), 1) + print(obuf[0:10]) +else: + print('running kernel as_BFarray') + _bf.xgpuInitialize(ibuf.as_BFarray(), obuf.as_BFarray(), 0) + for i in range(4): + _bf.xgpuCorrelate(ibuf.as_BFarray(), obuf.as_BFarray(), 0) + _bf.xgpuCorrelate(ibuf.as_BFarray(), obuf.as_BFarray(), 1) + print(obuf[0:10]) + +obuf_cpu = obuf.copy(space='system') +print('copied') +#view as real/imag x chan x station +o = obuf_cpu.view(dtype=np.int32).reshape(2, NFREQUENCY, MATLEN//NFREQUENCY) +oc = o[0,0,:] + 1j*o[1,0,:] + +acc_len = 5 * NTIME +for s0 in range(5): + for s1 in range(s0, 5): + ar = ibuf[0,0,s0,0].real[0] + ai = ibuf[0,0,s0,0].imag[0] + a = ar + 1j*ai + br = ibuf[0,0,s1,0].real[0] + bi = ibuf[0,0,s1,0].imag[0] + b = br + 1j*bi + v = a * np.conj(b) + v *= acc_len + print(s0, s1, v, oc[regtile_index(2*s0,2*s1)]) + +#from matplotlib import pyplot as plt +#plt.plot(o[0,0,:]) +#plt.plot(o[1,0,:]) +#plt.show() diff --git a/xgpu b/xgpu new file mode 160000 index 0000000..3e0eb1c --- /dev/null +++ b/xgpu @@ -0,0 +1 @@ +Subproject commit 3e0eb1c6667d8e0747fd2a3ff74c60204729f14d