提交 eeaf93ad 编写于 作者: Z zengbin93

add 线段划分

上级 2650f6e1
# coding: utf-8
"""
step 1. 去除包含关系
step 2. 找出全部分型,并验证有效性
step 3. 标记线段
step 4. 标记中枢
====================================================================
"""
def find_xd(kline):
"""线段查找。输入:确定了分型的 K 线;输出:加入线段查找结果的 K 线"""
def preprocess(kline):
"""去除包含关系"""
kline['high_m'] = None
kline['low_m'] = None
# 找出所有可能的线段终点
gd1 = kline[kline['bi_mark'] == 0].iloc[0]
gd2 = kline[kline['bi_mark'] == 2].iloc[0]
# 首行处理
last_h = kline.loc[0, 'high']
last_l = kline.loc[0, 'low']
if last_h >= kline.loc[1, 'high']:
direction = 0 # 下跌
if gd1['high'] < gd2['high']:
direction = "向上"
else:
direction = 1 # 上涨
for i, row in kline.iterrows():
cur_h, cur_l = row['high'], row['low']
# 左包含 or 右包含
if (cur_h <= last_h and cur_l >= last_l) or (cur_h >= last_h and cur_l <= last_l):
if direction == 0:
last_h = min(last_h, cur_h)
last_l = min(last_l, cur_l)
elif direction == 1:
last_h = max(last_h, cur_h)
last_l = max(last_l, cur_l)
else:
raise ValueError
continue
kline.loc[i-1, 'high_m'] = last_h
kline.loc[i-1, 'low_m'] = last_l
direction = "向下"
# 更新 direction, last_h, last_l
if last_h >= cur_h:
direction = 0 # 下跌
else:
direction = 1 # 上涨
i = 4
mark = 0
kline['xd_mark'] = None
last_h = cur_h
last_l = cur_l
return kline
while i <= kline['bi_mark'].max():
gd1 = kline[kline['bi_mark'] == i-3].iloc[0]
dd1 = kline[kline['bi_mark'] == i-2].iloc[0]
gd2 = kline[kline['bi_mark'] == i-1].iloc[0]
dd2 = kline[kline['bi_mark'] == i].iloc[0]
# 第二个顶分型的最高价小于或等于第一个顶分型的最高价,向上过程有可能结束
if direction == "向上" and gd2['high'] <= gd1['high']:
kline.loc[gd1.name, 'xd_mark'] = mark
mark += 1
direction = "向下"
def find_fx(kline):
"""找出全部分型,并验证有效性
# 第二个底分型的最低价大于或等于第一个底分型的最低价,向下过程有可能结束
elif direction == "向下" and dd2['low'] >= dd1['low']:
kline.loc[dd1.name, 'xd_mark'] = mark
mark += 1
direction = "向上"
0 - 顶分型
1 - 底分型
i += 2
:param kline: pd.DataFrame
经过预处理,去除了包含关系的 K 线
:return:
"""
kline_new = kline.dropna()
kline['fx'] = None
for i in range(1, len(kline_new)-1):
data = kline_new.iloc[i-1: i+2]
row = kline_new.iloc[i]
if max(data['high_m']) == row['high_m']:
kline.loc[row.name, 'fx'] = 0
elif min(data['low_m']) == row['low_m']:
kline.loc[row.name, 'fx'] = 1
else:
continue
# TODO(zengbin): 检查线段的有效性
return kline
......@@ -109,6 +109,20 @@ def find_fx(kline):
last_index = None
else:
last_index = curr_index
# 添加 笔标记 - 从第一个有效顶分型开始标记
kline['bi_mark'] = None
mark = 0
for i, row in kline.iterrows():
if mark == 0 and row['fx'] == 0:
kline.loc[i, 'bi_mark'] = mark
mark += 1
continue
if mark > 0 and row['fx'] in [0, 1]:
kline.loc[i, 'bi_mark'] = mark
mark += 1
return kline
......@@ -2,15 +2,13 @@
import unittest
from chan.a import get_kline
from chan.analyze import *
from chan.utils import preprocess, find_fx
class TestAnalyze(unittest.TestCase):
def test_analyze(self):
kline = get_kline(ts_code='600122.SH', start_date='20190501', end_date='20190620', freq='5min')
kline = preprocess(kline)
kline = find_fx(kline)
kline = get_kline(ts_code='600977.SH', start_date='20190501', end_date='20190725', freq='5min')
kline_new = preprocess(kline)
kline_fx = find_fx(kline_new)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册