이번에는 환율 정보의 파이프라인을 구성해보자. 지난번 블로그의 비트코인과 조건 빼고 모두 동일하다.
전반적인 개요
GCP와 파이썬을 사용한 주식 데이터 파이프라인에 대한 전반적인 개요는 다음과 같다.
Step 4
Step 6
파이썬을 사용한 대시보드 구축
Step 2
Step 3
Step 5
파이썬을 사용한 데이터 시각화
1. Airflow pipeline 구축
그럼 이제부터 본격적으로 파이프라인을 구축 해보자. 참고해야할 사항이 있다면 위의 가이드에서 ‘Step 4’ 부분을 참고 하면 된다. 우선 파이썬 파일을 하나 생성해주자. python_file이라는 폴더에 usd_krw_cralwer.py 라는 파일을 생성 해주었다.
1-1. 라이브러리 호출
#!/usr/bin/env python
# coding: utf-8
import pandas as pd
import pandas_gbq
from pykrx import stock
from pykrx import bond
import FinanceDataReader as fdr
from datetime import timedelta
from time import sleep
import psycopg2 as pg2
from sqlalchemy import create_engine
from datetime import datetime
import os
import time
import glob
from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud import storage
1-2. 기본 세팅(Bigquery, Cloud Storage, Cloud SQL)
이제 기본 세팅을 해주자. 파이썬에서 각각 Bigquery, Cloud Storage, Cloud SQL에 접근 하기 위한 장치 이다.
# 경로 변경
os.chdir('/home/owenchoi07/finance_mlops')
# 서비스 계정 키 JSON 파일 경로
key_path = glob.glob("key_value/*.json")[0]
# Credentials 객체 생성
credentials = service_account.Credentials.from_service_account_file(key_path)
# 빅쿼리 정보
project_id = 'owen-389015'
dataset_id = 'finance_mlops'
# GCP 클라이언트 객체 생성
storage_client = storage.Client(credentials = credentials,
project = credentials.project_id)
bucket_name = 'finance-mlops' # 서비스 계정 생성한 bucket 이름 입력
# Postgresql 연결
db_connect_info = pd.read_csv('key_value/db_connect_info.csv')
username = db_connect_info['username'][0]
password = db_connect_info['password'][0]
host = db_connect_info['host'][0]
database = db_connect_info['database'][0]
engine = create_engine(f'postgresql+psycopg2://{username}:{password}@{host}:5432/{database}')
1-3. 업로드 함수 생성
그리고 나서 이번에는 업로드 함수를 만들어 주자. 다음의 코드는 각각 Bigquery, Cloud Storage, Cloud SQL에 우리가 수집한 데이터를 업로드 하기위한 함수이다. 이는 지난 블로그를 참고 하고 응용한 것이며 앞으로 일별로 수집한 객체는 다음의 함수로 한꺼번에 업로드 될 예정이다.
def upload_df(data, file_name, project_id, dataset_id, time_line):
if not os.path.exists(f'data_crawler/{file_name}'):
os.makedirs(f'data_crawler/{file_name}')
try:
if not os.path.exists(f'data_crawler/{file_name}/{file_name}_{today_date1}.csv'):
data.to_csv(f'data_crawler/{file_name}/{file_name}_{today_date1}.csv', index=False, mode='w')
else:
data.to_csv(f'data_crawler/{file_name}/{file_name}_{today_date1}.csv', index=False, mode='a', header=False)
print(f'{file_name}_로컬CSV저장_success_{time_line}')
except:
print(f'{file_name}_로컬CSV저장_fail_{time_line}')
# Google Storage 적재
source_file_name = f'data_crawler/{file_name}/{file_name}_{today_date1}.csv' # GCP에 업로드할 파일 절대경로
destination_blob_name = f'data_crawler/{file_name}/{file_name}_{today_date1}.csv' # 업로드할 파일을 GCP에 저장할 때의 이름
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
try:
# 빅쿼리 데이터 적재
data.to_gbq(destination_table=f'{project_id}.{dataset_id}.{file_name}',
project_id=project_id,
if_exists='append',
credentials=credentials)
print(f'{file_name}_빅쿼리저장_success_{time_line}')
except:
print(f'{file_name}_빅쿼리저장_fail_{time_line}')
try:
# Postgresql 적재
data.to_sql(f'{file_name}',if_exists='append', con=engine, index=False)
print(f'{file_name}_Postgresql저장_success_{time_line}')
except:
print(f'{file_name}_Postgresql저장_fail_{time_line}')
1-4. 날짜 옵션
이번에는 날짜에 대한 세팅을 해주도록 하자. 환율 정보는 월~금요일 오전 9시에 수집이 될 예정이다. 그리고 start_date2를 today_date2와 동일하게 해주었다. 이는 fdr.DataReader()
함수의 경우 시작날짜와 종료날짜가 모두 있어야 하기 때문이다.
# ### 날짜 설정
now = datetime.datetime.now()
today_date1 = now.strftime('%Y%m%d')
today_date2 = now.strftime('%Y-%m-%d')
start_date2 = today_date2
today_date_time_csv = now.strftime("%Y%m%d_%H%M")
2. 데이터 수집
그럼 이제부터 그동안 수집했던 데이터 수집 코드를 만들어 보자.
2-1. 환율 수집
환율 데이터를 수집 해주자. fdr의 fdr.DataReader()함수를 사용 했다. 기존 S&P 500 데이터를 수집할 때 fdr.DataReader()
에서 ticker를 넣어 주었지만 이번에는 ‘USD/KRW’ 옵션을 넣어 주면 된다.
file_name = 'usd_krw'
try:
now1 = datetime.now()
time_line = now1.strftime("%Y%m%d_%H:%M:%S")
df_raw = fdr.DataReader('USD/KRW', start_date2,today_date2)
df_raw['ticker'] = 'usd_krw'
df_raw = df_raw.reset_index()
df_raw.columns = ['date', 'open','high','low','close','adj_close','volume','ticker']
print(f'{ticker_nm} success_{time_line}')
except:
print(f'{ticker_nm} fail_{time_line}')
df_raw.columns['date'] = pd.to_datetime(df_raw.columns['date'])
now1 = datetime.now()
time_line = now1.strftime("%Y%m%d_%H:%M:%S")
upload_df(df_raw, file_name, project_id, dataset_id, time_line, today_date1)
3. Airflow DAG
이렇게 python_file 경로에 usd_krw_crawler.py를 잘 생성 했으면 이제 Airflow DAG를 추가 하자. 그리고 이를 일주일 전부 09시 1분에 시작하도록 설정 해주었다. 추가로 t1이라는 DAG는 앞으로 t2, t3으로 파이프라인이 추가될 예정이다.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
import pendulum
## 로컬 타임존 생성
local_tz = pendulum.timezone("Asia/Seoul")
python_dir = '/home/owenchoi07/anaconda3/bin/python3'
file_dir = '/home/owenchoi07/finance_mlops'
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 7, 11, tzinfo=local_tz),
'retries': 0,
'catchup': False
}
with DAG(
'usd_krw',
default_args=default_args,
description='usd/krw crawler',
schedule_interval = '01 09 * * MON-FRI',
tags=['urd_krw'],
) as dag:
t1 = BashOperator(
task_id='usd_krw_crawler',
bash_command = f'{python_dir} {file_dir}/python_file/usd_krw_crawler.py'
)
t1