appbase_import.go 9.3 KB
Newer Older
A
Avi Aryan 已提交
1 2
// +build !oss

A
Avi Aryan 已提交
3 4 5 6 7
package main

import (
	"encoding/json"
	"fmt"
8
	"io/ioutil"
A
Avi Aryan 已提交
9 10
	"os"
	"strconv"
11
	"strings"
A
Avi Aryan 已提交
12
	"time"
U
utsavoza 已提交
13 14 15 16 17 18

	"github.com/appbaseio/abc/appbase/app"
	"github.com/appbaseio/abc/appbase/common"
	"github.com/appbaseio/abc/imports/adaptor"
	"github.com/appbaseio/abc/log"
	"github.com/joho/godotenv"
A
Avi Aryan 已提交
19 20
)

A
Avi Aryan 已提交
21
// GLOBALS
22
// map from real input params to what goes in writeConfig
A
Avi Aryan 已提交
23
var srcParamMap = map[string]string{
24 25
	"src_uri":          "uri",
	"src_type":         "_name_",
26
	"tail":             "tail",
27
	"ssl":              "ssl",
A
Avi Aryan 已提交
28 29
	"replication_slot": "replication_slot",
	"typename":         "typeName",
30
	"src_filter":       "srcRegex",
U
utsavoza 已提交
31
	"sac_path":         "sacPath",
A
Avi Aryan 已提交
32
	// "timeout":          "timeout",
A
Avi Aryan 已提交
33
	"transform_file": "_transform_",
J
Jeet Parekh 已提交
34
	"log_dir":        "log_dir",
A
Avi Aryan 已提交
35 36 37
}

var destParamMap = map[string]string{
U
utsavoza 已提交
38 39 40 41 42
	"dest_uri":      "uri",
	"dest_type":     "_name_",
	"tail":          "tail",
	"request_size":  "request_size",
	"bulk_requests": "bulk_requests",
A
Avi Aryan 已提交
43 44
}

45
const basicUsage string = `abc import --src_type={SourceDatabase} --src_uri={SourceURI} [-t|--tail] [Cluster URL|App Name]`
A
Avi Aryan 已提交
46 47 48 49

