Use a channel for the crunchbase indexer

... instead of a mutex
This commit is contained in:
Erik Arvidsson
2015-11-25 13:05:47 -05:00
parent a2ff860194
commit 7dcf3b88dd

View File

@@ -6,7 +6,6 @@ import (
"log"
"os"
"runtime"
"sync"
"time"
"github.com/attic-labs/noms/clients/util"
@@ -52,71 +51,67 @@ func main() {
v, ok := tv.(MapOfStringToRefOfCompany)
d.Exp.True(ok, "Unexpected data in dataset. Found %T", tv)
l := v.Len()
i := 0
mu := sync.Mutex{}
type entry struct {
key Key
roundRaiseDef RoundRaiseDef
}
c := make(chan entry, 1024)
mapOfRoundsDef := MapOfRefOfKeyToSetOfRoundRaiseDef{}
addRound := func(key Key, roundRaiseDef RoundRaiseDef) {
keyRef := key.Ref()
mu.Lock()
defer mu.Unlock()
var setDef SetOfRoundRaiseDef
setDef, ok := mapOfRoundsDef[keyRef]
if !ok {
setDef = SetOfRoundRaiseDef{}
}
setDef[roundRaiseDef] = true
mapOfRoundsDef[keyRef] = setDef
}
addTimeRounds := func(tn int64, roundRaiseDef RoundRaiseDef) {
t := time.Unix(tn, 0)
year := int32(t.Year())
yk := NewKey().SetYear(year)
addRound(yk, roundRaiseDef)
c <- entry{yk, roundRaiseDef}
q := timeToQuarter(t)
qk := NewKey().SetQuarter(QuarterDef{Year: year, Quarter: q}.New())
addRound(qk, roundRaiseDef)
c <- entry{qk, roundRaiseDef}
}
v.IterAllP(64, func(permalink string, r RefOfCompany) {
mu.Lock()
i++
fmt.Printf("\rIndexing companies: %d/%d (%.f%%)", i, l, float64(i)/float64(l)*100)
mu.Unlock()
company := r.TargetValue(ds)
categoryList := company.CategoryList()
regionKey := NewKey().SetRegion(company.Region())
company.Rounds().IterAll(func(r RefOfRound) {
round := r.TargetValue(ds)
roundRaiseDef := RoundRaiseDef{
Raised: round.RaisedAmountUsd(),
Details: r.TargetRef(),
}
categoryList.IterAllP(64, func(category string) {
key := NewKey().SetCategory(category)
addRound(key, roundRaiseDef)
go func() {
v.IterAllP(64, func(permalink string, r RefOfCompany) {
company := r.TargetValue(ds)
categoryList := company.CategoryList()
regionKey := NewKey().SetRegion(company.Region())
company.Rounds().IterAll(func(r RefOfRound) {
round := r.TargetValue(ds)
roundRaiseDef := RoundRaiseDef{
Raised: round.RaisedAmountUsd(),
Details: r.TargetRef(),
}
categoryList.IterAllP(64, func(category string) {
key := NewKey().SetCategory(category)
c <- entry{key, roundRaiseDef}
})
c <- entry{regionKey, roundRaiseDef}
addTimeRounds(round.FundedAt(), roundRaiseDef)
roundType := classifyRoundType(round)
roundTypeKey := NewKey().SetRoundType(roundType)
c <- entry{roundTypeKey, roundRaiseDef}
})
addRound(regionKey, roundRaiseDef)
addTimeRounds(round.FundedAt(), roundRaiseDef)
roundType := classifyRoundType(round)
roundTypeKey := NewKey().SetRoundType(roundType)
addRound(roundTypeKey, roundRaiseDef)
})
})
fmt.Printf("\r\033[KConverting to Noms...")
close(c)
}()
for e := range c {
key := e.key
roundRaiseDef := e.roundRaiseDef
keyRef := key.Ref()
setDef := mapOfRoundsDef[keyRef]
if setDef == nil {
setDef = SetOfRoundRaiseDef{}
}
setDef[roundRaiseDef] = true
mapOfRoundsDef[keyRef] = setDef
}
output := mapOfRoundsDef.New()
fmt.Printf("\r\033[KCommitting...\n")
_, ok = outputDataset.Commit(output)
d.Exp.True(ok, "Could not commit due to conflicting edit")