diff --git a/examples/use_czsc_with_jq.py b/examples/use_czsc_with_jq.py new file mode 100644 index 0000000000000000000000000000000000000000..28b92dc62da560e1b36327c465773389cb9ccac2 --- /dev/null +++ b/examples/use_czsc_with_jq.py @@ -0,0 +1,78 @@ +# coding: utf-8 +import json +import requests +import pandas as pd +from datetime import datetime +from czsc import KlineAnalyze + + +# 聚宽数据 API +url = "https://dataapi.joinquant.com/apis" +mob = "******" # mob是申请JQData时所填写的手机号 +pwd = "******" # Password为聚宽官网登录密码,新申请用户默认为手机号后6位 + + +def get_token(): + """获取调用凭证""" + body = { + "method": "get_current_token", + "mob": mob, # mob是申请JQData时所填写的手机号 + "pwd": pwd, # Password为聚宽官网登录密码,新申请用户默认为手机号后6位 + } + response = requests.post(url, data=json.dumps(body)) + token = response.text + return token + + +def text2df(text): + rows = [x.split(",") for x in text.strip().split('\n')] + df = pd.DataFrame(rows[1:], columns=rows[0]) + return df + + +def get_kline(symbol, end_date, freq, count=3000): + # 1m, 5m, 15m, 30m, 60m, 120m, 1d, 1w, 1M + freq_convert = {"1min": "1m", "5min": '5m', '15min': '15m', + "30min": "30m", "60min": '60m', "D": "1d", "W": '1w'} + if "-" not in end_date: + end_date = datetime.strptime(end_date, "%Y%m%d").strftime("%Y-%m-%d") + + data = { + "method": "get_price", + "token": get_token(), + "code": symbol, + "count": count, + "unit": freq_convert[freq], + "end_date": end_date, + "fq_ref_date": "2010-01-01" + } + r = requests.post(url, data=json.dumps(data)) + df = text2df(r.text) + df['symbol'] = symbol + df.rename({'date': 'dt', 'volume': 'vol'}, axis=1, inplace=True) + df = df[['symbol', 'dt', 'open', 'close', 'high', 'low', 'vol']] + for col in ['open', 'close', 'high', 'low', 'vol']: + df.loc[:, col] = df[col].apply(lambda x: round(float(x), 2)) + return df + + +def use_kline_analyze(): + kline = get_kline(symbol="000001.XSHG", end_date="20200616", freq="D", count=5000) + + # 输入K线即完成分析 + ka = KlineAnalyze(kline, name="日线") + + # 查看结果 + print("分型识别结果:", ka.fx[-3:]) + print("笔识别结果:", ka.bi[-3:]) + print("线段识别结果:", ka.xd[-3:]) + print("中枢识别结果:", ka.zs[-1]) + + # 用图片或者HTML可视化 + ka.to_image("test.png") + ka.to_html("test.html") + + +if __name__ == '__main__': + use_kline_analyze() + diff --git a/examples/use_czsc_with_tq.py b/examples/use_czsc_with_tq.py new file mode 100644 index 0000000000000000000000000000000000000000..cb4c42007ce8ea8963ed74b52118af8a2c1404e5 --- /dev/null +++ b/examples/use_czsc_with_tq.py @@ -0,0 +1,234 @@ +# coding: utf-8 +import os +from tqsdk import TqApi, TqBacktest, TqSim +from datetime import date, datetime, timedelta +from copy import deepcopy +from pathlib import Path +import traceback +from czsc import KlineAnalyze +from czsc.analyze import create_df +from czsc.solid import is_xd_buy, is_xd_sell +from zb.utils import create_logger + +# 环境准备: pip install tqsdk zb czsc + + +class TradeAnalyze: + """5分钟第三类买卖点 + 1分钟线段买卖点 + 日线分型""" + def __init__(self, klines): + self.klines = klines + self.ka_1min = KlineAnalyze(self.klines['1分钟'], name='1分钟') + self.ka_5min = KlineAnalyze(self.klines['5分钟'], name="5分钟") + self.ka_D = KlineAnalyze(self.klines['日线'], name='日线') + self.symbol = self.ka_1min.symbol + self.end_dt = self.ka_1min.end_dt + self.latest_price = self.ka_1min.latest_price + self.s = self.signals() + self.desc = self.__doc__ + + def signals(self): + """计算交易决策需要的状态信息""" + s = {"symbol": self.symbol, + "dt": self.end_dt, + "base_price": self.ka_5min.xd[-1]['xd'], + "latest_price": self.latest_price, + "5分钟顶分型后有效跌破MA5": False, + "5分钟底分型后有效升破MA5": False, + "5分钟三买": False, + "5分钟三卖": False, + "5分钟线段标记": self.ka_5min.xd[-1]['fx_mark'], + "5分钟笔标记": self.ka_5min.bi[-1]['fx_mark'], + "日线最后一个分型": self.ka_D.fx[-1]['fx_mark'], + "1分钟有线买": False, + "1分钟有线卖": False, + } + + b1 = is_xd_buy(self.ka_1min, self.ka_5min, pf=True) + if b1["操作提示"] == "线买": + s['1分钟有线买'] = True + + s1 = is_xd_sell(self.ka_1min, self.ka_5min, pf=True) + if s1["操作提示"] == "线卖": + s['1分钟有线卖'] = True + + ka = self.ka_5min + xds = ka.xd[-6:] + + # 至少需要6个线段标记 + if len(xds) < 6: + return s + + zs_d = max([x['xd'] for x in xds[:4] if x['fx_mark'] == 'd']) + zs_g = min([x['xd'] for x in xds[:4] if x['fx_mark'] == 'g']) + if zs_g > zs_d: + if xds[-1]['fx_mark'] == 'd' and xds[-1]['xd'] > zs_g: + s['5分钟三买'] = True + + if xds[-1]['fx_mark'] == 'g' and xds[-1]['xd'] < zs_d: + s['5分钟三卖'] = True + + df = create_df(ka, ma_params=(5,)) + last_fx = ka.fx[-1] + df_last = df[df.dt >= last_fx['dt']] + + if last_fx['fx_mark'] == 'g' and df_last.iloc[1]['close'] < df_last.iloc[1]['ma5']: + s['5分钟顶分型后有效跌破MA5'] = True + + if last_fx['fx_mark'] == 'd' and df_last.iloc[1]['close'] > df_last.iloc[1]['ma5']: + s['5分钟底分型后有效升破MA5'] = True + + return {k: v for k, v in s.items()} + + def long_open(self): + s = self.s + if s['日线最后一个分型'] == "d" and s['5分钟三买'] and s['5分钟底分型后有效升破MA5'] and s['1分钟有线买']: + return True + else: + return False + + def long_close(self): + s = self.s + if s['5分钟线段标记'] == "g" and s['5分钟顶分型后有效跌破MA5'] and s['1分钟有线卖']: + return True + else: + return False + + def short_open(self): + s = self.s + if s['日线最后一个分型'] == "g" and s['5分钟三卖'] and s['5分钟顶分型后有效跌破MA5'] and s['1分钟有线卖']: + return True + else: + return False + + def short_close(self): + s = self.s + if s['5分钟线段标记'] == 'd' and s['5分钟底分型后有效升破MA5'] and s['1分钟有线买']: + return True + else: + return False + + +def format_kline(kline): + """格式化K线""" + def __convert_time(t): + try: + dt = datetime.utcfromtimestamp(t/1000000000) + dt = dt + timedelta(hours=8) # 中国默认时区 + return dt.strftime("%Y-%m-%d %H:%M:%S") + except: + return "" + + kline['dt'] = kline['datetime'].apply(__convert_time) + kline['vol'] = kline['volume'] + columns = ['symbol', 'dt', 'open', 'close', 'high', 'low', 'vol'] + df = kline[columns] + df = df.dropna(axis=0) + df.sort_values('dt', inplace=True, ascending=True) + df.reset_index(drop=True, inplace=True) + return df + + +if __name__ == '__main__': + start_dt = date(2020, 6, 16) + end_dt = date(2020, 6, 16) + init_balance = 100000 + port = '53318' + freqs_k_count = {"1分钟": 1000, "5分钟": 1000, "日线": 200} + + max_positions = { + "KQ.i@SHFE.au": 1, + "KQ.i@DCE.jd": 4, + } + + data_path = f"./logs/S05_{datetime.now().strftime('%Y%m%d%H%M%S')}" + Path(data_path).mkdir(parents=True, exist_ok=False) + file_log = os.path.join(data_path, "backtest.log") + file_signals = os.path.join(data_path, "signals.txt") + + logger = create_logger(log_file=file_log, cmd=True, name="S") + logger.info(f"标的配置:{max_positions}") + logger.info(f"前端地址:http://127.0.0.1:{port}") + logger.info(f"策略描述:{TradeAnalyze.__doc__}") + + account = TqSim(init_balance=init_balance) + backtest = TqBacktest(start_dt=start_dt, end_dt=end_dt) + api = TqApi(account=account, backtest=backtest, web_gui=f":{port}") + symbols = list(max_positions.keys()) + freqs = list(freqs_k_count.keys()) + + freq_seconds = {"1分钟": 60, "5分钟": 60 * 5, "15分钟": 60 * 15, + "30分钟": 60 * 30, "60分钟": 60 * 60, "日线": 3600 * 24} + + # 订阅K线 + symbols_klines = {s: dict() for s in symbols} + for symbol in symbols: + for freq in freqs: + symbols_klines[symbol][freq] = api.get_kline_serial(symbol, + freq_seconds[freq], + data_length=freqs_k_count[freq]) + + account = api.get_account() + positions = api.get_position() + + while True: + api.wait_update() + for symbol in symbols: + if api.is_changing(symbols_klines[symbol]["5分钟"]): + klines = {k: format_kline(deepcopy(symbols_klines[symbol][k])) for k in freqs} + try: + ta = TradeAnalyze(klines) + with open(file_signals, 'a', encoding='utf-8') as f: + f.write(str(ta.s) + "\n") + except: + traceback.print_exc() + continue + + cur_pos = positions.get(symbol, None) + if cur_pos: + long_pos = { + "dt": ta.end_dt, + "volume": cur_pos.pos_long, + "td_volume": cur_pos.pos_long_today, + "yd_volume": cur_pos.pos_long_his, + } + short_pos = { + "dt": ta.end_dt, + "volume": cur_pos.pos_short, + "td_volume": cur_pos.pos_short_today, + "yd_volume": cur_pos.pos_short_his, + } + else: + long_pos = {"dt": ta.end_dt, "volume": 0, "td_volume": 0, "yd_volume": 0} + short_pos = {"dt": ta.end_dt, "volume": 0, "td_volume": 0, "yd_volume": 0} + + logger.info(f"{symbol} - 当前多仓持仓情况:{long_pos}") + logger.info(f"{symbol} - 当前空仓持仓情况:{short_pos}") + + # 下单 + if ta.long_close() and long_pos.get('volume', 0) > 0: + if long_pos.get("td_volume", 0): + order = api.insert_order(symbol=symbol, direction="SELL", offset="CLOSETODAY", + volume=long_pos['td_volume']) + + if long_pos.get("yd_volume", 0): + order = api.insert_order(symbol=symbol, direction="SELL", offset="CLOSE", + volume=long_pos['yd_volume']) + + if ta.long_open() and long_pos.get('volume', 0) == 0: + order = api.insert_order(symbol=symbol, direction="BUY", offset="OPEN", + volume=max_positions[symbol]) + + # 平空仓 + if ta.short_close() and short_pos.get('volume', 0) > 0: + if short_pos.get("td_volume", 0) > 0: + order = api.insert_order(symbol=symbol, direction="BUY", offset="CLOSETODAY", + volume=short_pos['td_volume']) + + if short_pos.get("yd_volume", 0) > 0: + order = api.insert_order(symbol=symbol, direction="BUY", offset="CLOSE", + volume=short_pos['yd_volume']) + + # 开空仓 + if ta.short_open() and short_pos.get('volume', 0) == 0: + order = api.insert_order(symbol=symbol, direction="SELL", offset="OPEN", + volume=max_positions[symbol])