signals.py 17.1 KB
Newer Older
Z
zengbin93 已提交
1 2 3
# coding: utf-8
from .analyze import KlineAnalyze, find_zs

Z
zengbin93 已提交
4 5
def check_jing(fd1, fd2, fd3, fd4, fd5):
    """检查最近5个分段走势是否构成井
Z
zengbin93 已提交
6

Z
zengbin93 已提交
7
    井的定义:
Z
zengbin93 已提交
8 9 10
        12345,五段,是构造井的基本形态,形成井的位置肯定是5,而5出井的
        前提条件是对于向上5至少比3和1其中之一高,向下反过来; 并且,234
        构成一个中枢。
Z
zengbin93 已提交
11

Z
zengbin93 已提交
12 13 14 15 16 17
        井只有两类,大井和小井(以向上为例):
        大井对应的形式是:12345向上,5最高3次之1最低,力度上1大于3,3大于5;
        小井对应的形式是:
            1:12345向上,3最高5次之1最低,力度上5的力度比1小,注意这时候
               不需要再考虑5和3的关系了,因为5比3低,所以不需要考虑力度。
            2:12345向上,5最高3次之1最低,力度上1大于5,5大于3。
Z
zengbin93 已提交
18

Z
zengbin93 已提交
19 20 21
        小井的构造,关键是满足5一定至少大于1、3中的一个。
        注意有一种情况不归为井:就是12345向上,1的力度最小,5的力度次之,3的力度最大此类不算井,
        因为345后面必然还有走势在67的时候才能再判断,个中道理各位好好体会。
Z
zengbin93 已提交
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75


    fd 为 dict 对象,表示一段走势,可以是笔、线段,样例如下:

    fd = {
        "start_dt": "",
        "end_dt": "",
        "power": 0,         # 力度
        "direction": "up",
        "high": 0,
        "low": 0,
        "mode": "bi"
    }

    """
    assert fd1['direction'] == fd3['direction'] == fd5['direction']
    assert fd2['direction'] == fd4['direction']
    direction = fd1['direction']

    zs_g = min(fd2['high'], fd3['high'], fd4['high'])
    zs_d = max(fd2['low'], fd3['low'], fd4['low'])

    jing = "没有出井"
    if zs_d < zs_g:
        if direction == 'up' and fd5["high"] > min(fd3['high'], fd1['high']):

            # 大井对应的形式是:12345向上,5最高3次之1最低,力度上1大于3,3大于5
            if fd5["high"] > fd3['high'] > fd1['high'] and fd5['power'] < fd3['power'] < fd1['power']:
                jing = "向上大井"

            # 第一种小井:12345向上,3最高5次之1最低,力度上5的力度比1小
            if fd1['high'] < fd5['high'] < fd3['high'] and fd5['power'] < fd1['power']:
                jing = "向上小井"

            # 第二种小井:12345向上,5最高3次之1最低,力度上1大于5,5大于3
            if fd5["high"] > fd3['high'] > fd1['high'] and fd1['power'] > fd5['power'] > fd3['power']:
                jing = "向上小井"

        if direction == 'down' and fd5["low"] < max(fd3['low'], fd1['low']):

            # 大井对应的形式是:12345向下,5最低3次之1最高,力度上1大于3,3大于5;
            if fd5['low'] < fd3['low'] < fd1['low'] and fd5['power'] < fd3['power'] < fd1['power']:
                jing = "向下大井"

            # 第一种小井:12345向下,3最低5次之1最高,力度上5的力度比1小
            if fd1["low"] > fd5['low'] > fd3['low'] and fd5['power'] < fd1['power']:
                jing = "向下小井"

            # 第二种小井:12345向下,5最低3次之1最高,力度上1大于5,5大于3
            if fd3["low"] > fd5['low'] > fd1['low'] and fd1['power'] > fd5['power'] > fd3['power']:
                jing = "向下小井"

    return jing

Z
zengbin93 已提交
76 77 78 79

def get_fx_signals(ka):
    """计算分型特征"""
    s = {
Z
zengbin93 已提交
80 81 82 83 84 85 86 87 88 89
        "收于MA5上方": False,
        "收于MA5下方": False,
        "收于MA20上方": False,
        "收于MA20下方": False,
        "收于MA120上方": False,
        "收于MA120下方": False,
        "最后一个分型为顶": False,
        "最后一个分型为底": False,
        "顶分型后有效跌破MA5": False,
        "底分型后有效升破MA5": False,
Z
zengbin93 已提交
90
        "最近三K线形态": None,
Z
zengbin93 已提交
91 92
    }

