Skip to content

Commit

Permalink
Buffered io nonstriperbuffer (#43)
Browse files Browse the repository at this point in the history
* Add capability for buffer io raw to use striperless reads

* Add capability for buffer io raw to use striperless reads

* Add a maybe striper for reading in ceph posix

* Use striperless reads when bypassing the buffer
  • Loading branch information
snafus authored May 10, 2023
1 parent 6a168c6 commit 14bb81a
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 10 deletions.
13 changes: 9 additions & 4 deletions src/XrdCeph/XrdCephBuffers/CephIOAdapterRaw.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ using namespace XrdCephBuffer;
using myclock = std::chrono::steady_clock;
//using myseconds = std::chrono::duration<float,

CephIOAdapterRaw::CephIOAdapterRaw(IXrdCephBufferData * bufferdata, int fd) :
m_bufferdata(bufferdata),m_fd(fd) {
CephIOAdapterRaw::CephIOAdapterRaw(IXrdCephBufferData * bufferdata, int fd,
bool useStriperlessReads) :
m_bufferdata(bufferdata),m_fd(fd),
m_useStriperlessReads(useStriperlessReads) {
}

CephIOAdapterRaw::~CephIOAdapterRaw() {
Expand All @@ -30,7 +32,9 @@ CephIOAdapterRaw::~CephIOAdapterRaw() {
<< " write_MBs:" << write_speed
<< " nread:" << m_stats_read_req << " bytesread:" << m_stats_read_bytes << " read_s:"
<< m_stats_read_timer * 1e-3 << " readmax_s:" << m_stats_read_longest * 1e-3
<< " read_MBs:" << read_speed );
<< " read_MBs:" << read_speed
<< " striperlessRead: " << m_useStriperlessReads
);

}

Expand Down Expand Up @@ -60,10 +64,11 @@ ssize_t CephIOAdapterRaw::read(off64_t offset, size_t count) {
if (!buf) {
return -EINVAL;
}
ssize_t rc {0};

// no check is made whether the buffer has sufficient capacity
auto start = std::chrono::steady_clock::now();
ssize_t rc = ceph_posix_pread(m_fd,buf,count,offset);
rc = ceph_posix_maybestriper_pread(m_fd,buf,count,offset, m_useStriperlessReads);
auto end = std::chrono::steady_clock::now();
//auto elapsed = end-start;
auto int_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end-start);
Expand Down
4 changes: 3 additions & 1 deletion src/XrdCeph/XrdCephBuffers/CephIOAdapterRaw.hh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ namespace XrdCephBuffer {
*/
class CephIOAdapterRaw: public virtual ICephIOAdapter {
public:
CephIOAdapterRaw(IXrdCephBufferData * bufferdata, int fd);
CephIOAdapterRaw(IXrdCephBufferData * bufferdata, int fd,
bool useStriperlessReads);
virtual ~CephIOAdapterRaw();

/**
Expand Down Expand Up @@ -57,6 +58,7 @@ class CephIOAdapterRaw: public virtual ICephIOAdapter {
private:
IXrdCephBufferData * m_bufferdata; //!< no ownership of pointer (consider shared ptrs, etc)
int m_fd;
bool m_useStriperlessReads {true}; //!< use the striperless read code

// timer and counter info
std::atomic< long> m_stats_read_timer{0}, m_stats_write_timer{0};
Expand Down
9 changes: 6 additions & 3 deletions src/XrdCeph/XrdCephBuffers/XrdCephBufferAlgSimple.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ using namespace XrdCephBuffer;


XrdCephBufferAlgSimple::XrdCephBufferAlgSimple(std::unique_ptr<IXrdCephBufferData> buffer,
std::unique_ptr<ICephIOAdapter> cephio, int fd ):
m_bufferdata(std::move(buffer)), m_cephio(std::move(cephio)), m_fd(fd){
std::unique_ptr<ICephIOAdapter> cephio, int fd,
bool useStriperlessReads):
m_bufferdata(std::move(buffer)), m_cephio(std::move(cephio)), m_fd(fd),
m_useStriperlessReads(useStriperlessReads) {

}

Expand Down Expand Up @@ -111,7 +113,8 @@ ssize_t XrdCephBufferAlgSimple::read(volatile void *buf, off_t offset, size_t
m_bufferdata->invalidate();
m_bufferLength =0; // ensure cached data is set to zero length
// #FIXME JW: const_cast is probably a bit poor.
ssize_t rc = ceph_posix_pread(m_fd, const_cast<void*>(buf), blen, offset);

ssize_t rc = ceph_posix_maybestriper_pread (m_fd, const_cast<void*>(buf), blen, offset, m_useStriperlessReads);
if (rc > 0) {
m_stats_bytes_fromceph += rc;
m_stats_bytes_toclient += rc;
Expand Down
4 changes: 3 additions & 1 deletion src/XrdCeph/XrdCephBuffers/XrdCephBufferAlgSimple.hh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ namespace XrdCephBuffer {

class XrdCephBufferAlgSimple : public virtual IXrdCephBufferAlg {
public:
XrdCephBufferAlgSimple(std::unique_ptr<IXrdCephBufferData> buffer, std::unique_ptr<ICephIOAdapter> cephio, int fd );
XrdCephBufferAlgSimple(std::unique_ptr<IXrdCephBufferData> buffer, std::unique_ptr<ICephIOAdapter> cephio, int fd,
bool useStriperlessReads = true );
virtual ~XrdCephBufferAlgSimple();

virtual ssize_t read_aio (XrdSfsAio *aoip) override;
Expand All @@ -49,6 +50,7 @@ class XrdCephBufferAlgSimple : public virtual IXrdCephBufferAlg {
std::unique_ptr<IXrdCephBufferData> m_bufferdata; //! this algorithm takes ownership of the buffer, and will delete it on destruction
std::unique_ptr<ICephIOAdapter> m_cephio ; // no ownership is taken here
int m_fd = -1;
bool m_useStriperlessReads {true};

off_t m_bufferStartingOffset = 0;
size_t m_bufferLength = 0;
Expand Down
3 changes: 2 additions & 1 deletion src/XrdCeph/XrdCephOssBufferedFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,8 @@ std::unique_ptr<XrdCephBuffer::IXrdCephBufferAlg> XrdCephOssBufferedFile::create
if (m_bufferIOmode == "aio") {
cephio = std::unique_ptr<ICephIOAdapter>(new CephIOAdapterAIORaw(cephbuffer.get(),m_fd));
} else if (m_bufferIOmode == "io") {
cephio = std::unique_ptr<ICephIOAdapter>(new CephIOAdapterRaw(cephbuffer.get(),m_fd));
cephio = std::unique_ptr<ICephIOAdapter>(new CephIOAdapterRaw(cephbuffer.get(),m_fd,
!m_cephoss->m_useDefaultPreadAlg));
} else {
BUFLOG("XrdCephOssBufferedFile: buffer mode needs to be one of aio|io " );
m_xrdOssDF->Close();
Expand Down
20 changes: 20 additions & 0 deletions src/XrdCeph/XrdCephPosix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,26 @@ ssize_t ceph_posix_pread(int fd, void *buf, size_t count, off64_t offset) {
}
}

ssize_t ceph_posix_maybestriper_pread(int fd, void *buf, size_t count, off64_t offset, bool allowStriper) {
ssize_t rc {0};
if (!allowStriper) {
rc = ceph_posix_pread(fd,buf,count,offset);
return rc;
}
rc = ceph_posix_nonstriper_pread(fd, buf, count,offset);
if (-ENOENT == rc || -ENOTSUP == rc) {
//This might be a sparse file or nbstripes > 1, so let's try striper read
rc = ceph_posix_pread(fd, buf, count,offset);
if (rc >= 0) {
char err_str[100]; //99 symbols should be enough for the short message
snprintf(err_str, 100, "WARNING! The file (fd %d) seem to be sparse, this is not expected", fd);
logwrapper(err_str);
}
}
return rc;
}


static void ceph_aio_read_complete(rados_completion_t c, void *arg) {
AioArgs *awa = reinterpret_cast<AioArgs*>(arg);
size_t rc = rados_aio_get_return_value(c);
Expand Down
2 changes: 2 additions & 0 deletions src/XrdCeph/XrdCephPosix.hh
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ ssize_t ceph_striper_readv(int fd, XrdOucIOVec *readV, int n);
ssize_t ceph_posix_read(int fd, void *buf, size_t count);
ssize_t ceph_posix_nonstriper_pread(int fd, void *buf, size_t count, off64_t offset);
ssize_t ceph_posix_pread(int fd, void *buf, size_t count, off64_t offset);
ssize_t ceph_posix_maybestriper_pread(int fd, void *buf, size_t count, off64_t offset, bool allowStriper=true);

ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb);
int ceph_posix_fstat(int fd, struct stat *buf);
int ceph_posix_stat(XrdOucEnv* env, const char *pathname, struct stat *buf);
Expand Down

0 comments on commit 14bb81a

Please sign in to comment.