socketMultithread.cpp 5.9 KB
Newer Older
X
XWHQSJ 已提交
1 2 3 4
//
// Created by wanhui on 10/12/19.
//

X
xwhqsj 已提交
5
#include <cstdio>
X
XWHQSJ 已提交
6
#include <cstdlib>
X
xwhqsj 已提交
7 8 9
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
X
xwhqsj 已提交
10
#include <cstring>
X
xwhqsj 已提交
11
#include <iostream>
X
xwhqsj 已提交
12
#include <vector>
X
XWHQSJ 已提交
13
#include <jni.h>
X
xwhqsj 已提交
14
#include <pthread.h>
X
XWHQSJ 已提交
15

X
xwhqsj 已提交
16
//#include "tpool.h"
X
XWHQSJ 已提交
17 18 19

#define NUM_THREADS 6

X
xwhqsj 已提交
20 21
#define PORT 8080

X
XWHQSJ 已提交
22 23 24 25 26

struct JVM {
    JavaVM *jvm;
};

X
update  
xwhqsj 已提交
27 28 29 30 31 32
struct ARGS
{
    struct JVM* jvm;
    int socket;
};

X
XWHQSJ 已提交
33

X
xwhqsj 已提交
34 35 36 37 38
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

pthread_t threads[NUM_THREADS];


X
xwhqsj 已提交
39 40 41 42
void *jvmThreads(void *myJvm, char* plainsql, char* dbname);

JNIEnv *create_vm(struct JVM *jvm);

X
update  
xwhqsj 已提交
43
void invoke_class(JNIEnv* env, char* plainsql, char* dbname);
X
xwhqsj 已提交
44 45 46

int socket_init();

X
update  
xwhqsj 已提交
47
void* handle_stream(void* arg);
X
xwhqsj 已提交
48

X
xwhqsj 已提交
49 50 51 52


int socket_init()
{
X
update  
xwhqsj 已提交
53
    int server_fd, new_socket;
X
xwhqsj 已提交
54 55
    struct sockaddr_in address;

X
update  
xwhqsj 已提交
56 57
    int opt = 1;
    int addrlen = sizeof(address);
X
xwhqsj 已提交
58

X
update  
xwhqsj 已提交
59
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
X
xwhqsj 已提交
60 61 62 63
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

X
update  
xwhqsj 已提交
64 65
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("setsockopt");
X
xwhqsj 已提交
66 67 68 69
        exit(EXIT_FAILURE);
    }

    address.sin_family = AF_INET;
X
update  
xwhqsj 已提交
70
    address.sin_addr.s_addr = htonl(INADDR_ANY);
X
xwhqsj 已提交
71 72
    address.sin_port = htons(PORT);

X
update  
xwhqsj 已提交
73
    if (bind(server_fd, (struct sockaddr *) &address, sizeof(address)) < 0) {
X
xwhqsj 已提交
74 75 76 77
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

X
update  
xwhqsj 已提交
78 79
    if (listen(server_fd, 10) < 0) {
        perror("listen failed");
X
xwhqsj 已提交
80 81 82
        exit(EXIT_FAILURE);
    }

X
update  
xwhqsj 已提交
83
    printf("listening...\n");
X
XWHQSJ 已提交
84

X
xwhqsj 已提交
85
    return server_fd;
X
XWHQSJ 已提交
86 87 88 89 90 91 92 93
}

JNIEnv *create_vm (struct JVM *jvm) {
    JNIEnv *env;
    JavaVMInitArgs vm_args;
    JavaVMOption options[3];

    options[0].optionString = "-Djava.compiler=NONE";
X
xwhqsj 已提交
94
    options[1].optionString = "-Djava.class.path=.:/home/wanhui/CallJvm/callJvmThreadpool/qin_test1.jar";
X
XWHQSJ 已提交
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
    options[2].optionString = "-verbose:jni";

    vm_args.options = options;
    vm_args.nOptions = 3;
    vm_args.ignoreUnrecognized = JNI_TRUE;
    vm_args.version = JNI_VERSION_1_8;

    int status = JNI_CreateJavaVM (&jvm->jvm, (void **) &env, &vm_args);
    if (status < 0 || !env) {
        printf ("Error: %d\n", status);
        return nullptr;
    }
    return env;
}

X
update  
xwhqsj 已提交
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
void* handle_stream(void* args)
{
    struct ARGS *arg = static_cast<ARGS *>(args);
    struct JVM* myJvm = arg->jvm;
    int client_fd = arg->socket;

    char buf[1024] = {0};
    char* psql;
    char* dbn;
    char delims[] = "$";
    char *res = nullptr;
    std::vector<char*> resvec;

    read(client_fd, buf, 1024);

    res = strtok(buf, delims);
    while (res != nullptr)
    {
        resvec.push_back(res);
        res = strtok(nullptr, delims);
    }

    psql = resvec[0];
    dbn = resvec[1];

    jvmThreads(myJvm, psql, dbn);

    char hello[] = "Hello send";
    send(client_fd, hello, strlen(hello), 0);
X
xwhqsj 已提交
139 140
    close(client_fd);
    pthread_exit(nullptr);
X
update  
xwhqsj 已提交
141 142 143 144 145 146 147 148 149
}

