IT

DataFrame의 파티셔닝을 정의하는 방법은 무엇입니까?

lottoking 2020. 7. 22. 07:47
반응형

DataFrame의 파티셔닝을 정의하는 방법은 무엇입니까?


Spark 1.4.0에서 Spark SQL 및 DataFrames를 사용하기 시작했습니다. 스칼라의 DataFrames에서 사용자 정의 파티 셔 너를 정의하고 싶지만 어떻게해야합니까?

내가 작업하고있는 데이터 테이블 중 하나에는 계정별로 다음 예제의 silimar 트랜잭션 목록이 포함되어 있습니다.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

대부분의 계산이 계정 거래간에 발생합니다. 따라서 계정의 모든 트랜잭션이 동일한 Spark 파티션에 기능 데이터를 분할하고 싶습니다.

그러나 저희가 제시하는 방법을 묻습니다. DataFrame 클래스에는 만들 파티션 수를 수있는 'repartition (Int)'이라는 메서드가 있습니다. 그러나 RDD에 대한 수있는 것과 같이 DataFrame에 대한 사용자 정의 파티 셔 너를 정의하는 데 사용할 수있는 방법이 없습니다.

소스 데이터는 Parquet에 저장됩니다. Parquet에 DataFrame을 쓸 때 분할 할 열을 수 있으므로 Parquet에게 '계정 열로 데이터를 분할하도록 지시 할 수 있습니다. 그러나 수백만 개의 개의 계정이있을 수 있으며 개별적으로 이해하면 각 계정에 대해 말할 수 있습니다.

계정에 대한 모든 데이터가 동일한 파티션에 Spark 가이 DataFrame을 분할하는 방법이 있습니까?


스파크> = 2.3.0

SPARK- 22614는 범위 분할을 노출합니다.

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

SPARK-22389Data Source API v2 에서 외부 형식 파티셔닝을 제공합니다 .

스파크> = 1.6.0

Spark> = 1.6에서는 쿼리 및 캐싱에 열을 기준으로 파티셔닝을 사용할 수 있습니다. 방법을 사용하여 SPARK-11410SPARK-4849참조하십시오 repartition.

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

RDDs스파크 Dataset( Dataset[Row]일명 포함 DataFrame) 와 달리 현재는 user-정의 파티 셔 너를 사용할 수 없습니다. 일반적으로 인공적인 파티셔닝 장소를 작성하여 이러한 것을 제공합니다.

스파크 <1.6.0 :

할 수있는 전에 한 가지 입력 데이터를 생성하기 사전 파티션하는 것입니다. DataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

이후 DataFrame에서 작성 RDD파티션 레이아웃 기존 단지 간단한지도 상 * 보존한다 :

assert(df.rdd.partitions == partitioned.partitions)

기존 파티션을 다시 파티션 할 수있는 것과 같은 방법 DataFrame:

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

그래서 불가능하지 않은 것 같습니다. 그것이 의미가 있다면 문제는 남아 있습니다. 나는 대부분의 경우 다음과 같지 않다고 주장 할 것입니다.

  1. 재 파티셔닝은 비용이 많이 드는 프로세스입니다. 일반적인 시나리오에서 대부분의 데이터는 직렬화, 셔플 및 역 직렬화되어야합니다. 반면에 사전 분할 된 데이터의 이점을 얻을 수있는 작업 수는 상대적으로 적으며 내부 API가이 속성을 활용하도록 설계되지 않은 경우 더 제한됩니다.

    • 일부 시나리오에서는 조인하지만 내부 지원이 필요합니다.
    • 창 함수는 일치하는 파티 셔너로 호출합니다. 위와 동일하며 단일 창 정의로 제한됩니다. 하지만 이미 내부적으로 분할되어 있으므로 사전 분할이 중복 될 수 있습니다.
    • 간단한 집계 GROUP BY-임시 버퍼 **의 메모리 사용량을 줄일 수 있지만 전체 비용은 훨씬 더 높습니다. groupByKey.mapValues(_.reduce)(현재 동작) 대 reduceByKey(사전 분할) 과 다소 동일합니다 . 실제로 유용 할 것 같지 않습니다.
    • 로 데이터 압축 SqlContext.cacheTable. 실행 길이 인코딩을 사용하는 것처럼 보이므로 적용 OrderedRDDFunctions.repartitionAndSortWithinPartitions하면 압축률이 향상 될 수 있습니다.
  2. 성능은 키 배포에 크게 좌우됩니다. 치우친 경우 리소스 사용률이 최적화되지 않습니다. 최악의 시나리오에서는 작업을 전혀 완료 할 수 없습니다.

  3. 높은 수준의 선언적 API 사용의 요점은 낮은 수준의 구현 세부 정보에서 자신을 격리하는 것입니다. @dwysakowicz@RomiKuntsman이 이미 언급했듯이 최적화는 Catalyst Optimizer 의 작업입니다 . 그것은 매우 정교한 짐승이며 내부에 훨씬 더 깊이 들어 가지 않고도 쉽게 향상시킬 수 있을지 의심 스럽습니다.

관련 개념

JDBC 소스로 파티션하기 :