Z
zengbin93 已提交
93 94 95
    last_tri = ka.kline_new[-3:]
    if len(last_tri) == 3:
        if last_tri[-3]['high'] < last_tri[-2]['high'] > last_tri[-1]['high']:
Z
zengbin93 已提交
96
            s["最近三K线形态"] = "g"
Z
zengbin93 已提交
97
        elif last_tri[-3]['low'] > last_tri[-2]['low'] < last_tri[-1]['low']:
Z
zengbin93 已提交
98
            s["最近三K线形态"] = "d"
Z
zengbin93 已提交
99
        elif last_tri[-3]['low'] > last_tri[-2]['low'] > last_tri[-1]['low']:
Z
zengbin93 已提交
100
            s["最近三K线形态"] = "down"
Z
zengbin93 已提交
101
        elif last_tri[-3]['high'] < last_tri[-2]['high'] < last_tri[-1]['high']:
Z
zengbin93 已提交
102
            s["最近三K线形态"] = "up"
Z
zengbin93 已提交
103

Z
zengbin93 已提交
104 105 106 107 108
    last_klines_ = [dict(x) for x in ka.kline_raw[-10:]]
    if len(last_klines_) != 10:
        return s

    last_ma_ = ka.ma[-10:]
Z
zengbin93 已提交
109
    for i, x in enumerate(last_klines_):
Z
zengbin93 已提交
110
        assert last_ma_[i]['dt'] == x['dt'], "{}:计算均线错误".format(ka.name)
Z
zengbin93 已提交
111 112 113 114
        last_klines_[i].update(last_ma_[i])

    last_k = last_klines_[-1]
    if last_k['close'] >= last_k['ma5']:
Z
zengbin93 已提交
115
        s["收于MA5上方"] = True
Z
zengbin93 已提交
116
    else:
Z
zengbin93 已提交
117
        s["收于MA5下方"] = True
Z
zengbin93 已提交
118 119

    if last_k['close'] >= last_k['ma20']:
Z
zengbin93 已提交
120
        s["收于MA20上方"] = True
Z
zengbin93 已提交
121
    else:
Z
zengbin93 已提交
122
        s["收于MA20下方"] = True
Z
zengbin93 已提交
123 124

    if last_k['close'] >= last_k['ma120']:
Z
zengbin93 已提交
125
        s["收于MA120上方"] = True
Z
zengbin93 已提交
126
    else:
Z
zengbin93 已提交
127
        s["收于MA120下方"] = True
Z
zengbin93 已提交
128 129 130 131

    last_fx = ka.fx_list[-1]
    after_klines = [x for x in last_klines_ if x['dt'] >= last_fx['dt']]
    if last_fx['fx_mark'] == 'g':
Z
zengbin93 已提交
132
        s["最后一个分型为顶"] = True
Z
zengbin93 已提交
133 134 135 136
        # 顶分型有效跌破MA5的三种情况:1)分型右侧第一根K线收于MA5下方;2)连续5根K线最低价下穿MA5;3)连续3根K线收盘价收于MA5下方
        if after_klines[1]['close'] < after_klines[1]['ma5'] \
                or sum([1 for x in after_klines[-5:] if x['low'] < x['ma5']]) >= 5 \
                or sum([1 for x in after_klines[-3:] if x['close'] < x['ma5']]) >= 3:
Z
zengbin93 已提交
137
            s['顶分型后有效跌破MA5'] = True
Z
zengbin93 已提交
138 139

    if last_fx['fx_mark'] == 'd':
Z
zengbin93 已提交
140
        s["最后一个分型为底"] = True
Z
zengbin93 已提交
141 142 143 144
        # 底分型后有效升破MA5的三种情况:1)分型右侧第一根K线收于MA5上方;2)连续5根K线最高价上穿MA5;3)连续3根K线收盘价收于MA5上方
        if after_klines[1]['close'] > after_klines[1]['ma5'] \
                or sum([1 for x in after_klines[-5:] if x['high'] > x['ma5']]) >= 5 \
                or sum([1 for x in after_klines[-3:] if x['close'] > x['ma5']]) >= 3:
Z
zengbin93 已提交
145 146 147
            s['底分型后有效升破MA5'] = True
    freq = ka.name
    return {freq + k: v for k, v in s.items()}
Z
zengbin93 已提交
148 149 150 151 152


