Spark에서 출력 디렉터리를 사용하는 방법
매분 데이터 세트를 생성하는 스파크 스트리밍 응용 프로그램이 있습니다. 처리 된 데이터의 결과를 저장 / 기록해야합니다.
org.apache.hadoop.mapred.FileAlreadyExistsException 데이터 세트를 계속 쓰려고하면 실행이 중지됩니다.
Spark 속성을 설정 set("spark.files.overwrite","true")
했지만 운이 없습니다.
Spark에서 파일을 사용하거나 미리 삭제하는 방법은 무엇입니까?
업데이트 : 사용 제안 Dataframes
및 ... .write.mode(SaveMode.Overwrite) ...
.
이전 버전의 경우
yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext(yourSparkConf)
1.1.0에서는 --conf 플래그와 함께 spark-submit 펼쳐보기를 사용하여 conf 설정을 이용할 수 있습니다.
경고 (이전 버전) : @piggybox에 따르면 Spark에는 파일을 작성하는 데 필요한 파일 만 프로그램 작성되는 버그 part-
가있는 다른 파일은 제거되지 않은 상태로 유지됩니다.
이후 df.save(path, source, mode)
사용되지 않습니다, ( http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame )
사용 df.write.format(source).mode("overwrite").save(path)
df.write이 DataFrameWriter입니다
'source'는 ( "com.databricks.spark.avro"| "parquet"| "json") 일 수 있습니다.
매개 변수에 대한 문서는 spark.files.overwrite
" SparkContext.addFile()
대상 파일이 존재하고 그 내용이 소스의 내용과 일치하지 않을 때 추가 된 파일을 쓸지 여부"라고 사실 . 따라서 saveAsTextFiles 메서드는 영향을주지 않습니다.
파일을 저장하기 전에 다음을 수행 할 수 있습니다.
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }
Aas는 http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696에 설명되어 있습니다. 있습니다 . HTML
pyspark.sql.DataFrame.save 의 문서 (현재 1.3.1에서)를 사용할 수 있습니다 mode='overwrite'
.
myDataFrame.save(path='myPath', source='parquet', mode='overwrite')
나는 이것이 남은 파티션 파일도 있다는 것을 확인했습니다. 따라서 원래 10 개의 파일 / 파일이 존재하는 파일이 6 개의 파티션 만있는 DataFrame으로 파일 경우 결과 폴더가 6 개의 파티션 / 파일이 있습니다.
참고 항목 스파크 SQL 설명서를 모드 옵션에 대한 자세한 내용은.
df.write.mode('overwrite').parquet("/output/folder/path")
사용하여 마루 파일을 사용하려면 작동합니다. 이 스파크 1.6.2에 있습니다. API는 이후 버전에서 다를 수 있습니다.
val jobName = "WordCount";
//overwrite the output directory in spark set("spark.hadoop.validateOutputSpecs", "false")
val conf = new
SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false");
val sc = new SparkContext(conf)
이 오버로드 된 버전의 저장 기능은 저에게입니다.
yourDF.save (outputPath, org.apache.spark.sql.SaveMode.valueOf ( "덮어 쓰기"))
위의 예는 기존 폴더를 덮어 씁니다. savemode는 다음 매개 변수도 사용할 수 있습니다 ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ) :
Append : Append 모드는 DataFrame을 데이터 소스에 저장할 때 데이터 / 테이블이 이미 존재하는 경우 DataFrame의 내용이 기존 데이터에 추가되는 것을 의미합니다.
ErrorIfExists : ErrorIfExists 모드는 DataFrame을 데이터 소스에 저장할 때 데이터가 이미있는 경우 예외가 throw 될 것으로 예상됨을 의미합니다.
Ignore : Ignore 모드는 DataFrame을 데이터 소스에 저장할 때 데이터가 이미 존재하는 경우 저장 작업이 DataFrame의 내용을 저장하지 않고 기존 데이터를 변경하지 않을 것으로 예상됨을 의미합니다.
사용자 지정 출력 형식을 기꺼이 사용하려는 경우 RDD에서도 원하는 동작을 얻을 수 있습니다.
다음 클래스를 살펴보십시오 : FileOutputFormat , FileOutputCommitter
파일 출력 형식에는 출력 디렉토리가 존재하는지 확인하는 checkOutputSpecs라는 메소드가 있습니다. FileOutputCommitter에는 일반적으로 임시 디렉토리에서 최종 위치로 데이터를 전송하는 commitJob이 있습니다.
아직 확인할 수 없었지만 (자유 시간이 얼마 남지 않으면 바로 할 것입니다) 이론적으로 : FileOutputFormat을 확장하고 checkOutputSpecs를 디렉터리에 예외를 발생시키지 않는 메서드로 재정의하면 이미 존재하고 사용자 정의 출력 커미터의 commitJob 메소드는 RDD로 원하는 동작을 달성 할 수있는 것보다 내가 원하는 로직 (예 : 일부 파일 재정의, 다른 파일 추가)을 수행합니다.
출력 형식은 saveAsNewAPIHadoopFile (실제로 파일을 저장하기 위해 호출되는 saveAsTextFile 메소드)에 전달됩니다. 그리고 출력 커미터는 애플리케이션 수준에서 구성됩니다.
참고 URL : https://stackoverflow.com/questions/27033823/how-to-overwrite-the-output-directory-in-spark
'IT' 카테고리의 다른 글
Node.js / Express.js 앱은 포트 3000에서만 작동합니다. (0) | 2020.09.07 |
---|---|
NullPointerException이 발생하지 않는 이유는 무엇입니까? (0) | 2020.09.07 |
"… 모듈이 아닌 것으로 해석되고 해석이 구성을 사용하여 수 없음"은 무엇을 의미합니까? (0) | 2020.09.07 |
Eclipse에서 테스트하는 동안 -D 시스템 속성을 전달하는 방법은 무엇입니까? (0) | 2020.09.07 |
Objective-C 메소드 이름의 마지막 부분이 인수를 가져와야하는 이유는 무엇입니까? (0) | 2020.09.07 |