JDBC 데이터 소스는 predicates인수를 지원 합니다. 다음과 같이 사용할 수 있습니다.

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

술어 당 하나의 JDBC 파티션을 작성합니다. 개별 조건자를 사용하여 만든 집합이 분리되지 않은 경우 결과 테이블에 중복 항목이 표시됩니다.

partitionBy방법DataFrameWriter :

Spark DataFrameWriterpartitionBy쓰기시 데이터를 "분할"하는 데 사용할 수있는 방법을 제공합니다 . 제공된 열 세트를 사용하여 쓰기시 데이터를 분리합니다.

val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")

이를 통해 키 기반 쿼리에 대해 읽기시 술어 푸시 다운을 사용할 수 있습니다.

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

그러나 DataFrame.repartition. 특히 다음과 같은 집계 :

val cnts = df1.groupBy($"k").sum()

여전히 다음이 필요합니다 TungstenExchange.

cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

bucketBy메소드DataFrameWriter (스파크> = 2.0) :

bucketBy와 유사한 응용 프로그램이 partitionBy있지만 테이블 ( saveAsTable) 에만 사용할 수 있습니다 . 버킷 정보는 조인을 최적화하는 데 사용할 수 있습니다.

// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")

// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
//    :- *Sort [k#41 ASC NULLS FIRST], false, 0
//    :  +- *Project [k#41, v#42]
//    :     +- *Filter isnotnull(k#41)
//    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
//    +- *Sort [k#46 ASC NULLS FIRST], false, 0
//       +- *Project [k#46, v2#47]
//          +- *Filter isnotnull(k#46)
//             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>

* 파티션 레이아웃 이란 데이터 배포만을 의미합니다. partitionedRDD에는 더 이상 파티 셔 너가 없습니다. ** 초기 예측이 없다고 가정합니다. 집계가 열의 작은 하위 집합 만 포함하는 경우 이득이 전혀 없습니다.


Spark <1.6 HiveContext에서 일반 오래된 SqlContext것이 아닌를 생성하는 경우 HiveQL을 사용할 수 있습니다 DISTRIBUTE BY colX...(N 감속기 각각이 x의 겹치지 않는 범위를 가져옴) & CLUSTER BY colX...(배포 기준 및 정렬 기준의 단축키).

df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")

이것이 Spark DF api와 어떻게 맞는지 잘 모르겠습니다. 이러한 키워드는 일반 SqlContext에서 지원되지 않습니다 (HiveContext를 사용하기 위해 하이브 메타 저장소가 필요하지 않음).

편집 : Spark 1.6 이상은 이제 네이티브 DataFrame API에 있습니다.


다음에서 반환 된 DataFrame을 사용합니다.

yourDF.orderBy(account)

partitionByDataFrame에서 사용 하는 명시적인 방법은없고 PairRDD에서만 사용할 수 있지만 DataFrame을 정렬 할 때 LogicalPlan에서이를 사용하므로 각 계정에 대해 계산해야 할 때 도움이됩니다.

계정별로 분할하려는 데이터 프레임으로 똑같은 문제를 발견했습니다. "계정에 대한 모든 트랜잭션이 동일한 Spark 파티션에 있도록 데이터를 분할하고 싶다"고 말할 때 규모와 성능을 위해이를 원하지만 코드는 이에 의존하지 않는다고 가정합니다 (예 : mapPartitions()등), 맞습니까?


그래서 어떤 종류의 대답으로 시작하려면 :)-당신은 할 수 없습니다

나는 전문가는 아니지만 DataFrames를 이해하는 한 rdd와 같지 않으며 DataFrame에는 Partitioner와 같은 것이 없습니다.

일반적으로 DataFrame의 아이디어는 이러한 문제 자체를 처리하는 또 다른 수준의 추상화를 제공하는 것입니다. DataFrame에 대한 쿼리는 RDD에 대한 작업으로 추가 변환되는 논리적 계획으로 변환됩니다. 제안한 파티셔닝은 아마도 자동으로 적용되거나 적어도 적용되어야합니다.

SparkSQL이 어떤 종류의 최적의 작업을 제공 할 것이라고 신뢰하지 않는 경우 의견에서 제안한대로 DataFrame을 RDD [Row]로 변환 할 수 있습니다.


나는 RDD를 사용하여 이것을 할 수 있었다. 그러나 이것이 당신에게 적합한 해결책인지 모르겠습니다. DF를 RDD로 사용할 수 있으면 신청 repartitionAndSortWithinPartitions하여 데이터의 사용자 지정 재 파티션을 수행 할 수 있습니다 .

다음은 내가 사용한 샘플입니다.

class DatePartitioner(partitions: Int) extends Partitioner {

  override def getPartition(key: Any): Int = {
    val start_time: Long = key.asInstanceOf[Long]
    Objects.hash(Array(start_time)) % partitions
  }

  override def numPartitions: Int = partitions
}

myRDD
  .repartitionAndSortWithinPartitions(new DatePartitioner(24))
  .map { v => v._2 }
  .toDF()
  .write.mode(SaveMode.Overwrite)

참고 URL : https://stackoverflow.com/questions/30995699/how-to-define-partitioning-of-dataframe

반응형