取得個股交易資訊:Local_Stock_Quote.py
一開始會先判斷依 [dbo].[Stock_Table_Status] 判斷 Table 是否已準備完成。
當準備完成後,透過 API 取得資料並解譯 JSON 存進 DataFrame 中,調整 Table 狀態後寫進 Stage Table。因部分資料並未符合 MS-SQL 規範,呼叫 SP [dbo].[usp_MERGE_STG_PRESTAGE] 執行資料清洗後寫入實際 Table。
from datetime import datetime
import requests
import datetime
from datetime import datetime
import pytz
# 以下 package 需事先使用 pip install 安裝
import pandas
import pyodbc
# For MS-SQL
STAGE_TABLE = "Stock_Quote"
PRESTAGE_TABLE = STAGE_TABLE + "_Stage"
def main_quote(server_name, database, account, pwd, api_token):
# 開啟 SQL Server 連線
cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER='+server_name+';DATABASE='+database+';UID='+account+';PWD='+ pwd)
cursor = cnxn.cursor()
# 確認哪些 stage table 已準備好
query_status = f"SELECT [Status] FROM [Stock_API].[dbo].[Stock_Table_Status] WHERE TABLE_NAME = '{PRESTAGE_TABLE}';"
df_status = pandas.read_sql(query_status, cnxn)
for index, item in df_status.iterrows():
if (item['Status'] == "Y"):
# 關閉 SQL Server 連線
get_stock_quote(server_name, database, account, pwd, api_token)
else:
print("資料未齊全")
# 關閉 SQL Server 連線
cursor.close()
cnxn.close()
def get_stock_quote(server_name, database, account, pwd, api_token):
cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER='+server_name+';DATABASE='+database+';UID='+account+';PWD='+ pwd)
cursor = cnxn.cursor()
# 查詢現在關注哪些標的,並將股票代號寫入 Dataframe
query = "SELECT [stock_id] FROM [Stock_API].[dbo].[Stock_Get] WHERE isGet = 'TRUE';"
df = pandas.read_sql(query, cnxn)
records = []
for index, item in df.iterrows():
symbolId = item['stock_id']
# API URL
url = f"https://api.fugle.tw/realtime/v0.3/intraday/quote?symbolId={symbolId}&apiToken={api_token}&oddLot=true"
print(url)
# 取得當日開盤、收盤的個股基本資訊並解析 json
quote_data = requests.get(url).json()
# 讀取資料
data_info = quote_data["data"]["info"]
data_quote = quote_data["data"]["quote"]
data_quote_trade = quote_data["data"]["quote"]["trade"]
data_quote_priceHigh = quote_data["data"]["quote"]["priceHigh"]
data_quote_priceLow = quote_data["data"]["quote"]["priceLow"]
data_quote_priceOpen = quote_data["data"]["quote"]["priceOpen"]
data_quote_priceAvg = quote_data["data"]["quote"]["priceAvg"]
now = datetime.now(pytz.timezone('Asia/Taipei'))
# 將資料加入字典
record_item = {
"date" : str(data_info.get("date")), # 本筆資料所屬日期
"type" : str(data_info.get("type")), # 類別
"exchange" : str(data_info.get("exchange")), # 交易所
"market" : str(data_info.get("market")), # 市場別
"symbolId" : str(data_info.get("symbolId")), # 股票代號
"countryCode" : str(data_info.get("countryCode")), # 股票所屬國家ISO2代碼
"timeZone" : str(data_info.get("timeZone")), # 股票所屬時區
"lastUpdatedAt" : str(data_info.get("lastUpdatedAt")), # 本筆資料最後更新時間
"trade_at" : str(data_quote_trade.get("at")), # 最新一筆成交時間
"trade_bid" : str(data_quote_trade.get("bid")), # 最新一筆成交買進價
"trade_ask" : str(data_quote_trade.get("ask")), # 最新一筆成交賣出價
"trade_price" : str(data_quote_trade.get("price")), # 最新一筆成交價
"trade_volume" : str(data_quote_trade.get("volume")), # 最新一筆成交量
"priceHigh_at" : str(data_quote_priceHigh.get("at")), # 第一次到達當日最高價之時間
"priceHigh_price": str(data_quote_priceHigh.get("price")), # 當日之最高價
"priceLow_at" : str(data_quote_priceLow.get("at")), # 第一次到達當日最低價之時間
"priceLow_price" : str(data_quote_priceLow.get("price")), # 當日之最低價
"priceOpen_at" : str(data_quote_priceOpen.get("at")), # 當日第一筆成交時間
"priceOpen_price": str(data_quote_priceOpen.get("price")), # 當日之開盤價
"priceAvg_at" : str(data_quote_priceAvg.get("at")), # 當日最後一筆成交時間
"priceAvg_price" : str(data_quote_priceAvg.get("price")), # 當日之成交均價
"change" : str(data_quote.get("change")), # 當日股價之漲跌
"changePercent" : str(data_quote.get("changePercent")), # 當日股價之漲跌幅
"api_datetime" : now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
}
records.append(record_item)
# 將字典加入 Dataframe 中
dataframe = pandas.DataFrame(
records,
# 定義 Dataframe 每個欄位名稱
columns = [
"date",
"type",
"exchange",
"market",
"symbolId",
"countryCode",
"timeZone",
"lastUpdatedAt",
"trade_at",
"trade_bid",
"trade_ask",
"trade_price",
"trade_volume",
"priceHigh_at",
"priceHigh_price",
"priceLow_at",
"priceLow_price",
"priceOpen_at",
"priceOpen_price",
"priceAvg_at",
"priceAvg_price",
"change",
"changePercent",
"api_datetime",
],
)
try:
# 連接 SQL Server
cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER='+server_name+';DATABASE='+database+';UID='+account+';PWD='+ pwd)
cursor = cnxn.cursor()
# 變更 prestage 狀態為未處理
cursor.execute(f"UPDATE [Stock_API].[dbo].[Stock_Table_Status] SET Status = 'N', Update_Datetime = GETDATE() WHERE Table_Name = '{PRESTAGE_TABLE}'; TRUNCATE TABLE [Stock_API].[dbo].[{PRESTAGE_TABLE}];")
cnxn.commit()
# 將每筆資訊寫入 prestage table
for index, row in dataframe.iterrows():
cursor.execute( f"INSERT INTO [Stock_API].[dbo].[{PRESTAGE_TABLE}] VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
,row.symbolId
,row.date
,row.type
,row.exchange
,row.market
,row.countryCode
,row.timeZone
,row.lastUpdatedAt
,row.trade_at
,row.trade_bid
,row.trade_ask
,row.trade_price
,row.trade_volume
,row.priceHigh_at
,row.priceHigh_price
,row.priceLow_at
,row.priceLow_price
,row.priceOpen_at
,row.priceOpen_price
,row.priceAvg_at
,row.priceAvg_price
,row.change
,row.changePercent
,row.api_datetime)
cnxn.commit()
# 變更 prestage 狀態為已處理
cursor.execute(f"UPDATE [Stock_API].[dbo].[Stock_Table_Status] SET Status = 'Y', Update_Datetime = GETDATE() WHERE Table_Name = '{PRESTAGE_TABLE}'")
# 寫入實際 Table
today = datetime.today().strftime('%Y-%m-%d')
cursor.execute(f"EXEC [Stock_API].[dbo].[usp_MERGE_STG_PRESTAGE] '{STAGE_TABLE}', '2023-05-26'")
cnxn.commit()
# 關閉 SQL Server 連線
cursor.close()
cnxn.close()
except pyodbc.DataError as err:
print("Encountered errors while inserting rows: {}".format(err))
def convertDatetime(time_string):
# 解析時間字符串
datetime_obj = datetime.fromisoformat(time_string)
# 設定原始時區
original_timezone = datetime_obj.astimezone()
# 轉換為目標時區(+8時區)
target_timezone = pytz.timezone('Asia/Taipei')
converted_datetime_obj = original_timezone.astimezone(target_timezone)
# 格式化為 BigQuery datetime 格式
bq_datetime = converted_datetime_obj.strftime('%Y-%m-%dT%H:%M:%S.%f')
return bq_datetime
# 測試用
#main_quote()Last updated