diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index ab255849d6f..fd613c26442 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -88,6 +88,11 @@ "ImportPath": "github.com/jlhawn/go-crypto", "Rev": "cd738dde20f0b3782516181b0866c9bb9db47401" }, + { + "ImportPath": "github.com/noahdesu/go-ceph/rados", + "Comment": "v.0.3.0-28-gc7a0450", + "Rev": "c7a04507129089fbc6a994b57b4feee4ef22c64c" + }, { "ImportPath": "github.com/yvasiyarov/go-metrics", "Rev": "57bccd1ccd43f94bb17fdd8bf3007059b802f85e" diff --git a/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/conn.go b/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/conn.go new file mode 100644 index 00000000000..af3cfebe182 --- /dev/null +++ b/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/conn.go @@ -0,0 +1,300 @@ +package rados + +// #cgo LDFLAGS: -lrados +// #include +// #include +import "C" + +import "unsafe" +import "bytes" + +// ClusterStat represents Ceph cluster statistics. +type ClusterStat struct { + Kb uint64 + Kb_used uint64 + Kb_avail uint64 + Num_objects uint64 +} + +// Conn is a connection handle to a Ceph cluster. +type Conn struct { + cluster C.rados_t +} + +// PingMonitor sends a ping to a monitor and returns the reply. +func (c *Conn) PingMonitor(id string) (string, error) { + c_id := C.CString(id) + defer C.free(unsafe.Pointer(c_id)) + + var strlen C.size_t + var strout *C.char + + ret := C.rados_ping_monitor(c.cluster, c_id, &strout, &strlen) + defer C.rados_buffer_free(strout) + + if ret == 0 { + reply := C.GoStringN(strout, (C.int)(strlen)) + return reply, nil + } else { + return "", RadosError(int(ret)) + } +} + +// Connect establishes a connection to a RADOS cluster. It returns an error, +// if any. +func (c *Conn) Connect() error { + ret := C.rados_connect(c.cluster) + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +// Shutdown disconnects from the cluster. +func (c *Conn) Shutdown() { + C.rados_shutdown(c.cluster) +} + +// ReadConfigFile configures the connection using a Ceph configuration file. +func (c *Conn) ReadConfigFile(path string) error { + c_path := C.CString(path) + defer C.free(unsafe.Pointer(c_path)) + ret := C.rados_conf_read_file(c.cluster, c_path) + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +// ReadDefaultConfigFile configures the connection using a Ceph configuration +// file located at default locations. +func (c *Conn) ReadDefaultConfigFile() error { + ret := C.rados_conf_read_file(c.cluster, nil) + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +func (c *Conn) OpenIOContext(pool string) (*IOContext, error) { + c_pool := C.CString(pool) + defer C.free(unsafe.Pointer(c_pool)) + ioctx := &IOContext{} + ret := C.rados_ioctx_create(c.cluster, c_pool, &ioctx.ioctx) + if ret == 0 { + return ioctx, nil + } else { + return nil, RadosError(int(ret)) + } +} + +// ListPools returns the names of all existing pools. +func (c *Conn) ListPools() (names []string, err error) { + buf := make([]byte, 4096) + for { + ret := int(C.rados_pool_list(c.cluster, + (*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf)))) + if ret < 0 { + return nil, RadosError(int(ret)) + } + + if ret > len(buf) { + buf = make([]byte, ret) + continue + } + + tmp := bytes.SplitAfter(buf[:ret-1], []byte{0}) + for _, s := range tmp { + if len(s) > 0 { + name := C.GoString((*C.char)(unsafe.Pointer(&s[0]))) + names = append(names, name) + } + } + + return names, nil + } +} + +// SetConfigOption sets the value of the configuration option identified by +// the given name. +func (c *Conn) SetConfigOption(option, value string) error { + c_opt, c_val := C.CString(option), C.CString(value) + defer C.free(unsafe.Pointer(c_opt)) + defer C.free(unsafe.Pointer(c_val)) + ret := C.rados_conf_set(c.cluster, c_opt, c_val) + if ret < 0 { + return RadosError(int(ret)) + } else { + return nil + } +} + +// GetConfigOption returns the value of the Ceph configuration option +// identified by the given name. +func (c *Conn) GetConfigOption(name string) (value string, err error) { + buf := make([]byte, 4096) + c_name := C.CString(name) + defer C.free(unsafe.Pointer(c_name)) + ret := int(C.rados_conf_get(c.cluster, c_name, + (*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf)))) + // FIXME: ret may be -ENAMETOOLONG if the buffer is not large enough. We + // can handle this case, but we need a reliable way to test for + // -ENAMETOOLONG constant. Will the syscall/Errno stuff in Go help? + if ret == 0 { + value = C.GoString((*C.char)(unsafe.Pointer(&buf[0]))) + return value, nil + } else { + return "", RadosError(ret) + } +} + +// WaitForLatestOSDMap blocks the caller until the latest OSD map has been +// retrieved. +func (c *Conn) WaitForLatestOSDMap() error { + ret := C.rados_wait_for_latest_osdmap(c.cluster) + if ret < 0 { + return RadosError(int(ret)) + } else { + return nil + } +} + +// GetClusterStat returns statistics about the cluster associated with the +// connection. +func (c *Conn) GetClusterStats() (stat ClusterStat, err error) { + c_stat := C.struct_rados_cluster_stat_t{} + ret := C.rados_cluster_stat(c.cluster, &c_stat) + if ret < 0 { + return ClusterStat{}, RadosError(int(ret)) + } else { + return ClusterStat{ + Kb: uint64(c_stat.kb), + Kb_used: uint64(c_stat.kb_used), + Kb_avail: uint64(c_stat.kb_avail), + Num_objects: uint64(c_stat.num_objects), + }, nil + } +} + +// ParseCmdLineArgs configures the connection from command line arguments. +func (c *Conn) ParseCmdLineArgs(args []string) error { + // add an empty element 0 -- Ceph treats the array as the actual contents + // of argv and skips the first element (the executable name) + argc := C.int(len(args) + 1) + argv := make([]*C.char, argc) + + // make the first element a string just in case it is ever examined + argv[0] = C.CString("placeholder") + defer C.free(unsafe.Pointer(argv[0])) + + for i, arg := range args { + argv[i+1] = C.CString(arg) + defer C.free(unsafe.Pointer(argv[i+1])) + } + + ret := C.rados_conf_parse_argv(c.cluster, argc, &argv[0]) + if ret < 0 { + return RadosError(int(ret)) + } else { + return nil + } +} + +// ParseDefaultConfigEnv configures the connection from the default Ceph +// environment variable(s). +func (c *Conn) ParseDefaultConfigEnv() error { + ret := C.rados_conf_parse_env(c.cluster, nil) + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +// GetFSID returns the fsid of the cluster as a hexadecimal string. The fsid +// is a unique identifier of an entire Ceph cluster. +func (c *Conn) GetFSID() (fsid string, err error) { + buf := make([]byte, 37) + ret := int(C.rados_cluster_fsid(c.cluster, + (*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf)))) + // FIXME: the success case isn't documented correctly in librados.h + if ret == 36 { + fsid = C.GoString((*C.char)(unsafe.Pointer(&buf[0]))) + return fsid, nil + } else { + return "", RadosError(int(ret)) + } +} + +// GetInstanceID returns a globally unique identifier for the cluster +// connection instance. +func (c *Conn) GetInstanceID() uint64 { + // FIXME: are there any error cases for this? + return uint64(C.rados_get_instance_id(c.cluster)) +} + +// MakePool creates a new pool with default settings. +func (c *Conn) MakePool(name string) error { + c_name := C.CString(name) + defer C.free(unsafe.Pointer(c_name)) + ret := int(C.rados_pool_create(c.cluster, c_name)) + if ret == 0 { + return nil + } else { + return RadosError(ret) + } +} + +// DeletePool deletes a pool and all the data inside the pool. +func (c *Conn) DeletePool(name string) error { + c_name := C.CString(name) + defer C.free(unsafe.Pointer(c_name)) + ret := int(C.rados_pool_delete(c.cluster, c_name)) + if ret == 0 { + return nil + } else { + return RadosError(ret) + } +} + +// MonCommand sends a command to one of the monitors +func (c *Conn) MonCommand(args []byte) (buffer []byte, info string, err error) { + argv := make([]*C.char, len(args)) + for i, _ := range args { + argv[i] = (*C.char)(unsafe.Pointer(&args[i])) + } + + var ( + outs, outbuf *C.char + outslen, outbuflen C.size_t + ) + inbuf := C.CString("") + defer C.free(unsafe.Pointer(inbuf)) + + ret := C.rados_mon_command(c.cluster, + &argv[0], C.size_t(len(args)), + inbuf, // bulk input (e.g. crush map) + C.size_t(0), // length inbuf + &outbuf, // buffer + &outbuflen, // buffer length + &outs, // status string + &outslen) + + if outslen > 0 { + info = C.GoStringN(outs, C.int(outslen)) + C.free(unsafe.Pointer(outs)) + } + if outbuflen > 0 { + buffer = C.GoBytes(unsafe.Pointer(outbuf), C.int(outbuflen)) + C.free(unsafe.Pointer(outbuf)) + } + if ret != 0 { + err = RadosError(int(ret)) + return nil, info, err + } + + return +} diff --git a/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/doc.go b/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/doc.go new file mode 100644 index 00000000000..14babe93a71 --- /dev/null +++ b/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/doc.go @@ -0,0 +1,4 @@ +/* +Set of wrappers around librados API. +*/ +package rados diff --git a/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/ioctx.go b/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/ioctx.go new file mode 100644 index 00000000000..161d2912d8d --- /dev/null +++ b/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/ioctx.go @@ -0,0 +1,494 @@ +package rados + +// #cgo LDFLAGS: -lrados +// #include +// #include +import "C" + +import "unsafe" +import "time" + +// PoolStat represents Ceph pool statistics. +type PoolStat struct { + // space used in bytes + Num_bytes uint64 + // space used in KB + Num_kb uint64 + // number of objects in the pool + Num_objects uint64 + // number of clones of objects + Num_object_clones uint64 + // num_objects * num_replicas + Num_object_copies uint64 + Num_objects_missing_on_primary uint64 + // number of objects found on no OSDs + Num_objects_unfound uint64 + // number of objects replicated fewer times than they should be + // (but found on at least one OSD) + Num_objects_degraded uint64 + Num_rd uint64 + Num_rd_kb uint64 + Num_wr uint64 + Num_wr_kb uint64 +} + +// ObjectStat represents an object stat information +type ObjectStat struct { + // current length in bytes + Size uint64 + // last modification time + ModTime time.Time +} + +// IOContext represents a context for performing I/O within a pool. +type IOContext struct { + ioctx C.rados_ioctx_t +} + +// Pointer returns a uintptr representation of the IOContext. +func (ioctx *IOContext) Pointer() uintptr { + return uintptr(ioctx.ioctx) +} + +// Write writes len(data) bytes to the object with key oid starting at byte +// offset offset. It returns an error, if any. +func (ioctx *IOContext) Write(oid string, data []byte, offset uint64) error { + c_oid := C.CString(oid) + defer C.free(unsafe.Pointer(c_oid)) + + ret := C.rados_write(ioctx.ioctx, c_oid, + (*C.char)(unsafe.Pointer(&data[0])), + (C.size_t)(len(data)), + (C.uint64_t)(offset)) + + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +// Read reads up to len(data) bytes from the object with key oid starting at byte +// offset offset. It returns the number of bytes read and an error, if any. +func (ioctx *IOContext) Read(oid string, data []byte, offset uint64) (int, error) { + if len(data) == 0 { + return 0, nil + } + + c_oid := C.CString(oid) + defer C.free(unsafe.Pointer(c_oid)) + + ret := C.rados_read( + ioctx.ioctx, + c_oid, + (*C.char)(unsafe.Pointer(&data[0])), + (C.size_t)(len(data)), + (C.uint64_t)(offset)) + + if ret >= 0 { + return int(ret), nil + } else { + return 0, RadosError(int(ret)) + } +} + +// Delete deletes the object with key oid. It returns an error, if any. +func (ioctx *IOContext) Delete(oid string) error { + c_oid := C.CString(oid) + defer C.free(unsafe.Pointer(c_oid)) + + ret := C.rados_remove(ioctx.ioctx, c_oid) + + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +// Truncate resizes the object with key oid to size size. If the operation +// enlarges the object, the new area is logically filled with zeroes. If the +// operation shrinks the object, the excess data is removed. It returns an +// error, if any. +func (ioctx *IOContext) Truncate(oid string, size uint64) error { + c_oid := C.CString(oid) + defer C.free(unsafe.Pointer(c_oid)) + + ret := C.rados_trunc(ioctx.ioctx, c_oid, (C.uint64_t)(size)) + + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +// Destroy informs librados that the I/O context is no longer in use. +// Resources associated with the context may not be freed immediately, and the +// context should not be used again after calling this method. +func (ioctx *IOContext) Destroy() { + C.rados_ioctx_destroy(ioctx.ioctx) +} + +// Stat returns a set of statistics about the pool associated with this I/O +// context. +func (ioctx *IOContext) GetPoolStats() (stat PoolStat, err error) { + c_stat := C.struct_rados_pool_stat_t{} + ret := C.rados_ioctx_pool_stat(ioctx.ioctx, &c_stat) + if ret < 0 { + return PoolStat{}, RadosError(int(ret)) + } else { + return PoolStat{ + Num_bytes: uint64(c_stat.num_bytes), + Num_kb: uint64(c_stat.num_kb), + Num_objects: uint64(c_stat.num_objects), + Num_object_clones: uint64(c_stat.num_object_clones), + Num_object_copies: uint64(c_stat.num_object_copies), + Num_objects_missing_on_primary: uint64(c_stat.num_objects_missing_on_primary), + Num_objects_unfound: uint64(c_stat.num_objects_unfound), + Num_objects_degraded: uint64(c_stat.num_objects_degraded), + Num_rd: uint64(c_stat.num_rd), + Num_rd_kb: uint64(c_stat.num_rd_kb), + Num_wr: uint64(c_stat.num_wr), + Num_wr_kb: uint64(c_stat.num_wr_kb), + }, nil + } +} + +// GetPoolName returns the name of the pool associated with the I/O context. +func (ioctx *IOContext) GetPoolName() (name string, err error) { + buf := make([]byte, 128) + for { + ret := C.rados_ioctx_get_pool_name(ioctx.ioctx, + (*C.char)(unsafe.Pointer(&buf[0])), C.unsigned(len(buf))) + if ret == -34 { // FIXME + buf = make([]byte, len(buf)*2) + continue + } else if ret < 0 { + return "", RadosError(ret) + } + name = C.GoStringN((*C.char)(unsafe.Pointer(&buf[0])), ret) + return name, nil + } +} + +// ObjectListFunc is the type of the function called for each object visited +// by ListObjects. +type ObjectListFunc func(oid string) + +// ListObjects lists all of the objects in the pool associated with the I/O +// context, and called the provided listFn function for each object, passing +// to the function the name of the object. +func (ioctx *IOContext) ListObjects(listFn ObjectListFunc) error { + var ctx C.rados_list_ctx_t + ret := C.rados_objects_list_open(ioctx.ioctx, &ctx) + if ret < 0 { + return RadosError(ret) + } + defer func() { C.rados_objects_list_close(ctx) }() + + for { + var c_entry *C.char + ret := C.rados_objects_list_next(ctx, &c_entry, nil) + if ret == -2 { // FIXME + return nil + } else if ret < 0 { + return RadosError(ret) + } + listFn(C.GoString(c_entry)) + } + + panic("invalid state") +} + +// Stat returns the size of the object and its last modification time +func (ioctx *IOContext) Stat(object string) (stat ObjectStat, err error) { + var c_psize C.uint64_t + var c_pmtime C.time_t + c_object := C.CString(object) + defer C.free(unsafe.Pointer(c_object)) + + ret := C.rados_stat( + ioctx.ioctx, + c_object, + &c_psize, + &c_pmtime) + + if ret < 0 { + return ObjectStat{}, RadosError(int(ret)) + } else { + return ObjectStat{ + Size: uint64(c_psize), + ModTime: time.Unix(int64(c_pmtime), 0), + }, nil + } +} + +// GetXattr gets an xattr with key `name`, it returns the length of +// the key read or an error if not successful +func (ioctx *IOContext) GetXattr(object string, name string, data []byte) (int, error) { + c_object := C.CString(object) + c_name := C.CString(name) + defer C.free(unsafe.Pointer(c_object)) + defer C.free(unsafe.Pointer(c_name)) + + ret := C.rados_getxattr( + ioctx.ioctx, + c_object, + c_name, + (*C.char)(unsafe.Pointer(&data[0])), + (C.size_t)(len(data))) + + if ret >= 0 { + return int(ret), nil + } else { + return 0, RadosError(int(ret)) + } +} + +// Sets an xattr for an object with key `name` with value as `data` +func (ioctx *IOContext) SetXattr(object string, name string, data []byte) error { + c_object := C.CString(object) + c_name := C.CString(name) + defer C.free(unsafe.Pointer(c_object)) + defer C.free(unsafe.Pointer(c_name)) + + ret := C.rados_setxattr( + ioctx.ioctx, + c_object, + c_name, + (*C.char)(unsafe.Pointer(&data[0])), + (C.size_t)(len(data))) + + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +// function that lists all the xattrs for an object, since xattrs are +// a k-v pair, this function returns a map of k-v pairs on +// success, error code on failure +func (ioctx *IOContext) ListXattrs(oid string) (map[string][]byte, error) { + c_oid := C.CString(oid) + defer C.free(unsafe.Pointer(c_oid)) + + var it C.rados_xattrs_iter_t + + ret := C.rados_getxattrs(ioctx.ioctx, c_oid, &it) + if ret < 0 { + return nil, RadosError(ret) + } + defer func() { C.rados_getxattrs_end(it) }() + m := make(map[string][]byte) + for { + var c_name, c_val *C.char + var c_len C.size_t + defer C.free(unsafe.Pointer(c_name)) + defer C.free(unsafe.Pointer(c_val)) + + ret := C.rados_getxattrs_next(it, &c_name, &c_val, &c_len) + if ret < 0 { + return nil, RadosError(int(ret)) + } + // rados api returns a null name,val & 0-length upon + // end of iteration + if c_name == nil { + return m, nil // stop iteration + } + m[C.GoString(c_name)] = C.GoBytes(unsafe.Pointer(c_val), (C.int)(c_len)) + } +} + +// Remove an xattr with key `name` from object `oid` +func (ioctx *IOContext) RmXattr(oid string, name string) error { + c_oid := C.CString(oid) + c_name := C.CString(name) + defer C.free(unsafe.Pointer(c_oid)) + defer C.free(unsafe.Pointer(c_name)) + + ret := C.rados_rmxattr( + ioctx.ioctx, + c_oid, + c_name) + + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +// Append the map `pairs` to the omap `oid` +func (ioctx *IOContext) SetOmap(oid string, pairs map[string][]byte) error { + c_oid := C.CString(oid) + defer C.free(unsafe.Pointer(c_oid)) + + var s C.size_t + var c *C.char + ptrSize := unsafe.Sizeof(c) + + c_keys := C.malloc(C.size_t(len(pairs)) * C.size_t(ptrSize)) + c_values := C.malloc(C.size_t(len(pairs)) * C.size_t(ptrSize)) + c_lengths := C.malloc(C.size_t(len(pairs)) * C.size_t(unsafe.Sizeof(s))) + + defer C.free(unsafe.Pointer(c_keys)) + defer C.free(unsafe.Pointer(c_values)) + defer C.free(unsafe.Pointer(c_lengths)) + + i := 0 + for key, value := range pairs { + // key + c_key_ptr := (**C.char)(unsafe.Pointer(uintptr(c_keys) + uintptr(i) * ptrSize)) + *c_key_ptr = C.CString(key) + defer C.free(unsafe.Pointer(*c_key_ptr)) + + // value and its length + c_value_ptr := (**C.char)(unsafe.Pointer(uintptr(c_values) + uintptr(i) * ptrSize)) + + var c_length C.size_t + if len(value) > 0 { + *c_value_ptr = (*C.char)(unsafe.Pointer(&value[0])) + c_length = C.size_t(len(value)) + } else { + *c_value_ptr = nil + c_length = C.size_t(0) + } + + c_length_ptr := (*C.size_t)(unsafe.Pointer(uintptr(c_lengths) + uintptr(i) * ptrSize)) + *c_length_ptr = c_length + + i++ + } + + op := C.rados_create_write_op() + C.rados_write_op_omap_set( + op, + (**C.char)(c_keys), + (**C.char)(c_values), + (*C.size_t)(c_lengths), + C.size_t(len(pairs))) + + ret := C.rados_write_op_operate(op, ioctx.ioctx, c_oid, nil, 0) + C.rados_release_write_op(op) + + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +// Fetch a set of keys and their values from an omap and returns then as a map +// `startAfter`: retrieve only the keys after this specified one +// `filterPrefix`: retrieve only the keys beginning with this prefix +// `maxReturn`: retrieve no more than `maxReturn` key/value pairs +func (ioctx *IOContext) GetOmapValues(oid string, startAfter string, filterPrefix string, maxReturn int64) (map[string][]byte, error) { + c_oid := C.CString(oid) + c_start_after := C.CString(startAfter) + c_filter_prefix := C.CString(filterPrefix) + c_max_return := C.uint64_t(maxReturn) + + defer C.free(unsafe.Pointer(c_oid)) + defer C.free(unsafe.Pointer(c_start_after)) + defer C.free(unsafe.Pointer(c_filter_prefix)) + + op := C.rados_create_read_op() + + var c_iter C.rados_omap_iter_t + var c_prval C.int + C.rados_read_op_omap_get_vals( + op, + c_start_after, + c_filter_prefix, + c_max_return, + &c_iter, + &c_prval, + ) + + ret := C.rados_read_op_operate(op, ioctx.ioctx, c_oid, 0) + omap := map[string][]byte{} + + if int(c_prval) != 0 { + return omap, RadosError(int(c_prval)) + } else if int(ret) != 0 { + return omap, RadosError(int(ret)) + } + + for { + var c_key *C.char + var c_val *C.char + var c_len C.size_t + + ret = C.rados_omap_get_next(c_iter, &c_key, &c_val, &c_len) + + if int(ret) != 0 { + return omap, RadosError(int(ret)) + } + + if c_key == nil { + break + } + + omap[C.GoString(c_key)] = C.GoBytes(unsafe.Pointer(c_val), C.int(c_len)) + } + + C.rados_omap_get_end(c_iter) + C.rados_release_read_op(op) + + return omap, nil +} + +// Remove the specified `keys` from the omap `oid` +func (ioctx *IOContext) RmOmapKeys(oid string, keys []string) error { + c_oid := C.CString(oid) + defer C.free(unsafe.Pointer(c_oid)) + + var c *C.char + ptrSize := unsafe.Sizeof(c) + + c_keys := C.malloc(C.size_t(len(keys)) * C.size_t(ptrSize)) + defer C.free(unsafe.Pointer(c_keys)) + + i := 0 + for _, key := range keys { + c_key_ptr := (**C.char)(unsafe.Pointer(uintptr(c_keys) + uintptr(i) * ptrSize)) + *c_key_ptr = C.CString(key) + defer C.free(unsafe.Pointer(*c_key_ptr)) + i++ + } + + op := C.rados_create_write_op() + C.rados_write_op_omap_rm_keys( + op, + (**C.char)(c_keys), + C.size_t(len(keys))) + + ret := C.rados_write_op_operate(op, ioctx.ioctx, c_oid, nil, 0) + C.rados_release_write_op(op) + + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} + +// Clear the omap `oid` +func (ioctx *IOContext) CleanOmap(oid string) error { + c_oid := C.CString(oid) + defer C.free(unsafe.Pointer(c_oid)) + + op := C.rados_create_write_op() + C.rados_write_op_omap_clear(op) + + ret := C.rados_write_op_operate(op, ioctx.ioctx, c_oid, nil, 0) + C.rados_release_write_op(op) + + if ret == 0 { + return nil + } else { + return RadosError(int(ret)) + } +} diff --git a/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/rados.go b/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/rados.go new file mode 100644 index 00000000000..935bc248610 --- /dev/null +++ b/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/rados.go @@ -0,0 +1,54 @@ +package rados + +// #cgo LDFLAGS: -lrados +// #include +// #include +import "C" + +import ( + "fmt" + "unsafe" +) + +type RadosError int + +func (e RadosError) Error() string { + return fmt.Sprintf("rados: ret=%d", e) +} + +// Version returns the major, minor, and patch components of the version of +// the RADOS library linked against. +func Version() (int, int, int) { + var c_major, c_minor, c_patch C.int + C.rados_version(&c_major, &c_minor, &c_patch) + return int(c_major), int(c_minor), int(c_patch) +} + +// NewConn creates a new connection object. It returns the connection and an +// error, if any. +func NewConn() (*Conn, error) { + conn := &Conn{} + ret := C.rados_create(&conn.cluster, nil) + + if ret == 0 { + return conn, nil + } else { + return nil, RadosError(int(ret)) + } +} + +// NewConnWithUser creates a new connection object with a custom username. +// It returns the connection and an error, if any. +func NewConnWithUser(user string) (*Conn, error) { + c_user := C.CString(user) + defer C.free(unsafe.Pointer(c_user)) + + conn := &Conn{} + ret := C.rados_create(&conn.cluster, c_user) + + if ret == 0 { + return conn, nil + } else { + return nil, RadosError(int(ret)) + } +} diff --git a/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/rados_test.go b/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/rados_test.go new file mode 100644 index 00000000000..454da2ed8ad --- /dev/null +++ b/Godeps/_workspace/src/github.com/noahdesu/go-ceph/rados/rados_test.go @@ -0,0 +1,680 @@ +package rados_test + +import "testing" + +//import "bytes" +import "github.com/noahdesu/go-ceph/rados" +import "github.com/stretchr/testify/assert" +import "os" +import "os/exec" +import "io" +import "io/ioutil" +import "time" +import "net" +import "fmt" +import "sort" +import "encoding/json" + +func GetUUID() string { + out, _ := exec.Command("uuidgen").Output() + return string(out[:36]) +} + +func TestVersion(t *testing.T) { + var major, minor, patch = rados.Version() + assert.False(t, major < 0 || major > 1000, "invalid major") + assert.False(t, minor < 0 || minor > 1000, "invalid minor") + assert.False(t, patch < 0 || patch > 1000, "invalid patch") +} + +func TestGetSetConfigOption(t *testing.T) { + conn, _ := rados.NewConn() + + // rejects invalid options + err := conn.SetConfigOption("wefoijweojfiw", "welfkwjelkfj") + assert.Error(t, err, "Invalid option") + + // verify SetConfigOption changes a values + log_file_val, err := conn.GetConfigOption("log_file") + assert.NotEqual(t, log_file_val, "/dev/null") + + err = conn.SetConfigOption("log_file", "/dev/null") + assert.NoError(t, err, "Invalid option") + + log_file_val, err = conn.GetConfigOption("log_file") + assert.Equal(t, log_file_val, "/dev/null") +} + +func TestParseDefaultConfigEnv(t *testing.T) { + conn, _ := rados.NewConn() + + log_file_val, _ := conn.GetConfigOption("log_file") + assert.NotEqual(t, log_file_val, "/dev/null") + + err := os.Setenv("CEPH_ARGS", "--log-file /dev/null") + assert.NoError(t, err) + + err = conn.ParseDefaultConfigEnv() + assert.NoError(t, err) + + log_file_val, _ = conn.GetConfigOption("log_file") + assert.Equal(t, log_file_val, "/dev/null") +} + +func TestParseCmdLineArgs(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + + mon_host_val, _ := conn.GetConfigOption("mon_host") + assert.NotEqual(t, mon_host_val, "1.1.1.1") + + args := []string{"--mon-host", "1.1.1.1"} + err := conn.ParseCmdLineArgs(args) + assert.NoError(t, err) + + mon_host_val, _ = conn.GetConfigOption("mon_host") + assert.Equal(t, mon_host_val, "1.1.1.1") +} + +func TestGetClusterStats(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + poolname := GetUUID() + err := conn.MakePool(poolname) + assert.NoError(t, err) + + pool, err := conn.OpenIOContext(poolname) + assert.NoError(t, err) + + // grab current stats + prev_stat, err := conn.GetClusterStats() + fmt.Printf("prev_stat: %+v\n", prev_stat) + assert.NoError(t, err) + + // make some changes to the cluster + buf := make([]byte, 1<<20) + for i := 0; i < 10; i++ { + objname := GetUUID() + pool.Write(objname, buf, 0) + } + + // wait a while for the stats to change + for i := 0; i < 30; i++ { + stat, err := conn.GetClusterStats() + assert.NoError(t, err) + + // wait for something to change + if stat == prev_stat { + fmt.Printf("curr_stat: %+v (trying again...)\n", stat) + time.Sleep(time.Second) + } else { + // success + fmt.Printf("curr_stat: %+v (change detected)\n", stat) + conn.Shutdown() + return + } + } + + pool.Destroy() + conn.Shutdown() + t.Error("Cluster stats aren't changing") +} + +func TestGetFSID(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + fsid, err := conn.GetFSID() + assert.NoError(t, err) + assert.NotEqual(t, fsid, "") + + conn.Shutdown() +} + +func TestGetInstanceID(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + id := conn.GetInstanceID() + assert.NotEqual(t, id, 0) + + conn.Shutdown() +} + +func TestMakeDeletePool(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + // get current list of pool + pools, err := conn.ListPools() + assert.NoError(t, err) + + // check that new pool name is unique + new_name := GetUUID() + for _, poolname := range pools { + if new_name == poolname { + t.Error("Random pool name exists!") + return + } + } + + // create pool + err = conn.MakePool(new_name) + assert.NoError(t, err) + + // get updated list of pools + pools, err = conn.ListPools() + assert.NoError(t, err) + + // verify that the new pool name exists + found := false + for _, poolname := range pools { + if new_name == poolname { + found = true + } + } + + if !found { + t.Error("Cannot find newly created pool") + } + + // delete the pool + err = conn.DeletePool(new_name) + assert.NoError(t, err) + + // verify that it is gone + + // get updated list of pools + pools, err = conn.ListPools() + assert.NoError(t, err) + + // verify that the new pool name exists + found = false + for _, poolname := range pools { + if new_name == poolname { + found = true + } + } + + if found { + t.Error("Deleted pool still exists") + } + + conn.Shutdown() +} + +func TestPingMonitor(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + // mon id that should work with vstart.sh + reply, err := conn.PingMonitor("a") + if err == nil { + assert.NotEqual(t, reply, "") + return + } + + // mon id that should work with micro-osd.sh + reply, err = conn.PingMonitor("0") + if err == nil { + assert.NotEqual(t, reply, "") + return + } + + // try to use a hostname as the monitor id + mon_addr, _ := conn.GetConfigOption("mon_host") + hosts, _ := net.LookupAddr(mon_addr) + for _, host := range hosts { + reply, err := conn.PingMonitor(host) + if err == nil { + assert.NotEqual(t, reply, "") + return + } + } + + t.Error("Could not find a valid monitor id") + + conn.Shutdown() +} + +func TestReadConfigFile(t *testing.T) { + conn, _ := rados.NewConn() + + // check current log_file value + log_file_val, err := conn.GetConfigOption("log_file") + assert.NoError(t, err) + assert.NotEqual(t, log_file_val, "/dev/null") + + // create a temporary ceph.conf file that changes the log_file conf + // option. + file, err := ioutil.TempFile("/tmp", "go-rados") + assert.NoError(t, err) + + _, err = io.WriteString(file, "[global]\nlog_file = /dev/null\n") + assert.NoError(t, err) + + // parse the config file + err = conn.ReadConfigFile(file.Name()) + assert.NoError(t, err) + + // check current log_file value + log_file_val, err = conn.GetConfigOption("log_file") + assert.NoError(t, err) + assert.Equal(t, log_file_val, "/dev/null") + + // cleanup + file.Close() + os.Remove(file.Name()) +} + +func TestWaitForLatestOSDMap(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + err := conn.WaitForLatestOSDMap() + assert.NoError(t, err) + + conn.Shutdown() +} + +func TestReadWrite(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + // make pool + pool_name := GetUUID() + err := conn.MakePool(pool_name) + assert.NoError(t, err) + + pool, err := conn.OpenIOContext(pool_name) + assert.NoError(t, err) + + bytes_in := []byte("input data") + err = pool.Write("obj", bytes_in, 0) + assert.NoError(t, err) + + bytes_out := make([]byte, len(bytes_in)) + n_out, err := pool.Read("obj", bytes_out, 0) + + assert.Equal(t, n_out, len(bytes_in)) + assert.Equal(t, bytes_in, bytes_out) + + pool.Destroy() +} + +func TestObjectStat(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + pool_name := GetUUID() + err := conn.MakePool(pool_name) + assert.NoError(t, err) + + pool, err := conn.OpenIOContext(pool_name) + assert.NoError(t, err) + + bytes_in := []byte("input data") + err = pool.Write("obj", bytes_in, 0) + assert.NoError(t, err) + + stat, err := pool.Stat("obj") + assert.Equal(t, uint64(len(bytes_in)), stat.Size) + assert.NotNil(t, stat.ModTime) + + pool.Destroy() + conn.Shutdown() +} + +func TestGetPoolStats(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + poolname := GetUUID() + err := conn.MakePool(poolname) + assert.NoError(t, err) + + pool, err := conn.OpenIOContext(poolname) + assert.NoError(t, err) + + // grab current stats + prev_stat, err := pool.GetPoolStats() + fmt.Printf("prev_stat: %+v\n", prev_stat) + assert.NoError(t, err) + + // make some changes to the cluster + buf := make([]byte, 1<<20) + for i := 0; i < 10; i++ { + objname := GetUUID() + pool.Write(objname, buf, 0) + } + + // wait a while for the stats to change + for i := 0; i < 30; i++ { + stat, err := pool.GetPoolStats() + assert.NoError(t, err) + + // wait for something to change + if stat == prev_stat { + fmt.Printf("curr_stat: %+v (trying again...)\n", stat) + time.Sleep(time.Second) + } else { + // success + fmt.Printf("curr_stat: %+v (change detected)\n", stat) + conn.Shutdown() + return + } + } + + pool.Destroy() + conn.Shutdown() + t.Error("Pool stats aren't changing") +} + +func TestGetPoolName(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + poolname := GetUUID() + err := conn.MakePool(poolname) + assert.NoError(t, err) + + ioctx, err := conn.OpenIOContext(poolname) + assert.NoError(t, err) + + poolname_ret, err := ioctx.GetPoolName() + assert.NoError(t, err) + + assert.Equal(t, poolname, poolname_ret) + + ioctx.Destroy() + conn.Shutdown() +} + +func TestMonCommand(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + command, err := json.Marshal(map[string]string{"prefix": "df", "format": "json"}) + assert.NoError(t, err) + + buf, info, err := conn.MonCommand(command) + assert.NoError(t, err) + assert.Equal(t, info, "") + + var message map[string]interface{} + err = json.Unmarshal(buf, &message) + assert.NoError(t, err) + + conn.Shutdown() +} + +func TestObjectIterator(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + poolname := GetUUID() + err := conn.MakePool(poolname) + assert.NoError(t, err) + + ioctx, err := conn.OpenIOContext(poolname) + assert.NoError(t, err) + + objectList := []string{} + err = ioctx.ListObjects(func(oid string) { + objectList = append(objectList, oid) + }) + assert.NoError(t, err) + assert.True(t, len(objectList) == 0) + + createdList := []string{} + for i := 0; i < 200; i++ { + oid := GetUUID() + bytes_in := []byte("input data") + err = ioctx.Write(oid, bytes_in, 0) + assert.NoError(t, err) + createdList = append(createdList, oid) + } + assert.True(t, len(createdList) == 200) + + err = ioctx.ListObjects(func(oid string) { + objectList = append(objectList, oid) + }) + assert.NoError(t, err) + assert.Equal(t, len(objectList), len(createdList)) + + sort.Strings(objectList) + sort.Strings(createdList) + + assert.Equal(t, objectList, createdList) +} + +func TestNewConnWithUser(t *testing.T) { + _, err := rados.NewConnWithUser("admin") + assert.Equal(t, err, nil) +} + +func TestReadWriteXattr(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + // make pool + pool_name := GetUUID() + err := conn.MakePool(pool_name) + assert.NoError(t, err) + + pool, err := conn.OpenIOContext(pool_name) + assert.NoError(t, err) + + bytes_in := []byte("input data") + err = pool.Write("obj", bytes_in, 0) + assert.NoError(t, err) + + my_xattr_in := []byte("my_value") + err = pool.SetXattr("obj", "my_key", my_xattr_in) + assert.NoError(t, err) + + my_xattr_out := make([]byte, len(my_xattr_in)) + n_out, err := pool.GetXattr("obj", "my_key", my_xattr_out) + + assert.Equal(t, n_out, len(my_xattr_in)) + assert.Equal(t, my_xattr_in, my_xattr_out) + + pool.Destroy() +} + +func TestListXattrs(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + // make pool + pool_name := GetUUID() + err := conn.MakePool(pool_name) + assert.NoError(t, err) + + pool, err := conn.OpenIOContext(pool_name) + assert.NoError(t, err) + + bytes_in := []byte("input data") + err = pool.Write("obj", bytes_in, 0) + assert.NoError(t, err) + + input_xattrs := make(map[string][]byte) + for i := 0; i < 200; i++ { + name := fmt.Sprintf("key_%d", i) + data := []byte(GetUUID()) + err = pool.SetXattr("obj", name, data) + assert.NoError(t, err) + input_xattrs[name] = data + } + + output_xattrs := make(map[string][]byte) + output_xattrs, err = pool.ListXattrs("obj") + assert.NoError(t, err) + assert.Equal(t, len(input_xattrs), len(output_xattrs)) + assert.Equal(t, input_xattrs, output_xattrs) + + pool.Destroy() +} + +func TestRmXattr(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + pool_name := GetUUID() + err := conn.MakePool(pool_name) + assert.NoError(t, err) + + pool, err := conn.OpenIOContext(pool_name) + assert.NoError(t, err) + + bytes_in := []byte("input data") + err = pool.Write("obj", bytes_in, 0) + assert.NoError(t, err) + + key := "key1" + val := []byte("val1") + err = pool.SetXattr("obj", key, val) + assert.NoError(t, err) + + key = "key2" + val = []byte("val2") + err = pool.SetXattr("obj", key, val) + assert.NoError(t, err) + + xattr_list := make(map[string][]byte) + xattr_list, err = pool.ListXattrs("obj") + assert.NoError(t, err) + assert.Equal(t, len(xattr_list), 2) + + pool.RmXattr("obj", "key2") + xattr_list, err = pool.ListXattrs("obj") + assert.NoError(t, err) + assert.Equal(t, len(xattr_list), 1) + + found := false + for key, _ = range xattr_list { + if key == "key2" { + found = true + } + + } + + if found { + t.Error("Deleted pool still exists") + } + + pool.Destroy() +} + +func TestReadWriteOmap(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + pool_name := GetUUID() + err := conn.MakePool(pool_name) + assert.NoError(t, err) + + pool, err := conn.OpenIOContext(pool_name) + assert.NoError(t, err) + + // Set + orig := map[string][]byte{ + "key1": []byte("value1"), + "key2": []byte("value2"), + "prefixed-key3": []byte("value3"), + "empty": []byte(""), + } + + err = pool.SetOmap("obj", orig) + assert.NoError(t, err) + + // Get + fetched, err := pool.GetOmapValues("obj", "", "", 4) + assert.NoError(t, err) + assert.Equal(t, orig, fetched) + + // Remove + err = pool.RmOmapKeys("obj", []string{"key1", "prefixed-key3"}) + assert.NoError(t, err) + + fetched, err = pool.GetOmapValues("obj", "", "", 4) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{ + "key2": []byte("value2"), + "empty": []byte(""), + }, fetched) + + // Clear + err = pool.CleanOmap("obj") + assert.NoError(t, err) + + fetched, err = pool.GetOmapValues("obj", "", "", 4) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{}, fetched) + + pool.Destroy() +} + +func TestReadFilterOmap(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + pool_name := GetUUID() + err := conn.MakePool(pool_name) + assert.NoError(t, err) + + pool, err := conn.OpenIOContext(pool_name) + assert.NoError(t, err) + + orig := map[string][]byte{ + "key1": []byte("value1"), + "prefixed-key3": []byte("value3"), + "key2": []byte("value2"), + } + + err = pool.SetOmap("obj", orig) + assert.NoError(t, err) + + // filter by prefix + fetched, err := pool.GetOmapValues("obj", "", "prefixed", 4) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{ + "prefixed-key3": []byte("value3"), + }, fetched) + + // "start_after" a key + fetched, err = pool.GetOmapValues("obj", "key1", "", 4) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{ + "prefixed-key3": []byte("value3"), + "key2": []byte("value2"), + }, fetched) + + // maxReturn + fetched, err = pool.GetOmapValues("obj", "", "key", 1) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{ + "key1": []byte("value1"), + }, fetched) + + pool.Destroy() +} + diff --git a/circle.yml b/circle.yml index 656efa69302..472e5ea6077 100644 --- a/circle.yml +++ b/circle.yml @@ -3,6 +3,8 @@ machine: pre: # Install gvm - bash < <(curl -s -S -L https://raw.githubusercontent.com/moovweb/gvm/1.0.22/binscripts/gvm-installer) + # Install ceph to test rados driver & create pool + - bash contrib/ceph/micro-osd.sh /tmp/osd-data && ceph osd pool create docker-distribution 1 post: # Install many go versions @@ -20,6 +22,8 @@ machine: # BASE_BLEED: ../../../$HOME/.gvm/pkgsets/bleed/global/$BASE_DIR # Workaround Circle parsing dumb bugs and/or YAML wonkyness CIRCLE_PAIN: "mode: set" + # Ceph pool + RADOS_POOL: "docker-distribution" hosts: # Not used yet diff --git a/cmd/registry/main.go b/cmd/registry/main.go index 52eecf8f217..30112411fbf 100644 --- a/cmd/registry/main.go +++ b/cmd/registry/main.go @@ -25,6 +25,7 @@ import ( _ "github.com/docker/distribution/registry/storage/driver/filesystem" _ "github.com/docker/distribution/registry/storage/driver/inmemory" _ "github.com/docker/distribution/registry/storage/driver/middleware/cloudfront" + _ "github.com/docker/distribution/registry/storage/driver/rados" _ "github.com/docker/distribution/registry/storage/driver/s3" "github.com/docker/distribution/version" gorhandlers "github.com/gorilla/handlers" diff --git a/contrib/ceph/micro-osd.sh b/contrib/ceph/micro-osd.sh new file mode 100755 index 00000000000..8b1f2b8f936 --- /dev/null +++ b/contrib/ceph/micro-osd.sh @@ -0,0 +1,90 @@ +# +# Copyright (C) 2013,2014 Loic Dachary +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +set -e +set -u + +DIR=$1 + +if ! dpkg -l ceph ; then + wget -q -O- 'https://ceph.com/git/?p=ceph.git;a=blob_plain;f=keys/release.asc' | sudo apt-key add - + echo deb http://ceph.com/debian-dumpling/ $(lsb_release -sc) main | sudo tee /etc/apt/sources.list.d/ceph.list + sudo apt-get update + sudo apt-get --yes install ceph ceph-common +fi + +# get rid of process and directories leftovers +pkill ceph-mon || true +pkill ceph-osd || true +rm -fr $DIR + +# cluster wide parameters +mkdir -p ${DIR}/log +cat >> $DIR/ceph.conf <> $DIR/ceph.conf <> $DIR/ceph.conf < r.size-r.offset { + bufferSize = r.size - r.offset + } + + // Fill `b` + for bufferOffset < bufferSize { + // Get the offset in the object chunk + chunkedOid, chunkedOffset := r.driver.GetChunkNameFromOffset(r.oid, r.offset) + + // Determine the best size to read + bufferEndOffset := bufferSize + if bufferEndOffset-bufferOffset > r.driver.chunksize-chunkedOffset { + bufferEndOffset = bufferOffset + (r.driver.chunksize - chunkedOffset) + } + + // Read the chunk + n, err = r.driver.Ioctx.Read(chunkedOid, b[bufferOffset:bufferEndOffset], chunkedOffset) + + if err != nil { + return int(bufferOffset), err + } + + bufferOffset += uint64(n) + r.offset += uint64(n) + } + + // EOF if the offset is at the end of the object + if r.offset == r.size { + return int(bufferOffset), io.EOF + } + + return int(bufferOffset), nil +} + +func (r *readStreamReader) Close() error { + return nil +} + +func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { + // get oid from filename + oid, err := d.GetOid(path) + + if err != nil { + return nil, err + } + + // get object stat + stat, err := d.Stat(path) + + if err != nil { + return nil, err + } + + if offset > stat.Size() { + return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} + } + + return &readStreamReader{ + driver: d, + oid: oid, + size: uint64(stat.Size()), + offset: uint64(offset), + }, nil +} + +func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (totalRead int64, err error) { + buf := make([]byte, d.chunksize) + totalRead = 0 + + oid, err := d.GetOid(path) + if err != nil { + switch err.(type) { + // Trying to write new object, generate new blob identifier for it + case storagedriver.PathNotFoundError: + oid = d.GenerateOid() + err = d.PutOid(path, oid) + if err != nil { + return 0, err + } + default: + return 0, err + } + } else { + totalSize, err := d.GetXattrTotalSize(oid) + if err != nil { + return 0, err + } + + // If offset if after the current object size, fill the gap with zeros + for totalSize < uint64(offset) { + sizeToWrite := d.chunksize + if totalSize-uint64(offset) < sizeToWrite { + sizeToWrite = totalSize - uint64(offset) + } + + chunkName, chunkOffset := d.GetChunkNameFromOffset(oid, uint64(totalSize)) + err = d.Ioctx.Write(chunkName, buf[:sizeToWrite], uint64(chunkOffset)) + if err != nil { + return totalRead, err + } + + totalSize += sizeToWrite + } + } + + // Writer + for { + // Align to chunk size + sizeRead := uint64(0) + sizeToRead := uint64(offset+totalRead) % d.chunksize + if sizeToRead == 0 { + sizeToRead = d.chunksize + } + + // Read from `reader` + for sizeRead < sizeToRead { + nn, err := reader.Read(buf[sizeRead:sizeToRead]) + sizeRead += uint64(nn) + + if err != nil { + if err != io.EOF { + return totalRead, err + } + + break + } + } + + // End of file and nothing was read + if sizeRead == 0 { + break + } + + // Write chunk object + chunkName, chunkOffset := d.GetChunkNameFromOffset(oid, uint64(offset+totalRead)) + err = d.Ioctx.Write(chunkName, buf[:sizeRead], uint64(chunkOffset)) + + if err != nil { + return totalRead, err + } + + // Update total object size as xattr in the first chunk of the object + err = d.SetXattrTotalSize(oid, uint64(offset+totalRead)+sizeRead) + if err != nil { + return totalRead, err + } + + totalRead += int64(sizeRead) + + // End of file + if sizeRead < sizeToRead { + break + } + } + + return totalRead, nil +} + +// Stat retrieves the FileInfo for the given path, including the current size +func (d *driver) Stat(path string) (storagedriver.FileInfo, error) { + // get oid from filename + oid, err := d.GetOid(path) + + if err != nil { + return nil, err + } + + // the path is a virtual directory? + if oid == "" { + return storagedriver.FileInfoInternal{ + FileInfoFields: storagedriver.FileInfoFields{ + Path: path, + Size: 0, + IsDir: true, + }, + }, nil + } + + // stat first chunk + stat, err := d.Ioctx.Stat(oid + "-0") + + if err != nil { + return nil, err + } + + // get total size of chunked object + totalSize, err := d.GetXattrTotalSize(oid) + + if err != nil { + return nil, err + } + + return storagedriver.FileInfoInternal{ + FileInfoFields: storagedriver.FileInfoFields{ + Path: path, + Size: int64(totalSize), + ModTime: stat.ModTime, + }, + }, nil +} + +// List returns a list of the objects that are direct descendants of the given path. +func (d *driver) List(path string) ([]string, error) { + files, err := d.ListDirectoryOid(path) + + if err != nil { + return nil, err + } + + keys := make([]string, 0, len(files)) + for k := range files { + keys = append(keys, filepath.Join(path, k)) + } + + return keys, nil +} + +// Move moves an object stored at sourcePath to destPath, removing the original +// object. +func (d *driver) Move(sourcePath string, destPath string) error { + // Get oid + oid, err := d.GetOid(sourcePath) + + if err != nil { + return err + } + + // Move reference + err = d.PutOid(destPath, oid) + + if err != nil { + return err + } + + // Delete old reference + err = d.DeleteOid(sourcePath) + + if err != nil { + return err + } + + return nil +} + +// Delete recursively deletes all objects stored at "path" and its subpaths. +func (d *driver) Delete(path string) error { + // Get oid + oid, err := d.GetOid(path) + + if err != nil { + return err + } + + // Deleting virtual directory + if oid == "" { + objects, err := d.ListDirectoryOid(path) + if err != nil { + return err + } + + for object := range objects { + err = d.Delete(filepath.Join(path, object)) + if err != nil { + return err + } + } + } else { + // Delete object chunks + totalSize, err := d.GetXattrTotalSize(oid) + + if err != nil { + return err + } + + for offset := uint64(0); offset < totalSize; offset += d.chunksize { + chunkName, _ := d.GetChunkNameFromOffset(oid, offset) + + err = d.Ioctx.Delete(chunkName) + if err != nil { + return err + } + } + + // Delete reference + err = d.DeleteOid(path) + if err != nil { + return err + } + } + + return nil +} + +// URLFor returns a URL which may be used to retrieve the content stored at the given path. +// May return an UnsupportedMethodErr in certain StorageDriver implementations. +func (d *driver) URLFor(path string, options map[string]interface{}) (string, error) { + return "", storagedriver.ErrUnsupportedMethod +} + +// Generate a blob identifier +func (d *driver) GenerateOid() string { + return objectBlobPrefix + uuid.New() +} + +// Reference a object and its hierarchy +func (d *driver) PutOid(path string, oid string) error { + directory := filepath.Dir(path) + base := filepath.Base(path) + createParentReference := true + + // After creating this reference, skip the parents referencing since the + // hierarchy already exists + if oid == "" { + firstReference, err := d.Ioctx.GetOmapValues(directory, "", "", 1) + if (err == nil) && (len(firstReference) > 0) { + createParentReference = false + } + } + + oids := map[string][]byte{ + base: []byte(oid), + } + + // Reference object + err := d.Ioctx.SetOmap(directory, oids) + if err != nil { + return err + } + + // Esure parent virtual directories + if createParentReference && directory != "/" { + return d.PutOid(directory, "") + } + + return nil +} + +// Get the object identifier from an object name +func (d *driver) GetOid(path string) (string, error) { + directory := filepath.Dir(path) + base := filepath.Base(path) + + files, err := d.Ioctx.GetOmapValues(directory, "", base, 1) + + if (err != nil) || (files[base] == nil) { + return "", storagedriver.PathNotFoundError{Path: path} + } + + return string(files[base][:]), nil +} + +// List the objects of a virtual directory +func (d *driver) ListDirectoryOid(path string) (map[string][]byte, error) { + return d.Ioctx.GetOmapValues(path, "", "", defaultMaxKeysFetched) +} + +// Remove a file from the files hierarchy +func (d *driver) DeleteOid(path string) error { + // Remove object reference + directory := filepath.Dir(path) + base := filepath.Base(path) + err := d.Ioctx.RmOmapKeys(directory, []string{base}) + + if err != nil { + return err + } + + // Remove virtual directory if empty (no more references) + firstReference, err := d.Ioctx.GetOmapValues(directory, "", "", 1) + + if err != nil { + return err + } + + if len(firstReference) == 0 { + // Delete omap + err := d.Ioctx.Delete(directory) + + if err != nil { + return err + } + + // Remove reference on parent omaps + if directory != "/" { + return d.DeleteOid(directory) + } + } + + return nil +} + +// Takes an offset in an chunked object and return the chunk name and a new +// offset in this chunk object +func (d *driver) GetChunkNameFromOffset(oid string, offset uint64) (string, uint64) { + chunkID := offset / d.chunksize + chunkedOid := oid + "-" + strconv.FormatInt(int64(chunkID), 10) + chunkedOffset := offset % d.chunksize + return chunkedOid, chunkedOffset +} + +// Set the total size of a chunked object `oid` +func (d *driver) SetXattrTotalSize(oid string, size uint64) error { + // Convert uint64 `size` to []byte + xattr := make([]byte, binary.MaxVarintLen64) + binary.LittleEndian.PutUint64(xattr, size) + + // Save the total size as a xattr in the first chunk + return d.Ioctx.SetXattr(oid+"-0", defaultXattrTotalSizeName, xattr) +} + +// Get the total size of the chunked object `oid` stored as xattr +func (d *driver) GetXattrTotalSize(oid string) (uint64, error) { + // Fetch xattr as []byte + xattr := make([]byte, binary.MaxVarintLen64) + xattrLength, err := d.Ioctx.GetXattr(oid+"-0", defaultXattrTotalSizeName, xattr) + + if err != nil { + return 0, err + } + + if xattrLength != len(xattr) { + log.Errorf("object %s xattr length mismatch: %d != %d", oid, xattrLength, len(xattr)) + return 0, storagedriver.PathNotFoundError{Path: oid} + } + + // Convert []byte as uint64 + totalSize := binary.LittleEndian.Uint64(xattr) + + return totalSize, nil +} diff --git a/registry/storage/driver/rados/rados_test.go b/registry/storage/driver/rados/rados_test.go new file mode 100644 index 00000000000..29486e896f2 --- /dev/null +++ b/registry/storage/driver/rados/rados_test.go @@ -0,0 +1,38 @@ +package rados + +import ( + "os" + "testing" + + storagedriver "github.com/docker/distribution/registry/storage/driver" + "github.com/docker/distribution/registry/storage/driver/testsuites" + + "gopkg.in/check.v1" +) + +// Hook up gocheck into the "go test" runner. +func Test(t *testing.T) { check.TestingT(t) } + +func init() { + poolname := os.Getenv("RADOS_POOL") + username := os.Getenv("RADOS_USER") + + driverConstructor := func() (storagedriver.StorageDriver, error) { + parameters := DriverParameters{ + poolname, + username, + defaultChunkSize, + } + + return New(parameters) + } + + skipCheck := func() string { + if poolname == "" { + return "RADOS_POOL must be set to run Rado tests" + } + return "" + } + + testsuites.RegisterInProcessSuite(driverConstructor, skipCheck) +}