SPARK를 사용하여 대용량 데이터셋의 평균과 표준편차 구하기
메모리에 한번에 올리기 힘든 데이터셋에 표준화(standardization)를 적용하기 위해 평균 및 표준 편차를 계산하기 위해 사용한 방법입니다.
2021. 12. 12 최초작성
1. JDK가 필요합니다.
안드로이드 스튜디오를 사용중이라면 이미 OpenJDK가 설치되어 있으므로 바로 4번을 확인해보세요.
아래 링크에서 11 GA 윈도우 버전을 다운로드합니다.
2. 압축을 풀은 후, jdk-11 폴더를 C:\에 복사합니다.
3. 윈도우 키 + R을 누른 후, sysdm.cpl를 실행하여 시스템 환경 변수 path에 아래 경로를 추가합니다.
C:\jdk-11\bin
4. 명령 프롬프트에서 java 실행 가능 여부를 확인합니다.
C:\Users\webnautes> java -version
openjdk version "11" 2018-09-25
OpenJDK Runtime Environment 18.9 (build 11+28)
OpenJDK 64-Bit Server VM 18.9 (build 11+28, mixed mode)
5. 아래 링크에서 spark-3.2.0-bin-hadoop3.2.tgz를 다운로드합니다.
https://spark.apache.org/downloads.html
뒤에서 hadoop 다운로드시 버전을 맞춰야 하므로 버전 확인을 합니다. 다운로드 받은 파일 이름에 hadoop 3.2가 명시 되어 있습니다.
6. 압축을 풀어서 spark로 폴더 이름을 바꾸고 c:\에 복사합니다.
7. 아래 링크에서 앞에서 확인한 hadoop 버전 3.2를 위한 hadoop-3.2.0/bin를 클릭합니다.
https://github.com/cdarlint/winutils
8. hadoop.dll와 winutils.exe를 다운로드하여 C:\hadoop\bin에 저장합니다.
9. 윈도우 키 + R을 누른 후, sysdm.cpl를 실행하여 다음을 시스템 환경 변수에 추가합니다.
변수이름 : SPARK_HOME
변수 값 : C:\spark
변수이름 : HADOOP_HOME
변수 값 : c:\hadoop
시스템 변수 path에 다음 경로를 추가합니다.
%SPARK_HOME%\bin
%HADOOP_HOME%\bin
11. Python이 설치안되어 있다면 설치를 한 후 진행합니다.
pip 명령을 사용하여 pyspark를 설치합니다.
pip install pyspark
12. 아래 코드를 사용하여 지정한 디렉토리에 있는 csv 파일들로부터 평균과 표준 편차를 구할 수 있습니다.
포스트에선 아래 링크에 있는 csv 파일 하나(goldx.csv)를 사용하여 진행했지만 여러 경로에 있는 다수의 csv 파일로도 가능합니다.
https://www.kaggle.com/omdatas/historic-gold-prices
실행결과입니다. 첫번째 줄은 구한 평균과 표준 편차이며 두번쨰 줄은 72초 걸렸다는 의미입니다.
마지막줄에 보이듯이 표준화 적용후 평균은 0, 표준편차는 1에 가까워진 것을 볼 수 있습니다.
mean = 638.9222209267894, std = 419.776825663592
time : 72.82766556739807
mean = 2.1044867710002667e-15, std = 0.9999999999999943
from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, FloatType, StringType from pyspark.sql import functions as F from pyspark.sql.functions import col, mean as _mean, stddev as _stddev import time import numpy as np start = time.time() # csv 파일의 구조를 지정해줍니다. schema = StructType([ StructField("Date", StringType(), True), StructField("Price", FloatType(), True), StructField("Open", FloatType(), True), StructField("High", FloatType(), True), StructField("Low", FloatType(), True)]) spark = SparkSession.builder.getOrCreate() dataframe = spark.read.schema(schema).option("sep", ",").csv(".\\goldx.csv") # 하나의 datafrmae으로 결합하여 진행하고 있습니다. # dataframe1 = spark.read.schema(schema).option("sep", ",").csv("D:\\dataset1\\Train\\*.csv") # dataframe2 = spark.read.schema(schema).option("sep", ",").csv("D:\\dataset2\\Train\\*.csv") # dataframe3 = spark.read.schema(schema).option("sep", ",").csv("D:\\dataset3\\Train\\*.csv") # dataframe4 = spark.read.schema(schema).option("sep", ",").csv("D:\\dataset4\\Train\\*.csv") # dataframe = dataframe1.union(dataframe2).union(dataframe3).union(dataframe4) # dataframe = dataframe.filter(F.col("time").cast("float").isNotNull()) # dataframe = dataframe.drop("time") # time 컬럼을 제거합니다. # 평균과 표준 편차를 구합니다. df_stats = dataframe.select( _mean(col('Price')).alias('mean'), _stddev(col('Price')).alias('std') ).collect() mean = df_stats[0]['mean'] std = df_stats[0]['std'] print('mean = {}, std = {}'.format(mean, std)) print("time :", time.time() - start) print() # 구한 평균과 표준편차를 파일로 저장합니다. np.save('train_value.npy', [mean, std]) # 구한 평균과 표준편차로 표준화를 적용한 후, 데이터셋의 평균과 표준편차를 구해봅니다. def normalize(df, mean, std): return df.select( _mean((col('Price')-mean)/std).alias('mean'), _stddev((col('Price')-mean)/std).alias('std') ).collect() df_stats = normalize(dataframe, mean, std) mean = df_stats[0]['mean'] std = df_stats[0]['std'] print('mean = {}, std = {}'.format(mean, std)) |