package serve import ( "encoding/json" "fmt" "io" "orm/DB" "os" "strings" "sync" "time" ) func getParams(fieldName []string, queryParam map[string]string) { // 获取参数 } func Serve() { table := "tongbai_wt16" selectToCsvFile(table) // 创建数据库 //createTable() // 通过csv文件插入数据 //allStart := time.Now() //for i := 0; i < 100; i++ { //insertByCsv("./test.csv") //} //allSince := time.Since(allStart) //fmt.Println("总时间", allSince) } // 读取数据库数据并写入文件 func selectToCsvFile(table string) { // 获取sql语句 sql := fmt.Sprintf("select * from `lxc_db`.`%s`", table) //sql := "select * from `lxc_db`.`tongbai_wt11` limit 100" //sql = "select * from `test`.`meters` limit 10000" // 执行select语句 mData := selectExec(sql) // 写入文件 writeToFile(mData, "./test.csv") } // 执行select语句 func selectExec(sql string) []map[string]interface{} { //now := time.Now() // 执行sql语句 query, err := DB.Db.Query(sql) if err != nil { panic(err) } //since := time.Since(now) // 数据列 columns, err := query.Columns() if err != nil { panic(err) } // 数据个数 count := len(columns) // 返回值 Map切片 mData := make([]map[string]interface{}, 0) // 一条数据的各列的值(需要指定长度为列的个数,以便获取地址) values := make([]interface{}, count) // 一条数据的各列的值的地址 valPointers := make([]interface{}, count) // 处理查询结果 for query.Next() { // 获取各列的值的地址 for i := 0; i < count; i++ { valPointers[i] = &values[i] } // 获取各列的值,放到对应的地址中 query.Scan(valPointers...) // 一条数据的Map (列名和值的键值对) entry := make(map[string]interface{}) // Map 赋值 for i, col := range columns { var v interface{} // 值复制给val(所以Scan时指定的地址可重复使用) val := values[i] b, ok := val.([]byte) if ok { // 字符切片转为字符串 v = string(b) } else { v = val } entry[col] = v } mData = append(mData, entry) } return mData // 循环打印mData //for _, v := range mData { // fmt.Println(v) //} } // 判断文件是否存在 func fileExist(path string) bool { // 判断文件是否存在 _, err := os.Stat(path) if err != nil { return false } return true } // 将数据写入文件 func writeToFile(mData []map[string]interface{}, path string) { // 写入文件的字符串 var fileStr string // 创建表头 var fileHead []string // 创建文件 var csvFile *os.File var err error defer csvFile.Close() // 判断文件是否存在 if fileExist(path) { file := readCsvFile(path) if file == "" || file == "\n" { } else { csvFile, err = os.OpenFile(path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666) if err != nil { panic(err) } fileLine := strings.Split(file, "\n") fileHead = strings.Split(strings.ReplaceAll(fileLine[0], "\"", ""), ",") fmt.Println("fileHead", fileHead) } } else { csvFile, err = os.OpenFile(path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666) if err != nil { panic(err) } for k := range mData[0] { // 将key转为字符串 key, err := json.Marshal(k) if err != nil { panic(err) } // 将key转为字符串后,拼接到fileStr fileStr += string(key) fileStr += "," // 向fileHead添加key fileHead = append(fileHead, k) fmt.Println("fileHead", fileHead) } // 去掉最后一个逗号,此时fileStr第一行为表头 fileStr = strings.TrimRight(fileStr, ",") fileStr += "\n" } // 根据表头读数据并写入文件 // 将mData写入文件 循环[]map[string]interface{} dataPoint := 0 go func() { // 写入文件 if _, err := io.WriteString(csvFile, fileStr); err != nil { fmt.Println("写入文件失败: ", err) os.Exit(1) } }() for { if len(mData)-dataPoint < 500 { fmt.Println("数据量小于500") fileStr += getStrHandle(mData[dataPoint:], fileHead) break } go func() { fileStr += getStrHandle(mData[dataPoint:dataPoint+500], fileHead) dataPoint += 500 }() } // 循环表头,根据表头读每一行的map的数据 // 删除最后一个换行符 //fileStr = strings.TrimRight(fileStr, "\n") //fmt.Println(fileStr) //getSqlByContent(fileStr) // 写入文件 if _, err := io.WriteString(csvFile, fileStr); err != nil { fmt.Println("写入文件失败: ", err) os.Exit(1) } } func getStrHandle(mData []map[string]interface{}, fileHead []string) (fileStr string) { now := time.Now() for _, mapEntry := range mData { // 循环表头,根据表头读每一行的map的数据 for _, key := range fileHead { if key == "ts" { now = now.Add(time.Hour * 1) fileStr += "'" + now.Format("2006-01-02 15:04:05") + "'" //value, err := json.Marshal(mapEntry[key]) //if err != nil { // panic(err) //} //fileStr += string(value) } else { value, err := json.Marshal(mapEntry[key]) if err != nil { panic(err) } fileStr += string(value) } fileStr += "," } fileStr = strings.TrimRight(fileStr, ",") fileStr += "\n" fmt.Println("写完一行数据") } // 删除最后一个换行符 //fileStr = strings.TrimRight(fileStr, "\n") //fmt.Println(fileStr) //getSqlByContent(fileStr) return } // 通过csv文件插入数据 func insertByCsv(filePath string) { // 将文件写入数据库, 试用版不支持 //sql := fmt.Sprintf("INSERT INTO `lxc_db`.`tongbai_wt12` FILE %s;", filePath) //_, err := DB.Db.Exec(sql) //if err != nil { // panic(err) //} // 读取文件冰生成sql语句 // 读取文件 readFront := time.Now() fileContent := readCsvFile(filePath) readSince := time.Since(readFront) fmt.Println("读文件所用时间", readSince) // 生成写入数据库的sql语句 sqlFront := time.Now() sqlList := getSqlByContent(fileContent) sqlSince := time.Since(sqlFront) fmt.Println("生成sql语句所用时间", sqlSince) // 执行sql语句 //fmt.Println("sql", sql) execFront := time.Now() wg := sync.WaitGroup{} for _, sql := range sqlList { wg.Add(1) go func() { _, err := DB.Db.Exec(sql) if err != nil { } wg.Done() }() //_, err := DB.Db.Exec(sql) //if err != nil { // panic(err) //} else { // //fmt.Println("插入成功") //} } wg.Wait() execSince := time.Since(execFront) fmt.Println("执行sql语句所用时间", execSince) } // 读取csv文件 func readCsvFile(filePath string) string { // 打开文件 csvFile, err := os.OpenFile(filePath, os.O_RDONLY, 0666) if err != nil { panic(err) } defer csvFile.Close() // 读取CSV文件 fileContent, err := io.ReadAll(csvFile) if err != nil { panic(err) } return string(fileContent) } // 根据文件内容生成sql语句 func getSqlByContent(fileContent string) []string { // 将文件内容按行分割 dataLine := strings.Split(fileContent, "\n") // 第一行为表头 head := dataLine[0] // 剩余行为数据 content := dataLine[1:] var data string var sqlList []string var l int for i, v := range content { content[i] = "(" + v + ")" data += content[i] l++ // 每500条数据生成一条sql语句 if l > 500 { l = 0 sql := fmt.Sprintf("INSERT INTO `lxc_db`.`tongbai_wt16` (%s) VALUES %s;", head, data) sqlList = append(sqlList, sql) data = "" } } // 生成sql语句 sql := fmt.Sprintf("INSERT INTO `lxc_db`.`tongbai_wt16` (%s) VALUES %s;", head, data) sqlList = append(sqlList, sql) return sqlList } // 创建表 func createTable() { // 创建表 sql := "CREATE TABLE `lxc_db`.`tongbai_wt16` USING `lxc_db`.`meters` (`groupid`, `ip_addr`) TAGS (10011, NULL)" exec, err := DB.Db.Exec(sql) if err != nil { panic(err) } id, err := exec.RowsAffected() if err != nil { panic(err) } fmt.Println("exec.RowsAffected", id) }