取得個股交易資訊:Local_Stock_Quote.py

一開始會先判斷依 [dbo].[Stock_Table_Status] 判斷 Table 是否已準備完成。

當準備完成後,透過 API 取得資料並解譯 JSON 存進 DataFrame 中,調整 Table 狀態後寫進 Stage Table。因部分資料並未符合 MS-SQL 規範,呼叫 SP [dbo].[usp_MERGE_STG_PRESTAGE] 執行資料清洗後寫入實際 Table。

Local_Stock_Quote.py
from datetime import datetime
import requests
import datetime
from datetime import datetime
import pytz
# 以下 package 需事先使用 pip install 安裝
import pandas
import pyodbc

# For MS-SQL
STAGE_TABLE = "Stock_Quote"
PRESTAGE_TABLE = STAGE_TABLE + "_Stage"

def main_quote(server_name, database, account, pwd, api_token):
    # 開啟 SQL Server 連線
    cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER='+server_name+';DATABASE='+database+';UID='+account+';PWD='+ pwd)
    cursor = cnxn.cursor()

    # 確認哪些 stage table 已準備好
    query_status = f"SELECT [Status] FROM [Stock_API].[dbo].[Stock_Table_Status] WHERE TABLE_NAME = '{PRESTAGE_TABLE}';"
    df_status = pandas.read_sql(query_status, cnxn)
    for index, item in df_status.iterrows():
        if (item['Status'] == "Y"):
             # 關閉 SQL Server 連線
            get_stock_quote(server_name, database, account, pwd, api_token)
        else:
            print("資料未齊全")
             # 關閉 SQL Server 連線
            cursor.close()
            cnxn.close()
           

