IT

Spark에서 출력 디렉터리를 사용하는 방법

lottoking 2020. 9. 7. 08:24
반응형

Spark에서 출력 디렉터리를 사용하는 방법


매분 데이터 세트를 생성하는 스파크 스트리밍 응용 프로그램이 있습니다. 처리 된 데이터의 결과를 저장 / 기록해야합니다.

org.apache.hadoop.mapred.FileAlreadyExistsException 데이터 세트를 계속 쓰려고하면 실행이 중지됩니다.

Spark 속성을 설정 set("spark.files.overwrite","true")했지만 운이 없습니다.

Spark에서 파일을 사용하거나 미리 삭제하는 방법은 무엇입니까?


업데이트 : 사용 제안 Dataframes... .write.mode(SaveMode.Overwrite) ....

이전 버전의 경우

yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext(yourSparkConf)

1.1.0에서는 --conf 플래그와 함께 spark-submit 펼쳐보기를 사용하여 conf 설정을 이용할 수 있습니다.

경고 (이전 버전) : @piggybox에 따르면 Spark에는 파일을 작성하는 데 필요한 파일 만 프로그램 작성되는 버그 part-가있는 다른 파일은 제거되지 않은 상태로 유지됩니다.


이후 df.save(path, source, mode)사용되지 않습니다, ( http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame )

사용 df.write.format(source).mode("overwrite").save(path)
df.write이 DataFrameWriter입니다

'source'는 ( "com.databricks.spark.avro"| "parquet"| "json") 일 수 있습니다.


매개 변수에 대한 문서는 spark.files.overwrite" SparkContext.addFile()대상 파일이 존재하고 그 내용이 소스의 내용과 일치하지 않을 때 추가 된 파일을 쓸지 여부"라고 사실 . 따라서 saveAsTextFiles 메서드는 영향을주지 않습니다.

파일을 저장하기 전에 다음을 수행 할 수 있습니다.

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }

Aas는 http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696에 설명되어 있습니다. 있습니다 . HTML


pyspark.sql.DataFrame.save 문서 (현재 1.3.1에서)를 사용할 수 있습니다 mode='overwrite'.

myDataFrame.save(path='myPath', source='parquet', mode='overwrite')

나는 이것이 남은 파티션 파일도 있다는 것을 확인했습니다. 따라서 원래 10 개의 파일 / 파일이 존재하는 파일이 6 개의 파티션 만있는 DataFrame으로 파일 경우 결과 폴더가 6 개의 파티션 / 파일이 있습니다.

참고 항목 스파크 SQL 설명서를 모드 옵션에 대한 자세한 내용은.


df.write.mode('overwrite').parquet("/output/folder/path")사용하여 마루 파일을 사용하려면 작동합니다. 이 스파크 1.6.2에 있습니다. API는 이후 버전에서 다를 수 있습니다.


  val jobName = "WordCount";
  //overwrite the output directory in spark  set("spark.hadoop.validateOutputSpecs", "false")
  val conf = new 
  SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false");
  val sc = new SparkContext(conf)

이 오버로드 된 버전의 저장 기능은 저에게입니다.

yourDF.save (outputPath, org.apache.spark.sql.SaveMode.valueOf ( "덮어 쓰기"))

위의 예는 기존 폴더를 덮어 씁니다. savemode는 다음 매개 변수도 사용할 수 있습니다 ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ) :

Append : Append 모드는 DataFrame을 데이터 소스에 저장할 때 데이터 / 테이블이 이미 존재하는 경우 DataFrame의 내용이 기존 데이터에 추가되는 것을 의미합니다.

ErrorIfExists : ErrorIfExists 모드는 DataFrame을 데이터 소스에 저장할 때 데이터가 이미있는 경우 예외가 throw 될 것으로 예상됨을 의미합니다.

Ignore : Ignore 모드는 DataFrame을 데이터 소스에 저장할 때 데이터가 이미 존재하는 경우 저장 작업이 DataFrame의 내용을 저장하지 않고 기존 데이터를 변경하지 않을 것으로 예상됨을 의미합니다.


사용자 지정 출력 형식을 기꺼이 사용하려는 경우 RDD에서도 원하는 동작을 얻을 수 있습니다.

다음 클래스를 살펴보십시오 : FileOutputFormat , FileOutputCommitter

파일 출력 형식에는 출력 디렉토리가 존재하는지 확인하는 checkOutputSpecs라는 메소드가 있습니다. FileOutputCommitter에는 일반적으로 임시 디렉토리에서 최종 위치로 데이터를 전송하는 commitJob이 있습니다.

아직 확인할 수 없었지만 (자유 시간이 얼마 남지 않으면 바로 할 것입니다) 이론적으로 : FileOutputFormat을 확장하고 checkOutputSpecs를 디렉터리에 예외를 발생시키지 않는 메서드로 재정의하면 이미 존재하고 사용자 정의 출력 커미터의 commitJob 메소드는 RDD로 원하는 동작을 달성 할 수있는 것보다 내가 원하는 로직 (예 : 일부 파일 재정의, 다른 파일 추가)을 수행합니다.

출력 형식은 saveAsNewAPIHadoopFile (실제로 파일을 저장하기 위해 호출되는 saveAsTextFile 메소드)에 전달됩니다. 그리고 출력 커미터는 애플리케이션 수준에서 구성됩니다.

참고 URL : https://stackoverflow.com/questions/27033823/how-to-overwrite-the-output-directory-in-spark

반응형