def get_bi_signals(ka):
    """计算笔信号"""
    s = {
Z
zengbin93 已提交
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
        "最后一个未确认的笔标记为底": False,
        "最后一个未确认的笔标记为顶": False,
        "最后一个已确认的笔标记为底": False,
        "最后一个已确认的笔标记为顶": False,
        "向上笔走势延伸": False,
        "向上笔现顶分型": False,
        "向下笔走势延伸": False,
        "向下笔现底分型": False,

        "最后一个笔中枢上沿": 0,
        "最后一个笔中枢下沿": 0,
        "收于笔中枢上方且有三买": False,
        "收于笔中枢上方且无三买": False,
        "收于笔中枢下方且有三卖": False,
        "收于笔中枢下方且无三卖": False,

        '笔同级别分解买': False,
        '笔同级别分解卖': False,

        '类趋势顶背驰(笔)': False,
        '类趋势底背驰(笔)': False,
        '类盘整顶背驰(笔)': False,
        '类盘整底背驰(笔)': False,

        # '趋势顶背驰(笔)': False,
        # '趋势底背驰(笔)': False,
        # '盘整顶背驰(笔)': False,
        # '盘整底背驰(笔)': False,
Z
zengbin93 已提交
181 182 183
    }

    # ------------------------------------------------------------------------------------------------------------------
Z
zengbin93 已提交
184 185 186 187 188 189 190 191 192 193 194 195
    if len(ka.bi_list) > 2:
        assert ka.bi_list[-1]['fx_mark'] in ['d', 'g']
        if ka.bi_list[-1]['fx_mark'] == 'd':
            s["最后一个未确认的笔标记为底"] = True
        else:
            s["最后一个未确认的笔标记为顶"] = True

        assert ka.bi_list[-2]['fx_mark'] in ['d', 'g']
        if ka.bi_list[-2]['fx_mark'] == 'd':
            s["最后一个已确认的笔标记为底"] = True
        else:
            s["最后一个已确认的笔标记为顶"] = True
Z
zengbin93 已提交
196 197 198 199

    # ------------------------------------------------------------------------------------------------------------------
    last_bi = ka.bi_list[-1]
    if last_bi['fx_mark'] == 'd' and ka.fx_list[-1]['fx_mark'] == 'd':
Z
zengbin93 已提交
200
        s["向上笔走势延伸"] = True
Z
zengbin93 已提交
201
    elif last_bi['fx_mark'] == 'd' and ka.fx_list[-1]['fx_mark'] == 'g':
Z
zengbin93 已提交
202
        s["向上笔现顶分型"] = True
Z
zengbin93 已提交
203
    elif last_bi['fx_mark'] == 'g' and ka.fx_list[-1]['fx_mark'] == 'g':
Z
zengbin93 已提交
204
        s['向下笔走势延伸'] = True
Z
zengbin93 已提交
205
    elif last_bi['fx_mark'] == 'g' and ka.fx_list[-1]['fx_mark'] == 'd':
Z
zengbin93 已提交
206
        s['向下笔现底分型'] = True
Z
zengbin93 已提交
207 208 209 210 211 212
    else:
        raise ValueError("笔状态识别错误,最后一个笔标记:%s,"
                         "最后一个分型标记%s" % (str(last_bi), str(ka.fx_list[-1])))

    # ------------------------------------------------------------------------------------------------------------------
    bis = ka.bi_list[-30:]
Z
zengbin93 已提交
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
    if len(bis) >= 6:
        if bis[-1]['fx_mark'] == 'd' and bis[-1]['bi'] < bis[-3]['bi'] and bis[-2]['bi'] < bis[-4]['bi']:
            zs1 = {"start_dt": bis[-2]['dt'], "end_dt": bis[-1]['dt'], "direction": "down"}
            zs2 = {"start_dt": bis[-4]['dt'], "end_dt": bis[-3]['dt'], "direction": "down"}
            if ka.is_bei_chi(zs1, zs2, mode="bi", adjust=0.9):
                # 类趋势
                if bis[-2]['bi'] < bis[-5]['bi']:
                    s['类趋势底背驰(笔)'] = True
                else:
                    s['类盘整底背驰(笔)'] = True

        if bis[-1]['fx_mark'] == 'g' and bis[-1]['bi'] > bis[-3]['bi'] and bis[-2]['bi'] > bis[-4]['bi']:
            zs1 = {"start_dt": bis[-2]['dt'], "end_dt": bis[-1]['dt'], "direction": "up"}
            zs2 = {"start_dt": bis[-4]['dt'], "end_dt": bis[-3]['dt'], "direction": "up"}
            if ka.is_bei_chi(zs1, zs2, mode="bi", adjust=0.9):
                # 类趋势
                if bis[-2]['bi'] > bis[-5]['bi']:
                    s['类趋势顶背驰(笔)'] = True
                else:
                    s['类盘整顶背驰(笔)'] = True
