Merge pull request #361 from rafael-atticlabs/sfcrime

Make order of crime data deterministic
This commit is contained in:
Aaron Boodman
2015-09-28 16:19:22 -07:00
+114 -89
View File
@@ -4,31 +4,49 @@ import (
"encoding/csv"
"flag"
"fmt"
"github.com/attic-labs/noms/chunks"
"github.com/attic-labs/noms/clients/util"
"math"
"os"
"sort"
"strconv"
"sync"
"time"
"github.com/attic-labs/noms/chunks"
"github.com/attic-labs/noms/clients/util"
"github.com/attic-labs/noms/d"
"github.com/attic-labs/noms/dataset"
"github.com/attic-labs/noms/types"
"math"
"os"
"strconv"
"sync"
"time"
)
// data can be obtained using:
// data can be obtained using:
// wget --output-document=sfcrime.csv https://data.sfgov.org/api/views/tmnf-yvry/rows.csv?accessType=DOWNLOAD
var (
limitFlag = flag.Int("limit", math.MaxInt32, "limit number of rows that are imported")
inputFlag = flag.String("input-file", "", "path to .csv file containing sfcrime data")
quietFlag = flag.Bool("quiet", false, "suppress printing of messages")
numIncidents = 0
rowsRead = 0
start = time.Now()
limitFlag = flag.Int("limit", math.MaxInt32, "limit number of rows that are imported")
inputFlag = flag.String("input-file", "", "path to .csv file containing sfcrime data")
quietFlag = flag.Bool("quiet", false, "suppress printing of messages")
numIncidents = 0
rowsRead = 0
start = time.Now()
)
const maxListSize = 1e5
type incidentWithIndex struct {
incident *IncidentDef
index int
}
type refIndex struct {
ref types.Ref
index int
}
type refIndexList []refIndex
func (a refIndexList) Len() int { return len(a) }
func (a refIndexList) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a refIndexList) Less(i, j int) bool { return a[i].index < a[j].index }
func main() {
dsFlags := dataset.NewFlags()
flag.Parse()
@@ -46,111 +64,118 @@ func main() {
}
defer csvfile.Close()
if util.MaybeStartCPUProfile() {
defer util.StopCPUProfile()
}
if util.MaybeStartCPUProfile() {
defer util.StopCPUProfile()
}
reader := csv.NewReader(csvfile)
minLon := float32(180)
minLat := float32(90)
maxLat := float32(-90)
maxLon := float32(-180)
// read the header row and discard it
_, err = reader.Read()
outLiers := 0
limitExceeded := false
iChan, rChan := getNomsWriter(ds.Store())
refs := []types.Value{}
maxLon := float32(-180)
// Start a go routine to add incident refs to the list as they are ready
var wg sync.WaitGroup
go func() {
wg.Add(1)
for ref := range rChan {
refs = append(refs, ref)
}
wg.Done()
}()
// read the header row and discard it
_, err = reader.Read()
outLiers := 0
limitExceeded := false
iChan, rChan := getNomsWriter(ds.Store())
refList := refIndexList{}
refs := []types.Value{}
// Start a go routine to add incident refs to the list as they are ready
var wg sync.WaitGroup
go func() {
wg.Add(1)
for ref := range rChan {
refList = append(refList, ref)
}
sort.Sort(refList)
for _, r := range refList {
refs = append(refs, r.ref)
}
wg.Done()
}()
index := 0
for r, err := reader.Read(); !limitExceeded && err == nil; r, err = reader.Read() {
rowsRead++
rowsRead++
id, _ := strconv.ParseInt(r[0], 10, 64)
lon64, _ := strconv.ParseFloat(r[9], 32)
lat64, _ := strconv.ParseFloat(r[10], 32)
geopos := GeopositionDef{Latitude: float32(lat64), Longitude: float32(lon64)}
incident := IncidentDef{
ID: id,
Category: r[1],
Description: r[2],
DayOfWeek: r[3],
Date: r[4],
Time: r[5],
PdDistrict: r[6],
Resolution: r[7],
Address: r[8],
Geoposition: geopos,
PdID: r[12],
}
ID: id,
Category: r[1],
Description: r[2],
DayOfWeek: r[3],
Date: r[4],
Time: r[5],
PdDistrict: r[6],
Resolution: r[7],
Address: r[8],
Geoposition: geopos,
PdID: r[12],
}
if geopos.Latitude > 35 && geopos.Latitude < 40 && geopos.Longitude > -125 && geopos.Longitude < 120 {
minLat = min(minLat, geopos.Latitude)
maxLat = max(maxLat, geopos.Latitude)
minLon = min(minLon, geopos.Longitude)
maxLon = max(maxLon, geopos.Longitude)
iChan <- &incident
iChan <- incidentWithIndex{&incident, index}
index++
} else {
outLiers++
}
if !*quietFlag && rowsRead%maxListSize == 0 {
fmt.Printf("Processed %d rows, %d incidents, elapsed time: %.2f secs\n", rowsRead, numIncidents, time.Now().Sub(start).Seconds())
}
fmt.Printf("Processed %d rows, %d incidents, elapsed time: %.2f secs\n", rowsRead, numIncidents, time.Now().Sub(start).Seconds())
}
if rowsRead >= *limitFlag {
limitExceeded = true
limitExceeded = true
}
}
close(iChan)
wg.Wait()
incidentRefs := types.NewList(refs...)
if !*quietFlag {
fmt.Printf("Converting refs list to noms list: %.2f secs\n", time.Now().Sub(start).Seconds())
}
_, ok := ds.Commit(incidentRefs)
d.Exp.True(ok, "Could not commit due to conflicting edit")
if !*quietFlag {
fmt.Printf("Commit completed, elaspsed time: %.2f secs\n", time.Now().Sub(start).Seconds())
printDataStats(rowsRead, numIncidents, outLiers, minLat, minLon, maxLat, maxLon)
}
close(iChan)
wg.Wait()
fmt.Printf("Ref of list containing Incidents: %s, , elaspsed time: %.2f secs\n", incidentRefs.Ref(), time.Now().Sub(start).Seconds())
incidentRefs := types.NewList(refs...)
if !*quietFlag {
fmt.Printf("Converting refs list to noms list: %.2f secs\n", time.Now().Sub(start).Seconds())
}
_, ok := ds.Commit(incidentRefs)
d.Exp.True(ok, "Could not commit due to conflicting edit")
if !*quietFlag {
fmt.Printf("Commit completed, elaspsed time: %.2f secs\n", time.Now().Sub(start).Seconds())
printDataStats(rowsRead, numIncidents, outLiers, minLat, minLon, maxLat, maxLon)
}
fmt.Printf("Ref of list containing Incidents: %s, , elaspsed time: %.2f secs\n", incidentRefs.Ref(), time.Now().Sub(start).Seconds())
}
func getNomsWriter(cs chunks.ChunkSink) (iChan chan *IncidentDef, rChan chan types.Ref) {
iChan = make(chan *IncidentDef, 3000)
rChan = make(chan types.Ref, 3000)
var wg sync.WaitGroup
for i := 0; i < 32; i++ {
wg.Add(1)
go func() {
for incidentDef := range iChan {
v := incidentDef.New()
r := types.WriteValue(v.NomsValue(), cs)
rChan <- types.Ref{R: r}
}
wg.Done()
}()
}
go func() {
wg.Wait()
close(rChan)
}()
return
func getNomsWriter(cs chunks.ChunkSink) (iChan chan incidentWithIndex, rChan chan refIndex) {
iChan = make(chan incidentWithIndex, 3000)
rChan = make(chan refIndex, 3000)
var wg sync.WaitGroup
for i := 0; i < 32; i++ {
wg.Add(1)
go func() {
for incidentRecord := range iChan {
v := incidentRecord.incident.New()
r := types.WriteValue(v.NomsValue(), cs)
rChan <- refIndex{types.Ref{R: r}, incidentRecord.index}
}
wg.Done()
}()
}
go func() {
wg.Wait()
close(rChan)
}()
return
}
func printDataStats(rows, rowsInserted, badRows int, minLat, minLon, maxLat, maxLon float32) {