Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
zengbin93
czsc
提交
b426fedc
C
czsc
项目概览
zengbin93
/
czsc
通知
23
Star
2
Fork
1
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
4
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
czsc
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
4
Issue
4
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
b426fedc
编写于
9月 02, 2020
作者:
Z
zengbin93
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
0.5.3 新增MACD和VOL的力度计算方法
上级
caf5bdb8
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
63 addition
and
15 deletion
+63
-15
czsc/analyze.py
czsc/analyze.py
+15
-1
test/test_analyze.py
test/test_analyze.py
+48
-14
未找到文件。
czsc/analyze.py
浏览文件 @
b426fedc
...
...
@@ -852,7 +852,7 @@ class KlineAnalyze:
return
[
x
for
x
in
points
if
end_dt
>=
x
[
'dt'
]
>=
start_dt
]
def
calculate_macd_power
(
self
,
start_dt
:
datetime
,
end_dt
:
datetime
,
mode
=
'bi'
,
direction
=
"up"
):
"""计算走势段(start_dt ~ end_dt)的力度
"""
用 MACD
计算走势段(start_dt ~ end_dt)的力度
:param start_dt: datetime
走势开始时间
...
...
@@ -881,4 +881,18 @@ class KlineAnalyze:
raise
ValueError
return
power
def
calculate_vol_power
(
self
,
start_dt
:
datetime
,
end_dt
:
datetime
):
"""用 VOL 计算走势段(start_dt ~ end_dt)的力度
:param start_dt: datetime
走势开始时间
:param end_dt: datetime
走势结束时间
:return: float
走势力度
"""
fd_vol
=
[
x
for
x
in
self
.
kline_raw
if
x
[
'dt'
]
>=
start_dt
]
fd_vol
=
[
x
for
x
in
fd_vol
if
end_dt
>=
x
[
'dt'
]]
power
=
sum
([
x
[
'vol'
]
for
x
in
fd_vol
])
return
int
(
power
)
test/test_analyze.py
浏览文件 @
b426fedc
...
...
@@ -8,20 +8,20 @@ sys.path.insert(0, '..')
import
os
import
pandas
as
pd
import
czsc
from
czsc.analyze
import
KlineAnalyze
,
find_zs
,
is_valid_xd
,
make_standard_seq
,
get_potential_xd
,
handle_last_xd
from
czsc.analyze
import
KlineAnalyze
,
find_zs
,
is_valid_xd
,
make_standard_seq
warnings
.
warn
(
"czsc version is {}"
.
format
(
czsc
.
__version__
))
cur_path
=
os
.
path
.
split
(
os
.
path
.
realpath
(
__file__
))[
0
]
# cur_path = "./test"
file_kline
=
os
.
path
.
join
(
cur_path
,
"data/000001.SH_D.csv"
)
kline
=
pd
.
read_csv
(
file_kline
,
encoding
=
"utf-8"
)
kline
.
loc
[:,
"dt"
]
=
pd
.
to_datetime
(
kline
.
dt
)
kline1
=
kline
.
iloc
[:
2000
]
kline2
=
kline
.
iloc
[
2000
:]
ka
=
KlineAnalyze
(
kline1
,
name
=
"日线"
,
max_raw_len
=
2000
,
verbose
=
False
)
def
test_ka_update
():
file_kline
=
os
.
path
.
join
(
cur_path
,
"data/000001.SH_D.csv"
)
kline
=
pd
.
read_csv
(
file_kline
,
encoding
=
"utf-8"
)
kline
.
loc
[:,
"dt"
]
=
pd
.
to_datetime
(
kline
.
dt
)
kline1
=
kline
.
iloc
[:
2000
]
kline2
=
kline
.
iloc
[
2000
:]
ka1
=
KlineAnalyze
(
kline
,
name
=
"日线"
,
max_raw_len
=
5000
,
verbose
=
False
)
ka2
=
KlineAnalyze
(
kline1
,
name
=
"日线"
,
max_raw_len
=
5000
,
verbose
=
False
)
...
...
@@ -32,8 +32,35 @@ def test_ka_update():
assert
len
(
ka1
.
fx_list
)
==
len
(
ka2
.
fx_list
)
assert
len
(
ka1
.
bi_list
)
==
len
(
ka2
.
bi_list
)
def
test_calculate_power
():
file_kline
=
os
.
path
.
join
(
cur_path
,
"data/000001.SH_D.csv"
)
kline
=
pd
.
read_csv
(
file_kline
,
encoding
=
"utf-8"
)
kline
.
loc
[:,
"dt"
]
=
pd
.
to_datetime
(
kline
.
dt
)
ka
=
KlineAnalyze
(
kline
,
name
=
"日线"
,
max_raw_len
=
5000
,
verbose
=
False
)
# 测试 macd 力度
last_xd_power
=
ka
.
calculate_macd_power
(
start_dt
=
ka
.
xd_list
[
-
2
][
'dt'
],
end_dt
=
ka
.
xd_list
[
-
1
][
'dt'
],
mode
=
'xd'
,
direction
=
"up"
if
ka
.
xd_list
[
-
1
][
'fx_mark'
]
==
'g'
else
"down"
)
last_bi_power
=
ka
.
calculate_macd_power
(
start_dt
=
ka
.
bi_list
[
-
2
][
'dt'
],
end_dt
=
ka
.
bi_list
[
-
1
][
'dt'
],
mode
=
'bi'
)
assert
int
(
last_xd_power
)
==
389
assert
int
(
last_bi_power
)
==
300
# 测试 vol 力度
last_xd_power
=
ka
.
calculate_vol_power
(
start_dt
=
ka
.
xd_list
[
-
2
][
'dt'
],
end_dt
=
ka
.
xd_list
[
-
1
][
'dt'
])
last_bi_power
=
ka
.
calculate_vol_power
(
start_dt
=
ka
.
bi_list
[
-
2
][
'dt'
],
end_dt
=
ka
.
bi_list
[
-
1
][
'dt'
])
assert
int
(
last_xd_power
)
==
13329239053
assert
int
(
last_bi_power
)
==
9291793337
def
test_get_sub_section
():
file_kline
=
os
.
path
.
join
(
cur_path
,
"data/000001.SH_D.csv"
)
kline
=
pd
.
read_csv
(
file_kline
,
encoding
=
"utf-8"
)
kline
.
loc
[:,
"dt"
]
=
pd
.
to_datetime
(
kline
.
dt
)
ka
=
KlineAnalyze
(
kline
,
name
=
"日线"
,
max_raw_len
=
2000
,
verbose
=
False
)
sub_kn
=
ka
.
get_sub_section
(
ka
.
fx_list
[
-
2
][
'dt'
],
ka
.
fx_list
[
-
1
][
'dt'
],
mode
=
'kn'
,
is_last
=
True
)
assert
sub_kn
[
0
][
'dt'
]
==
ka
.
fx_list
[
-
2
][
'dt'
]
and
sub_kn
[
-
1
][
'dt'
]
==
ka
.
fx_list
[
-
1
][
'dt'
]
...
...
@@ -48,15 +75,16 @@ def test_get_sub_section():
def
test_kline_analyze
():
file_kline
=
os
.
path
.
join
(
cur_path
,
"data/000001.SH_D.csv"
)
kline
=
pd
.
read_csv
(
file_kline
,
encoding
=
"utf-8"
)
kline
.
loc
[:,
"dt"
]
=
pd
.
to_datetime
(
kline
.
dt
)
ka
=
KlineAnalyze
(
kline
,
name
=
"日线"
,
max_raw_len
=
2000
,
verbose
=
False
)
# 测试绘图
file_img
=
"kline.png"
ka
.
to_image
(
file_img
,
max_k_count
=
5000
)
assert
os
.
path
.
exists
(
file_img
)
for
_
,
row
in
kline2
.
iterrows
():
ka
.
update
(
row
.
to_dict
())
assert
ka
.
kline_raw
[
-
1
][
'dt'
]
==
row
[
'dt'
]
# 测试分型识别结果
assert
ka
.
fx_list
[
-
1
][
'fx_mark'
]
==
'g'
assert
ka
.
fx_list
[
-
5
][
'fx_mark'
]
==
'g'
...
...
@@ -80,6 +108,11 @@ def test_kline_analyze():
def
test_bei_chi
():
file_kline
=
os
.
path
.
join
(
cur_path
,
"data/000001.SH_D.csv"
)
kline
=
pd
.
read_csv
(
file_kline
,
encoding
=
"utf-8"
)
kline
.
loc
[:,
"dt"
]
=
pd
.
to_datetime
(
kline
.
dt
)
ka
=
KlineAnalyze
(
kline
,
name
=
"日线"
,
max_raw_len
=
2000
,
verbose
=
False
)
bi1
=
{
"start_dt"
:
ka
.
bi_list
[
-
11
][
'dt'
],
"end_dt"
:
ka
.
bi_list
[
-
10
][
'dt'
],
"direction"
:
"down"
}
bi2
=
{
"start_dt"
:
ka
.
bi_list
[
-
13
][
'dt'
],
"end_dt"
:
ka
.
bi_list
[
-
12
][
'dt'
],
"direction"
:
"down"
}
x1
=
ka
.
is_bei_chi
(
bi1
,
bi2
,
mode
=
"bi"
,
adjust
=
0.9
)
...
...
@@ -91,7 +124,11 @@ def test_bei_chi():
def
test_update_ta
():
file_kline
=
os
.
path
.
join
(
cur_path
,
"data/000001.SH_D.csv"
)
kline
=
pd
.
read_csv
(
file_kline
,
encoding
=
"utf-8"
)
kline
.
loc
[:,
"dt"
]
=
pd
.
to_datetime
(
kline
.
dt
)
ka
=
KlineAnalyze
(
kline
,
name
=
"日线"
,
max_raw_len
=
2000
,
verbose
=
False
)
ma_x1
=
dict
(
ka
.
ma
[
-
1
])
macd_x1
=
dict
(
ka
.
macd
[
-
1
])
ka
.
update
(
kline
.
iloc
[
-
1
].
to_dict
())
...
...
@@ -107,9 +144,6 @@ def test_update_ta():
def
test_find_zs
():
bi_zs
=
find_zs
(
ka
.
bi_list
)
xd_zs
=
find_zs
(
ka
.
xd_list
)
# 造数测试
points
=
[
{
"dt"
:
0
,
"fx_mark"
:
"d"
,
"xd"
:
8
},
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录