4 min read

[Airflow] 주가 데이터 수집 파이프라인 3 (비트코인 편)

이번에는 비트코인 파이프라인을 구성해보자. 아주 지겨운 반복 작업이다. 이제 수집하는 방식과, 수집 날짜와 Airflow DAG만 조금씩 바꿔주면 된다.

1. Airflow pipeline 구축

그럼 이제부터 본격적으로 파이프라인을 구축 해보자. 참고해야할 사항이 있다면 위의 가이드에서 ‘Step 4’ 부분을 참고 하면 된다. 우선 파이썬 파일을 하나 생성해주자. python_file이라는 폴더에 bitcoin_cralwer.py 라는 파일을 생성 해주었다.

1-1. 라이브러리 호출

#!/usr/bin/env python
# coding: utf-8

import pandas as pd
import pandas_gbq
from pykrx import stock
from pykrx import bond
import FinanceDataReader as fdr
from datetime import timedelta

from time import sleep

import psycopg2 as pg2
from sqlalchemy import create_engine

from datetime import datetime
import os
import time

import glob
from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud import storage

1-2. 기본 세팅(Bigquery, Cloud Storage, Cloud SQL)

이제 기본 세팅을 해주자. 파이썬에서 각각 Bigquery, Cloud Storage, Cloud SQL에 접근 하기 위한 장치 이다.

# 경로 변경
os.chdir('/home/owenchoi07/finance_mlops')

# 서비스 계정 키 JSON 파일 경로
key_path = glob.glob("key_value/*.json")[0]

# Credentials 객체 생성
credentials = service_account.Credentials.from_service_account_file(key_path)

# 빅쿼리 정보
project_id = 'owen-389015'
dataset_id = 'finance_mlops'

# GCP 클라이언트 객체 생성
storage_client = storage.Client(credentials = credentials, 
                         project = credentials.project_id)
bucket_name = 'finance-mlops'    # 서비스 계정 생성한 bucket 이름 입력

# Postgresql 연결
db_connect_info = pd.read_csv('key_value/db_connect_info.csv')
username = db_connect_info['username'][0]
password = db_connect_info['password'][0]
host = db_connect_info['host'][0]
database = db_connect_info['database'][0]
engine = create_engine(f'postgresql+psycopg2://{username}:{password}@{host}:5432/{database}')

1-3. 업로드 함수 생성

그리고 나서 이번에는 업로드 함수를 만들어 주자. 다음의 코드는 각각 Bigquery, Cloud Storage, Cloud SQL에 우리가 수집한 데이터를 업로드 하기위한 함수이다. 이는 지난 블로그를 참고 하고 응용한 것이며 앞으로 일별로 수집한 객체는 다음의 함수로 한꺼번에 업로드 될 예정이다.

def upload_df(data, file_name, project_id, dataset_id, time_line):
    if not os.path.exists(f'data_crawler/{file_name}'):
        os.makedirs(f'data_crawler/{file_name}')

    try:
        if not os.path.exists(f'data_crawler/{file_name}/{file_name}_{today_date1}.csv'):
            data.to_csv(f'data_crawler/{file_name}/{file_name}_{today_date1}.csv', index=False, mode='w')
        else:
            data.to_csv(f'data_crawler/{file_name}/{file_name}_{today_date1}.csv', index=False, mode='a', header=False)
        print(f'{file_name}_로컬CSV저장_success_{time_line}')    
    except:
        print(f'{file_name}_로컬CSV저장_fail_{time_line}')
    
    
    # Google Storage 적재
    source_file_name = f'data_crawler/{file_name}/{file_name}_{today_date1}.csv'    # GCP에 업로드할 파일 절대경로
    destination_blob_name = f'data_crawler/{file_name}/{file_name}_{today_date1}.csv'    # 업로드할 파일을 GCP에 저장할 때의 이름
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)      
    
    try:
        # 빅쿼리 데이터 적재
        data.to_gbq(destination_table=f'{project_id}.{dataset_id}.{file_name}',
          project_id=project_id,
          if_exists='append',
          credentials=credentials)
        print(f'{file_name}_빅쿼리저장_success_{time_line}')    
    except:
        print(f'{file_name}_빅쿼리저장_fail_{time_line}')  
    
    
    try:
        # Postgresql 적재
        data.to_sql(f'{file_name}',if_exists='append', con=engine,  index=False)
        print(f'{file_name}_Postgresql저장_success_{time_line}')    
    except:
        print(f'{file_name}_Postgresql저장_fail_{time_line}')

1-4. 날짜 옵션

이번에는 날짜에 대한 세팅을 해주도록 하자. 비트코인은 월~일요일 모든 요일 오전 9시에 수집이 될 예정이다. 그리고 start_date2를 today_date2와 동일하게 해주었다. 이는 fdr.DataReader() 함수의 경우 시작날짜와 종료날짜가 모두 있어야 하기 때문이다.

# ### 날짜 설정
now = datetime.datetime.now()

today_date1 = now.strftime('%Y%m%d')
today_date2 = now.strftime('%Y-%m-%d')
start_date2 = today_date2
today_date_time_csv = now.strftime("%Y%m%d_%H%M")

2. 데이터 수집

그럼 이제부터 그동안 수집했던 데이터 수집 코드를 만들어 보자.

2-1. 비트코인 수집

비트코인 데이터를 수집 해주자. fdr의 fdr.DataReader()함수를 사용 했다. 기존 S&P 500 데이터를 수집할 때 fdr.DataReader()에서 ticker를 넣어 주었지만 이번에는 ‘BTC/KRW’ 옵션을 넣어 주면 된다.

file_name = 'bitcoin'
try:
    now1 = datetime.now()

    time_line = now1.strftime("%Y%m%d_%H:%M:%S")
    df_raw = fdr.DataReader('BTC/KRW', start_date2,today_date2)
    df_raw['ticker'] = 'btc_krw'
    df_raw = df_raw.reset_index()
    df_raw.columns = ['date', 'open','high','low','close','adj_close','volume','ticker']

    print(f'{ticker_nm} success_{time_line}')   
except:
    print(f'{ticker_nm} fail_{time_line}')
        
df_raw.columns['date'] = pd.to_datetime(df_raw.columns['date'])
now1 = datetime.now()
time_line = now1.strftime("%Y%m%d_%H:%M:%S")
upload_df(df_raw, file_name, project_id, dataset_id, time_line, today_date1)        

최종 코드

최종 코드는 다음의 git 링크를 참고 하자.

3. Airflow DAG

이렇게 python_file 경로에 bitcoin_crawler.py를 잘 생성 했으면 이제 Airflow DAG를 추가 하자. 그리고 이를 일주일 전부 09시 1분에 시작하도록 설정 해주었다. 추가로 t1이라는 DAG는 앞으로 t2, t3으로 파이프라인이 추가될 예정이다.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

import pendulum
## 로컬 타임존 생성
local_tz = pendulum.timezone("Asia/Seoul")

python_dir = '/home/owenchoi07/anaconda3/bin/python3'
file_dir = '/home/owenchoi07/finance_mlops'

default_args = {
	'owner': 'airflow',
	'start_date': datetime(2023, 7, 11, tzinfo=local_tz),
	'retries': 0,
	'catchup': False
}

with DAG(
	'bitcoin',
	default_args=default_args,
	description='bitcoin crawler',
	schedule_interval = '01 09 * * *',
	tags=['bitcoin'],
) as dag:


    t1 = BashOperator(
        task_id='bitcoin_crawler',
        bash_command = f'{python_dir} {file_dir}/python_file/bitcoin_crawler.py'
    )

    t1