def get_stock_quote(server_name, database, account, pwd, api_token):
    cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER='+server_name+';DATABASE='+database+';UID='+account+';PWD='+ pwd)
    cursor = cnxn.cursor()

    # 查詢現在關注哪些標的,並將股票代號寫入 Dataframe
    query = "SELECT [stock_id] FROM [Stock_API].[dbo].[Stock_Get] WHERE isGet = 'TRUE';"
    df = pandas.read_sql(query, cnxn)

    records = []

    for index, item in df.iterrows():
        symbolId = item['stock_id']
        # API URL
        url = f"https://api.fugle.tw/realtime/v0.3/intraday/quote?symbolId={symbolId}&apiToken={api_token}&oddLot=true"
        print(url)
        # 取得當日開盤、收盤的個股基本資訊並解析 json
        quote_data = requests.get(url).json()

        # 讀取資料
        data_info            = quote_data["data"]["info"]
        data_quote           = quote_data["data"]["quote"]
        data_quote_trade     = quote_data["data"]["quote"]["trade"]
        data_quote_priceHigh = quote_data["data"]["quote"]["priceHigh"]
        data_quote_priceLow  = quote_data["data"]["quote"]["priceLow"]
        data_quote_priceOpen = quote_data["data"]["quote"]["priceOpen"]
        data_quote_priceAvg  = quote_data["data"]["quote"]["priceAvg"]
        
        now = datetime.now(pytz.timezone('Asia/Taipei'))
        # 將資料加入字典
        record_item = {
            "date"           : str(data_info.get("date")),              # 本筆資料所屬日期
            "type"           : str(data_info.get("type")),              # 類別
            "exchange"       : str(data_info.get("exchange")),          # 交易所
            "market"         : str(data_info.get("market")),            # 市場別
            "symbolId"       : str(data_info.get("symbolId")),          # 股票代號
            "countryCode"    : str(data_info.get("countryCode")),       # 股票所屬國家ISO2代碼
            "timeZone"       : str(data_info.get("timeZone")),          # 股票所屬時區
            "lastUpdatedAt"  : str(data_info.get("lastUpdatedAt")),     # 本筆資料最後更新時間
            "trade_at"       : str(data_quote_trade.get("at")),         # 最新一筆成交時間
            "trade_bid"      : str(data_quote_trade.get("bid")),        # 最新一筆成交買進價
            "trade_ask"      : str(data_quote_trade.get("ask")),        # 最新一筆成交賣出價
            "trade_price"    : str(data_quote_trade.get("price")),      # 最新一筆成交價
            "trade_volume"   : str(data_quote_trade.get("volume")),     # 最新一筆成交量
            "priceHigh_at"   : str(data_quote_priceHigh.get("at")),     # 第一次到達當日最高價之時間
            "priceHigh_price": str(data_quote_priceHigh.get("price")),  # 當日之最高價
            "priceLow_at"    : str(data_quote_priceLow.get("at")),      # 第一次到達當日最低價之時間
            "priceLow_price" : str(data_quote_priceLow.get("price")),   # 當日之最低價
            "priceOpen_at"   : str(data_quote_priceOpen.get("at")),     # 當日第一筆成交時間
            "priceOpen_price": str(data_quote_priceOpen.get("price")),  # 當日之開盤價
            "priceAvg_at"    : str(data_quote_priceAvg.get("at")),      # 當日最後一筆成交時間
            "priceAvg_price" : str(data_quote_priceAvg.get("price")),   # 當日之成交均價
            "change"         : str(data_quote.get("change")),           # 當日股價之漲跌
            "changePercent"  : str(data_quote.get("changePercent")),    # 當日股價之漲跌幅
            "api_datetime"   : now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
        }
        records.append(record_item)

    # 將字典加入 Dataframe 中
    dataframe = pandas.DataFrame(
        records,
        # 定義 Dataframe 每個欄位名稱
        columns = [
            "date",
            "type",
            "exchange",
            "market",
            "symbolId",
            "countryCode",
            "timeZone",
            "lastUpdatedAt",
            "trade_at",
            "trade_bid",
            "trade_ask",
            "trade_price",
            "trade_volume",
            "priceHigh_at",
            "priceHigh_price",
            "priceLow_at",
            "priceLow_price",
            "priceOpen_at",
            "priceOpen_price",
            "priceAvg_at",
            "priceAvg_price",
            "change",
            "changePercent",
            "api_datetime",
        ],
    )

    try:
        # 連接 SQL Server
        cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER='+server_name+';DATABASE='+database+';UID='+account+';PWD='+ pwd)
        cursor = cnxn.cursor()

        # 變更 prestage 狀態為未處理
        cursor.execute(f"UPDATE [Stock_API].[dbo].[Stock_Table_Status] SET Status = 'N', Update_Datetime = GETDATE() WHERE Table_Name = '{PRESTAGE_TABLE}'; TRUNCATE TABLE [Stock_API].[dbo].[{PRESTAGE_TABLE}];")
        cnxn.commit()

        # 將每筆資訊寫入 prestage table
        for index, row in dataframe.iterrows():
            cursor.execute( f"INSERT INTO [Stock_API].[dbo].[{PRESTAGE_TABLE}] VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
                           ,row.symbolId
                           ,row.date
                           ,row.type
                           ,row.exchange
                           ,row.market
                           ,row.countryCode
                           ,row.timeZone
                           ,row.lastUpdatedAt
                           ,row.trade_at
                           ,row.trade_bid
                           ,row.trade_ask
                           ,row.trade_price
                           ,row.trade_volume
                           ,row.priceHigh_at
                           ,row.priceHigh_price
                           ,row.priceLow_at
                           ,row.priceLow_price
                           ,row.priceOpen_at
                           ,row.priceOpen_price
                           ,row.priceAvg_at
                           ,row.priceAvg_price
                           ,row.change
                           ,row.changePercent
                           ,row.api_datetime)
            cnxn.commit()
        
        # 變更 prestage 狀態為已處理
        cursor.execute(f"UPDATE [Stock_API].[dbo].[Stock_Table_Status] SET Status = 'Y', Update_Datetime = GETDATE() WHERE Table_Name = '{PRESTAGE_TABLE}'")
        # 寫入實際 Table
        today = datetime.today().strftime('%Y-%m-%d')
        cursor.execute(f"EXEC [Stock_API].[dbo].[usp_MERGE_STG_PRESTAGE] '{STAGE_TABLE}', '2023-05-26'")
        cnxn.commit()

        # 關閉 SQL Server 連線
        cursor.close()
        cnxn.close()
    except pyodbc.DataError as err:
        print("Encountered errors while inserting rows: {}".format(err))

        

def convertDatetime(time_string):
    # 解析時間字符串
    datetime_obj = datetime.fromisoformat(time_string)

    # 設定原始時區
    original_timezone = datetime_obj.astimezone()

    # 轉換為目標時區(+8時區)
    target_timezone = pytz.timezone('Asia/Taipei')
    converted_datetime_obj = original_timezone.astimezone(target_timezone)

    # 格式化為 BigQuery datetime 格式
    bq_datetime = converted_datetime_obj.strftime('%Y-%m-%dT%H:%M:%S.%f')

    return bq_datetime

# 測試用
#main_quote()

Last updated