make xml importer parallel

This commit is contained in:
Rafael Weinstein
2015-09-09 18:28:23 -07:00
parent 19cb987761
commit 60851d03dc
+80 -19
View File
@@ -6,6 +6,9 @@ import (
"log"
"os"
"path/filepath"
"runtime"
"sort"
"sync"
"github.com/attic-labs/noms/Godeps/_workspace/src/github.com/clbanning/mxj"
"github.com/attic-labs/noms/clients/util"
@@ -24,6 +27,22 @@ var (
}
)
type fileIndex struct {
path string
index int
}
type refIndex struct {
ref types.Ref
index int
}
type refIndexList []refIndex
func (a refIndexList) Len() int { return len(a) }
func (a refIndexList) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a refIndexList) Less(i, j int) bool { return a[i].index < a[j].index }
func main() {
err := d.Try(func() {
dsFlags := dataset.NewFlags()
@@ -41,31 +60,73 @@ func main() {
defer util.StopCPUProfile()
}
list := types.NewList()
cpuCount := runtime.NumCPU()
runtime.GOMAXPROCS(cpuCount)
filesChan := make(chan fileIndex, 1024)
refsChan := make(chan refIndex, 1024)
getFilePaths := func() {
index := 0
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
d.Exp.NoError(err, "Cannot traverse directories")
if !info.IsDir() && filepath.Ext(path) == ".xml" {
filesChan <- fileIndex{path, index}
index++
}
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
d.Exp.NoError(err, "Cannot traverse directories")
if info.IsDir() || filepath.Ext(path) != ".xml" {
return nil
}
file, err := os.Open(path)
d.Exp.NoError(err, "Error getting XML")
defer file.Close()
})
d.Exp.NoError(err)
close(filesChan)
}
xmlObject, err := mxj.NewMapXmlReader(file)
d.Exp.NoError(err, "Error decoding XML")
object := xmlObject.Old()
wg := sync.WaitGroup{}
importXml := func() {
for f := range filesChan {
file, err := os.Open(f.path)
d.Exp.NoError(err, "Error getting XML")
nomsObj := util.NomsValueFromDecodedJSON(object)
if *noIO {
return nil
xmlObject, err := mxj.NewMapXmlReader(file)
d.Exp.NoError(err, "Error decoding XML")
object := xmlObject.Old()
file.Close()
nomsObj := util.NomsValueFromDecodedJSON(object)
r := types.Ref{}
if !*noIO {
r = types.Ref{types.WriteValue(nomsObj, ds.Store())}
}
refsChan <- refIndex{r, f.index}
}
ref := types.WriteValue(nomsObj, ds.Store())
list = list.Append(types.Ref{R: ref})
return nil
})
d.Exp.NoError(err)
wg.Done()
}
go getFilePaths()
for i := 0; i < cpuCount*8; i++ {
wg.Add(1)
go importXml()
}
go func() {
wg.Wait()
close(refsChan) // done converting xml to noms
}()
refList := refIndexList{}
for r := range refsChan {
refList = append(refList, r)
}
sort.Sort(refList)
refs := make([]types.Value, 0, len(refList))
for _, r := range refList {
refs = append(refs, r.ref)
}
list := types.NewList(refs...)
if !*noIO {
_, ok := ds.Commit(list)