// runImport runs the import command
func runImport(args []string) error {
	flagset := baseFlagSet("import")
50
	flagset.Usage = usageFor(flagset, basicUsage)
A
Avi Aryan 已提交
51 52 53

	// custom flags
	tail := flagset.BoolP("tail", "t", false, "allow tail feature")
54 55
	srcType := flagset.String("src_type", "postgres", "type of source database")
	srcURL := flagset.String("src_uri", "http://user:pass@host:port/db", "url of source database")
A
Avi Aryan 已提交
56
	typeName := flagset.String("typename", "mytype", "[csv] typeName to use")
57
	replicationSlot := flagset.String("replication_slot", "standby_replication_slot",
A
Avi Aryan 已提交
58
		"[postgres] replication slot to use")
A
Avi Aryan 已提交
59
	// timeout := flagset.String("timeout", "10s", "source timeout")
60
	srcRegex := flagset.String("src_filter", ".*", "Namespace filter for source")
A
Avi Aryan 已提交
61 62
	test := flagset.Bool("test", false, `if set to true, only pipeline is created and sync is not started. 
		Useful for checking your configuration`)
63
	sacPath := flagset.String("sac_path", "./ServiceAccountKey.json", "Path to firebase service account credentials file")
U
utsavoza 已提交
64
	ssl := flagset.Bool("ssl", false, "Enable SSL connection to the source.")
65 66
	requestSize := flagset.Int64("request_size", 2<<19, "Http request size in bytes, specifically for bulk requests to ES.")
	bulkRequests := flagset.Int("bulk_requests", 1000, "Number of bulk requests to send during a network request to ES.")
A
Avi Aryan 已提交
67

J
patch  
Jeet Parekh 已提交
68 69
	logDir := flagset.String("log_dir", "", "used for storing commit logs")

A
Avi Aryan 已提交
70
	transformFile := flagset.String("transform_file", "", "transform file to use")
71

72 73
	verify := flagset.Bool("verify", false, "verify the source and destination connections")

J
Jeet Parekh 已提交
74 75 76 77
	srcUsername := flagset.String("src_username", "", "source username")
	srcPassword := flagset.String("src_password", "", "source password")
	srcRealm := flagset.String("src_realm", "", "source realm")

A
Avi Aryan 已提交
78 79 80 81
	// use external config
	config := flagset.String("config", "", "Path to external config file, if specified, only that is used")

	var destURL string
A
Avi Aryan 已提交
82 83 84 85
	// parse args
	if err := flagset.Parse(args); err != nil {
		return err
	}
A
Avi Aryan 已提交
86 87 88

	// use the config file
	if *config != "" {
J
Jeet Parekh 已提交
89 90
		file, configuredAdaptors, err := genPipelineFromEnv(*config)

A
Avi Aryan 已提交
91 92 93
		if err != nil {
			return err
		}
J
Jeet Parekh 已提交
94

J
Jeet Parekh 已提交
95 96 97 98
		if *verify {
			return verifyConnections(configuredAdaptors)
		}

A
Avi Aryan 已提交
99
		return execBuilder(file, *test)
A
Avi Aryan 已提交
100 101
	}

A
Avi Aryan 已提交
102 103
	// create source config
	var srcConfig = map[string]interface{}{
104
		"_name_":           *srcType,
A
Avi Aryan 已提交
105 106 107 108
		"uri":              *srcURL,
		"tail":             *tail,
		"typeName":         *typeName,
		"replication_slot": *replicationSlot,
U
utsavoza 已提交
109 110 111 112
		"srcRegex":         *srcRegex,
		"sacPath":          *sacPath,
		"ssl":              *ssl,
		"_transform_":      *transformFile,
J
Jeet Parekh 已提交
113
		"log_dir":          *logDir,
J
Jeet Parekh 已提交
114 115 116
		"username":         *srcUsername,
		"password":         *srcPassword,
		"realm":            *srcRealm,
A
Avi Aryan 已提交
117 118
	}

119 120 121 122 123 124 125 126 127 128 129 130 131 132
	// use command line params
	args = flagset.Args()
	if len(args) == 1 {
		destURL = args[0]
	} else {
		if *verify {
			return verifyConnectionsWithoutDestination(srcConfig)
		} else {
			showShortHelp(basicUsage)
			return nil
		}
	}

	// create destination config
133
	var destConfig = map[string]interface{}{
U
utsavoza 已提交
134 135 136
		"uri":           destURL,
		"_name_":        "elasticsearch",
		"request_size":  *requestSize,
137
		"bulk_requests": *bulkRequests,
U
utsavoza 已提交
138
		"tail":          *tail,
139 140
	}

A
Avi Aryan 已提交
141
	// write config file
J
Jeet Parekh 已提交
142
	file, configuredAdaptors, err := writeConfigFile(srcConfig, destConfig)
A
Avi Aryan 已提交
143 144 145 146
	if err != nil {
		return err
	}

J
Jeet Parekh 已提交
147 148 149 150
	if *verify {
		return verifyConnections(configuredAdaptors)
	}

A
Avi Aryan 已提交
151 152 153 154
	log.Infof("Created temp file %s", file)
	// return nil

	// run config file
A
Avi Aryan 已提交
155
	return execBuilder(file, *test)
A
Avi Aryan 已提交
156 157
}

A
Avi Aryan 已提交
158
// execBuilder executes a pipeline file
A
Avi Aryan 已提交
159
func execBuilder(file string, isTest bool) error {
A
Avi Aryan 已提交
160 161 162 163
	builder, err := newBuilder(file)
	if err != nil {
		return err
	}
A
Avi Aryan 已提交
164 165 166 167
	if isTest {
		fmt.Println(builder)
		return nil
	}
A
Avi Aryan 已提交
168 169 170 171 172 173 174
	// delete if not a devBuild
	if !common.DevBuild {
		err = os.Remove(file)
		if err != nil {
			return err
		}
	}
A
Avi Aryan 已提交
175 176 177
	return builder.run()
}

