스파크 -CSV 파일을 DataFrame으로로드 하시겠습니까?
스파크에서 CSV를 읽고 DataFrame으로 변환하여 HDFS에 저장하고 싶습니다. df.registerTempTable("table_name")
나는 시도했다 :
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
내가 얻은 오류 :
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Apache Spark에서 CSV 파일을 DataFrame으로로드하는 올바른 명령은 무엇입니까?
spark-csv는 핵심 Spark 기능의 일부이며 별도의 라이브러리가 필요하지 않습니다. 예를 들어
df = spark.read.format("csv").option("header", "true").load("csvfile.csv")
scala에서 (이 csv의 경우 ",", tsv의 경우 "\ t"등의 모든 형식 구분 기호에 적용됩니다) val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")
CSV 구문 분석 및 Spark 2.x를 사용하여 DataFrame / DataSet으로로드
먼저 기본적으로SparkSession
object- 를 초기화 합니다.spark
val spark = org.apache.spark.sql.SparkSession.builder
.master("local")
.appName("Spark CSV Reader")
.getOrCreate;
다음 방법 중 하나를 사용하여 CSV를
DataFrame/DataSet
1. 프로그래밍 방식으로 수행
val df = spark.read
.format("csv")
.option("header", "true") //first line in file has headers
.option("mode", "DROPMALFORMED")
.load("hdfs:///csv/file/dir/file.csv")
2. 이 SQL 방식으로 할 수 있습니다.
val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")
의존성 :
"org.apache.spark" % "spark-core_2.11" % 2.0.0,
"org.apache.spark" % "spark-sql_2.11" % 2.0.0,
스파크 버전 <2.0
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("csv/file/path");
대표 :
"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,
Hadoop은 2.6이고 Spark는 1.6이며 "데이터 브릭"패키지가 없습니다.
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;
val csv = sc.textFile("/path/to/file.csv")
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_(0) != header(0))
val rdd = data.map(row => Row(row(0),row(1).toInt))
val schema = new StructType()
.add(StructField("id", StringType, true))
.add(StructField("val", IntegerType, true))
val df = sqlContext.createDataFrame(rdd, schema)
Spark 2.0에서 CSV를 읽는 방법은 다음과 같습니다.
val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder
.config(conf = conf)
.appName("spark session example")
.getOrCreate()
val path = "/Users/xxx/Downloads/usermsg.csv"
val base_df = sparkSession.read.option("header","true").
csv(path)
Java 1.8 에서이 코드는 CSV 파일을 읽기 위해 완벽하게 작동합니다.
POM.xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.4.0</version>
</dependency>
자바
SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);
Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
//("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
System.out.println("========== Print Schema ============");
df.printSchema();
System.out.println("========== Print Data ==============");
df.show();
System.out.println("========== Print title ==============");
df.select("title").show();
Penny의 Spark 2 예제는 spark2에서 수행하는 방법입니다. 한 가지 더 트릭이 있습니다. 옵션 inferSchema
을 다음과 같이 설정하여 데이터를 초기 스캔하여 헤더를 생성하십시오.true
여기에서 spark
설정 한 Spark 세션이라고 가정하면 S3에서 호스트하는 모든 Landsat 이미지의 CSV 파일에로드하는 작업입니다.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
val csvdata = spark.read.options(Map(
"header" -> "true",
"ignoreLeadingWhiteSpace" -> "true",
"ignoreTrailingWhiteSpace" -> "true",
"timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
"inferSchema" -> "true",
"mode" -> "FAILFAST"))
.csv("s3a://landsat-pds/scene_list.gz")
나쁜 소식은 다음과 같습니다. 이것은 파일을 통해 스캔을 트리거합니다. 이 20MB 이상의 압축 CSV 파일과 같은 큰 파일의 경우 장거리 연결시 30 초가 걸릴 수 있습니다. 명심하십시오. 스키마가 들어 오면 수동으로 코딩하는 것이 좋습니다.
(코드 스 니펫 Apache Software License 2.0은 모든 모호함을 피하기 위해 라이선스가 부여되었습니다. S3 통합의 데모 / 통합 테스트로 수행 한 작업)
CSV 파일을 구문 분석하는 데 많은 어려움이 있습니다. 파일 크기가 더 크면 계속 합산되고 열 값에 영어가 아닌 / 이스케이프 / 구분자 / 기타 문자가 있으면 구문 분석 오류가 발생할 수 있습니다.
마법은 사용되는 옵션에 있습니다. 나를 위해 일했고 대부분의 가장자리 사례를 다루기를 희망하는 것은 아래 코드에 있습니다.
### Create a Spark Session
spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()
### Note the options that are used. You may have to tweak these in case of error
html_df = spark.read.csv(html_csv_file_path,
header=True,
multiLine=True,
ignoreLeadingWhiteSpace=True,
ignoreTrailingWhiteSpace=True,
encoding="UTF-8",
sep=',',
quote='"',
escape='"',
maxColumns=2,
inferSchema=True)
도움이 되었기를 바랍니다. 자세한 내용은 다음을 참조하십시오 : PySpark 2를 사용하여 HTML 소스 코드가있는 CSV 읽기
참고 : 위의 코드는 Spark 2 API에서 가져온 것으로 CSV 파일 읽기 API는 Spark 설치 가능 패키지와 함께 제공됩니다.
참고 : PySpark는 Spark 용 Python 래퍼이며 Scala / Java와 동일한 API를 공유합니다.
scala 2.11 및 Apache 2.0 이상으로 jar를 빌드하는 경우.
sqlContext
또는 sparkContext
개체 를 만들 필요가 없습니다 . SparkSession
객체 하나만으로 모든 요구 사항을 충족 할 수 있습니다.
다음은 잘 작동하는 mycode입니다.
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.log4j.{Level, LogManager, Logger}
object driver {
def main(args: Array[String]) {
val log = LogManager.getRootLogger
log.info("**********JAR EXECUTION STARTED**********")
val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate()
val df = spark.read.format("csv")
.option("header", "true")
.option("delimiter","|")
.option("inferSchema","true")
.load("d:/small_projects/spark/test.pos")
df.show()
}
}
당신이 클러스터에서 실행중인 경우 단지 변경 .master("local")
을 .master("yarn")
정의하는 동안 sparkBuilder
객체를
Spark Doc은이를 다룹니다 : https://spark.apache.org/docs/2.2.0/sql-programming-guide.html
기본 파일 형식은 parquet with spark.read .. 및 파일 읽기 csv로 인해 예외가 발생합니다. 사용하려는 API로 csv 형식을 지정하십시오.
CSV 파일을로드하고 결과를 DataFrame으로 반환합니다.
df=sparksession.read.option("header", true).csv("file_name.csv")
Dataframe은 파일을 csv 형식으로 처리했습니다.
Spark 2.0 이상을 사용하는 경우 이것을 시도하십시오.
For non-hdfs file:
df = spark.read.csv("file:///csvfile.csv")
For hdfs file:
df = spark.read.csv("hdfs:///csvfile.csv")
For hdfs file (with different delimiter than comma:
df = spark.read.option("delimiter","|")csv("hdfs:///csvfile.csv")
참고 :-구분 된 파일에 대해 작동합니다. 옵션 ( "구분자",)을 사용하여 값을 변경하십시오.
이것이 도움이되기를 바랍니다.
참고 URL : https://stackoverflow.com/questions/29704333/spark-load-csv-file-as-dataframe
'IT' 카테고리의 다른 글
사용자를 위해 mysql 서버에 원격 액세스 권한을 부여하는 방법은 무엇입니까? (0) | 2020.07.21 |
---|---|
TeX / LaTeX의 후임자가 보입니까? (0) | 2020.07.21 |
MATLAB 함수의 일부 반환 값을 우아하게 무시하는 방법은 무엇입니까? (0) | 2020.07.21 |
지원 (v21) 사용하여 환경 설정 화면 만들기 (0) | 2020.07.21 |
hashCode는 무엇에 사용인가? (0) | 2020.07.21 |