2016년 5월 4일 수요일

Spark-postgreSQL 연동

//Spark 실행할때 jdbc jar를 지정

spark shell --driver-class-path /usr/share/java/postgresql93-jdbc.jar

/*
DOMAIN_NAME : ip 혹은 도메인

DATABASE_NAME : 접속할 Database 이름

USER_NAME : 유저 명

PASSWORD : 비밀번호

변수를 만들어서 입력 / 하드코딩
*/
val url = "jdbc:postgresql://DOMAIN_NAME/DATABASE_NAME?user=USER_NAME&password=PASSWORD"

/*
TABLE_NAME : 데이터를 가져올 테이블 명

rows : RDD 형태로 반환 (FROM 절을 구현한것)
*/
val rows = sqlContext.load("jdbc", Map("url" -> url ,"dbtable"->"TABLE_NAME"))

/*
filter 함수 에서 WHERE 절을 구현

select 함수 에서 SELECT 절을 구현
*/
row.filter("COLUMN_NAME like 'test'").select("col1","col2")

/*
다시 테이블에 저장하는  소스
*/
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType};

/*
저장 할  테이블에 컬럼 명을 공백을 두고 생성
schema에서 StructType 타입으로 변경 됨
*/
val schemaString = "col1 col2 col3"
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

/*
데이터 (rdd) Row 형태로 변환

테스트에서는 "data1_data2_data3" 을 하나의 로우로 가진 것을 이용
*/
val rowRDD = rdd.map(_.split("_")).map(p => Row(p(0),p(1),p(2)))

/*
dfr : Row로 변환된 rdd와 schema로 Data Frame 생성 (Spark DF의 연산들을 모두 사용 가능)

insertIntoJDBC : postgreSQL에 저장하는 함수, 세번째 인자로 true를 주면 기존 테이블 삭

제 하고 다시 생성 함
*/
val dfr = sqlContext.createDataFrame(rowRDD, schema)
dfr.insertIntoJDBC(url, "TABLE_NAME", true)

참조 : https://eradiating.wordpress.com/2015/04/17/using-spark-data-sources-to-load-data-from-postgresql/

댓글 없음:

댓글 쓰기