A
Avi Aryan 已提交
178
// writeConfigFile writes config information in a pipeline file
J
Jeet Parekh 已提交
179
func writeConfigFile(srcConfig map[string]interface{}, destConfig map[string]interface{}) (string, map[string]adaptor.Adaptor, error) {
180
	fname := "pipeline_" + strconv.FormatInt(time.Now().UnixNano(), 10) + ".js"
A
Avi Aryan 已提交
181 182 183 184 185 186

	if _, err := os.Stat(fname); err == nil {
		log.Errorf("File %s exists, will be overwritten", fname)
	}
	appFileHandle, err := os.Create(fname)
	if err != nil {
J
Jeet Parekh 已提交
187
		return "", nil, err
A
Avi Aryan 已提交
188 189 190
	}
	defer appFileHandle.Close()

191
	args := []string{srcConfig["_name_"].(string), destConfig["_name_"].(string)}
A
Avi Aryan 已提交
192 193
	var config = make(map[string]interface{})

194 195
	// check appname as destination uri
	if !strings.Contains(destConfig["uri"].(string), "/") {
A
Avi Aryan 已提交
196
		destConfig["uri"], err = app.GetAppURL(destConfig["uri"].(string))
197
		if err != nil {
J
Jeet Parekh 已提交
198
			return "", nil, err
199 200
		}
	}
201 202
	// check appname as source uri
	if (!strings.Contains(srcConfig["uri"].(string), "/")) && srcConfig["_name_"].(string) == "elasticsearch" {
A
Avi Aryan 已提交
203
		srcConfig["uri"], err = app.GetAppURL(srcConfig["uri"].(string))
204
		if err != nil {
J
Jeet Parekh 已提交
205
			return "", nil, err
206 207
		}
	}
208 209 210 211
	// check file path as source [json, csv]
	if common.StringInSlice(srcConfig["_name_"].(string), []string{"json", "csv"}) {
		err = common.IsFileValid(srcConfig["uri"].(string))
		if err != nil {
J
Jeet Parekh 已提交
212
			return "", nil, err
213 214
		}
	}
215

J
Jeet Parekh 已提交
216 217
	configuredAdaptors := make(map[string]adaptor.Adaptor)

A
Avi Aryan 已提交
218 219 220 221 222 223 224 225 226
	nodeName := "source"
	for _, name := range args {
		// set config
		if nodeName == "source" {
			for k, v := range srcConfig {
				config[k] = v
			}
		} else {
			config = map[string]interface{}{}
227 228 229
			for k, v := range destConfig {
				config[k] = v
			}
A
Avi Aryan 已提交
230 231 232
		}
		// get adaptor
		a, _ := adaptor.GetAdaptor(name, config)
J
Jeet Parekh 已提交
233
		configuredAdaptors[nodeName] = a
A
Avi Aryan 已提交
234 235 236
		// get config json
		b, err := json.Marshal(a)
		if err != nil {
J
Jeet Parekh 已提交
237
			return "", nil, err
A
Avi Aryan 已提交
238 239 240 241 242 243
		}
		confJSON := string(b)
		// save to file
		appFileHandle.WriteString(fmt.Sprintf("var %s = %s(%s)\n\n", nodeName, name, confJSON))
		nodeName = "sink"
	}
244 245 246 247
	// custom transform file
	if srcConfig["_transform_"] != "" {
		dat, err := ioutil.ReadFile(srcConfig["_transform_"].(string))
		if err != nil {
J
Jeet Parekh 已提交
248
			return "", nil, err
249 250 251 252
		}
		appFileHandle.WriteString(string(dat))
	} else {
		// no transform file
J
patch  
Jeet Parekh 已提交
253

J
Jeet Parekh 已提交
254 255 256
		// set Config({log_dir})
		if srcConfig["log_dir"] != "" {
			confStr := fmt.Sprintf(`t.Config({"log_dir":"%s"}).Source("source", source, "/%s/").Save("sink", sink, "/.*/")`, srcConfig["log_dir"], srcConfig["srcRegex"])
J
patch  
Jeet Parekh 已提交
257 258 259 260 261 262 263 264 265 266

			fmt.Println(confStr)

			appFileHandle.WriteString(confStr)
		} else {
			appFileHandle.WriteString(
				fmt.Sprintf(`t.Source("source", source, "/%s/").Save("sink", sink, "/.*/")`,
					srcConfig["srcRegex"]),
			)
		}
267
	}
A
Avi Aryan 已提交
268 269
	appFileHandle.WriteString("\n")

J
Jeet Parekh 已提交
270
	return fname, configuredAdaptors, nil
A
Avi Aryan 已提交
271
}
A
Avi Aryan 已提交
272

