提交 8b7b33b7 编写于 作者: Z zengbin93

modify

上级 b5af7d34
......@@ -32,14 +32,27 @@ pip install git+git://github.com/zengbin93/chan.git -U
## 使用方法
### 应用缠论对 K 线进行分析,并将分析结果可视化
目前已经实现了缠论的 笔、线段、中枢 的自动识别,核心代码在 `chan.analyze` 中;
此外,基于这个库,开发了一个web页面,关联 tushare.pro 的数据,输入相应的交易代码等就可以直接查看对应的分析结果。
访问地址:http://103.235.232.44:8012/
```python
import chan
from chan.analyze import KlineAnalyze
chan.kline_viewer(ts_code='000001.SH', freq='W', end_date='20190809', asset='I', show=True)
```
ka = KlineAnalyze(kline) # kline 的格式见K先数据样例
# 笔的识别结果
ka.k_bi
# 线段的识别结果
ka.k_xd
# 中枢的识别结果
ka.k_zs
```
# coding: utf-8
from . import a
from .web.vis_kline import kline_viewer
from .utils import cache_path, clean_cache
from .utils import kline_status, ma, macd
from .analyze import preprocess, find_fx, find_bi, find_xd
author = "zengbin93"
version = "0.0.3"
email = "zeng_bin8888@163.com"
from .analyze import KlineAnalyze
# coding: utf-8
"""
缠论中仅适用与 A股 的方法
"""
from .daily_classfier import daily_classifier
from .utils import get_kline, get_realtime_kline
from .status import share_status, index_status
# coding: utf-8
import os
import time
import functools
import tushare as ts
import pandas as pd
import warnings
from datetime import datetime
from chan import cache_path
# A股交易日历
# --------------------------------------------------------------------
FILE_CALENDAR = os.path.join(cache_path, 'calendar.csv')
if os.path.exists(FILE_CALENDAR) and \
time.time() - os.path.getmtime(FILE_CALENDAR) < 3600 * 24 * 300:
trade_calendar = pd.read_csv(FILE_CALENDAR, encoding='utf-8')
else:
trade_calendar = ts.trade_cal() # tushare提供的交易日历
trade_calendar.to_csv(FILE_CALENDAR, index=False, encoding='utf-8')
def is_trade_day(date):
"""判断date日期是不是交易日
:param date: str or datetime.date, 如 2018-03-15
:return: Bool
"""
trade_day = trade_calendar[trade_calendar["isOpen"] == 1]
trade_day_list = list(trade_day['calendarDate'])
if isinstance(date, datetime):
date = str(date.date())
if date in trade_day_list:
return True
else:
return False
def in_trade_time():
"""判断当前是否是交易时间"""
today = str(datetime.now().date())
if not is_trade_day(today): # 如果今天不是交易日,返回False
return False
t1 = today + " " + "09:30:00"
t1 = datetime.strptime(t1, "%Y-%m-%d %H:%M:%S")
t2 = today + " " + "11:30:00"
t2 = datetime.strptime(t2, "%Y-%m-%d %H:%M:%S")
t3 = today + " " + "13:00:00"
t3 = datetime.strptime(t3, "%Y-%m-%d %H:%M:%S")
t4 = today + " " + "15:00:00"
t4 = datetime.strptime(t4, "%Y-%m-%d %H:%M:%S")
if t1 <= datetime.now() <= t2 or t3 <= datetime.now() <= t4:
return True
else:
return False
def recent_trade_days(date, n=10):
"""返回任一日期date前后的n个交易日日期
:param n: int
返回交易日的数量n。
如果 n > 0,则返回date日期未来的n个交易日日期;
如果 n < 0,则返回date日期过去的n个交易日日期。
:param date: str 默认值 today
:return: list
n个交易日日期列表
"""
cal_date = list(trade_calendar['calendarDate'])
date_index = cal_date.index(date)
if n >= 0:
recent_days = cal_date[date_index:date_index + n + 20]
else:
recent_days = cal_date[date_index + n - 20:date_index + 1]
recent_days = recent_days[::-1]
res = []
for d in recent_days:
if is_trade_day(d):
res.append(d)
if len(res) == abs(n):
break
return res
def run_at_trade_time(func):
"""控制函数func仅在交易时间段运行"""
@functools.wraps(func)
def wrapper(*args, **kw):
if in_trade_time():
res = func(*args, **kw)
else:
msg = "%s 不是交易时间,函数 %s 仅在交易时间执行" % (
datetime.now().__str__(), func.__name__
)
warnings.warn(msg)
res = None
return res
return wrapper
# coding: utf-8
from datetime import datetime, timedelta
import tushare as ts
def daily_classifier(ts_code, trade_date, asset='E', return_central=False):
""" A 股每日走势的分类
asset 交易资产类型,可选值 E股票 I沪深指数
使用该方法前,请仔细阅读:http://blog.sina.com.cn/s/blog_486e105c010009uy.html
每个交易日产生 8 根 30 分钟 K 线,分别在前三根和最后三根中计算中枢,其结果分以下情况:
1)没有中枢;
2)仅有一个中枢,或计算得到的两个中枢区间有重叠;
3)有两个中枢,且没有重叠。
example:
>>> kind, central = daily_classifier('600122.SH', "20190613", return_central=True)
>>> print(kind, central)
"""
start_date = datetime.strptime(trade_date, '%Y%m%d')
end_date = start_date + timedelta(days=1)
end_date = end_date.date().__str__().replace("-", "")
df = ts.pro_bar(ts_code=ts_code, freq='30min', asset=asset,
start_date=trade_date, end_date=end_date)
df.sort_values('trade_time', inplace=True)
data = df[['ts_code', 'trade_time', 'high', 'low', 'close']].iloc[1:, :]
data = data.reset_index(drop=True)
assert len(data) == 8, "每个交易日,A股有且只有8跟30分钟K线"
def _central(tri):
c_low = max(tri['low'])
c_high = min(tri['high'])
if c_low >= c_high:
# None means no central found
central = None
else:
central = {
"time_span": "%s - %s" % (tri.iloc[0, 1], tri.iloc[2, 1]),
"price_span": (c_low, c_high)
}
return central
first_central = _central(data.iloc[:3, :])
last_central = _central(data.iloc[-3:, :])
# 若两个中枢之间存在价格重叠部分,则第二个中枢不存在
if first_central and last_central:
fp = first_central['price_span']
lp = last_central["price_span"]
if fp[1] >= lp[0] >= fp[0] or fp[1] >= lp[1] >= fp[0]:
last_central = None
# 没有中枢的情况
if first_central is None and last_central is None:
kind = "最强单边走势"
# 一个中枢的情况(平衡市)
elif (first_central is None and last_central) or (first_central and last_central is None):
max_p = max(data.iloc[:3, :]['high'])
min_p = min(data.iloc[:3, :]['low'])
# 1、在前三根30分钟K线出现当天高点
if max(data['high']) == max_p:
kind = "弱平衡市"
# 2、在前三根30分钟K线出现当天低点
elif min(data['low']) == min_p:
kind = "强平衡市"
# 3、在前三根30分钟K线不出现当天高低点
else:
kind = "转折平衡市"
# 两个中枢的情况
elif first_central and last_central:
if first_central['price_span'][0] > last_central['price_span'][0]:
kind = "向下两中枢走势"
elif first_central['price_span'][0] < last_central['price_span'][0]:
kind = "向上两中枢走势"
else:
raise ValueError("两中枢的最低价不可以相等")
else:
raise ValueError('中枢计算错误')
if return_central:
return kind, (first_central, last_central)
else:
return kind
# coding:utf-8
from collections import OrderedDict
from chan import a
from chan.utils import kline_status
#
# def kline_status(ts_code, trade_date, freq='D', asset="I"):
# kline = a.get_kline(ts_code, freq=freq, end_date=trade_date, asset=asset, indicators=('ma', 'macd'))
#
# # MACD 多空状态
# last_raw = kline.iloc[-1]
# if last_raw['diff'] < 0 and last_raw['dea'] < 0:
# macd_status = '空头行情'
# elif last_raw['diff'] > 0 and last_raw['dea'] > 0:
# macd_status = '多头行情'
# else:
# macd_status = '转折行情'
#
# # 最近三根K线状态
# pred = preprocess(kline)
# last_three = pred.iloc[-3:]
#
# # 笔状态:最近三根 K 线的走势状态
# if min(last_three['low']) == last_three.iloc[-1]['low']:
# bi_status = "向下笔延伸中"
# elif min(last_three['low']) == last_three.iloc[-2]['low']:
# bi_status = "底分型构造中"
# elif max(last_three['high']) == last_three.iloc[-1]['high']:
# bi_status = "向上笔延伸中"
# elif max(last_three['high']) == last_three.iloc[-2]['high']:
# bi_status = "顶分型构造中"
# else:
# raise ValueError("kline 数据出错")
#
# return OrderedDict(macd_status=macd_status, bi_status=bi_status)
#
def _status(ts_code, trade_date, asset="I"):
status = OrderedDict()
status['上一交易日状态'] = a.daily_classifier(ts_code, trade_date, asset=asset)
for freq in ['30min', 'D', 'W']:
kline = a.get_kline(ts_code, freq=freq, end_date=trade_date, asset=asset, indicators=None)
status[freq] = kline_status(kline)
status_msg = '%s(%s) :%s;' % (ts_code, trade_date, status['上一交易日状态'])
status_msg += "30分钟线%s,MACD呈现%s;" % (status['30min']['bi_status'], status['30min']['macd_status'])
status_msg += "日线%s,MACD呈现%s;" % (status['D']['bi_status'], status['D']['macd_status'])
status_msg += "周线%s,MACD呈现%s;" % (status['W']['bi_status'], status['W']['macd_status'])
return status, status_msg
def share_status(ts_code, trade_date):
return _status(ts_code, trade_date, asset="E")
def index_status(ts_code, trade_date):
return _status(ts_code, trade_date, asset="I")
# coding: utf-8
from datetime import datetime, timedelta
import tushare as ts
from ..utils import ma, macd, boll
def _cal_indicators(kline, indicators):
if "ma" in indicators:
kline = ma(kline)
if "macd" in indicators:
kline = macd(kline)
if "boll" in indicators:
kline = boll(kline)
return kline
def get_kline(ts_code, end_date, start_date=None, freq='30min', asset='E', indicators=("ma", "macd")):
"""获取指定级别的前复权K线
:param ts_code: str
股票代码,如 600122.SH
:param freq: str
K线级别,可选值 [1min, 5min, 15min, 30min, 60min, D, M, Y]
:param start_date: str
日期,如 20190601
:param end_date: str
日期,如 20190610
:param indicators: tuple
可选值 ma macd boll
:param asset: str
交易资产类型,可选值 E股票 I沪深指数 C数字货币 FT期货 FD基金 O期权 CB可转债(v1.2.39),默认E
:return: pd.DataFrame
columns = ["symbol", "dt", "open", "close", "high", "low", "vol"]
example:
>>> from chan.a.utils import get_kline
>>> get_kline(ts_code='600122.SH', start_date='20190601', end_date='20190610', freq='30min')
"""
if indicators is None:
indicators = []
if start_date is None:
end_date = datetime.strptime(end_date, '%Y%m%d')
if freq == '1min':
start_date = end_date - timedelta(days=10)
elif freq == '5min':
start_date = end_date - timedelta(days=20)
elif freq == '30min':
start_date = end_date - timedelta(days=100)
elif freq == '60min':
start_date = end_date - timedelta(days=200)
elif freq == 'D':
start_date = end_date - timedelta(weeks=100)
elif freq == 'W':
start_date = end_date - timedelta(weeks=500)
elif freq == 'M':
start_date = end_date - timedelta(weeks=4*500)
elif freq == 'Y':
start_date = end_date - timedelta(weeks=52*30)
else:
raise ValueError("'freq' value error, current value is %s, "
"optional valid values are ['1min', '5min', '30min', "
"'60min', 'D', 'W', 'M', 'Y']" % freq)
start_date = start_date.date().__str__().replace("-", "")
end_date = end_date.date().__str__().replace("-", "")
# https://tushare.pro/document/2?doc_id=109
df = ts.pro_bar(ts_code=ts_code, freq=freq, start_date=start_date, end_date=end_date,
adj='qfq', asset=asset)
# 统一 k 线数据格式为 6 列,分别是 ["symbol", "dt", "open", "close", "high", "low", "vr"]
if "min" in freq:
df.rename(columns={'ts_code': "symbol", "trade_time": "dt"}, inplace=True)
else:
df.rename(columns={'ts_code': "symbol", "trade_date": "dt"}, inplace=True)
df.sort_values('dt', inplace=True)
df.reset_index(drop=True, inplace=True)
# 计算量比
# df['vr'] = 0
# for i in range(5, len(df)):
# df.loc[i, 'vr'] = round(df.loc[i, 'vol'] / df.loc[i-5:i-1, 'vol'].mean(), 4)
kline = df[['symbol', 'dt', 'open', 'close', 'high', 'low', 'vol']]
return _cal_indicators(kline, indicators)
def get_realtime_kline(ts_code, freq="5min", indicators=("ma", "macd")):
"""实时获取分钟K线(仅适用于A股股票)
:param ts_code: str
tushare 股票代码,如 600122.SH
:param freq: str
K线周期,分钟级别,可选值 5min 15min 30min 60min
:param indicators: tuple
可选值 ma macd boll
:return: pd.DataFrame
columns = ["symbol", "dt", "open", "close", "high", "low", "vol"]
"""
code = ts_code[:6]
df = ts.get_k_data(code=code, ktype=freq.replace("min", ""))
df['symbol'] = ts_code
df.rename(columns={'date': 'dt', 'volume': 'vol'}, inplace=True)
df.sort_values('dt', inplace=True)
df.reset_index(drop=True, inplace=True)
kline = df[['symbol', 'dt', 'open', 'close', 'high', 'low', 'vol']]
return _cal_indicators(kline, indicators)
......@@ -56,8 +56,9 @@ def preprocess(kline):
k_new.append(last)
last = deepcopy(row)
if i == len(kline):
k_new.append(last)
if i == len(kline)-1:
if row['dt'] != k_new[-1]['dt']:
k_new.append(row)
# print('k_new nums:', len(k_new))
return k_new
......@@ -131,10 +132,12 @@ def find_bi(k_fx):
def find_xd(k_bi):
"""查找线段标记"""
k_xd = []
i = 0
potential = deepcopy(k_bi[i])
# 依据不创新高、新低的近似标准找出所有潜在线段标记
while i < len(k_bi)-3:
k1, k2, k3 = k_bi[i+1], k_bi[i+2], k_bi[i+3]
......@@ -160,14 +163,83 @@ def find_xd(k_bi):
else:
raise ValueError
potential['xd'] = potential['bi']
k_xd.append(potential)
# print("xd nums:", len(k_xd))
# 待完善:如果两个线段标记之间无有效笔标记,则这两个线段标记无效。
return k_xd
# # 在向下笔构成的标准特征序列中找出所有顶分型;在向上笔构成的标准特征序列中找出所有底分型;
# def _bi_fx(bi_list, mode):
#
# if mode == 'g':
# direction = 'up'
# elif mode == 'd':
# direction = 'down'
# else:
# raise ValueError
#
# bi_pred = []
# last = bi_list[0]
# for bi in bi_list[1:]:
# # 包含关系处理
# cur_h, cur_l = bi['high'], bi['low']
# last_h, last_l = last['high'], last['low']
#
# # 左包含 or 右包含
# if (cur_h <= last_h and cur_l >= last_l) or (cur_h >= last_h and cur_l <= last_l):
# # 有包含关系,按方向分别处理
# if direction == "up":
# last_h = max(last_h, cur_h)
# last_l = max(last_l, cur_l)
# if last_h != last['high']:
# last = deepcopy(bi)
# last['low'] = last_l
# elif direction == "down":
# last_h = min(last_h, cur_h)
# last_l = min(last_l, cur_l)
# if last_l != last['low']:
# last = deepcopy(bi)
# last['high'] = last_h
# else:
# raise ValueError
# else:
# # 无包含关系,更新 K 线
# bi_pred.append(last)
# last = deepcopy(bi)
#
# return [x for x in find_fx(bi_pred) if x.get('fx_mark', None) == mode]
#
# bi_down = [{"dt": x['dt'], "high": x['fx'], "low": k_bi[i+1]['fx']}
# for i, x in enumerate(k_bi[:-1]) if x['fx_mark'] == "g"]
# bi_g = _bi_fx(bi_down, mode='g')
# bi_up = [{"dt": x['dt'], "high": k_bi[i+1]['fx'], "low": x['fx']}
# for i, x in enumerate(k_bi[:-1]) if x['fx_mark'] == "d"]
# bi_d = _bi_fx(bi_up, mode='d')
# valid_fx = [x['low'] for x in bi_d] + [x['high'] for x in bi_g]
# # valid_dt = [x['dt'] for x in bi_d] + [x['dt'] for x in bi_g]
#
# # 利用特征序列的顶底分型确认线段标记的有效性,随后,对连续两个相同标记进行处理。
# k_xd_valid = [k_xd[0]]
# for x in k_xd:
# if x['fx'] not in valid_fx:
# continue
# else:
# if x['fx_mark'] == k_xd_valid[-1]['fx_mark']:
# if x['fx_mark'] == 'g' and x['fx'] > k_xd_valid[-1]['fx']:
# k_xd_valid[-1] = x
# elif x['fx_mark'] == 'd' and x['fx'] < k_xd_valid[-1]['fx']:
# k_xd_valid[-1] = x
# else:
# continue
# else:
# k_xd_valid.append(x)
#
# # 对最后一个线段标记使用近似定义
# if k_xd[-1] not in k_xd_valid and k_xd[-1]['fx_mark'] != k_xd_valid[-1]['fx_mark']:
# # print("add last xd")
# k_xd_valid.append(k_xd[-1])
#
# return k_xd_valid
def find_zs(k_xd):
"""查找中枢"""
......@@ -226,6 +298,7 @@ class KlineAnalyze(object):
self.k_fx = find_fx(self.k_new)
self.k_bi = find_bi(self.k_fx)
self.k_xd = find_xd(self.k_bi)
self.k_zs = find_zs(self.k_xd)
@property
def chan_result(self):
......@@ -281,6 +354,7 @@ class KlineAnalyze(object):
# 找第三卖点是否形成
s = '中枢下移'
elif xd1[0]['fx_mark'] == "d" and xd4[1]['fx'] > zs[1]:
# 找第三买点是否形成
s = '中枢上移'
else:
s = '中枢震荡'
......
# coding: utf-8
"""
import os
import shutil
from collections import OrderedDict
from .analyze import preprocess
cache_path = os.path.join(os.path.expanduser('~'), ".chan")
if not os.path.exists(cache_path):
os.mkdir(cache_path)
def clean_cache():
shutil.rmtree(cache_path)
os.mkdir(cache_path)
常用技术分析指标:MA, MACD, BOLL
"""
def ma(kline, params=(5, 10, 20, 60, 120, 250)):
......@@ -72,40 +61,3 @@ def boll(kline):
kline['boll-bottom'] = kline['boll-bottom'].apply(round, args=(2,))
return kline
def kline_status(kline):
"""计算 kline 的当下状态
:param kline: pd.DataFrame
K线,columns = ["symbol", "dt", "open", "close", "high", "low", "vol"]
:return: OrderedDict
"""
kline = ma(kline)
kline = macd(kline)
# MACD 多空状态
last_raw = kline.iloc[-1]
if last_raw['diff'] < 0 and last_raw['dea'] < 0:
macd_status = '空头行情'
elif last_raw['diff'] > 0 and last_raw['dea'] > 0:
macd_status = '多头行情'
else:
macd_status = '转折行情'
# 最近三根K线状态
pred = preprocess(kline)
last_three = pred.iloc[-3:]
# 笔状态:最近三根 K 线的走势状态
if min(last_three['low']) == last_three.iloc[-1]['low']:
bi_status = "向下笔延伸中"
elif min(last_three['low']) == last_three.iloc[-2]['low']:
bi_status = "底分型构造中"
elif max(last_three['high']) == last_three.iloc[-1]['high']:
bi_status = "向上笔延伸中"
elif max(last_three['high']) == last_three.iloc[-2]['high']:
bi_status = "顶分型构造中"
else:
raise ValueError("kline 数据出错")
return OrderedDict(macd_status=macd_status, bi_status=bi_status)
# coding: utf-8
"""
https://pyecharts.org/#/zh-cn/rectangular_charts?id=klinecandlestick%ef%bc%9ak%e7%ba%bf%e5%9b%be
"""
import os
import webbrowser
from pyecharts import options as opts
from pyecharts.charts import Kline, Grid, Line, Bar, Scatter
from pyecharts.globals import ThemeType, CurrentConfig
from chan.a import get_kline
from chan.utils import cache_path
from chan.analyze import preprocess, find_fx, find_bi, find_xd
CurrentConfig.PAGE_TITLE = "chan - 缠论分析"
def kline_viewer(ts_code, freq, end_date, asset='E', show=True):
"""
:param show:
:param ts_code:
:param freq:
:param end_date:
:param asset:
:return:
example:
>>> from chan.web.vis_kline import kline_viewer
>>> kline_viewer(ts_code='002739.SZ', freq='1min', end_date="20190809", asset='E')
"""
kline_raw = get_kline(ts_code, freq=freq, end_date=end_date, asset=asset, indicators=('ma', 'macd'))
for col in ['open', 'close', 'high', 'low']:
kline_raw[col] = kline_raw[col].apply(round, args=(2,))
kline_chan = find_xd(find_bi(find_fx(preprocess(kline_raw))))
kline_chan = kline_chan[['dt', 'fx', 'fx_mark', 'bi', 'bi_mark', 'xd', 'xd_mark']]
kline_raw = kline_raw.merge(kline_chan, how='left', on='dt')
start_dt = kline_raw.iloc[0]["dt"]
end_dt = kline_raw.iloc[-1]["dt"]
x_data = kline_raw.dt.values.tolist()
oclh = kline_raw[['open', 'close', 'low', 'high']].values.tolist()
symbol = kline_raw.iloc[0]['symbol']
kline = (
Kline(init_opts=opts.InitOpts(theme=ThemeType.WHITE))
.add_xaxis(xaxis_data=x_data)
.add_yaxis(
series_name=symbol,
y_axis=oclh,
itemstyle_opts=opts.ItemStyleOpts(color="#ec0000", color0="#00da3c"),
)
.set_global_opts(
title_opts=opts.TitleOpts(
title="缠论分析结果:%s-%s" % (symbol, freq),
subtitle="时间区间:%s 至 %s" % (start_dt, end_dt)
),
xaxis_opts=opts.AxisOpts(type_="category"),
yaxis_opts=opts.AxisOpts(
is_scale=True,
splitarea_opts=opts.SplitAreaOpts(
is_show=True, areastyle_opts=opts.AreaStyleOpts(opacity=1)
),
),
legend_opts=opts.LegendOpts(
is_show=True, pos_top=5, pos_left="center"
),
datazoom_opts=[
opts.DataZoomOpts(
is_show=False,
type_="inside",
xaxis_index=[0, 1],
range_start=0,
range_end=100,
),
opts.DataZoomOpts(
is_show=True,
xaxis_index=[0, 1],
type_="slider",
pos_top="60%",
range_start=0,
range_end=100,
),
],
tooltip_opts=opts.TooltipOpts(
trigger="axis",
axis_pointer_type="cross",
background_color="rgba(245, 245, 245, 0.8)",
border_width=1,
border_color="#ccc",
textstyle_opts=opts.TextStyleOpts(color="#000"),
),
visualmap_opts=opts.VisualMapOpts(
is_show=False,
dimension=2,
series_index=5,
is_piecewise=True,
pieces=[
{"value": 1, "color": "#ec0000"},
{"value": -1, "color": "#00da3c"},
],
),
axispointer_opts=opts.AxisPointerOpts(
is_show=True,
link=[{"xAxisIndex": "all"}],
label=opts.LabelOpts(background_color="#777"),
),
brush_opts=opts.BrushOpts(
x_axis_index="all",
brush_link="all",
out_of_brush={"colorAlpha": 0.1},
brush_type="lineX",
),
)
)
line = (
Line(init_opts=opts.InitOpts(theme=ThemeType.WHITE))
.add_xaxis(xaxis_data=x_data)
.add_yaxis(
series_name="MA5",
y_axis=kline_raw.ma5.values.tolist(),
is_smooth=True,
is_symbol_show=False,
is_hover_animation=False,
linestyle_opts=opts.LineStyleOpts(width=2, opacity=0.5),
label_opts=opts.LabelOpts(is_show=False),
)
.add_yaxis(
series_name="MA10",
y_axis=kline_raw.ma10.values.tolist(),
is_smooth=True,
is_symbol_show=False,
is_hover_animation=False,
linestyle_opts=opts.LineStyleOpts(width=2, opacity=0.5),
label_opts=opts.LabelOpts(is_show=False),
)
.add_yaxis(
series_name="MA20",
y_axis=kline_raw.ma20.values.tolist(),
is_smooth=True,
is_symbol_show=False,
is_hover_animation=False,
linestyle_opts=opts.LineStyleOpts(width=2, opacity=0.5),
label_opts=opts.LabelOpts(is_show=False),
)
.add_yaxis(
series_name="MA60",
y_axis=kline_raw.ma60.values.tolist(),
is_smooth=True,
is_symbol_show=False,
is_hover_animation=False,
linestyle_opts=opts.LineStyleOpts(width=2, opacity=0.5),
label_opts=opts.LabelOpts(is_show=False),
)
.add_yaxis(
series_name="MA120",
y_axis=kline_raw.ma120.values.tolist(),
is_smooth=True,
is_symbol_show=False,
is_hover_animation=False,
linestyle_opts=opts.LineStyleOpts(width=2, opacity=0.5),
label_opts=opts.LabelOpts(is_show=False),
)
.add_yaxis(
series_name="MA250",
y_axis=kline_raw.ma250.values.tolist(),
is_smooth=True,
is_symbol_show=False,
is_hover_animation=False,
linestyle_opts=opts.LineStyleOpts(width=2, opacity=0.5),
label_opts=opts.LabelOpts(is_show=False),
)
.set_global_opts(xaxis_opts=opts.AxisOpts(type_="category"))
)
chan_fx = (
Scatter(init_opts=opts.InitOpts(theme=ThemeType.WHITE))
.add_xaxis(xaxis_data=x_data)
.add_yaxis("分型标记", kline_raw.fx.values.tolist())
.set_global_opts(
visualmap_opts=opts.VisualMapOpts(type_="size", max_=150, min_=20),
)
)
chan_bi = (
Scatter(init_opts=opts.InitOpts(theme=ThemeType.WHITE))
.add_xaxis(xaxis_data=x_data)
.add_yaxis("笔标记", kline_raw.bi.values.tolist())
.set_global_opts(
visualmap_opts=opts.VisualMapOpts(type_="size", max_=150, min_=20),
)
)
chan_xd = (
Scatter(init_opts=opts.InitOpts(theme=ThemeType.WHITE))
.add_xaxis(xaxis_data=x_data)
.add_yaxis("线段标记", kline_raw.xd.values.tolist())
.set_global_opts(
visualmap_opts=opts.VisualMapOpts(type_="size", max_=150, min_=20),
)
)
# Kline And Line
kline = kline.overlap(line)
kline = kline.overlap(chan_fx)
kline = kline.overlap(chan_bi)
kline = kline.overlap(chan_xd)
# ==========================================================
macd_line = (
Line(init_opts=opts.InitOpts(theme=ThemeType.WHITE))
.add_xaxis(xaxis_data=x_data)
.add_yaxis(
series_name="DEA",
y_axis=kline_raw['dea'].values.tolist(),
is_smooth=True,
is_symbol_show=False,
is_hover_animation=False,
linestyle_opts=opts.LineStyleOpts(width=2, opacity=0.5),
label_opts=opts.LabelOpts(is_show=False),
)
.add_yaxis(
series_name="DIFF",
y_axis=kline_raw['diff'].values.tolist(),
is_smooth=True,
is_symbol_show=False,
is_hover_animation=False,
linestyle_opts=opts.LineStyleOpts(width=2, opacity=0.5),
label_opts=opts.LabelOpts(is_show=False),
)
.set_global_opts(xaxis_opts=opts.AxisOpts(type_="category"))
)
macd_bar = (
Bar(init_opts=opts.InitOpts(theme=ThemeType.LIGHT))
.add_xaxis(xaxis_data=x_data)
.add_yaxis(
series_name="Volume",
yaxis_data=kline_raw.macd.values.tolist(),
xaxis_index=1,
yaxis_index=1,
label_opts=opts.LabelOpts(is_show=False),
)
.set_global_opts(
xaxis_opts=opts.AxisOpts(
type_="category",
is_scale=True,
grid_index=1,
boundary_gap=False,
axisline_opts=opts.AxisLineOpts(is_on_zero=False),
axistick_opts=opts.AxisTickOpts(is_show=False),
splitline_opts=opts.SplitLineOpts(is_show=False),
axislabel_opts=opts.LabelOpts(is_show=False),
split_number=20,
min_="dataMin",
max_="dataMax",
),
yaxis_opts=opts.AxisOpts(
grid_index=1,
is_scale=True,
split_number=2,
axislabel_opts=opts.LabelOpts(is_show=False),
axisline_opts=opts.AxisLineOpts(is_show=False),
axistick_opts=opts.AxisTickOpts(is_show=False),
splitline_opts=opts.SplitLineOpts(is_show=False),
),
legend_opts=opts.LegendOpts(is_show=False),
)
)
macd = macd_line.overlap(macd_bar)
# ==========================================================
bar = (
Bar(init_opts=opts.InitOpts(theme=ThemeType.LIGHT))
.add_xaxis(xaxis_data=x_data)
.add_yaxis(
series_name="Volume",
yaxis_data=kline_raw.vol.values.tolist(),
xaxis_index=1,
yaxis_index=1,
label_opts=opts.LabelOpts(is_show=False),
)
.set_global_opts(
xaxis_opts=opts.AxisOpts(
type_="category",
is_scale=True,
grid_index=1,
boundary_gap=False,
axisline_opts=opts.AxisLineOpts(is_on_zero=False),
axistick_opts=opts.AxisTickOpts(is_show=False),
splitline_opts=opts.SplitLineOpts(is_show=False),
axislabel_opts=opts.LabelOpts(is_show=False),
split_number=20,
min_="dataMin",
max_="dataMax",
),
yaxis_opts=opts.AxisOpts(
grid_index=1,
is_scale=True,
split_number=2,
axislabel_opts=opts.LabelOpts(is_show=False),
axisline_opts=opts.AxisLineOpts(is_show=False),
axistick_opts=opts.AxisTickOpts(is_show=False),
splitline_opts=opts.SplitLineOpts(is_show=False),
),
legend_opts=opts.LegendOpts(is_show=False),
)
)
# Grid Overlap + Bar
grid_chart = Grid(opts.InitOpts(width="1500px", height="800px", theme=ThemeType.WHITE))
grid_chart.add(
kline,
grid_opts=opts.GridOpts(
pos_left="8%", pos_right="8%", height="50%"
),
)
grid_chart.add(
macd,
grid_opts=opts.GridOpts(
pos_left="8%", pos_right="8%", pos_top="60%", height="16%"
),
)
grid_chart.add(
bar,
grid_opts=opts.GridOpts(
pos_left="8%", pos_right="8%", pos_top="70%", height="16%"
),
)
# 调用浏览器打开可视化结果
if show:
graph_path = os.path.join(cache_path, "%s_kline_%s.html" % (symbol, freq))
grid_chart.render(path=graph_path)
webbrowser.open(graph_path)
return grid_chart
# coding: utf-8
from setuptools import setup, find_packages
import chan
setup(
name="chan",
version=chan.version,
version="V 0.0.1",
keywords=("缠论", "技术分析"),
description="缠论技术分析工具",
long_description="缠论技术分析工具",
license="MIT",
url="https://github.com/zengbin93/chan",
author=chan.author,
author_email=chan.email,
author="zengbin93",
author_email="zeng_bin8888@163.com",
packages=find_packages(exclude=['test', 'images']),
include_package_data=True,
......
此差异已折叠。
# coding: utf-8
import sys
sys.path.insert(0, "..")
import os
from datetime import datetime
import pandas as pd
import chan
from tqdm import tqdm
shares = pd.read_excel(os.path.join(chan.cache_path, 'shares.xlsx'), sheets='pool')
today = datetime.now().date().__str__().replace("-", '')
chan.kline_viewer(ts_code='000001.SH', freq='5min', end_date=today, asset='I', show=True)
# coding: utf-8
import unittest
from chan.a import get_kline
import chan.utils as u
class TestAnalyze(unittest.TestCase):
def setUp(self):
self.kline = get_kline(ts_code='600977.SH', start_date='20190501', end_date='20190725', freq='5min')
def test_utils(self):
kline_new = u.preprocess(self.kline)
self.assertTrue(len(kline_new) < len(self.kline))
kline_bi = u.find_bi(kline_new)
self.assertTrue("bi_mark" in kline_bi.columns)
kline_xd = u.find_xd(kline_bi)
self.assertTrue("xd_mark" in kline_xd.columns)
kline_macd = u.macd(kline_xd)
self.assertTrue("macd" in kline_macd.columns)
kline_boll = u.boll(kline_macd)
self.assertTrue("boll-top" in kline_boll.columns)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册