Z
zengbin93 已提交
233 234 235 236 237

    # ------------------------------------------------------------------------------------------------------------------
    bi_zs = find_zs(bis)
    if bi_zs:
        last_bi_zs = bi_zs[-1]
Z
zengbin93 已提交
238 239
        s["最后一个笔中枢上沿"] = last_bi_zs['ZG']
        s["最后一个笔中枢下沿"] = last_bi_zs['ZD']
Z
zengbin93 已提交
240 241 242 243

        last_k = ka.kline_new[-1]
        if last_k['close'] > last_bi_zs['ZG']:
            if last_bi_zs.get("third_buy", 0):
Z
zengbin93 已提交
244
                s["收于笔中枢上方且有三买"] = True
Z
zengbin93 已提交
245
            else:
Z
zengbin93 已提交
246
                s["收于笔中枢上方且无三买"] = True
Z
zengbin93 已提交
247 248 249

        if last_k['close'] < last_bi_zs['ZD']:
            if last_bi_zs.get("third_sell", 0):
Z
zengbin93 已提交
250
                s["收于笔中枢下方且有三卖"] = True
Z
zengbin93 已提交
251
            else:
Z
zengbin93 已提交
252
                s["收于笔中枢下方且无三卖"] = True
Z
zengbin93 已提交
253 254 255 256

    # ------------------------------------------------------------------------------------------------------------------
    if len(bis) >= 6:
        if bis[-1]['fx_mark'] == 'd' and bis[-2]['bi'] > bis[-5]['bi']:
Z
zengbin93 已提交
257 258
            if bis[-1]['bi'] > bis[-3]['bi'] or s['类盘整底背驰(笔)']:
                s['笔同级别分解买'] = True
Z
zengbin93 已提交
259 260

        if bis[-1]['fx_mark'] == 'g' and bis[-2]['bi'] < bis[-5]['bi']:
Z
zengbin93 已提交
261 262 263 264 265
            if bis[-1]['bi'] < bis[-3]['bi'] or s['类盘整顶背驰(笔)']:
                s['笔同级别分解卖'] = True

    freq = ka.name
    return {freq + k: v for k, v in s.items()}
Z
zengbin93 已提交
266 267 268 269 270


