Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
zengbin93
czsc
提交
19a8f038
C
czsc
项目概览
zengbin93
/
czsc
通知
23
Star
2
Fork
1
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
4
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
czsc
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
4
Issue
4
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
19a8f038
编写于
8月 29, 2020
作者:
Z
zengbin93
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
0.5.3 新增 get_potential_xd 方法
上级
2b14b514
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
70 addition
and
18 deletion
+70
-18
czsc/analyze.py
czsc/analyze.py
+49
-13
test/test_analyze.py
test/test_analyze.py
+21
-5
未找到文件。
czsc/analyze.py
浏览文件 @
19a8f038
...
...
@@ -137,7 +137,10 @@ def has_gap(k1, k2, min_gap=0.002):
def
make_standard_seq
(
bi_seq
):
"""计算标准特征序列
:param bi_seq: list of dict
笔标记序列
:return: list of dict
标准特征序列
"""
if
bi_seq
[
0
][
'fx_mark'
]
==
'd'
:
direction
=
"up"
...
...
@@ -245,6 +248,49 @@ def is_valid_xd(bi_seq1, bi_seq2, bi_seq3):
return
False
return
True
def
get_potential_xd
(
bi_points
):
"""获取潜在线段标记点
:param bi_points: list of dict
笔标记点
:return: list of dict
潜在线段标记点
"""
xd_p
=
[]
bi_d
=
[
x
for
x
in
bi_points
if
x
[
'fx_mark'
]
==
'd'
]
bi_g
=
[
x
for
x
in
bi_points
if
x
[
'fx_mark'
]
==
'g'
]
for
i
in
range
(
1
,
len
(
bi_d
)
-
1
):
d1
,
d2
,
d3
=
bi_d
[
i
-
1
:
i
+
2
]
if
d1
[
'bi'
]
>
d2
[
'bi'
]
<
d3
[
'bi'
]:
xd_p
.
append
(
d2
)
for
j
in
range
(
1
,
len
(
bi_g
)
-
1
):
g1
,
g2
,
g3
=
bi_g
[
j
-
1
:
j
+
2
]
if
g1
[
'bi'
]
<
g2
[
'bi'
]
>
g3
[
'bi'
]:
xd_p
.
append
(
g2
)
xd_p
=
sorted
(
xd_p
,
key
=
lambda
x
:
x
[
'dt'
],
reverse
=
False
)
return
xd_p
def
handle_last_xd
(
bi_points
):
"""处理当下段
当下段是指当下进行中的无法确认完成的线段,对于操作而言,必须在当下对其进行分析,判断是延续还是转折。
:param bi_points: list of dict
最近一个线段标记后面的全部笔标记。在这些笔标记中可能存在 1个、2个或3个需要需要确认的线段标记。
:return: list of dict
返回判断结果
"""
# step 1. 获取潜在分段标记点
xd_p
=
get_potential_xd
(
bi_points
)
if
len
(
xd_p
)
==
0
:
if
bi_points
[
0
][
'fx_mark'
]
!=
bi_points
[
-
1
][
'fx_mark'
]:
bi_points
.
pop
(
-
1
)
class
KlineAnalyze
:
def
__init__
(
self
,
kline
,
name
=
"本级别"
,
bi_mode
=
"old"
,
max_raw_len
=
10000
,
ma_params
=
(
5
,
20
,
120
),
verbose
=
False
):
"""
...
...
@@ -528,19 +574,8 @@ class KlineAnalyze:
right_bi
=
[
x
for
x
in
self
.
bi_list
if
x
[
'dt'
]
>=
self
.
xd_list
[
-
1
][
'dt'
]]
else
:
right_bi
=
[
x
for
x
in
self
.
bi_list
[
-
200
:]
if
x
[
'dt'
]
>=
self
.
xd_list
[
-
1
][
'dt'
]]
xd_p
=
[]
bi_d
=
[
x
for
x
in
right_bi
if
x
[
'fx_mark'
]
==
'd'
]
bi_g
=
[
x
for
x
in
right_bi
if
x
[
'fx_mark'
]
==
'g'
]
for
i
in
range
(
1
,
len
(
bi_d
)
-
2
):
d1
,
d2
,
d3
=
bi_d
[
i
-
1
:
i
+
2
]
if
d1
[
'bi'
]
>
d2
[
'bi'
]
<
d3
[
'bi'
]:
xd_p
.
append
(
d2
)
for
j
in
range
(
1
,
len
(
bi_g
)
-
2
):
g1
,
g2
,
g3
=
bi_g
[
j
-
1
:
j
+
2
]
if
g1
[
'bi'
]
<
g2
[
'bi'
]
>
g3
[
'bi'
]:
xd_p
.
append
(
g2
)
xd_p
=
sorted
(
xd_p
,
key
=
lambda
x
:
x
[
'dt'
],
reverse
=
False
)
xd_p
=
get_potential_xd
(
right_bi
)
for
xp
in
xd_p
:
xd
=
dict
(
xp
)
xd
[
'xd'
]
=
xd
.
pop
(
'bi'
)
...
...
@@ -584,6 +619,7 @@ class KlineAnalyze:
self
.
xd_list
.
append
(
xd
)
def
_xd_after_process
(
self
):
"""线段标记后处理,使用标准特征序列判断线段标记是否成立"""
if
not
len
(
self
.
xd_list
)
>
4
:
return
...
...
test/test_analyze.py
浏览文件 @
19a8f038
...
...
@@ -8,7 +8,7 @@ sys.path.insert(0, '..')
import
os
import
pandas
as
pd
import
czsc
from
czsc.analyze
import
KlineAnalyze
,
find_zs
,
is_valid_xd
,
make_standard_seq
from
czsc.analyze
import
KlineAnalyze
,
find_zs
,
is_valid_xd
,
make_standard_seq
,
get_potential_xd
,
handle_last_xd
warnings
.
warn
(
"czsc version is {}"
.
format
(
czsc
.
__version__
))
...
...
@@ -32,8 +32,8 @@ def test_get_sub_section():
sub_bi
=
ka
.
get_sub_section
(
ka
.
xd_list
[
-
2
][
'dt'
],
ka
.
xd_list
[
-
1
][
'dt'
],
mode
=
'bi'
,
is_last
=
True
)
assert
sub_bi
[
0
][
'dt'
]
==
ka
.
xd_list
[
-
2
][
'dt'
]
and
sub_bi
[
-
1
][
'dt'
]
==
ka
.
xd_list
[
-
1
][
'dt'
]
sub_xd
=
ka
.
get_sub_section
(
ka
.
xd_list
[
-
10
][
'dt'
],
ka
.
xd_list
[
-
1
][
'dt'
],
mode
=
'xd'
,
is_last
=
True
)
assert
sub_xd
[
0
][
'dt'
]
==
ka
.
xd_list
[
-
10
][
'dt'
]
and
sub_xd
[
-
1
][
'dt'
]
==
ka
.
xd_list
[
-
1
][
'dt'
]
sub_xd
=
ka
.
get_sub_section
(
ka
.
xd_list
[
-
4
][
'dt'
],
ka
.
xd_list
[
-
1
][
'dt'
],
mode
=
'xd'
,
is_last
=
True
)
assert
sub_xd
[
0
][
'dt'
]
==
ka
.
xd_list
[
-
4
][
'dt'
]
and
sub_xd
[
-
1
][
'dt'
]
==
ka
.
xd_list
[
-
1
][
'dt'
]
def
test_kline_analyze
():
...
...
@@ -73,8 +73,8 @@ def test_bei_chi():
bi2
=
{
"start_dt"
:
ka
.
bi_list
[
-
13
][
'dt'
],
"end_dt"
:
ka
.
bi_list
[
-
12
][
'dt'
],
"direction"
:
"down"
}
x1
=
ka
.
is_bei_chi
(
bi1
,
bi2
,
mode
=
"bi"
,
adjust
=
0.9
)
xd1
=
{
"start_dt"
:
ka
.
xd_list
[
-
4
][
'dt'
],
"end_dt"
:
ka
.
xd_list
[
-
3
][
'dt'
],
"direction"
:
"down"
}
xd2
=
{
"start_dt"
:
ka
.
xd_list
[
-
6
][
'dt'
],
"end_dt"
:
ka
.
xd_list
[
-
5
][
'dt'
],
"direction"
:
"down"
}
xd1
=
{
"start_dt"
:
ka
.
xd_list
[
-
2
][
'dt'
],
"end_dt"
:
ka
.
xd_list
[
-
1
][
'dt'
],
"direction"
:
"down"
}
xd2
=
{
"start_dt"
:
ka
.
xd_list
[
-
4
][
'dt'
],
"end_dt"
:
ka
.
xd_list
[
-
3
][
'dt'
],
"direction"
:
"down"
}
x2
=
ka
.
is_bei_chi
(
xd1
,
xd2
,
mode
=
'xd'
,
adjust
=
0.9
)
print
(
'背驰计算结果:{},{}'
.
format
(
x1
,
x2
))
...
...
@@ -276,3 +276,19 @@ def test_is_valid_xd():
assert
not
is_valid_xd
(
bi_seq1
,
bi_seq2
,
bi_seq3
)
def
test_handle_last_xd
():
# 0个需要确认的线段标记
bi_points
=
[
{
"dt"
:
1
,
"bi"
:
10
,
"fx_mark"
:
"d"
},
{
"dt"
:
2
,
"bi"
:
11
,
"fx_mark"
:
"g"
},
{
"dt"
:
3
,
"bi"
:
10.4
,
"fx_mark"
:
"d"
},
{
"dt"
:
4
,
"bi"
:
11.1
,
"fx_mark"
:
"g"
},
{
"dt"
:
5
,
"bi"
:
10.6
,
"fx_mark"
:
"d"
},
{
"dt"
:
6
,
"bi"
:
11.2
,
"fx_mark"
:
"g"
},
{
"dt"
:
7
,
"bi"
:
11
,
"fx_mark"
:
"d"
},
]
pass
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录