1. 22 9月, 2021 9 次提交
    • A
      [SPARK-36753][SQL] ArrayExcept handle duplicated Double.NaN and Float.NaN · a7cbe699
      Angerszhuuuu 提交于
      ### What changes were proposed in this pull request?
      For query
      ```
      select array_except(array(cast('nan' as double), 1d), array(cast('nan' as double)))
      ```
      This returns [NaN, 1d], but it should return [1d].
      This issue is caused by `OpenHashSet` can't handle `Double.NaN` and `Float.NaN` too.
      In this pr fix this based on https://github.com/apache/spark/pull/33955
      
      ### Why are the changes needed?
      Fix bug
      
      ### Does this PR introduce _any_ user-facing change?
      ArrayExcept won't show handle equal `NaN` value
      
      ### How was this patch tested?
      Added UT
      
      Closes #33994 from AngersZhuuuu/SPARK-36753.
      Authored-by: NAngerszhuuuu <angers.zhu@gmail.com>
      Signed-off-by: NWenchen Fan <wenchen@databricks.com>
      a7cbe699
    • I
      [SPARK-36803][SQL] Fix ArrayType conversion when reading Parquet files written in legacy mode · ec26d94e
      Ivan Sadikov 提交于
      ### What changes were proposed in this pull request?
      
      This PR fixes an issue when reading of a Parquet file written with legacy mode would fail due to incorrect Parquet LIST to ArrayType conversion.
      
      The issue arises when using schema evolution and utilising the parquet-mr reader. 2-level LIST annotated types could be parsed incorrectly as 3-level LIST annotated types because their underlying element type does not match the full inferred Catalyst schema.
      
      ### Why are the changes needed?
      
      It appears to be a long-standing issue with the legacy mode due to the imprecise check in ParquetRowConverter that was trying to determine Parquet backward compatibility using Catalyst schema: `DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType)` in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala#L606.
      
      ### Does this PR introduce _any_ user-facing change?
      No
      
      ### How was this patch tested?
      
      Added a new test case in ParquetInteroperabilitySuite.scala.
      
      Closes #34044 from sadikovi/parquet-legacy-write-mode-list-issue.
      Authored-by: NIvan Sadikov <ivan.sadikov@databricks.com>
      Signed-off-by: NWenchen Fan <wenchen@databricks.com>
      ec26d94e
    • U
      [SPARK-36822][SQL] BroadcastNestedLoopJoinExec should use all condition... · d90b4791
      ulysses-you 提交于
      [SPARK-36822][SQL] BroadcastNestedLoopJoinExec should use all condition instead of non-equi condition
      
      ### What changes were proposed in this pull request?
      
      Change `nonEquiCond` to all join condition at `JoinSelection.ExtractEquiJoinKeys` pattern.
      
      ### Why are the changes needed?
      
      At `JoinSelection`, with `ExtractEquiJoinKeys`, we use `nonEquiCond` as the join condition. It's wrong since there should exist some equi condition.
      ```
      Seq(joins.BroadcastNestedLoopJoinExec(
        planLater(left), planLater(right), buildSide, joinType, nonEquiCond))
      ```
      But it's should not be a bug, since we always use the smj as the default join strategy for ExtractEquiJoinKeys.
      
      ### Does this PR introduce _any_ user-facing change?
      
      no
      
      ### How was this patch tested?
      
      it's not a bug for now, but we should fix it in case we use this code path in future.
      
      Closes #34065 from ulysses-you/join-condition.
      Authored-by: Nulysses-you <ulyssesyou18@gmail.com>
      Signed-off-by: NWenchen Fan <wenchen@databricks.com>
      d90b4791
    • H
      [SPARK-36760][SQL] Add interface SupportsPushDownV2Filters · 1d7d9720
      Huaxin Gao 提交于
      Co-Authored-By: DB Tsai d_tsaiapple.com
      Co-Authored-By: Huaxin Gao huaxin_gaoapple.com
      ### What changes were proposed in this pull request?
      This is the 2nd PR for V2 Filter support. This PR does the following:
      
      - Add interface SupportsPushDownV2Filters
      
      Future work:
      - refactor `OrcFilters`, `ParquetFilters`, `JacksonParser`, `UnivocityParser` so both V1 file source and V2 file source can use them
      - For V2 file source: implement  v2 filter -> parquet/orc filter. csv and Json don't have real filters, but also need to change the current code to have v2 filter -> `JacksonParser`/`UnivocityParser`
      - For V1 file source, keep what we currently have: v1 filter -> parquet/orc filter
      - We don't need v1filter.toV2 and v2filter.toV1 since we have two separate paths
      
      The reasons that we have reached the above conclusion:
      - The major motivation to implement V2Filter is to eliminate the unnecessary conversion between Catalyst types and Scala types when using Filters.
      - We provide this `SupportsPushDownV2Filters` in this PR so V2 data source (e.g. iceberg) can implement it and use V2 Filters
      - There are lots of work to implement v2 filters in the V2 file sources because of the following reasons:
      
      possible approaches for implementing V2Filter:
      1. keep what we have for file source v1: v1 filter -> parquet/orc filter
          file source v2 we will implement v2 filter -> parquet/orc filter
          We don't need v1->v2 and v2->v1
          problem with this approach: there are lots of code duplication
      
      2.  We will implement v2 filter -> parquet/orc filter
           file source v1: v1 filter -> v2 filter -> parquet/orc filter
           We will need V1 -> V2
           This is the approach I am using in https://github.com/apache/spark/pull/33973
           In that PR, I have
           v2 orc: v2 filter -> orc filter
           V1 orc: v1 -> v2 -> orc filter
      
           v2 csv: v2->v1, new UnivocityParser
           v1 csv: new UnivocityParser
      
          v2 Json: v2->v1, new JacksonParser
          v1 Json: new JacksonParser
      
          csv and Json don't have real filters, they just use filter references, should be OK to use either v1 and v2. Easier to use
          v1 because no need to change.
      
          I haven't finished parquet yet. The PR doesn't have the parquet V2Filter implementation, but I plan to have
          v2 parquet: v2 filter -> parquet filter
          v1 parquet: v1 -> v2 -> parquet filter
      
          Problem with this approach:
          1. It's not easy to implement V1->V2  because V2 filter have `LiteralValue` and needs type info. We already lost the type information when we convert Expression filer to v1 filter.
          2. parquet is OK
              Use Timestamp as example, parquet filter takes long for timestamp
              v2 parquet: v2 filter -> parquet filter
             timestamp
             Expression (Long) -> v2 filter (LiteralValue  Long)-> parquet filter (Long)
      
             V1 parquet: v1 -> v2 -> parquet filter
             timestamp
             Expression (Long) -> v1 filter (timestamp) -> v2 filter (LiteralValue  Long)-> parquet filter (Long)
      
             but we have problem for orc because orc filter takes java Timestamp
             v2 orc: v2 filter -> orc filter
             timestamp
             Expression (Long) -> v2 filter (LiteralValue  Long)->  parquet filter (Timestamp)
      
             V1 orc: v1 -> v2 -> orc filter
             Expression (Long) ->  v1 filter (timestamp) -> v2 filter (LiteralValue  Long)-> parquet filter (Timestamp)
            This defeats the purpose of implementing v2 filters.
      3.  keep what we have for file source v1: v1 filter -> parquet/orc filter
           file source v2: v2 filter -> v1 filter -> parquet/orc filter
           We will need V2 -> V1
           we have similar problem as approach 2.
      
      So the conclusion is: approach 1 (keep what we have for file source v1: v1 filter -> parquet/orc filter
          file source v2 we will implement v2 filter -> parquet/orc filter) is better, but there are lots of code duplication. We will need to refactor `OrcFilters`, `ParquetFilters`, `JacksonParser`, `UnivocityParser` so both V1 file source and V2 file source can use them.
      
      ### Why are the changes needed?
      Use V2Filters to eliminate the unnecessary conversion between Catalyst types and Scala types.
      
      ### Does this PR introduce _any_ user-facing change?
      no
      
      ### How was this patch tested?
      Added new UT
      
      Closes #34001 from huaxingao/v2filter.
      Lead-authored-by: NHuaxin Gao <huaxin_gao@apple.com>
      Co-authored-by: NWenchen Fan <cloud0fan@gmail.com>
      Signed-off-by: NWenchen Fan <wenchen@databricks.com>
      1d7d9720
    • C
      [SPARK-36820][SQL] Disable tests related to LZ4 for Hadoop 2.7 profile · 6eb75599
      Chao Sun 提交于
      ### What changes were proposed in this pull request?
      
      Disable tests related to LZ4 in `FileSourceCodecSuite`, `FileSuite` and `ParquetCompressionCodecPrecedenceSuite` when using `hadoop-2.7` profile.
      ### Why are the changes needed?
      
      At the moment, parquet-mr uses LZ4 compression codec provided by Hadoop, and only since HADOOP-17292 (in 3.3.1/3.4.0) the latter added `lz4-java` to remove the restriction that the codec can only be run with native library. As consequence, the test will fail when using `hadoop-2.7` profile.
      
      ### Does this PR introduce _any_ user-facing change?
      
      No, it's just test.
      
      ### How was this patch tested?
      
      Existing test
      
      Closes #34064 from sunchao/SPARK-36820.
      Authored-by: NChao Sun <sunchao@apple.com>
      Signed-off-by: NLiang-Chi Hsieh <viirya@gmail.com>
      6eb75599
    • X
      [SPARK-36771][PYTHON] Fix `pop` of Categorical Series · 079a9c52
      Xinrong Meng 提交于
      ### What changes were proposed in this pull request?
      Fix `pop` of Categorical Series to be consistent with the latest pandas (1.3.2) behavior.
      
      ### Why are the changes needed?
      As https://github.com/databricks/koalas/issues/2198, pandas API on Spark behaves differently from pandas on `pop` of Categorical Series.
      
      ### Does this PR introduce _any_ user-facing change?
      Yes, results of `pop` of Categorical Series change.
      
      #### From
      ```py
      >>> psser = ps.Series(["a", "b", "c", "a"], dtype="category")
      >>> psser
      0    a
      1    b
      2    c
      3    a
      dtype: category
      Categories (3, object): ['a', 'b', 'c']
      >>> psser.pop(0)
      0
      >>> psser
      1    b
      2    c
      3    a
      dtype: category
      Categories (3, object): ['a', 'b', 'c']
      >>> psser.pop(3)
      0
      >>> psser
      1    b
      2    c
      dtype: category
      Categories (3, object): ['a', 'b', 'c']
      ```
      
      #### To
      ```py
      >>> psser = ps.Series(["a", "b", "c", "a"], dtype="category")
      >>> psser
      0    a
      1    b
      2    c
      3    a
      dtype: category
      Categories (3, object): ['a', 'b', 'c']
      >>> psser.pop(0)
      'a'
      >>> psser
      1    b
      2    c
      3    a
      dtype: category
      Categories (3, object): ['a', 'b', 'c']
      >>> psser.pop(3)
      'a'
      >>> psser
      1    b
      2    c
      dtype: category
      Categories (3, object): ['a', 'b', 'c']
      
      ```
      
      ### How was this patch tested?
      Unit tests.
      
      Closes #34052 from xinrong-databricks/cat_pop.
      Authored-by: NXinrong Meng <xinrong.meng@databricks.com>
      Signed-off-by: NTakuya UESHIN <ueshin@databricks.com>
      079a9c52
    • A
      [SPARK-36615][CORE] Register shutdown hook earlier when start SC · b7d99e3e
      Angerszhuuuu 提交于
      ### What changes were proposed in this pull request?
      Since user always use ctrl+c to stop a starting SC when register with yarn in client mode when resources are tight.
      
      In this time, SC have not register the Shutdown hook, this cause we won't invoke `sc.stop()` when exit the application.
      We should register the ShutdownHook earlier when starting a SparkContext.
      
      ### Why are the changes needed?
      
      Make sure we will invoke `sc.stop()` when kill a starting SparkContext application.
      
      ### Does this PR introduce _any_ user-facing change?
      No
      
      ### How was this patch tested?
      Not need
      
      Closes #33869 from AngersZhuuuu/SPARK-36615.
      Authored-by: NAngerszhuuuu <angers.zhu@gmail.com>
      Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
      b7d99e3e
    • G
      [SPARK-36670][FOLLOWUP][TEST] Remove brotli-codec dependency · ba5708d9
      Gengliang Wang 提交于
      ### What changes were proposed in this pull request?
      
      Remove `com.github.rdblue:brotli-codec:0.1.1` dependency.
      
      ### Why are the changes needed?
      
      As Stephen Coy pointed out in the dev list, we should not have `com.github.rdblue:brotli-codec:0.1.1` dependency which is not available on Maven Central. This is to avoid possible artifact changes on `Jitpack.io`.
      Also, the dependency is for tests only. I suggest that we remove it now to unblock the 3.2.0 release ASAP.
      
      ### Does this PR introduce _any_ user-facing change?
      
      No
      
      ### How was this patch tested?
      
      GA tests.
      
      Closes #34059 from gengliangwang/removeDeps.
      Authored-by: NGengliang Wang <gengliang@apache.org>
      Signed-off-by: NDongjoon Hyun <dongjoon@apache.org>
      ba5708d9
    • X
      [SPARK-36769][PYTHON] Improve `filter` of single-indexed DataFrame · 33e463cc
      Xinrong Meng 提交于
      ### What changes were proposed in this pull request?
      Improve `filter` of single-indexed DataFrame by replacing a long Project with Filter or Join.
      
      ### Why are the changes needed?
      When the given `items` have too many elements, a long Project is introduced.
      We may replace that with `Column.isin` or joining depending on the length of `items` for better performance.
      
      ### Does this PR introduce _any_ user-facing change?
      No.
      
      ### How was this patch tested?
      Unit tests.
      
      Closes #33998 from xinrong-databricks/impr_filter.
      Authored-by: NXinrong Meng <xinrong.meng@databricks.com>
      Signed-off-by: NTakuya UESHIN <ueshin@databricks.com>
      33e463cc
  2. 21 9月, 2021 8 次提交
    • Y
      [SPARK-36814][SQL] Make class ColumnarBatch extendable · 688b95b1
      Yufei Gu 提交于
      ### What changes were proposed in this pull request?
      Change class ColumnarBatch to a non-final class
      
      ### Why are the changes needed?
      To support better vectorized reading in multiple data source, ColumnarBatch need to be extendable. For example, To support row-level delete(  https://github.com/apache/iceberg/issues/3141) in Iceberg's vectorized read, we need to filter out deleted rows in a batch, which requires ColumnarBatch to be extendable.
      
      ### Does this PR introduce _any_ user-facing change?
      No
      
      ### How was this patch tested?
      No test needed.
      
      Closes #34054 from flyrain/columnarbatch-extendable.
      Authored-by: NYufei Gu <yufei_gu@apple.com>
      Signed-off-by: NDB Tsai <d_tsai@apple.com>
      688b95b1
    • M
      [SPARK-36807][SQL] Merge ANSI interval types to a tightest common type · d2340f8e
      Max Gekk 提交于
      ### What changes were proposed in this pull request?
      In the PR, I propose to modify `StructType` to support merging of ANSI interval types with different fields.
      
      ### Why are the changes needed?
      This will allow merging of schemas from different datasource files.
      
      ### Does this PR introduce _any_ user-facing change?
      No, the ANSI interval types haven't released yet.
      
      ### How was this patch tested?
      Added new test to `StructTypeSuite`.
      
      Closes #34049 from MaxGekk/merge-ansi-interval-types.
      Authored-by: NMax Gekk <max.gekk@gmail.com>
      Signed-off-by: NMax Gekk <max.gekk@gmail.com>
      d2340f8e
    • D
      [SPARK-36785][PYTHON] Fix DataFrame.isin when DataFrame has NaN value · cc182fe6
      dgd-contributor 提交于
      ### What changes were proposed in this pull request?
      Fix DataFrame.isin when DataFrame has NaN value
      
      ### Why are the changes needed?
      Fix DataFrame.isin when DataFrame has NaN value
      
      ``` python
      >>> psdf = ps.DataFrame(
      ...     {"a": [None, 2, 3, 4, 5, 6, 7, 8, None], "b": [None, 5, None, 3, 2, 1, None, 0, 0], "c": [1, 5, 1, 3, 2, 1, 1, 0, 0]},
      ... )
      >>> psdf
           a    b  c
      0  NaN  NaN  1
      1  2.0  5.0  5
      2  3.0  NaN  1
      3  4.0  3.0  3
      4  5.0  2.0  2
      5  6.0  1.0  1
      6  7.0  NaN  1
      7  8.0  0.0  0
      8  NaN  0.0  0
      >>> other = [1, 2, None]
      
      >>> psdf.isin(other)
            a     b     c
      0  None  None  True
      1  True  None  None
      2  None  None  True
      3  None  None  None
      4  None  True  True
      5  None  True  True
      6  None  None  True
      7  None  None  None
      8  None  None  None
      
      >>> psdf.to_pandas().isin(other)
             a      b      c
      0  False  False   True
      1   True  False  False
      2  False  False   True
      3  False  False  False
      4  False   True   True
      5  False   True   True
      6  False  False   True
      7  False  False  False
      8  False  False  False
      ```
      
      ### Does this PR introduce _any_ user-facing change?
      After this PR
      
      ``` python
      >>> psdf = ps.DataFrame(
      ...     {"a": [None, 2, 3, 4, 5, 6, 7, 8, None], "b": [None, 5, None, 3, 2, 1, None, 0, 0], "c": [1, 5, 1, 3, 2, 1, 1, 0, 0]},
      ... )
      >>> psdf
           a    b  c
      0  NaN  NaN  1
      1  2.0  5.0  5
      2  3.0  NaN  1
      3  4.0  3.0  3
      4  5.0  2.0  2
      5  6.0  1.0  1
      6  7.0  NaN  1
      7  8.0  0.0  0
      8  NaN  0.0  0
      >>> other = [1, 2, None]
      
      >>> psdf.isin(other)
             a      b      c
      0  False  False   True
      1   True  False  False
      2  False  False   True
      3  False  False  False
      4  False   True   True
      5  False   True   True
      6  False  False   True
      7  False  False  False
      8  False  False  False
      ```
      
      ### How was this patch tested?
      Unit tests
      
      Closes #34040 from dgd-contributor/SPARK-36785_dataframe.isin_fix.
      Authored-by: Ndgd-contributor <dgd_contributor@viettel.com.vn>
      Signed-off-by: NTakuya UESHIN <ueshin@databricks.com>
      cc182fe6
    • X
      [SPARK-36746][PYTHON] Refactor `_select_rows_by_iterable` in `iLocIndexer` to use `Column.isin` · 4b61c623
      Xinrong Meng 提交于
      ### What changes were proposed in this pull request?
      Refactor `_select_rows_by_iterable` in `iLocIndexer` to use `Column.isin`.
      
      ### Why are the changes needed?
      For better performance.
      
      After a rough benchmark, a long projection performs worse than `Column.isin`, even when the length of the filtering conditions exceeding `compute.isin_limit`.
      
      So we use `Column.isin` instead.
      
      ### Does this PR introduce _any_ user-facing change?
      No.
      
      ### How was this patch tested?
      Existing tests.
      
      Closes #33964 from xinrong-databricks/iloc_select.
      Authored-by: NXinrong Meng <xinrong.meng@databricks.com>
      Signed-off-by: NTakuya UESHIN <ueshin@databricks.com>
      4b61c623
    • X
      [SPARK-36618][PYTHON] Support dropping rows of a single-indexed DataFrame · 4cf86d33
      Xinrong Meng 提交于
      ### What changes were proposed in this pull request?
      Support dropping rows of a single-indexed DataFrame.
      
      Dropping rows and columns at the same time is supported in this PR  as well.
      
      ### Why are the changes needed?
      To increase pandas API coverage.
      
      ### Does this PR introduce _any_ user-facing change?
      Yes, dropping rows of a single-indexed DataFrame is supported now.
      
      ```py
      >>> df = ps.DataFrame(np.arange(12).reshape(3, 4), columns=['A', 'B', 'C', 'D'])
      >>> df
         A  B   C   D
      0  0  1   2   3
      1  4  5   6   7
      2  8  9  10  11
      ```
      #### From
      ```py
      >>> df.drop([0, 1])
      Traceback (most recent call last):
      ...
      KeyError: [(0,), (1,)]
      
      >>> df.drop([0, 1], axis=0)
      Traceback (most recent call last):
      ...
      NotImplementedError: Drop currently only works for axis=1
      
      >>> df.drop(1)
      Traceback (most recent call last):
      ...
      KeyError: [(1,)]
      
      >>> df.drop(index=1)
      Traceback (most recent call last):
      ...
      TypeError: drop() got an unexpected keyword argument 'index'
      
      >>> df.drop(index=[0, 1], columns='A')
      Traceback (most recent call last):
      ...
      TypeError: drop() got an unexpected keyword argument 'index'
      ```
      #### To
      ```py
      >>> df.drop([0, 1])
         A  B   C   D
      2  8  9  10  11
      
      >>> df.drop([0, 1], axis=0)
         A  B   C   D
      2  8  9  10  11
      
      >>> df.drop(1)
         A  B   C   D
      0  0  1   2   3
      2  8  9  10  11
      
      >>> df.drop(index=1)
         A  B   C   D
      0  0  1   2   3
      2  8  9  10  11
      
      >>> df.drop(index=[0, 1], columns='A')
         B   C   D
      2  9  10  11
      ```
      
      ### How was this patch tested?
      Unit tests.
      
      Closes #33929 from xinrong-databricks/frame_drop.
      Authored-by: NXinrong Meng <xinrong.meng@databricks.com>
      Signed-off-by: NTakuya UESHIN <ueshin@databricks.com>
      4cf86d33
    • D
      [SPARK-36806][K8S][R] Use R 4.0.4 in K8s R image · a1787525
      Dongjoon Hyun 提交于
      ### What changes were proposed in this pull request?
      
      This PR aims to upgrade R from 3.6.3 to 4.0.4 in K8s R Docker image.
      
      ### Why are the changes needed?
      
      `openjdk:11-jre-slim` image is upgraded to `Debian 11`.
      
      ```
      $ docker run -it openjdk:11-jre-slim cat /etc/os-release
      PRETTY_NAME="Debian GNU/Linux 11 (bullseye)"
      NAME="Debian GNU/Linux"
      VERSION_ID="11"
      VERSION="11 (bullseye)"
      VERSION_CODENAME=bullseye
      ID=debian
      HOME_URL="https://www.debian.org/"
      SUPPORT_URL="https://www.debian.org/support"
      BUG_REPORT_URL="https://bugs.debian.org/"
      ```
      
      It causes `R 3.5` installation failures in our K8s integration test environment.
      - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47953/
      ```
      The following packages have unmet dependencies:
       r-base-core : Depends: libicu63 (>= 63.1-1~) but it is not installable
                     Depends: libreadline7 (>= 6.0) but it is not installable
      E: Unable to correct problems, you have held broken packages.
      The command '/bin/sh -c apt-get update &&   apt install -y gnupg &&   echo "deb http://cloud.r-project.org/bin/linux/debian buster-cran35/" >> /etc/apt/sources.list &&   apt-key adv --keyserver keyserver.ubuntu.com --recv-key 'E19F5F87128899B192B1A2C2AD5F960A256A04AF' &&   apt-get update &&
      apt install -y -t buster-cran35 r-base r-base-dev &&   rm -rf
      ```
      
      ### Does this PR introduce _any_ user-facing change?
      
      Yes, this will recover the installation.
      
      ### How was this patch tested?
      
      Succeed to build SparkR docker image in the K8s integration test in Jenkins CI.
      
      - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47959/
      ```
      Successfully built 32e1a0cd5ff8
      Successfully tagged kubespark/spark-r:3.3.0-SNAPSHOT_6e4f7e2d-054d-4978-812f-4f32fc546b51
      ```
      
      Closes #34048 from dongjoon-hyun/SPARK-36806.
      Authored-by: NDongjoon Hyun <dongjoon@apache.org>
      Signed-off-by: NDongjoon Hyun <dongjoon@apache.org>
      a1787525
    • K
      [SPARK-36808][BUILD] Upgrade Kafka to 2.8.1 · 75e71ef7
      Kousuke Saruta 提交于
      ### What changes were proposed in this pull request?
      
      This PR upgrades Kafka from `2.8.0` to `2.8.1`.
      
      ### Why are the changes needed?
      
      Kafka `2.8.1` was released a few hours ago, which includes a bunch of bug fix.
      https://downloads.apache.org/kafka/2.8.1/RELEASE_NOTES.html
      
      ### Does this PR introduce _any_ user-facing change?
      
      No.
      
      ### How was this patch tested?
      
      CIs.
      
      Closes #34050 from sarutak/upgrade-kafka-2.8.1.
      Authored-by: NKousuke Saruta <sarutak@oss.nttdata.com>
      Signed-off-by: NDongjoon Hyun <dongjoon@apache.org>
      75e71ef7
    • D
      [SPARK-36805][BUILD][K8S] Upgrade kubernetes-client to 5.7.3 · 3b560390
      Dongjoon Hyun 提交于
      ### What changes were proposed in this pull request?
      
      This PR aims to upgrade `kubernetes-client` from 5.6.0 to 5.7.3
      
      ### Why are the changes needed?
      
      This will bring the latest improvements and bug fixes.
      - https://github.com/fabric8io/kubernetes-client/releases/tag/v5.7.3
      - https://github.com/fabric8io/kubernetes-client/releases/tag/v5.7.2
      - https://github.com/fabric8io/kubernetes-client/releases/tag/v5.7.1
      - https://github.com/fabric8io/kubernetes-client/releases/tag/v5.7.0
      
      ### Does this PR introduce _any_ user-facing change?
      
      No
      
      ### How was this patch tested?
      
      Pass the CIs.
      
      Closes #34047 from dongjoon-hyun/SPARK-36805.
      Authored-by: NDongjoon Hyun <dongjoon@apache.org>
      Signed-off-by: NDongjoon Hyun <dongjoon@apache.org>
      3b560390
  3. 20 9月, 2021 6 次提交
    • Y
      [SPARK-36683][SQL] Add new built-in SQL functions: SEC and CSC · 30d17b63
      Yuto Akutsu 提交于
      ### What changes were proposed in this pull request?
      
      Add new built-in SQL functions: secant and cosecant, and add them as Scala and Python functions.
      
      ### Why are the changes needed?
      
      Cotangent has been supported in Spark SQL but Secant and Cosecant are missing though I believe they can be used as much as cot.
      Related Links: [SPARK-20751](https://github.com/apache/spark/pull/17999) [SPARK-36660](https://github.com/apache/spark/pull/33906)
      
      ### Does this PR introduce _any_ user-facing change?
      
      Yes, users can now use these functions.
      
      ### How was this patch tested?
      
      Unit tests
      
      Closes #33988 from yutoacts/SPARK-36683.
      Authored-by: NYuto Akutsu <yuto.akutsu@oss.nttdata.com>
      Signed-off-by: NKousuke Saruta <sarutak@oss.nttdata.com>
      30d17b63
    • D
      [SPARK-36101][CORE] Grouping exception in core/api · 4cc39cfe
      dgd-contributor 提交于
      ### What changes were proposed in this pull request?
      This PR group exception messages in core/src/main/scala/org/apache/spark/api
      
      ### Why are the changes needed?
      It will largely help with standardization of error messages and its maintenance.
      
      ### Does this PR introduce _any_ user-facing change?
      No. Error messages remain unchanged.
      
      ### How was this patch tested?
      No new tests - pass all original tests to make sure it doesn't break any existing behavior.
      
      Closes #33536 from dgd-contributor/SPARK-36101.
      Authored-by: Ndgd-contributor <dgd_contributor@viettel.com.vn>
      Signed-off-by: NWenchen Fan <wenchen@databricks.com>
      4cc39cfe
    • A
      [SPARK-36754][SQL] ArrayIntersect handle duplicated Double.NaN and Float.NaN · 2fc7f2f7
      Angerszhuuuu 提交于
      ### What changes were proposed in this pull request?
      For query
      ```
      select array_intersect(array(cast('nan' as double), 1d), array(cast('nan' as double)))
      ```
      This returns [NaN], but it should return [].
      This issue is caused by `OpenHashSet` can't handle `Double.NaN` and `Float.NaN` too.
      In this pr fix this based on https://github.com/apache/spark/pull/33955
      
      ### Why are the changes needed?
      Fix bug
      
      ### Does this PR introduce _any_ user-facing change?
      ArrayIntersect won't show equal `NaN` value
      
      ### How was this patch tested?
      Added UT
      
      Closes #33995 from AngersZhuuuu/SPARK-36754.
      Authored-by: NAngerszhuuuu <angers.zhu@gmail.com>
      Signed-off-by: NWenchen Fan <wenchen@databricks.com>
      2fc7f2f7
    • D
      [SPARK-34112][BUILD] Upgrade ORC to 1.7.0 · a396dd62
      Dongjoon Hyun 提交于
      ### What changes were proposed in this pull request?
      
      This PR aims to upgrade Apache ORC from 1.6.11 to 1.7.0 for Apache Spark 3.3.0.
      
      ### Why are the changes needed?
      
      [Apache ORC 1.7.0](https://orc.apache.org/news/2021/09/15/ORC-1.7.0/) is a new release with the following new features and improvements.
        - ORC-377 Support Snappy compression in C++ Writer
        - ORC-577 Support row-level filtering
        - ORC-716 Build and test on Java 17-EA
        - ORC-731 Improve Java Tools
        - ORC-742 LazyIO of non-filter columns
        - ORC-751 Implement Predicate Pushdown in C++ Reader
        - ORC-755 Introduce OrcFilterContext
        - ORC-757 Add Hashtable implementation for dictionary
        - ORC-780 Support LZ4 Compression in C++ Writer
        - ORC-797 Allow writers to get the stripe information
        - ORC-818 Build and test in Apple Silicon
        - ORC-861 Bump CMake minimum requirement to 2.8.12
        - ORC-867 Upgrade hive-storage-api to 2.8.1
        - ORC-984 Save the software version that wrote each ORC file
      
      ### Does this PR introduce _any_ user-facing change?
      
      No.
      
      ### How was this patch tested?
      
      Pass the existing CIs because this is a dependency change.
      
      Closes #34045 from dongjoon-hyun/SPARK-34112.
      Authored-by: NDongjoon Hyun <dongjoon@apache.org>
      Signed-off-by: NDongjoon Hyun <dongjoon@apache.org>
      a396dd62
    • H
      [SPARK-36710][PYTHON] Support new typing syntax in function apply APIs in pandas API on Spark · 8d8b4aab
      Hyukjin Kwon 提交于
      ### What changes were proposed in this pull request?
      
      This PR proposes the new syntax introduced in https://github.com/apache/spark/pull/33954. Namely, users now can specify the index type and name as below:
      
      ```python
      import pandas as pd
      import pyspark.pandas as ps
      def transform(pdf) -> pd.DataFrame[int, [int, int]]:
          pdf['A'] = pdf.id + 1
          return pdf
      
      ps.range(5).koalas.apply_batch(transform)
      ```
      
      ```
         c0  c1
      0   0   1
      1   1   2
      2   2   3
      3   3   4
      4   4   5
      ```
      
      ```python
      import pandas as pd
      import pyspark.pandas as ps
      def transform(pdf) -> pd.DataFrame[("index", int), [("a", int), ("b", int)]]:
          pdf['A'] = pdf.id * pdf.id
          return pdf
      
      ps.range(5).koalas.apply_batch(transform)
      ```
      
      ```
             a   b
      index
      0      0   0
      1      1   1
      2      2   4
      3      3   9
      4      4  16
      ```
      
      Again, this syntax remains experimental and this is a non-standard way apart from Python standard. We should migrate to proper typing once pandas supports it like `numpy.typing`.
      
      ### Why are the changes needed?
      
      The rationale is described in https://github.com/apache/spark/pull/33954. In order to avoid unnecessary computation for default index or schema inference.
      
      ### Does this PR introduce _any_ user-facing change?
      
      Yes, this PR affects the following APIs:
      
      - `DataFrame.apply(..., axis=1)`
      - `DataFrame.groupby.apply(...)`
      - `DataFrame.pandas_on_spark.transform_batch(...)`
      - `DataFrame.pandas_on_spark.apply_batch(...)`
      
      Now they can specify the index type with the new syntax below:
      
      ```
      DataFrame[index_type, [type, ...]]
      DataFrame[(index_name, index_type), [(name, type), ...]]
      DataFrame[dtype instance, dtypes instance]
      DataFrame[(index_name, index_type), zip(names, types)]
      ```
      
      ### How was this patch tested?
      
      Manually tested, and unittests were added.
      
      Closes #34007 from HyukjinKwon/SPARK-36710.
      Authored-by: NHyukjin Kwon <gurwls223@apache.org>
      Signed-off-by: NHyukjin Kwon <gurwls223@apache.org>
      8d8b4aab
    • P
      [SPARK-36107][SQL] Refactor first set of 20 query execution errors to use error classes · c2881c5e
      PengLei 提交于
      ### What changes were proposed in this pull request?
      Refactor some exceptions in QueryExecutionErrors to use error classes. as follows:
      ```
      columnChangeUnsupportedError
      logicalHintOperatorNotRemovedDuringAnalysisError
      cannotEvaluateExpressionError
      cannotGenerateCodeForExpressionError
      cannotTerminateGeneratorError
      castingCauseOverflowError
      cannotChangeDecimalPrecisionError
      invalidInputSyntaxForNumericError
      cannotCastFromNullTypeError
      cannotCastError
      cannotParseDecimalError
      simpleStringWithNodeIdUnsupportedError
      evaluateUnevaluableAggregateUnsupportedError
      dataTypeUnsupportedError
      dataTypeUnsupportedError
      failedExecuteUserDefinedFunctionError
      divideByZeroError
      invalidArrayIndexError
      mapKeyNotExistError
      rowFromCSVParserNotExpectedError
      ```
      
      ### Why are the changes needed?
      [SPARK-36107](https://issues.apache.org/jira/browse/SPARK-36107)
      
      ### Does this PR introduce _any_ user-facing change?
      No
      
      ### How was this patch tested?
      Existed UT Testcase
      
      Closes #33538 from Peng-Lei/SPARK-36017.
      Lead-authored-by: NPengLei <peng.8lei@gmail.com>
      Co-authored-by: NLei Peng <peng.8lei@gmail.com>
      Signed-off-by: NHyukjin Kwon <gurwls223@apache.org>
      c2881c5e
  4. 18 9月, 2021 3 次提交
    • Y
      [SPARK-36772] FinalizeShuffleMerge fails with an exception due to attempt id not matching · cabc36b5
      Ye Zhou 提交于
      ### What changes were proposed in this pull request?
      Remove the appAttemptId from TransportConf, and parsing through SparkEnv.
      
      ### Why are the changes needed?
      Push based shuffle will fail if there are any attemptId set in the SparkConf, as the attemptId is not set correctly in Driver.
      
      ### Does this PR introduce _any_ user-facing change?
      No
      
      ### How was this patch tested?
      Tested within our Yarn cluster. Without this PR, the Driver will fail to finalize the shuffle merge on all the mergers. After the patch, Driver can successfully finalize the shuffle merge and the push based shuffle can work fine.
      Also with unit test to verify the attemptId is being set in the BlockStoreClient in Driver.
      
      Closes #34018 from zhouyejoe/SPARK-36772.
      Authored-by: NYe Zhou <yezhou@linkedin.com>
      Signed-off-by: NGengliang Wang <gengliang@apache.org>
      cabc36b5
    • D
      [SPARK-36762][PYTHON] Fix Series.isin when Series has NaN values · 32b85129
      dgd-contributor 提交于
      ### What changes were proposed in this pull request?
      Fix Series.isin when Series has NaN values
      
      ### Why are the changes needed?
      Fix Series.isin when Series has NaN values
      ``` python
      >>> pser = pd.Series([None, 5, None, 3, 2, 1, None, 0, 0])
      >>> psser = ps.from_pandas(pser)
      >>> pser.isin([1, 3, 5, None])
      0    False
      1     True
      2    False
      3     True
      4    False
      5     True
      6    False
      7    False
      8    False
      dtype: bool
      >>> psser.isin([1, 3, 5, None])
      0    None
      1    True
      2    None
      3    True
      4    None
      5    True
      6    None
      7    None
      8    None
      dtype: object
      ```
      
      ### Does this PR introduce _any_ user-facing change?
      After this PR
      ``` python
      >>> pser = pd.Series([None, 5, None, 3, 2, 1, None, 0, 0])
      >>> psser = ps.from_pandas(pser)
      >>> psser.isin([1, 3, 5, None])
      0    False
      1     True
      2    False
      3     True
      4    False
      5     True
      6    False
      7    False
      8    False
      dtype: bool
      
      ```
      
      ### How was this patch tested?
      unit tests
      
      Closes #34005 from dgd-contributor/SPARK-36762_fix_series.isin_when_values_have_NaN.
      Authored-by: Ndgd-contributor <dgd_contributor@viettel.com.vn>
      Signed-off-by: NTakuya UESHIN <ueshin@databricks.com>
      32b85129
    • L
      [SPARK-36673][SQL][FOLLOWUP] Remove duplicate test in DataFrameSetOperationsSuite · f9644cc2
      Liang-Chi Hsieh 提交于
      ### What changes were proposed in this pull request?
      
      As a followup of #34025 to remove duplicate test.
      
      ### Why are the changes needed?
      
      To remove duplicate test.
      
      ### Does this PR introduce _any_ user-facing change?
      
      No
      
      ### How was this patch tested?
      
      Existing test.
      
      Closes #34032 from viirya/remove.
      Authored-by: NLiang-Chi Hsieh <viirya@gmail.com>
      Signed-off-by: NLiang-Chi Hsieh <viirya@gmail.com>
      f9644cc2
  5. 17 9月, 2021 14 次提交
    • Y
      [SPARK-36780][BUILD] Make `dev/mima` runs on Java 17 · 5d0889bf
      Yang He 提交于
      ### What changes were proposed in this pull request?
      
      Java 17 has been officially released. This PR makes `dev/mima` runs on Java 17.
      
      ### Why are the changes needed?
      
      To make tests pass on Java 17.
      
      ### Does this PR introduce _any_ user-facing change?
      
      No.
      
      ### How was this patch tested?
      
      Manual test.
      
      Closes #34022 from RabbidHY/SPARK-36780.
      Lead-authored-by: NYang He <stitch106hy@gmail.com>
      Co-authored-by: NDongjoon Hyun <dongjoon@apache.org>
      Signed-off-by: NDongjoon Hyun <dongjoon@apache.org>
      5d0889bf
    • K
      [SPARK-36663][SQL] Support number-only column names in ORC data sources · cd1b7e17
      Kousuke Saruta 提交于
      ### What changes were proposed in this pull request?
      
      This PR aims to support number-only column names in ORC data sources.
      In the current master, with ORC datasource, we can write a DataFrame which contains such columns into ORC files.
      ```
      spark.sql("SELECT 'a' as `1`").write.orc("/path/to/dest")
      ```
      
      But reading the ORC files will fail.
      ```
      spark.read.orc("/path/to/dest")
      ...
      == SQL ==
      struct<1:string>
      -------^^^
      
        at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:265)
        at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:126)
        at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseDataType(ParseDriver.scala:40)
        at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.org$apache$spark$sql$execution$datasources$orc$OrcUtils$$toCatalystSchema(OrcUtils.scala:91)
      ```
      The cause of this is `OrcUtils.toCatalystSchema` fails to handle such column names.
      In `OrcUtils.toCatalystSchema`, `CatalystSqlParser` is used to create a instance of `StructType` which represents a schema but `CatalystSqlParser.parseDataType` fails to parse if a column name (and nested field) consists of only numbers.
      
      ### Why are the changes needed?
      
      For better usability.
      
      ### Does this PR introduce _any_ user-facing change?
      
      No.
      
      ### How was this patch tested?
      
      New tests.
      
      Closes #33915 from sarutak/fix-orc-schema-issue.
      Authored-by: NKousuke Saruta <sarutak@oss.nttdata.com>
      Signed-off-by: NWenchen Fan <wenchen@databricks.com>
      cd1b7e17
    • A
      [SPARK-36767][SQL] ArrayMin/ArrayMax/SortArray/ArraySort add comment and Unit test · 69e006dd
      Angerszhuuuu 提交于
      ### What changes were proposed in this pull request?
      Add comment about how ArrayMin/ArrayMax/SortArray/ArraySort handle NaN and add Unit test for this
      
      ### Why are the changes needed?
      Add Unit test
      
      ### Does this PR introduce _any_ user-facing change?
      No
      
      ### How was this patch tested?
      Added UT
      
      Closes #34008 from AngersZhuuuu/SPARK-36740.
      Authored-by: NAngerszhuuuu <angers.zhu@gmail.com>
      Signed-off-by: NWenchen Fan <wenchen@databricks.com>
      69e006dd
    • L
      [SPARK-36673][SQL] Fix incorrect schema of nested types of union · cdd7ae93
      Liang-Chi Hsieh 提交于
      ### What changes were proposed in this pull request?
      
      This patch proposes to fix incorrect schema of `union`.
      
      ### Why are the changes needed?
      
      The current `union` result of nested struct columns is incorrect. By definition of `union` API, it should resolve columns by position, not by name. Right now when determining the `output` (aka. the schema) of union plan, we use `merge` API which actually merges two structs (simply think it as concatenate fields from two structs if not overlapping). The merging behavior doesn't match the `union` definition.
      
      So currently we get incorrect schema but the query result is correct. We should fix the incorrect schema.
      
      ### Does this PR introduce _any_ user-facing change?
      
      Yes, fixing a bug of incorrect schema.
      
      ### How was this patch tested?
      
      Added unit test.
      
      Closes #34025 from viirya/SPARK-36673.
      Authored-by: NLiang-Chi Hsieh <viirya@gmail.com>
      Signed-off-by: NWenchen Fan <wenchen@databricks.com>
      cdd7ae93
    • W
      [SPARK-36718][SQL] Only collapse projects if we don't duplicate expensive expressions · 651904a2
      Wenchen Fan 提交于
      ### What changes were proposed in this pull request?
      
      The `CollapseProject` rule can combine adjacent projects and merge the project lists. The key idea behind this rule is that the evaluation of project is relatively expensive, and that expression evaluation is cheap and that the expression duplication caused by this rule is not a problem. This last assumption is, unfortunately, not always true:
      - A user can invoke some expensive UDF, this now gets invoked more often than originally intended.
      - A projection is very cheap in whole stage code generation. The duplication caused by `CollapseProject` does more harm than good here.
      
      This PR addresses this problem, by only collapsing projects when it does not duplicate expensive expressions. In practice this means an input reference may only be consumed once, or when its evaluation does not incur significant overhead (currently attributes, nested column access, aliases & literals fall in this category).
      
      ### Why are the changes needed?
      
      We have seen multiple complains about `CollapseProject` in the past, due to it may duplicate expensive expressions. The most recent one is https://github.com/apache/spark/pull/33903 .
      
      ### Does this PR introduce _any_ user-facing change?
      
      no
      
      ### How was this patch tested?
      
      a new UT and existing test
      
      Closes #33958 from cloud-fan/collapse.
      Authored-by: NWenchen Fan <wenchen@databricks.com>
      Signed-off-by: NWenchen Fan <wenchen@databricks.com>
      651904a2
    • J
      [SPARK-36764][SS][TEST] Fix race-condition on "ensure continuous stream is... · 6099edc6
      Jungtaek Lim 提交于
      [SPARK-36764][SS][TEST] Fix race-condition on "ensure continuous stream is being used" in KafkaContinuousTest
      
      ### What changes were proposed in this pull request?
      
      The test “ensure continuous stream is being used“ in KafkaContinuousTest quickly checks the actual type of the execution, and stops the query. Stopping the streaming query in continuous mode is done by interrupting query execution thread and join with indefinite timeout.
      
      In parallel, started streaming query is going to generate execution plan, including running optimizer. Some parts of SessionState can be built at that time, as they are defined as lazy. The problem is, some of them seem to “swallow” the InterruptedException and let the thread run continuously.
      
      That said, the query can’t indicate whether there is a request on stopping query, so the query won’t stop.
      
      This PR fixes such scenario via ensuring that streaming query has started before the test stops the query.
      
      ### Why are the changes needed?
      
      Race-condition could end up with test hang till test framework marks it as timed-out.
      
      ### Does this PR introduce _any_ user-facing change?
      
      No.
      
      ### How was this patch tested?
      
      Existing tests.
      
      Closes #34004 from HeartSaVioR/SPARK-36764.
      Authored-by: NJungtaek Lim <kabhwan.opensource@gmail.com>
      Signed-off-by: NWenchen Fan <wenchen@databricks.com>
      6099edc6
    • A
      [SPARK-36741][SQL] ArrayDistinct handle duplicated Double.NaN and Float.Nan · e356f6aa
      Angerszhuuuu 提交于
      ### What changes were proposed in this pull request?
      For query
      ```
      select array_distinct(array(cast('nan' as double), cast('nan' as double)))
      ```
      This returns [NaN, NaN], but it should return [NaN].
      This issue is caused by `OpenHashSet` can't handle `Double.NaN` and `Float.NaN` too.
      In this pr fix this based on https://github.com/apache/spark/pull/33955
      
      ### Why are the changes needed?
      Fix bug
      
      ### Does this PR introduce _any_ user-facing change?
      ArrayDistinct won't show duplicated `NaN` value
      
      ### How was this patch tested?
      Added UT
      
      Closes #33993 from AngersZhuuuu/SPARK-36741.
      Authored-by: NAngerszhuuuu <angers.zhu@gmail.com>
      Signed-off-by: NWenchen Fan <wenchen@databricks.com>
      e356f6aa
    • L
      [SPARK-36778][SQL] Support ILIKE API on Scala(dataframe) · 1312a873
      Leona Yoda 提交于
      ### What changes were proposed in this pull request?
      
      Support ILIKE (case insensitive LIKE) API on Scala.
      
      ### Why are the changes needed?
      
      ILIKE statement on SQL interface is supported by SPARK-36674.
      This PR will support Scala(dataframe) API for it.
      
      ### Does this PR introduce _any_ user-facing change?
      
      Yes. Users can call `ilike` from dataframe.
      
      ### How was this patch tested?
      
      unit tests.
      
      Closes #34027 from yoda-mon/scala-ilike.
      Authored-by: NLeona Yoda <yodal@oss.nttdata.com>
      Signed-off-by: NMax Gekk <max.gekk@gmail.com>
      1312a873
    • W
      [SPARK-36789][SQL] Use the correct constant type as the null value holder in array functions · 41454988
      Wenchen Fan 提交于
      ### What changes were proposed in this pull request?
      
      In array functions, we use constant 0 as the placeholder when adding a null value to an array buffer. This PR makes sure the constant 0 matches the type of the array element.
      
      ### Why are the changes needed?
      
      Fix a potential bug. Somehow we can hit this bug sometimes after https://github.com/apache/spark/pull/33955 .
      
      ### Does this PR introduce _any_ user-facing change?
      
      No
      
      ### How was this patch tested?
      
      existing tests
      
      Closes #34029 from cloud-fan/minor.
      Authored-by: NWenchen Fan <wenchen@databricks.com>
      Signed-off-by: NHyukjin Kwon <gurwls223@apache.org>
      41454988
    • C
      [SPARK-32709][SQL] Support writing Hive bucketed table (Parquet/ORC format with Hive hash) · 4a34db9a
      Cheng Su 提交于
      ### What changes were proposed in this pull request?
      
      This is a re-work of https://github.com/apache/spark/pull/30003, here we add support for writing Hive bucketed table with Parquet/ORC file format (data source v1 write path and Hive hash as the hash function). Support for Hive's other file format will be added in follow up PR.
      
      The changes are mostly on:
      
      * `HiveMetastoreCatalog.scala`: When converting hive table relation to data source relation, pass bucket info (BucketSpec) and other hive related info as options into `HadoopFsRelation` and `LogicalRelation`, which can be later accessed by `FileFormatWriter` to customize bucket id and file name.
      
      * `FileFormatWriter.scala`: Use `HiveHash` for `bucketIdExpression` if it's writing to Hive bucketed table. In addition, Spark output file name should follow Hive/Presto/Trino bucketed file naming convention. Introduce another parameter `bucketFileNamePrefix` and it introduces subsequent change in `FileFormatDataWriter`.
      
      * `HadoopMapReduceCommitProtocol`: Implement the new file name APIs introduced in https://github.com/apache/spark/pull/33012, and change its sub-class `PathOutputCommitProtocol`, to make Hive bucketed table writing work with all commit protocol (including S3A commit protocol).
      
      ### Why are the changes needed?
      
      To make Spark write other-SQL-engines-compatible bucketed table. Currently Spark bucketed table cannot be leveraged by other SQL engines like Hive and Presto, because it uses a different hash function (Spark murmur3hash) and different file name scheme. With this PR, the Spark-written-Hive-bucketed-table can be efficiently read by Presto and Hive to do bucket filter pruning, join, group-by, etc. This was and is blocking several companies (confirmed from Facebook, Lyft, etc) migrate bucketing workload from Hive to Spark.
      
      ### Does this PR introduce _any_ user-facing change?
      
      Yes, any Hive bucketed table (with Parquet/ORC format) written by Spark, is properly bucketed and can be efficiently processed by Hive and Presto/Trino.
      
      ### How was this patch tested?
      
      * Added unit test in BucketedWriteWithHiveSupportSuite.scala, to verify bucket file names and each row in each bucket is written properly.
      * Tested by Lyft Spark team (Shashank Pedamallu) to read Spark-written bucketed table from Trino, Spark and Hive.
      
      Closes #33432 from c21/hive-bucket-v1.
      Authored-by: NCheng Su <chengsu@fb.com>
      Signed-off-by: NWenchen Fan <wenchen@databricks.com>
      4a34db9a
    • H
      [SPARK-36788][SQL] Change log level of AQE for non-supported plans from warning to debug · 917d7dad
      Hyukjin Kwon 提交于
      ### What changes were proposed in this pull request?
      
      This PR suppresses the warnings for plans where AQE is not supported. Currently we show the warnings such as:
      
      ```
      org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan: spark.sql.adaptive.enabled is enabled but is not supported for query: Sort [a#324881 DESC NULLS FIRST], true, 23
      +- Scan ExistingRDD[a#324881]
      ```
      
      for every plan that AQE is not supported.
      
      ### Why are the changes needed?
      
      It's too noisy now. Below is the example of `SortSuite` run:
      
      ```
      14:51:40.675 WARN org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan: spark.sql.adaptive.enabled is enabled but is not supported for query: Sort [a#324881 DESC NULLS FIRST], true, 23
      +- Scan ExistingRDD[a#324881]
      .
      [info] - sorting on DayTimeIntervalType(0,1) with nullable=true, sortOrder=List('a DESC NULLS FIRST) (785 milliseconds)
      14:51:41.416 WARN org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan: spark.sql.adaptive.enabled is enabled but is not supported for query: ReferenceSort [a#324884 ASC NULLS FIRST], true
      +- Scan ExistingRDD[a#324884]
      .
      14:51:41.467 WARN org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan: spark.sql.adaptive.enabled is enabled but is not supported for query: Sort [a#324884 ASC NULLS FIRST], true, 23
      +- Scan ExistingRDD[a#324884]
      .
      [info] - sorting on DayTimeIntervalType(0,1) with nullable=false, sortOrder=List('a ASC NULLS FIRST) (796 milliseconds)
      14:51:42.210 WARN org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan: spark.sql.adaptive.enabled is enabled but is not supported for query: ReferenceSort [a#324887 ASC NULLS LAST], true
      +- Scan ExistingRDD[a#324887]
      .
      14:51:42.259 WARN org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan: spark.sql.adaptive.enabled is enabled but is not supported for query: Sort [a#324887 ASC NULLS LAST], true, 23
      +- Scan ExistingRDD[a#324887]
      .
      [info] - sorting on DayTimeIntervalType(0,1) with nullable=false, sortOrder=List('a ASC NULLS LAST) (797 milliseconds)
      14:51:43.009 WARN org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan: spark.sql.adaptive.enabled is enabled but is not supported for query: ReferenceSort [a#324890 DESC NULLS LAST], true
      +- Scan ExistingRDD[a#324890]
      .
      14:51:43.061 WARN org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan: spark.sql.adaptive.enabled is enabled but is not supported for query: Sort [a#324890 DESC NULLS LAST], true, 23
      +- Scan ExistingRDD[a#324890]
      .
      [info] - sorting on DayTimeIntervalType(0,1) with nullable=false, sortOrder=List('a DESC NULLS LAST) (848 milliseconds)
      14:51:43.857 WARN org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan: spark.sql.adaptive.enabled is enabled but is not supported for query: ReferenceSort [a#324893 DESC NULLS FIRST], true
      +- Scan ExistingRDD[a#324893]
      .
      14:51:43.903 WARN org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan: spark.sql.adaptive.enabled is enabled but is not supported for query: Sort [a#324893 DESC NULLS FIRST], true, 23
      +- Scan ExistingRDD[a#324893]
      .
      [info] - sorting on DayTimeIntervalType(0,1) with nullable=false, sortOrder=List('a DESC NULLS FIRST) (827 milliseconds)
      14:51:44.682 WARN org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan: spark.sql.adaptive.enabled is enabled but is not supported for query: ReferenceSort [a#324896 ASC NULLS FIRST], true
      +- Scan ExistingRDD[a#324896]
      .
      14:51:44.748 WARN org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan: spark.sql.adaptive.enabled is enabled but is not supported for query: Sort [a#324896 ASC NULLS FIRST], true, 23
      +- Scan ExistingRDD[a#324896]
      .
      [info] - sorting on YearMonthIntervalType(0,1) with nullable=true, sortOrder=List('a ASC NULLS FIRST) (565 milliseconds)
      14:51:45.248 WARN org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan: spark.sql.adaptive.enabled is enabled but is not supported for query: ReferenceSort [a#324899 ASC NULLS LAST], true
      +- Scan ExistingRDD[a#324899]
      .
      14:51:45.312 WARN org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan: spark.sql.adaptive.enabled is enabled but is not supported for query: Sort [a#324899 ASC NULLS LAST], true, 23
      +- Scan ExistingRDD[a#324899]
      .
      [info] - sorting on YearMonthIntervalType(0,1) with nullable=true, sortOrder=List('a ASC NULLS LAST) (591 milliseconds)
      14:51:45.841 WARN org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan: spark.sql.adaptive.enabled is enabled but is not supported for query: ReferenceSort [a#324902 DESC NULLS LAST], true
      +- Scan ExistingRDD[a#324902]
      .
      14:51:45.905 WARN org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan: spark.sql.adaptive.enabled is enabled but is not supported for query: Sort [a#324902 DESC NULLS LAST], true, 23
      +- Scan ExistingRDD[a#324902]
      .
      ```
      
      ### Does this PR introduce _any_ user-facing change?
      
      Yes, it will show less warnings to users. Note that AQE is enabled by default from Spark 3.2, see SPARK-33679
      
      ### How was this patch tested?
      
      Manually tested via unittests.
      
      Closes #34026 from HyukjinKwon/minor-log-level.
      Authored-by: NHyukjin Kwon <gurwls223@apache.org>
      Signed-off-by: NHyukjin Kwon <gurwls223@apache.org>
      917d7dad
    • W
      [SPARK-36783][SQL] ScanOperation should not push Filter through nondeterministic Project · dfd5237c
      Wenchen Fan 提交于
      ### What changes were proposed in this pull request?
      
      `ScanOperation` collects adjacent Projects and Filters. The caller side always assume that the collected Filters should run before collected Projects, which means `ScanOperation` effectively pushes Filter through Project.
      
      Following `PushPredicateThroughNonJoin`, we should not push Filter through nondeterministic Project. This PR fixes `ScanOperation` to follow this rule.
      
      ### Why are the changes needed?
      
      Fix a bug that violates the semantic of nondeterministic expressions.
      
      ### Does this PR introduce _any_ user-facing change?
      
      Most likely no change, but in some cases, this is a correctness bug fix which changes the query result.
      
      ### How was this patch tested?
      
      existing tests
      
      Closes #34023 from cloud-fan/scan.
      Authored-by: NWenchen Fan <wenchen@databricks.com>
      Signed-off-by: NWenchen Fan <wenchen@databricks.com>
      dfd5237c
    • B
      [SPARK-36773][SQL][TEST] Fixed unit test to check the compression for parquet · 3712502d
      BelodengKlaus 提交于
      ### What changes were proposed in this pull request?
      Change the unit test for parquet compression
      
      ### Why are the changes needed?
      To check the compression for parquet
      
      ### Does this PR introduce _any_ user-facing change?
      no
      
      ### How was this patch tested?
      change unit test
      
      Closes #34012 from BelodengKlaus/spark36773.
      Authored-by: NBelodengKlaus <jp.xiong520@gmail.com>
      Signed-off-by: NHyukjin Kwon <gurwls223@apache.org>
      3712502d
    • D
      [SPARK-36779][PYTHON] Fix when list of data type tuples has len = 1 · 8f895e9e
      dgd-contributor 提交于
      ### What changes were proposed in this pull request?
      
      Fix when list of data type tuples has len = 1
      
      ### Why are the changes needed?
      Fix when list of data type tuples has len = 1
      
      ``` python
      >>> ps.DataFrame[("a", int), [int]]
      typing.Tuple[pyspark.pandas.typedef.typehints.IndexNameType, int]
      
      >>> ps.DataFrame[("a", int), [("b", int)]]
      Traceback (most recent call last):
        File "<stdin>", line 1, in <module>
        File "/Users/dgd/spark/python/pyspark/pandas/frame.py", line 11998, in __class_getitem__
          return create_tuple_for_frame_type(params)
        File "/Users/dgd/spark/python/pyspark/pandas/typedef/typehints.py", line 685, in create_tuple_for_frame_type
          return Tuple[extract_types(params)]
        File "/Users/dgd/spark/python/pyspark/pandas/typedef/typehints.py", line 755, in extract_types
          return (index_type,) + extract_types(data_types)
        File "/Users/dgd/spark/python/pyspark/pandas/typedef/typehints.py", line 770, in extract_types
          raise TypeError(
      TypeError: Type hints should be specified as one of:
        - DataFrame[type, type, ...]
        - DataFrame[name: type, name: type, ...]
        - DataFrame[dtypes instance]
        - DataFrame[zip(names, types)]
        - DataFrame[index_type, [type, ...]]
        - DataFrame[(index_name, index_type), [(name, type), ...]]
        - DataFrame[dtype instance, dtypes instance]
        - DataFrame[(index_name, index_type), zip(names, types)]
      However, got [('b', <class 'int'>)].
      ```
      
      ### Does this PR introduce _any_ user-facing change?
      
      After:
      ``` python
      >>> ps.DataFrame[("a", int), [("b", int)]]
      typing.Tuple[pyspark.pandas.typedef.typehints.IndexNameType, pyspark.pandas.typedef.typehints.NameType]
      
      ```
      
      ### How was this patch tested?
      exist test
      
      Closes #34019 from dgd-contributor/fix_when_list_of_tuple_data_type_have_len=1.
      Authored-by: Ndgd-contributor <dgd_contributor@viettel.com.vn>
      Signed-off-by: NHyukjin Kwon <gurwls223@apache.org>
      8f895e9e