Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-29375-SPARK-28940-whole-plan-reuse
Browse files Browse the repository at this point in the history
# Conflicts:
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14a.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14a.sf100/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14a/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14a/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23a.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23a.sf100/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23a/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23a/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23b.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23b.sf100/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23b/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23b/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24a.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24a.sf100/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24a/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24a/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24b.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24b.sf100/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24b/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24b/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q35.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q35/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14a.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14a.sf100/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14a/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14a/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q24.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q24.sf100/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q24/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q24/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q64/explain.txt
  • Loading branch information
peter-toth committed Jun 17, 2021
2 parents bf29f1a + abf9675 commit c346387
Show file tree
Hide file tree
Showing 140 changed files with 8,276 additions and 8,188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ void putAll(BitArray array) {
this.bitCount = bitCount;
}

/** Combines the two BitArrays using bitwise AND. */
void and(BitArray array) {
assert data.length == array.data.length : "BitArrays must be of equal length when merging";
long bitCount = 0;
for (int i = 0; i < data.length; i++) {
data[i] &= array.data[i];
bitCount += Long.bitCount(data[i]);
}
this.bitCount = bitCount;
}

void writeTo(DataOutputStream out) throws IOException {
out.writeInt(data.length);
for (long datum : data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ int getVersionNumber() {
*/
public abstract BloomFilter mergeInPlace(BloomFilter other) throws IncompatibleMergeException;

/**
* Combines this bloom filter with another bloom filter by performing a bitwise AND of the
* underlying data. The mutations happen to <b>this</b> instance. Callers must ensure the
* bloom filters are appropriately sized to avoid saturating them.
*
* @param other The bloom filter to combine this bloom filter with. It is not mutated.
* @throws IncompatibleMergeException if {@code isCompatible(other) == false}
*/
public abstract BloomFilter intersectInPlace(BloomFilter other) throws IncompatibleMergeException;

/**
* Returns {@code true} if the element <i>might</i> have been put in this Bloom filter,
* {@code false} if this is <i>definitely</i> not the case.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,22 @@ public boolean isCompatible(BloomFilter other) {

@Override
public BloomFilter mergeInPlace(BloomFilter other) throws IncompatibleMergeException {
BloomFilterImpl otherImplInstance = checkCompatibilityForMerge(other);

this.bits.putAll(otherImplInstance.bits);
return this;
}

@Override
public BloomFilter intersectInPlace(BloomFilter other) throws IncompatibleMergeException {
BloomFilterImpl otherImplInstance = checkCompatibilityForMerge(other);

this.bits.and(otherImplInstance.bits);
return this;
}

private BloomFilterImpl checkCompatibilityForMerge(BloomFilter other)
throws IncompatibleMergeException {
// Duplicates the logic of `isCompatible` here to provide better error message.
if (other == null) {
throw new IncompatibleMergeException("Cannot merge null bloom filter");
Expand All @@ -215,9 +231,7 @@ public BloomFilter mergeInPlace(BloomFilter other) throws IncompatibleMergeExcep
"Cannot merge bloom filters with different number of hash functions"
);
}

this.bits.putAll(that.bits);
return this;
return that;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,39 @@ class BloomFilterSuite extends AnyFunSuite { // scalastyle:ignore funsuite
}
}

def testIntersectInPlace[T: ClassTag]
(typeName: String, numItems: Int)(itemGen: Random => T): Unit = {
test(s"intersectInPlace - $typeName") {
// use a fixed seed to make the test predictable.
val r = new Random(37)

val items1 = Array.fill(numItems / 2)(itemGen(r))
val items2 = Array.fill(numItems / 2)(itemGen(r))

val filter1 = BloomFilter.create(numItems / 2)
items1.foreach(filter1.put)

val filter2 = BloomFilter.create(numItems / 2)
items2.foreach(filter2.put)

filter1.intersectInPlace(filter2)

val common_items = items1.intersect(items2)
common_items.foreach(i => assert(filter1.mightContain(i)))

// After intersect, `filter1` still has `numItems/2` items
// which doesn't exceed `expectedNumItems`,
// so the `expectedFpp` should not be higher than the default one.
assert(filter1.expectedFpp() - BloomFilter.DEFAULT_FPP < EPSILON)

checkSerDe(filter1)
}
}

def testItemType[T: ClassTag](typeName: String, numItems: Int)(itemGen: Random => T): Unit = {
testAccuracy[T](typeName, numItems)(itemGen)
testMergeInPlace[T](typeName, numItems)(itemGen)
testIntersectInPlace[T](typeName, numItems)(itemGen)
}

testItemType[Byte]("Byte", 160) { _.nextInt().toByte }
Expand Down
7 changes: 4 additions & 3 deletions dev/deps/spark-deps-hadoop-3.2-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ flatbuffers-java/1.9.0//flatbuffers-java-1.9.0.jar
generex/1.0.2//generex-1.0.2.jar
gson/2.2.4//gson-2.2.4.jar
guava/14.0.1//guava-14.0.1.jar
hadoop-client-api/3.2.2//hadoop-client-api-3.2.2.jar
hadoop-client-runtime/3.2.2//hadoop-client-runtime-3.2.2.jar
hadoop-yarn-server-web-proxy/3.2.2//hadoop-yarn-server-web-proxy-3.2.2.jar
hadoop-client-api/3.3.1//hadoop-client-api-3.3.1.jar
hadoop-client-runtime/3.3.1//hadoop-client-runtime-3.3.1.jar
hadoop-shaded-guava/1.1.1//hadoop-shaded-guava-1.1.1.jar
hadoop-yarn-server-web-proxy/3.3.1//hadoop-yarn-server-web-proxy-3.3.1.jar
hive-beeline/2.3.9//hive-beeline-2.3.9.jar
hive-cli/2.3.9//hive-cli-2.3.9.jar
hive-common/2.3.9//hive-common-2.3.9.jar
Expand Down
2 changes: 1 addition & 1 deletion docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ license: |

- In Spark 3.2, `FloatType` is mapped to `FLOAT` in MySQL. Prior to this, it used to be mapped to `REAL`, which is by default a synonym to `DOUBLE PRECISION` in MySQL.

- In Spark 3.2, the query executions triggered by `DataFrameWriter` are always named `command` when being sent to `QueryExecutionListener`. In Spark 3.1 and earlier, the name is one of `save`, `insertInto`, `saveAsTable`, `create`, `append`, `overwrite`, `overwritePartitions`, `replace`.
- In Spark 3.2, the query executions triggered by `DataFrameWriter` are always named `command` when being sent to `QueryExecutionListener`. In Spark 3.1 and earlier, the name is one of `save`, `insertInto`, `saveAsTable`.

## Upgrading from Spark SQL 3.0 to 3.1

Expand Down
15 changes: 14 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@
<sbt.project.name>spark</sbt.project.name>
<slf4j.version>1.7.30</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<hadoop.version>3.2.2</hadoop.version>
<hadoop.version>3.3.1</hadoop.version>
<protobuf.version>2.5.0</protobuf.version>
<yarn.version>${hadoop.version}</yarn.version>
<zookeeper.version>3.6.2</zookeeper.version>
Expand Down Expand Up @@ -195,6 +195,7 @@
<maven-antrun.version>1.8</maven-antrun.version>
<commons-crypto.version>1.1.0</commons-crypto.version>
<commons-cli.version>1.2</commons-cli.version>
<bouncycastle.version>1.60</bouncycastle.version>
<!--
If you are changing Arrow version specification, please check
./python/pyspark/sql/pandas/utils.py, and ./python/setup.py too.
Expand Down Expand Up @@ -1192,6 +1193,18 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<version>${bouncycastle.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<version>${bouncycastle.version}</version>
<scope>test</scope>
</dependency>
<!-- Managed up to match Hadoop in HADOOP-16530 -->
<dependency>
<groupId>xerces</groupId>
Expand Down
5 changes: 4 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ object MimaExcludes {
ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getChild"),

// [SPARK-35135][CORE] Turn WritablePartitionedIterator from trait into a default implementation class
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.util.collection.WritablePartitionedIterator")
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.util.collection.WritablePartitionedIterator"),

// [SPARK-35757][CORE] Add bitwise AND operation and functionality for intersecting bloom filters
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.util.sketch.BloomFilter.intersectInPlace")
)

// Exclude rules for 3.1.x
Expand Down
123 changes: 111 additions & 12 deletions python/docs/source/development/contributing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,94 @@ Preparing to Contribute Code Changes
------------------------------------

Before starting to work on codes in PySpark, it is recommended to read `the general guidelines <https://spark.apache.org/contributing.html>`_.
There are a couple of additional notes to keep in mind when contributing to codes in PySpark:
Additionally, there are a couple of additional notes to keep in mind when contributing to codes in PySpark:

* Be Pythonic
See `The Zen of Python <https://www.python.org/dev/peps/pep-0020/>`_.

* Match APIs with Scala and Java sides
Apache Spark is an unified engine that provides a consistent API layer. In general, the APIs are consistently supported across other languages.

* PySpark-specific APIs can be accepted
As long as they are Pythonic and do not conflict with other existent APIs, it is fine to raise a API request, for example, decorator usage of UDFs.

* Adjust the corresponding type hints if you extend or modify public API
See `Contributing and Maintaining Type Hints`_ for details.

If you are fixing pandas API on Spark (``pyspark.pandas``) package, please consider the design principles below:

* Return pandas-on-Spark data structure for big data, and pandas data structure for small data
Often developers face the question whether a particular function should return a pandas-on-Spark DataFrame/Series, or a pandas DataFrame/Series. The principle is: if the returned object can be large, use a pandas-on-Spark DataFrame/Series. If the data is bound to be small, use a pandas DataFrame/Series. For example, ``DataFrame.dtypes`` return a pandas Series, because the number of columns in a DataFrame is bounded and small, whereas ``DataFrame.head()`` or ``Series.unique()`` returns a pandas-on-Spark DataFrame/Series, because the resulting object can be large.

* Provide discoverable APIs for common data science tasks
At the risk of overgeneralization, there are two API design approaches: the first focuses on providing APIs for common tasks; the second starts with abstractions, and enables users to accomplish their tasks by composing primitives. While the world is not black and white, pandas takes more of the former approach, while Spark has taken more of the latter.

One example is value count (count by some key column), one of the most common operations in data science. pandas ``DataFrame.value_count`` returns the result in sorted order, which in 90% of the cases is what users prefer when exploring data, whereas Spark's does not sort, which is more desirable when building data pipelines, as users can accomplish the pandas behavior by adding an explicit ``orderBy``.

Similar to pandas, pandas API on Spark should also lean more towards the former, providing discoverable APIs for common data science tasks. In most cases, this principle is well taken care of by simply implementing pandas' APIs. However, there will be circumstances in which pandas' APIs don't address a specific need, e.g. plotting for big data.

* Guardrails to prevent users from shooting themselves in the foot
Certain operations in pandas are prohibitively expensive as data scales, and we don't want to give users the illusion that they can rely on such operations in pandas API on Spark. That is to say, methods implemented in pandas API on Spark should be safe to perform by default on large datasets. As a result, the following capabilities are not implemented in pandas API on Spark:

* Capabilities that are fundamentally not parallelizable: e.g. imperatively looping over each element
* Capabilities that require materializing the entire working set in a single node's memory. This is why we do not implement `pandas.DataFrame.to_xarray <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_xarray.html>`_. Another example is the ``_repr_html_`` call caps the total number of records shown to a maximum of 1000, to prevent users from blowing up their driver node simply by typing the name of the DataFrame in a notebook.

A few exceptions, however, exist. One common pattern with "big data science" is that while the initial dataset is large, the working set becomes smaller as the analysis goes deeper. For example, data scientists often perform aggregation on datasets and want to then convert the aggregated dataset to some local data structure. To help data scientists, we offer the following:

* ``DataFrame.to_pandas``: returns a pandas DataFrame (pandas-on-Spark only)
* ``DataFrame.to_numpy``: returns a numpy array, works with both pandas and pandas API on Spark

Note that it is clear from the names that these functions return some local data structure that would require materializing data in a single node's memory. For these functions, we also explicitly document them with a warning note that the resulting data structure must be small.


Environment Setup
-----------------

Prerequisite
~~~~~~~~~~~~

PySpark development requires to build Spark that needs a proper JDK installed, etc. See `Building Spark <https://spark.apache.org/docs/latest/building-spark.html>`_ for more details.

Conda
~~~~~

If you are using Conda, the development environment can be set as follows.

.. code-block:: bash
# Python 3.6+ is required
conda create --name pyspark-dev-env python=3.9
conda activate pyspark-dev-env
pip install -r dev/requirements.txt
Once it is set up, make sure you switch to `pyspark-dev-env` before starting the development:

.. code-block:: bash
conda activate pyspark-dev-env
Now, you can start developing and `running the tests <testing.rst>`_.

pip
~~~

With Python 3.6+, pip can be used as below to install and set up the development environment.

.. code-block:: bash
pip install -r dev/requirements.txt
Now, you can start developing and `running the tests <testing.rst>`_.

* Be Pythonic.
* APIs are matched with Scala and Java sides in general.
* PySpark specific APIs can still be considered as long as they are Pythonic and do not conflict with other existent APIs, for example, decorator usage of UDFs.
* If you extend or modify public API, please adjust corresponding type hints. See `Contributing and Maintaining Type Hints`_ for details.

Contributing and Maintaining Type Hints
----------------------------------------

PySpark type hints are provided using stub files, placed in the same directory as the annotated module, with exception to ``# type: ignore`` in modules which don't have their own stubs (tests, examples and non-public API).
PySpark type hints are provided using stub files, placed in the same directory as the annotated module, with exception to:

* ``# type: ignore`` in modules which don't have their own stubs (tests, examples and non-public API).
* pandas API on Spark (``pyspark.pandas`` package) where the type hints are inlined.

As a rule of thumb, only public API is annotated.

Annotations should, when possible:
Expand Down Expand Up @@ -122,16 +199,38 @@ Annotations can be validated using ``dev/lint-python`` script or by invoking myp
mypy --config python/mypy.ini python/pyspark
Code and Docstring Guide
----------------------------------
------------------------

Code Conventions
~~~~~~~~~~~~~~~~

Please follow the style of the existing codebase as is, which is virtually PEP 8 with one exception: lines can be up
to 100 characters in length, not 79.
For the docstring style, PySpark follows `NumPy documentation style <https://numpydoc.readthedocs.io/en/latest/format.html>`_.

Note that the method and variable names in PySpark are the similar case is ``threading`` library in Python itself where
the APIs were inspired by Java. PySpark also follows `camelCase` for exposed APIs that match with Scala and Java.
There is an exception ``functions.py`` that uses `snake_case`. It was in order to make APIs SQL (and Python) friendly.
Note that:

* the method and variable names in PySpark are the similar case is ``threading`` library in Python itself where the APIs were inspired by Java. PySpark also follows `camelCase` for exposed APIs that match with Scala and Java.

* In contrast, ``functions.py`` uses `snake_case` in order to make APIs SQL (and Python) friendly.

* In addition, pandas-on-Spark (``pyspark.pandas``) also uses `snake_case` because this package is free from API consistency with other languages.

PySpark leverages linters such as `pycodestyle <https://pycodestyle.pycqa.org/en/latest/>`_ and `flake8 <https://flake8.pycqa.org/en/latest/>`_, which ``dev/lint-python`` runs. Therefore, make sure to run that script to double check.


Docstring Conventions
~~~~~~~~~~~~~~~~~~~~~

PySpark follows `NumPy documentation style <https://numpydoc.readthedocs.io/en/latest/format.html>`_.


Doctest Conventions
~~~~~~~~~~~~~~~~~~~

In general, doctests should be grouped logically by separating a newline.

For instance, the first block is for the statements for preparation, the second block is for using the function with a specific argument,
and third block is for another argument. As a example, please refer `DataFrame.rsub <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.rsub.html#pandas.DataFrame.rsub>`_ in pandas.

These blocks should be consistently separated in PySpark doctests, and more doctests should be added if the coverage of the doctests or the number of examples to show is not enough.
7 changes: 0 additions & 7 deletions python/docs/source/development/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,3 @@ Development
debugging
setting_ide

For pandas API on Spark:

.. toctree::
:maxdepth: 2

ps_contributing
ps_design
Loading

0 comments on commit c346387

Please sign in to comment.