def get_xd_signals(ka, use_zs=False):
    """计算线段方向特征"""
    s = {
Z
zengbin93 已提交
271 272 273 274 275 276 277 278 279 280 281 282
        "最后一个未确认的线段标记为底": False,
        "最后一个未确认的线段标记为顶": False,
        "最后一个已确认的线段标记为底": False,
        "最后一个已确认的线段标记为顶": False,

        "最后一个线段内部笔标记数量": 0,
        "最近上一线段内部笔标记数量": 0,

        '类趋势顶背驰(段)': False,
        '类趋势底背驰(段)': False,
        '类盘整顶背驰(段)': False,
        '类盘整底背驰(段)': False,
Z
zengbin93 已提交
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352

        "同级别分解买": False,
        "同级别分解卖": False,

        # '趋势顶背驰(段)': False,
        # '趋势底背驰(段)': False,
        # '盘整顶背驰(段)': False,
        # '盘整底背驰(段)': False,
        # "最后一个中枢上沿": 0,
        # "最后一个中枢下沿": 0,
    }

    # ------------------------------------------------------------------------------------------------------------------
    assert ka.xd_list[-1]['fx_mark'] in ['g', 'd']
    if ka.xd_list[-1]['fx_mark'] == 'd':
        s["最后一个未确认的线段标记为底"] = True
    else:
        s["最后一个未确认的线段标记为顶"] = True

    assert ka.xd_list[-2]['fx_mark'] in ['g', 'd']
    if ka.xd_list[-2]['fx_mark'] == 'd':
        s["最后一个已确认的线段标记为底"] = True
    else:
        s["最后一个已确认的线段标记为顶"] = True

    # ------------------------------------------------------------------------------------------------------------------
    bi_after = [x for x in ka.bi_list[-60:] if x['dt'] >= ka.xd_list[-1]['dt']]
    s["最后一个线段内部笔标记数量"] = len(bi_after)
    s["最近上一线段内部笔标记数量"] = len([x for x in ka.bi_list[-100:]
                              if ka.xd_list[-2]['dt'] <= x['dt'] <= ka.xd_list[-1]['dt']])

    # ------------------------------------------------------------------------------------------------------------------
    xds = ka.xd_list[-50:]
    if len(xds) >= 6:
        if xds[-1]['fx_mark'] == 'd' and xds[-1]['xd'] < xds[-3]['xd'] and xds[-2]['xd'] < xds[-4]['xd']:
            zs1 = {"start_dt": xds[-2]['dt'], "end_dt": xds[-1]['dt'], "direction": "down"}
            zs2 = {"start_dt": xds[-4]['dt'], "end_dt": xds[-3]['dt'], "direction": "down"}
            if ka.is_bei_chi(zs1, zs2, mode="xd", adjust=0.9):
                # 类趋势
                if xds[-2]['xd'] < xds[-5]['xd']:
                    s['类趋势底背驰(段)'] = True
                else:
                    s['类盘整底背驰(段)'] = True

        if xds[-1]['fx_mark'] == 'g' and xds[-1]['xd'] > xds[-3]['xd'] and xds[-2]['xd'] > xds[-4]['xd']:
            zs1 = {"start_dt": xds[-2]['dt'], "end_dt": xds[-1]['dt'], "direction": "up"}
            zs2 = {"start_dt": xds[-4]['dt'], "end_dt": xds[-3]['dt'], "direction": "up"}
            if ka.is_bei_chi(zs1, zs2, mode="xd", adjust=0.9):
                # 类趋势
                if xds[-2]['xd'] > xds[-5]['xd']:
                    s['类趋势顶背驰(段)'] = True
                else:
                    s['类盘整顶背驰(段)'] = True

    # ------------------------------------------------------------------------------------------------------------------
    last_xd_inside = [x for x in ka.bi_list[-60:] if x['dt'] >= xds[-1]['dt']]
    if len(xds) >= 6 and len(last_xd_inside) >= 6:
        if xds[-1]['fx_mark'] == 'g' and xds[-2]['xd'] < xds[-5]['xd']:
            if xds[-1]['xd'] < xds[-3]['xd'] or s['类盘整底背驰(段)']:
                s['同级别分解买'] = True

        if xds[-1]['fx_mark'] == 'd' and xds[-2]['xd'] > xds[-5]['xd']:
            if xds[-1]['xd'] > xds[-3]['xd'] or s['类盘整顶背驰(段)']:
                s['同级别分解卖'] = True

    # ------------------------------------------------------------------------------------------------------------------
    freq = ka.name
    return {freq + k: v for k, v in s.items()}


Z
zengbin93 已提交
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
class Signals:
    def __init__(self, klines):
        """
        :param klines: dict
            K线数据
        """
        self.klines = klines
        self.kas = {k: KlineAnalyze(self.klines[k], name=k, min_bi_k=5,
                                    ma_params=(5, 20, 120),
                                    max_raw_len=5000, verbose=False)
                    for k in self.klines.keys()}
        self.symbol = self.kas["1分钟"].symbol
        self.end_dt = self.kas["1分钟"].end_dt
        self.latest_price = self.kas["1分钟"].latest_price

    def __repr__(self):
Z
zengbin93 已提交
369
        return "<Signals of {}>".format(self.symbol)
Z
zengbin93 已提交
370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393

    def signals(self):
        """计算交易决策需要的状态信息"""
        s = {"symbol": self.symbol}

        for k in self.kas.keys():
            if k in ['周线', '日线', '60分钟', '30分钟', '15分钟', '5分钟', '1分钟']:
                s.update(get_fx_signals(self.kas[k]))

            if k in ['周线', '日线', '60分钟', '30分钟', '15分钟', '5分钟', '1分钟']:
                s.update(get_bi_signals(self.kas[k]))

            if k in ['日线', '60分钟', '30分钟', '15分钟', '5分钟', '1分钟']:
                s.update(get_xd_signals(self.kas[k]))
        return s

    def update_kas(self, klines_one):
        for freq, klines_ in klines_one.items():
            k = klines_[-1]
            self.kas[freq].update(k)

        self.symbol = self.kas["1分钟"].symbol
        self.end_dt = self.kas["1分钟"].end_dt
        self.latest_price = self.kas["1分钟"].latest_price