A
Avi Aryan 已提交
273
// genPipelineFromEnv generates a pipeline file from config file
J
Jeet Parekh 已提交
274
func genPipelineFromEnv(filename string) (string, map[string]adaptor.Adaptor, error) {
A
Avi Aryan 已提交
275 276 277
	var config map[string]string
	config, err := godotenv.Read(filename)
	if err != nil {
J
Jeet Parekh 已提交
278
		return "", nil, err
A
Avi Aryan 已提交
279
	}
280 281 282 283
	// save keys as small
	for k := range config {
		config[strings.ToLower(k)] = config[k]
	}
A
Avi Aryan 已提交
284
	// source
285
	src := map[string]interface{}{
286 287
		"srcRegex":    ".*", // custom param defaults
		"_transform_": "",
288
	}
A
Avi Aryan 已提交
289 290 291
	for k, v := range srcParamMap {
		if val, ok := config[k]; ok {
			src[v] = val
292 293 294 295 296 297 298 299
			// tail should be boolean
			if k == "tail" {
				if val == "true" {
					src[v] = true
				} else {
					src[v] = false
				}
			}
300 301 302 303 304 305 306 307
			// ssl should be boolean
			if k == "ssl" {
				if val == "true" {
					src[v] = true
				} else {
					src[v] = false
				}
			}
A
Avi Aryan 已提交
308 309 310 311 312 313 314
		}
	}
	// sink
	dest := map[string]interface{}{}
	for k, v := range destParamMap {
		if val, ok := config[k]; ok {
			dest[v] = val
U
utsavoza 已提交
315 316 317 318 319 320 321
			if k == "tail" {
				if val == "true" {
					dest[v] = true
				} else {
					dest[v] = false
				}
			}
A
Avi Aryan 已提交
322 323 324
		}
	}
	// generate file
J
Jeet Parekh 已提交
325
	file, configuredAdaptors, err := writeConfigFile(src, dest)
A
Avi Aryan 已提交
326
	if err != nil {
J
Jeet Parekh 已提交
327
		return "", nil, err
A
Avi Aryan 已提交
328 329
	}
	fmt.Printf("Writing %s...\n", file)
J
Jeet Parekh 已提交
330 331 332 333 334 335 336 337 338 339 340
	return file, configuredAdaptors, nil
}

func verifyConnections(adaptors map[string]adaptor.Adaptor) error {
	for _, ad := range adaptors {
		err := ad.Verify()
		if err != nil {
			return err
		}
	}
	return nil
A
Avi Aryan 已提交
341
}
342 343 344 345 346 347

func verifyConnectionsWithoutDestination(srcConfig map[string]interface{}) error {
	var config = make(map[string]interface{})
	for k, v := range srcConfig {
		config[k] = v
	}
348 349 350 351
	ad, adaptorErr := adaptor.GetAdaptor(srcConfig["_name_"].(string), config)
	if adaptorErr != nil {
		return adaptorErr
	}
352 353 354 355 356 357 358
	err := ad.Verify()
	if err != nil {
		return err
	}
	return nil
}