void* jvmThreads(void *myJvm, char* plainsql, char* dbname)
{
    auto* myJvmptr = (struct JVM*) myJvm;
    JavaVM *jvmPtr = myJvmptr->jvm;

    JNIEnv* env = nullptr;

X
xwhqsj 已提交
150
    pthread_mutex_lock(&lock);
X
update  
xwhqsj 已提交
151 152 153
    jvmPtr->AttachCurrentThread((void**)&(env), nullptr);
    invoke_class(env, plainsql, dbname);
    jvmPtr->DetachCurrentThread();
X
xwhqsj 已提交
154
    pthread_mutex_unlock(&lock);
X
update  
xwhqsj 已提交
155 156 157

    return nullptr;
}
X
XWHQSJ 已提交
158

X
xwhqsj 已提交
159
void invoke_class (JNIEnv * env, char* plainsql, char* dbname) {
X
XWHQSJ 已提交
160 161
    jclass Main_class;
    jmethodID fun_id;
X
xwhqsj 已提交
162 163 164 165
    jmethodID static_id;
    jmethodID stu_id;
    jmethodID hello_id;
    jobject obj1;
X
XWHQSJ 已提交
166 167

    Main_class = env->FindClass ("com/testjvm/Helloworld");
X
update  
xwhqsj 已提交
168

X
XWHQSJ 已提交
169 170 171
    if(Main_class == nullptr)
        return;

X
xwhqsj 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184
    // test static function main()
//    fun_id = env->GetStaticMethodID (Main_class, "main", "([Ljava/lang/String;)V");
//    jstring str = env->NewStringUTF("XWH");
//    env->CallStaticVoidMethod(Main_class, fun_id, str);

    // test static function name()
//    static_id = env->GetStaticMethodID(Main_class, "name", "([Ljava/lang/String;)V");
//    jstring str = env->NewStringUTF("XWH");
//    env->CallStaticVoidMethod(Main_class, static_id, str);

    // test generally function student()
    hello_id = env->GetMethodID(Main_class, "<init>", "()V");
    obj1 = env->NewObject(Main_class, hello_id);
X
xwhqsj 已提交
185 186 187
    jstring plainsqlstr = env->NewStringUTF(plainsql);
    jstring dbnamestr = env->NewStringUTF(dbname);

X
xwhqsj 已提交
188
    stu_id = env->GetMethodID(Main_class, "student", "([Ljava/lang/String;)V");
X
xwhqsj 已提交
189
    env->CallObjectMethod(obj1, stu_id, dbnamestr);
X
XWHQSJ 已提交
190 191 192 193 194 195 196
}

int main () {
    struct JVM myJvm{};
    JNIEnv *myEnv = create_vm (&myJvm);

    if (myEnv == nullptr)
X
xwhqsj 已提交
197 198 199 200
    {
        printf("create_vm failed\n");
        exit(1);
    }
X
XWHQSJ 已提交
201

X
xwhqsj 已提交
202 203 204 205 206
//    if(tpool_create(NUM_THREADS) != 0)
//    {
//        printf("tpool_create failed\n");
//        exit(1);
//    }
X
XWHQSJ 已提交
207

X
xwhqsj 已提交
208

X
xwhqsj 已提交
209 210 211 212 213
    int client_fd, new_socket;
    struct sockaddr_in address;
    int addrlen = sizeof(address);

    client_fd = socket_init();
X
xwhqsj 已提交
214

X
xwhqsj 已提交
215 216 217
    int i = 0;
    while (1){
        new_socket = accept(client_fd, (struct sockaddr *) &address, (socklen_t *) &addrlen);
X
update  
xwhqsj 已提交
218

X
xwhqsj 已提交
219 220 221 222
        struct ARGS *args;
        args = static_cast<ARGS *>(malloc(sizeof(struct args *)));
        args->jvm = &myJvm;
        args->socket = new_socket;
X
update  
xwhqsj 已提交
223

X
xwhqsj 已提交
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
        if(pthread_create(&threads[i], nullptr, handle_stream, args) != 0){
            perror("pthread_create failed");
            exit(EXIT_FAILURE);
        }
        pthread_join(threads[i++], nullptr);
    }

//    if ((new_socket = accept(client_fd, (struct sockaddr *) &address, (socklen_t *) &addrlen)) < 0) {
//        perror("accept failed");
//        exit(EXIT_FAILURE);
//    }
//
//    struct ARGS *args;
//    args = static_cast<ARGS *>(malloc(sizeof(struct args *)));
//    args->jvm = &myJvm;
//    args->socket = new_socket;

    // single thread
//    handle_stream(args);
X
update  
xwhqsj 已提交
243 244 245 246 247


//    while (1)
//    {
//        tpool_add_work(handle_stream, args);
X
xwhqsj 已提交
248
//        close(new_socket);
X
update  
xwhqsj 已提交
249
//    }
X
XWHQSJ 已提交
250

X
xwhqsj 已提交
251 252 253 254 255


//    int i;
//    for(i = 0; i < 10; i++)
//    {
X
update  
xwhqsj 已提交
256
//        tpool_add_work(handle_stream, args);
X
xwhqsj 已提交
257
//    }
X
update  
xwhqsj 已提交
258 259 260


    sleep(2);
X
xwhqsj 已提交
261
//    tpool_destroy();
X
update  
xwhqsj 已提交
262
    myJvm.jvm->DestroyJavaVM ();
X
xwhqsj 已提交
263

X
XWHQSJ 已提交
264 265

    return 0;
X
update  
XWHQSJ 已提交
266
}