spark shell --driver-class-path /usr/share/java/postgresql93-jdbc.jar
/*
/*
DOMAIN_NAME : ip 혹은 도메인
DATABASE_NAME : 접속할 Database 이름
USER_NAME : 유저 명
PASSWORD : 비밀번호
변수를 만들어서 입력 / 하드코딩
*/
*/
val url = "jdbc:postgresql://DOMAIN_NAME/DATABASE_NAME?user=USER_NAME&password=PASSWORD"
/*
TABLE_NAME : 데이터를 가져올 테이블 명
rows : RDD 형태로 반환 (FROM 절을 구현한것)
*/
*/
val rows = sqlContext.load("jdbc", Map("url" -> url ,"dbtable"->"TABLE_NAME"))
/*
filter 함수 에서 WHERE 절을 구현
filter 함수 에서 WHERE 절을 구현
select 함수 에서 SELECT 절을 구현
*/
*/
row.filter("COLUMN_NAME like 'test'").select("col1","col2")
/*
다시 테이블에 저장하는 소스
*/
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType};
/*
저장 할 테이블에 컬럼 명을 공백을 두고 생성
schema에서 StructType 타입으로 변경 됨
*/
val schemaString = "col1 col2 col3"
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
/*
데이터 (rdd) Row 형태로 변환
테스트에서는 "data1_data2_data3" 을 하나의 로우로 가진 것을 이용
*/
val rowRDD = rdd.map(_.split("_")).map(p => Row(p(0),p(1),p(2)))
/*
dfr : Row로 변환된 rdd와 schema로 Data Frame 생성 (Spark DF의 연산들을 모두 사용 가능)
insertIntoJDBC : postgreSQL에 저장하는 함수, 세번째 인자로 true를 주면 기존 테이블 삭
제 하고 다시 생성 함
*/
val dfr = sqlContext.createDataFrame(rowRDD, schema)
dfr.insertIntoJDBC(url, "TABLE_NAME", true)
참조 : https://eradiating.wordpress.com/2015/04/17/using-spark-data-sources-to-load-data-from-postgresql/
/*
다시 테이블에 저장하는 소스
*/
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType};
/*
저장 할 테이블에 컬럼 명을 공백을 두고 생성
schema에서 StructType 타입으로 변경 됨
*/
val schemaString = "col1 col2 col3"
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
/*
데이터 (rdd) Row 형태로 변환
테스트에서는 "data1_data2_data3" 을 하나의 로우로 가진 것을 이용
*/
val rowRDD = rdd.map(_.split("_")).map(p => Row(p(0),p(1),p(2)))
/*
dfr : Row로 변환된 rdd와 schema로 Data Frame 생성 (Spark DF의 연산들을 모두 사용 가능)
insertIntoJDBC : postgreSQL에 저장하는 함수, 세번째 인자로 true를 주면 기존 테이블 삭
제 하고 다시 생성 함
*/
val dfr = sqlContext.createDataFrame(rowRDD, schema)
dfr.insertIntoJDBC(url, "TABLE_NAME", true)
참조 : https://eradiating.wordpress.com/2015/04/17/using-spark-data-sources-to-load-data-from-postgresql/
댓글 없음:
댓글 쓰기