diff --git a/docs/sphinx/api.rst b/docs/sphinx/api.rst index 1c70205e0..ec4ac29fb 100644 --- a/docs/sphinx/api.rst +++ b/docs/sphinx/api.rst @@ -9,59 +9,9 @@ or ``Context.textFile("path/to/textfile.txt")``. These two methods return an ``RDD`` which can then be processed with the methods below. -RDD ---- +.. toctree:: + :maxdepth: 2 -.. autoclass:: pysparkling.RDD - :members: - -.. autoclass:: pysparkling.StatCounter - :members: - - -Context -------- - -A ``Context`` describes the setup. Instantiating a Context with the default -arguments using ``Context()`` is the most lightweight setup. All data is just -in the local thread and is never serialized or deserialized. - -If you want to process the data in parallel, you can use the ``multiprocessing`` -module. Given the limitations of the default ``pickle`` serializer, you can -specify to serialize all methods with ``cloudpickle`` instead. For example, -a common instantiation with ``multiprocessing`` looks like this: - -.. code-block:: python - - c = Context( - multiprocessing.Pool(4), - serializer=cloudpickle.dumps, - deserializer=pickle.loads, - ) - -This assumes that your data is serializable with ``pickle`` which is generally -faster. You can also specify a custom serializer/deserializer for data. - -.. autoclass:: pysparkling.Context - :members: - - -fileio ------- - -The functionality provided by this module is used in ``Context.textFile()`` -for reading and in ``RDD.saveAsTextFile()`` for writing. You can use this -submodule for writing files directly with ``File(filename).dump(some_data)``, -``File(filename).load()`` and ``File.exists(path)`` to read, write and check -for existance of a file. All methods transparently handle various schemas -(for example ``http://``, ``s3://`` and ``file://``) and -compression/decompression of ``.gz`` and ``.bz2`` files (among others). - -Use environment variables ``AWS_SECRET_ACCESS_KEY`` and ``AWS_ACCESS_KEY_ID`` -for auth and use file paths of the form ``s3://bucket_name/filename.txt``. - -.. autoclass:: pysparkling.fileio.File - :members: - -.. autoclass:: pysparkling.fileio.TextFile - :members: + api_rdd + api_context + api_fileio diff --git a/docs/sphinx/api_context.rst b/docs/sphinx/api_context.rst new file mode 100644 index 000000000..0525f72db --- /dev/null +++ b/docs/sphinx/api_context.rst @@ -0,0 +1,29 @@ +.. _api_context: + +.. currentmodule:: pysparkling + +Context +------- + +A :class:`Context` describes the setup. Instantiating a Context with the default +arguments using ``Context()`` is the most lightweight setup. All data is just +in the local thread and is never serialized or deserialized. + +If you want to process the data in parallel, you can use the ``multiprocessing`` +module. Given the limitations of the default ``pickle`` serializer, you can +specify to serialize all methods with ``cloudpickle`` instead. For example, +a common instantiation with ``multiprocessing`` looks like this: + +.. code-block:: python + + c = Context( + multiprocessing.Pool(4), + serializer=cloudpickle.dumps, + deserializer=pickle.loads, + ) + +This assumes that your data is serializable with ``pickle`` which is generally +faster. You can also specify a custom serializer/deserializer for data. + +.. autoclass:: pysparkling.Context + :members: diff --git a/docs/sphinx/api_fileio.rst b/docs/sphinx/api_fileio.rst new file mode 100644 index 000000000..9aee45475 --- /dev/null +++ b/docs/sphinx/api_fileio.rst @@ -0,0 +1,43 @@ +.. _api_fileio: + + +fileio +------ + +.. currentmodule:: pysparkling + +The functionality provided by this module is used in :func:`Context.textFile` +for reading and in :func:`RDD.saveAsTextFile` for writing. + +.. currentmodule:: pysparkling.fileio + +You can use this submodule for writing files directly with :func:`File.dump`, +:func:`File.load` and :func:`File.exists` to read, write and check +for existance of a file. All methods transparently handle various schemas +(for example ``http://``, ``s3://`` and ``file://``) and +compression/decompression of ``.gz`` and ``.bz2`` files (among others). + + +.. autoclass:: pysparkling.fileio.File + :members: + +.. autoclass:: pysparkling.fileio.TextFile + :members: + +.. autoclass:: pysparkling.fileio.fs.FileSystem + :members: + +.. autoclass:: pysparkling.fileio.fs.S3 + :members: + +.. autoclass:: pysparkling.fileio.fs.GS + :members: + +.. autoclass:: pysparkling.fileio.fs.Hdfs + :members: + +.. autoclass:: pysparkling.fileio.fs.Http + :members: + +.. autoclass:: pysparkling.fileio.fs.Local + :members: diff --git a/docs/sphinx/api_rdd.rst b/docs/sphinx/api_rdd.rst new file mode 100644 index 000000000..f9d36e4f9 --- /dev/null +++ b/docs/sphinx/api_rdd.rst @@ -0,0 +1,10 @@ +.. _api_rdd: + +RDD +--- + +.. autoclass:: pysparkling.RDD + :members: + +.. autoclass:: pysparkling.StatCounter + :members: diff --git a/pysparkling/fileio/codec/codec.py b/pysparkling/fileio/codec/codec.py index c057cfaa4..74dad25d2 100644 --- a/pysparkling/fileio/codec/codec.py +++ b/pysparkling/fileio/codec/codec.py @@ -4,11 +4,22 @@ class Codec(object): + """Codec.""" def __init__(self): pass def compress(self, stream): + """Compress. + + :param io.BytesIO stream: Uncompressed input stream. + :rtype: io.BytesIO + """ return stream def decompress(self, stream): + """Decompress. + + :param io.BytesIO stream: Compressed input stream. + :rtype: io.BytesIO + """ return stream diff --git a/pysparkling/fileio/file.py b/pysparkling/fileio/file.py index 268a7f6cf..1fb7c0cb0 100644 --- a/pysparkling/fileio/file.py +++ b/pysparkling/fileio/file.py @@ -10,7 +10,7 @@ class File(object): - """file object + """File object. :param file_name: Any file name. """ diff --git a/pysparkling/fileio/fs/file_system.py b/pysparkling/fileio/fs/file_system.py index 422b30f89..07f607656 100644 --- a/pysparkling/fileio/fs/file_system.py +++ b/pysparkling/fileio/fs/file_system.py @@ -4,22 +4,57 @@ class FileSystem(object): + """Interface class for the file system. + + :param str file_name: File name. + """ def __init__(self, file_name): self.file_name = file_name @staticmethod def resolve_filenames(expr): + """Resolve the given glob-like expression to filenames. + + :rtype: list + """ log.error('Cannot resolve: {0}'.format(expr)) def exists(self): + """Check whether the given file_name exists. + + :rtype: bool + """ log.warning('Could not determine whether {0} exists due to ' 'unhandled scheme.'.format(self.file_name)) def load(self): + """Load a file to a stream. + + :rtype: io.BytesIO + """ + log.error('Cannot load: {0}'.format(self.file_name)) + + def load_text(self, encoding='utf8', encoding_errors='ignore'): + """Load a file to a stream. + + :param str encoding: Text encoding. + :param str encoding_errors: How to handle encoding errors. + + :rtype: io.StringIO + """ log.error('Cannot load: {0}'.format(self.file_name)) def dump(self, stream): + """Dump a stream to a file. + + :param io.BytesIO stream: Input tream. + """ log.error('Cannot dump: {0}'.format(self.file_name)) def make_public(self, recursive=False): + """Make the file public (only on some file systems). + + :param bool recursive: Recurse. + :rtype: FileSystem + """ log.warning('Cannot make {0} public.'.format(self.file_name)) diff --git a/pysparkling/fileio/fs/gs.py b/pysparkling/fileio/fs/gs.py index 165e1f6c9..3797a10f0 100644 --- a/pysparkling/fileio/fs/gs.py +++ b/pysparkling/fileio/fs/gs.py @@ -17,7 +17,18 @@ class GS(FileSystem): + """:class:`.FileSystem` implementation for Google Storage. + + Paths are of the form `gs://bucket_name/file_path` or + `gs://project_name:bucket_name/file_path`. + """ + + #: Set a default project name. project_name = None + + #: Default mime type. + mime_type = 'text/plain' + _clients = {} def __init__(self, file_name): @@ -101,7 +112,7 @@ def load_text(self, encoding='utf8', encoding_errors='ignore'): def dump(self, stream): log.debug('Dumping to {0}.'.format(self.blob.name)) - self.blob.upload_from_string(stream.read()) + self.blob.upload_from_string(stream.read(), mime_type=self.mime_type) return self def make_public(self, recursive=False): diff --git a/pysparkling/fileio/fs/hdfs.py b/pysparkling/fileio/fs/hdfs.py index 2d3b83b7b..717d81562 100644 --- a/pysparkling/fileio/fs/hdfs.py +++ b/pysparkling/fileio/fs/hdfs.py @@ -17,6 +17,8 @@ class Hdfs(FileSystem): + """:class:`.FileSystem` implementation for HDFS.""" + _conn = {} def __init__(self, file_name): diff --git a/pysparkling/fileio/fs/http.py b/pysparkling/fileio/fs/http.py index 7fa25f9a4..75c1be87b 100644 --- a/pysparkling/fileio/fs/http.py +++ b/pysparkling/fileio/fs/http.py @@ -15,6 +15,8 @@ class Http(FileSystem): + """:class:`.FileSystem` implementation for HTTP.""" + def __init__(self, file_name): if requests is None: raise FileSystemNotSupported( diff --git a/pysparkling/fileio/fs/local.py b/pysparkling/fileio/fs/local.py index e349197ed..fd4b97044 100644 --- a/pysparkling/fileio/fs/local.py +++ b/pysparkling/fileio/fs/local.py @@ -12,6 +12,8 @@ class Local(FileSystem): + """:class:`.FileSystem` implementation for the local file system.""" + def __init__(self, file_name): super(Local, self).__init__(file_name) diff --git a/pysparkling/fileio/fs/s3.py b/pysparkling/fileio/fs/s3.py index fc27fe1ff..aef829128 100644 --- a/pysparkling/fileio/fs/s3.py +++ b/pysparkling/fileio/fs/s3.py @@ -17,6 +17,17 @@ class S3(FileSystem): + """:class:`.FileSystem` implementation for S3. + + Use environment variables ``AWS_SECRET_ACCESS_KEY`` and + ``AWS_ACCESS_KEY_ID`` for auth and use file paths of the form + ``s3://bucket_name/filename.txt``. + """ + + #: Keyword arguments for new connections. + #: Example: set to `{'anon': True}` for anonymous connections. + connection_kwargs = {} + _conn = None def __init__(self, file_name): @@ -30,20 +41,20 @@ def __init__(self, file_name): t.next('://') # skip scheme bucket_name = t.next('/') key_name = t.next() - conn = S3._get_conn() + conn = self._get_conn() bucket = conn.get_bucket(bucket_name, validate=False) self.key = bucket.get_key(key_name) if not self.key: self.key = bucket.new_key(key_name) - @staticmethod - def _get_conn(): - if not S3._conn: - S3._conn = boto.connect_s3() - return S3._conn + @classmethod + def _get_conn(cls): + if not cls._conn: + cls._conn = boto.connect_s3(**cls.connection_kwargs) + return cls._conn - @staticmethod - def resolve_filenames(expr): + @classmethod + def resolve_filenames(cls, expr): files = [] t = Tokenizer(expr) @@ -51,7 +62,7 @@ def resolve_filenames(expr): bucket_name = t.next('/') prefix = t.next(['*', '?']) - bucket = S3._get_conn().get_bucket( + bucket = cls._get_conn().get_bucket( bucket_name, validate=False ) @@ -70,7 +81,7 @@ def exists(self): t.next('//') # skip scheme bucket_name = t.next('/') key_name = t.next() - conn = S3._get_conn() + conn = self._get_conn() bucket = conn.get_bucket(bucket_name, validate=False) return (bucket.get_key(key_name) or bucket.list(prefix='{}/'.format(key_name))) diff --git a/pysparkling/fileio/textfile.py b/pysparkling/fileio/textfile.py index 09fb6f3f4..935f75934 100644 --- a/pysparkling/fileio/textfile.py +++ b/pysparkling/fileio/textfile.py @@ -5,6 +5,7 @@ from . import codec from .file import File +from .fs.file_system import FileSystem log = logging.getLogger(__name__) @@ -16,7 +17,7 @@ class TextFile(File): - """Derived from :class:`pysparkling.fileio.File`. + """Derived from :class:`File`. :param file_name: Any text file name. """ @@ -27,13 +28,12 @@ def __init__(self, file_name): def load(self, encoding='utf8', encoding_errors='ignore'): """Load the data from a file. - :param encoding: (optional) - The character encoding of the file. - + :param str encoding: The character encoding of the file. + :param str encoding_errors: How to handle encoding errors. :rtype: io.StringIO """ if type(self.codec) == codec.Codec and \ - hasattr(self.fs, 'load_text'): + self.fs.load_text != FileSystem.load_text: stream = self.fs.load_text(encoding, encoding_errors) else: stream = self.fs.load()