Deep Learning & Machine Learning/강좌&예제 코드

SPARK를 사용하여 대용량 데이터셋의 평균과 표준편차 구하기

webnautes 2021. 12. 12. 12:39
반응형

메모리에 한번에 올리기 힘든 데이터셋에 표준화(standardization)를 적용하기 위해 평균 및 표준 편차를 계산하기 위해 사용한 방법입니다.



2021. 12. 12 최초작성

 

1. JDK가 필요합니다.

안드로이드 스튜디오를 사용중이라면 이미 OpenJDK가 설치되어 있으므로 바로 4번을 확인해보세요. 

 

아래 링크에서 11 GA 윈도우 버전을 다운로드합니다. 

https://jdk.java.net/archive/ 



2. 압축을 풀은 후, jdk-11 폴더를 C:\에 복사합니다.



3. 윈도우 키 + R을 누른 후, sysdm.cpl를 실행하여 시스템 환경 변수 path에 아래 경로를 추가합니다.

C:\jdk-11\bin



4. 명령 프롬프트에서 java 실행 가능 여부를 확인합니다.

 

C:\Users\webnautes> java -version

openjdk version "11" 2018-09-25

OpenJDK Runtime Environment 18.9 (build 11+28)

OpenJDK 64-Bit Server VM 18.9 (build 11+28, mixed mode)



5. 아래 링크에서 spark-3.2.0-bin-hadoop3.2.tgz를 다운로드합니다.

https://spark.apache.org/downloads.html

 

 

뒤에서 hadoop 다운로드시  버전을 맞춰야 하므로 버전 확인을 합니다. 다운로드 받은 파일 이름에  hadoop 3.2가 명시 되어 있습니다. 



6. 압축을 풀어서 spark로 폴더 이름을 바꾸고 c:\에 복사합니다. 

 



7. 아래 링크에서 앞에서 확인한 hadoop 버전 3.2를 위한 hadoop-3.2.0/bin를 클릭합니다.

https://github.com/cdarlint/winutils



8. hadoop.dll와 winutils.exe를 다운로드하여  C:\hadoop\bin에 저장합니다.

 



9. 윈도우 키 + R을 누른 후, sysdm.cpl를 실행하여 다음을 시스템 환경 변수에 추가합니다.

 

변수이름 : SPARK_HOME

변수 값 : C:\spark

 

변수이름 : HADOOP_HOME

변수 값 : c:\hadoop



시스템 변수 path에 다음 경로를 추가합니다.

 

%SPARK_HOME%\bin

%HADOOP_HOME%\bin



11. Python이 설치안되어 있다면 설치를 한 후 진행합니다.



pip 명령을 사용하여 pyspark를 설치합니다.

 

pip install pyspark



12. 아래 코드를 사용하여 지정한 디렉토리에 있는 csv 파일들로부터 평균과 표준 편차를 구할 수 있습니다. 

 

포스트에선 아래 링크에 있는 csv 파일 하나(goldx.csv)를 사용하여 진행했지만 여러 경로에 있는 다수의 csv 파일로도 가능합니다.

https://www.kaggle.com/omdatas/historic-gold-prices



실행결과입니다. 첫번째 줄은 구한 평균과 표준 편차이며 두번쨰 줄은 72초 걸렸다는 의미입니다.

마지막줄에 보이듯이 표준화 적용후 평균은 0, 표준편차는 1에 가까워진 것을 볼 수 있습니다.

 

mean = 638.9222209267894, std = 419.776825663592

time : 72.82766556739807

 

mean = 2.1044867710002667e-15, std = 0.9999999999999943



from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, FloatType, StringType
from pyspark.sql import functions as F
from pyspark.sql.functions import col, mean as _mean, stddev as _stddev
import time
import numpy as np


start = time.time() 

# csv 파일의 구조를 지정해줍니다.
schema = StructType([
                    StructField("Date", StringType(), True),
                    StructField("Price", FloatType(), True),
                    StructField("Open", FloatType(), True),
                    StructField("High", FloatType(), True),
                    StructField("Low", FloatType(), True)])


spark = SparkSession.builder.getOrCreate()


dataframe = spark.read.schema(schema).option("sep", ",").csv(".\\goldx.csv")

# 하나의 datafrmae으로 결합하여 진행하고 있습니다.
# dataframe1 = spark.read.schema(schema).option("sep", ",").csv("D:\\dataset1\\Train\\*.csv")
# dataframe2 = spark.read.schema(schema).option("sep", ",").csv("D:\\dataset2\\Train\\*.csv")
# dataframe3 = spark.read.schema(schema).option("sep", ",").csv("D:\\dataset3\\Train\\*.csv")
# dataframe4 = spark.read.schema(schema).option("sep", ",").csv("D:\\dataset4\\Train\\*.csv")
# dataframe = dataframe1.union(dataframe2).union(dataframe3).union(dataframe4)
# dataframe = dataframe.filter(F.col("time").cast("float").isNotNull())
# dataframe = dataframe.drop("time") # time 컬럼을 제거합니다.


# 평균과 표준 편차를 구합니다.
df_stats = dataframe.select(
                                _mean(col('Price')).alias('mean'),
                                _stddev(col('Price')).alias('std')
                            ).collect()

mean = df_stats[0]['mean']
std = df_stats[0]['std']

print('mean = {}, std = {}'.format(mean, std))
print("time :", time.time() - start) 
print()


# 구한 평균과 표준편차를 파일로 저장합니다.
np.save('train_value.npy', [mean, std])



# 구한 평균과 표준편차로 표준화를 적용한 후, 데이터셋의 평균과 표준편차를 구해봅니다.
def normalize(df, mean, std):

    return df.select(
                        _mean((col('Price')-mean)/std).alias('mean'),
                        _stddev((col('Price')-mean)/std).alias('std')
                    ).collect()

df_stats = normalize(dataframe, mean, std)

mean = df_stats[0]['mean']
std = df_stats[0]['std']

print('mean = {}, std = {}'.format(mean, std))












반응형