Local_Stock_Quote.py
Last updated
Last updated
一開始會先判斷依 [dbo].[Stock_Table_Status]
判斷 Table 是否已準備完成。
當準備完成後,透過 API 取得資料存進 DataFrame 中,調整 Table 狀態後寫進 Stage Table。因部分資料並未符合 MS-SQL 規範,呼叫 SP [dbo].[usp_MERGE_STG_PRESTAGE_v2]
執行資料清洗後寫入實際 Table。
相關 API 欄位參考。
from datetime import datetime
import requests
import datetime
from datetime import datetime
import pytz
# 以下 package 需事先使用 pip install 安裝
import pandas
import pyodbc
from fugle_marketdata import RestClient # 富果 API Package
'''
==================================================================================================================
Author: Skylar Ma
------------------------------------------------------------------------------------------------------------------
Description: Get the stock main information.
------------------------------------------------------------------------------------------------------------------
Version Date Note
1.0 2023-05-27 Init.
2.0 2024-01-27 Use Fugle package instead of web api get function
==================================================================================================================
'''
# For MS-SQL
STAGE_TABLE = "Stock_Meta"
PRESTAGE_TABLE = STAGE_TABLE + "_Stage"
def main_meta(server_name, database, account, pwd, api_token):
# 開啟 SQL Server 連線
cnxn = pyodbc.connect("DRIVER={0};SERVER={1};DATABASE={2};UID={3};PWD={4}".format('SQL Server', server_name, database, account, pwd))
cursor = cnxn.cursor()
# 確認哪些 stage table 已準備好
query_status = "SELECT [Status] FROM [Stock_API].[dbo].[Stock_Table_Status] WHERE TABLE_NAME = '{0}';".format(PRESTAGE_TABLE)
df_status = pandas.read_sql(query_status, cnxn)
for index, item in df_status.iterrows():
if (item['Status'] == "Y"):
# 繼續取得資料
get_stock_meta(server_name, database, account, pwd, api_token)
else:
print("資料未齊全")
# 關閉 SQL Server 連線
cursor.close()
cnxn.close()
def get_stock_meta(server_name, database, account, pwd, api_token):
# 連線 API
client = RestClient(api_key = api_token)
stock = client.stock
cnxn = pyodbc.connect("DRIVER={0};SERVER={1};DATABASE={2};UID={3};PWD={4}".format('SQL Server', server_name, database, account, pwd))
cursor = cnxn.cursor()
# 取得現在時間
now = datetime.now(pytz.timezone('Asia/Taipei'))
# 查詢現在關注哪些標的,並將股票代號寫入 Dataframe
query = "SELECT [stock_id] FROM [Stock_API].[dbo].[Stock_Get] WHERE isGet = 'TRUE';"
df = pandas.read_sql(query, cnxn)
# 新增陣列
records = []
# 取得每項股票代號,取得股票基本資訊
# https://developer.fugle.tw/docs/data/http-api/intraday/ticker
for index, item in df.iterrows():
symbolId = item['stock_id']
content = stock.intraday.ticker(symbol = symbolId)
# 將資料加入字典
record_item = {
# 抓取欄位
"symbolId" : content['symbol'] # 股票代碼
,"date" : content['date'] # 日期
,"type" : content['type'] # Ticker 類型
,"exchange" : content['exchange'] # 交易所
,"market" : content['market'] # 市場別
,"countryCode" : 'TW' # 交易國家
,"timeZone" : 'Asia/Taipei' # 交易時區
,"lastUpdatedAt" : '' # 已無使用
,"nameZhTw" : content['name'] # 股票簡稱
,"industryZhTw" : content['industry'] # 產業別
,"previousClose" : content['previousClose'] # 前一交易日收盤價
,"priceReference" : content['referencePrice'] # 參考價
,"priceHighLimit" : content['limitUpPrice'] # 漲停價
,"priceLowLimit" : content['limitDownPrice'] # 跌停價
,"canDayBuySell" : content['canBuyDayTrade'] # 可先買現沖
,"canDaySellBuy" : content['canDayTrade'] # 可買賣現沖
,"canShortMargin" : content['canBelowFlatMarginShortSell'] # 平盤下得融券賣出
,"canShortLend" : content['canBelowFlatSBLShortSell'] # 平盤下得借券賣出
,"tradingUnit" : content['boardLot'] # 交易單位
,"currency" : content['tradingCurrency'] # 交易幣別
,"isTerminated" : '' # 已無使用
,"isSuspended" : '' # 已無使用
,"typeZhTw" : '' # 已無使用
,"abnormal" : '' # 已無使用
,"isUnusuallyRecommended" : content['isUnusuallyRecommended'] # 投資理財節目異常推介個股
,"isNewlyCompiled" : '' # 已無使用
,"api_datetime" : now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] # 取得時間
}
records.append(record_item)
# 將字典加入 Dataframe 中
dataframe = pandas.DataFrame(
records,
# 定義 Dataframe 每個欄位名稱
columns = [
"date",
"type",
"exchange",
"market",
"symbolId",
"countryCode",
"timeZone",
"lastUpdatedAt",
"nameZhTw",
"industryZhTw",
"previousClose",
"priceReference",
"priceHighLimit",
"priceLowLimit",
"canDayBuySell",
"canDaySellBuy",
"canShortMargin",
"canShortLend",
"tradingUnit",
"currency",
"isTerminated",
"isSuspended",
"typeZhTw",
"abnormal",
"isUnusuallyRecommended",
"isNewlyCompiled",
"api_datetime",
],
)
try:
# 連接 SQL Server
cnxn = pyodbc.connect("DRIVER={0};SERVER={1};DATABASE={2};UID={3};PWD={4}".format('SQL Server', server_name, database, account, pwd))
cursor = cnxn.cursor()
# 變更 prestage 狀態為未處理
cursor.execute("UPDATE [Stock_API].[dbo].[Stock_Table_Status] SET Status = 'N', Update_Datetime = GETDATE() WHERE Table_Name = '{0}'; TRUNCATE TABLE [Stock_API].[dbo].[{0}];".format(PRESTAGE_TABLE))
cnxn.commit()
# 將 Dataframe 每筆資訊寫入 prestage table
for index, row in dataframe.iterrows():
cursor.execute(
"INSERT INTO [Stock_API].[dbo].[{0}] VALUES ('{1}', '{2}', '{3}', '{4}', '{5}', '{6}', '{7}', '{8}', '{9}', '{10}', {11}, {12}, {13}, '{14}', '{15}', '{16}', '{17}', '{18}', '{19}', '{20}', '{21}', '{22}', '{23}', '{24}', '{25}', '{26}', '{27}')".format(
PRESTAGE_TABLE
,row.symbolId
,row.date
,row.type
,row.exchange
,row.market
,row.countryCode
,row.timeZone
,row.lastUpdatedAt
,row.nameZhTw
,row.industryZhTw
,row.previousClose
,row.priceReference
,row.priceHighLimit
,row.priceLowLimit
,row.canDayBuySell
,row.canDaySellBuy
,row.canShortMargin
,row.canShortLend
,row.tradingUnit
,row.currency
,row.isTerminated
,row.isSuspended
,row.typeZhTw
,row.abnormal
,row.isUnusuallyRecommended
,row.isNewlyCompiled
,row.api_datetime)
)
cnxn.commit()
# 取得 prestage 的資料日期
cursor.execute("SELECT DISTINCT date FROM [Stock_API].[dbo].[{0}]".format(PRESTAGE_TABLE))
result_date = cursor.fetchone()
# 變更 prestage 狀態為已處理
cursor.execute("UPDATE [Stock_API].[dbo].[Stock_Table_Status] SET Status = 'Y', Update_Datetime = GETDATE() WHERE Table_Name = '{0}'".format(PRESTAGE_TABLE))
# 寫入實際 Table
cursor.execute("EXEC [Stock_API].[dbo].[usp_MERGE_STG_PRESTAGE_v2] '{0}', '{1}'".format(STAGE_TABLE, result_date[0]))
cnxn.commit()
# 關閉 SQL Server 連線
cursor.close()
cnxn.close()
except pyodbc.DataError as err:
print("Encountered errors while inserting rows: {}".format(err))
# 測試用
#get_stock_meta()