Add ability to import csv in column major order (#3742)

Added --invert argument to indicate column major order
  Added --append argument to append imported data to current head of dataset
  Added --limit-records to import data for limited number of rows
This commit is contained in:
Dan Willhite
2017-09-28 11:22:46 -07:00
committed by GitHub
parent ff6efa73e5
commit 9405ea41d4
5 changed files with 201 additions and 30 deletions

View File

@@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"strings"
"time"
@@ -45,7 +46,10 @@ func main() {
noProgress := flag.Bool("no-progress", false, "prevents progress from being output if true")
destType := flag.String("dest-type", "list", "the destination type to import to. can be 'list' or 'map:<pk>', where <pk> is a list of comma-delimited column headers or indexes (0-based) used to uniquely identify a row")
skipRecords := flag.Uint("skip-records", 0, "number of records to skip at beginning of file")
limit := flag.Uint64("limit-records", math.MaxUint64, "maximum number of records to process")
performCommit := flag.Bool("commit", true, "commit the data to head of the dataset (otherwise only write the data to the dataset)")
append := flag.Bool("append", false, "append new data to list at head of specified dataset.")
invert := flag.Bool("invert", false, "import rows in column major format rather than row major")
spec.RegisterCommitMetaFlags(flag.CommandLine)
verbose.RegisterVerboseFlags(flag.CommandLine)
profile.RegisterProfileFlags(flag.CommandLine)
@@ -67,6 +71,10 @@ func main() {
err = errors.New("Cannot specify both <csvfile> and a noms path with -p")
case flag.NArg() > 2:
err = errors.New("Too many arguments")
case strings.HasPrefix(*destType, "map") && *append:
err = errors.New("--append is only compatible with list imports")
case strings.HasPrefix(*destType, "map") && *invert:
err = errors.New("--invert is only compatible with list imports")
}
d.CheckError(err)
@@ -174,15 +182,43 @@ func main() {
defer db.Close()
var value types.Value
if dest == destList {
value = csv.ReadToList(cr, *name, headers, kinds, db)
if dest == destMap {
value = csv.ReadToMap(cr, *name, headers, strPks, kinds, db, *limit)
} else if *invert {
value = csv.ReadToColumnar(cr, *name, headers, kinds, db, *limit)
} else {
value = csv.ReadToMap(cr, *name, headers, strPks, kinds, db)
value = csv.ReadToList(cr, *name, headers, kinds, db, *limit)
}
if *performCommit {
meta, err := spec.CreateCommitMetaStruct(ds.Database(), "", "", additionalMetaInfo(filePath, *path), nil)
d.CheckErrorNoUsage(err)
if *append {
if headVal, present := ds.MaybeHeadValue(); present {
switch headVal.Kind() {
case types.ListKind:
l, isList := headVal.(types.List)
d.PanicIfFalse(isList)
ref := db.WriteValue(value)
value = l.Concat(ref.TargetValue(db).(types.List))
case types.StructKind:
hstr, isStruct := headVal.(types.Struct)
d.PanicIfFalse(isStruct)
d.PanicIfFalse(hstr.Name() == "Columnar")
str := value.(types.Struct)
hstr.IterFields(func(fieldname string, v types.Value) {
hl := v.(types.Ref).TargetValue(db).(types.List)
nl := str.Get(fieldname).(types.Ref).TargetValue(db).(types.List)
l := hl.Concat(nl)
r := db.WriteValue(l)
str = str.Set(fieldname, r)
})
value = str
default:
d.Panic("append can only be used with list or columnar")
}
}
}
_, err = db.Commit(ds, value, datas.CommitOptions{Meta: meta})
if !*noProgress {
status.Clear()

View File

@@ -49,14 +49,15 @@ func (s *testSuite) TearDownTest() {
}
func writeCSV(w io.Writer) {
writeCSVWithHeader(w, "year,a,b,c\n")
writeCSVWithHeader(w, "year,a,b,c\n", 0)
}
func writeCSVWithHeader(w io.Writer, header string) {
func writeCSVWithHeader(w io.Writer, header string, startingValue int) {
_, err := io.WriteString(w, header)
d.Chk.NoError(err)
for i := 0; i < TEST_DATA_SIZE; i++ {
_, err = io.WriteString(w, fmt.Sprintf("%d,a%d,%d,%d\n", TEST_YEAR+i%3, i, i, i*2))
j := i + startingValue
_, err = io.WriteString(w, fmt.Sprintf("%d,a%d,%d,%d\n", TEST_YEAR+j%3, j, j, j*2))
d.Chk.NoError(err)
}
}
@@ -108,6 +109,24 @@ func (s *testSuite) validateNestedMap(vrw types.ValueReadWriter, m types.Map) {
}
}
func (s *testSuite) validateColumnar(vrw types.ValueReadWriter, str types.Struct, reps int) {
s.Equal("Columnar", str.Name())
lists := map[string]types.List{}
for _, nm := range []string{"year", "a", "b", "c"} {
l := str.Get(nm).(types.Ref).TargetValue(vrw).(types.List)
s.Equal(uint64(reps*TEST_DATA_SIZE), l.Len())
lists[nm] = l
}
for i := 0; i < reps*TEST_DATA_SIZE; i++ {
s.Equal(types.Number(TEST_YEAR+i%3), lists["year"].Get(uint64(i)))
s.Equal(types.String(fmt.Sprintf("a%d", i)), lists["a"].Get(uint64(i)))
s.Equal(types.Number(i), lists["b"].Get(uint64(i)))
s.Equal(types.Number(i*2), lists["c"].Get(uint64(i)))
}
}
func (s *testSuite) TestCSVImporter() {
setName := "csv"
dataspec := spec.CreateValueSpecString("nbs", s.DBDir, setName)
@@ -127,7 +146,7 @@ func (s *testSuite) TestCSVImporterLowercase() {
input, err := ioutil.TempFile(s.TempDir, "")
d.Chk.NoError(err)
defer input.Close()
writeCSVWithHeader(input, "YeAr,a,B,c\n")
writeCSVWithHeader(input, "YeAr,a,B,c\n", 0)
defer os.Remove(input.Name())
setName := "csv"
@@ -148,7 +167,7 @@ func (s *testSuite) TestCSVImporterLowercaseDuplicate() {
input, err := ioutil.TempFile(s.TempDir, "")
d.Chk.NoError(err)
defer input.Close()
writeCSVWithHeader(input, "YeAr,a,B,year\n")
writeCSVWithHeader(input, "YeAr,a,B,year\n", 0)
defer os.Remove(input.Name())
setName := "csv"
@@ -239,6 +258,48 @@ func (s *testSuite) TestCSVImporterToNestedMapByName() {
s.validateNestedMap(db, m)
}
func (s *testSuite) TestCSVImporterToColumnar() {
setName := "csv"
dataspec := spec.CreateValueSpecString("nbs", s.DBDir, setName)
stdout, stderr := s.MustRun(main, []string{"--no-progress", "--invert", "--column-types", TEST_FIELDS, s.tmpFileName, dataspec})
s.Equal("", stdout)
s.Equal("", stderr)
db := datas.NewDatabase(nbs.NewLocalStore(s.DBDir, clienttest.DefaultMemTableSize))
defer os.RemoveAll(s.DBDir)
defer db.Close()
ds := db.GetDataset(setName)
str := ds.HeadValue().(types.Struct)
s.validateColumnar(db, str, 1)
}
func (s *testSuite) TestCSVImporterToColumnarAppend() {
setName := "csv"
dataspec := spec.CreateValueSpecString("nbs", s.DBDir, setName)
stdout, stderr := s.MustRun(main, []string{"--no-progress", "--invert", "--column-types", TEST_FIELDS, s.tmpFileName, dataspec})
s.Equal("", stdout)
s.Equal("", stderr)
input, err := ioutil.TempFile(s.TempDir, "")
d.Chk.NoError(err)
defer input.Close()
writeCSVWithHeader(input, "year,a,b,c\n", 100)
defer os.Remove(input.Name())
stdout, stderr = s.MustRun(main, []string{"--no-progress", "--invert", "--append", "--column-types", TEST_FIELDS, input.Name(), dataspec})
s.Equal("", stdout)
s.Equal("", stderr)
db := datas.NewDatabase(nbs.NewLocalStore(s.DBDir, clienttest.DefaultMemTableSize))
defer os.RemoveAll(s.DBDir)
defer db.Close()
ds := db.GetDataset(setName)
str := ds.HeadValue().(types.Struct)
s.validateColumnar(db, str, 2)
}
func (s *testSuite) TestCSVImporterWithPipe() {
input, err := ioutil.TempFile(s.TempDir, "")
d.Chk.NoError(err)

View File

@@ -92,22 +92,29 @@ func MakeStructTemplateFromHeaders(headers []string, structName string, kinds Ki
return
}
// ReadToList takes a CSV reader and reads data into a typed List of structs. Each row gets read into a struct named structName, described by headers. If the original data contained headers it is expected that the input reader has already read those and are pointing at the first data row.
// If kinds is non-empty, it will be used to type the fields in the generated structs; otherwise, they will be left as string-fields.
// In addition to the list, ReadToList returns the typeDef of the structs in the list.
func ReadToList(r *csv.Reader, structName string, headers []string, kinds KindSlice, vrw types.ValueReadWriter) (l types.List) {
// ReadToList takes a CSV reader and reads data into a typed List of structs.
// Each row gets read into a struct named structName, described by headers. If
// the original data contained headers it is expected that the input reader has
// already read those and are pointing at the first data row.
// If kinds is non-empty, it will be used to type the fields in the generated
// structs; otherwise, they will be left as string-fields.
// In addition to the list, ReadToList returns the typeDef of the structs in the
// list.
func ReadToList(r *csv.Reader, structName string, headers []string, kinds KindSlice, vrw types.ValueReadWriter, limit uint64) (l types.List) {
temp, fieldOrder, kindMap := MakeStructTemplateFromHeaders(headers, structName, kinds)
valueChan := make(chan types.Value, 128) // TODO: Make this a function param?
listChan := types.NewStreamingList(vrw, valueChan)
cnt := uint64(0)
for {
row, err := r.Read()
if err == io.EOF {
if cnt >= limit || err == io.EOF {
close(valueChan)
break
} else if err != nil {
panic(err)
}
cnt++
fields := readFieldsFromRow(row, headers, fieldOrder, kindMap)
valueChan <- temp.NewStruct(fields)
@@ -116,6 +123,64 @@ func ReadToList(r *csv.Reader, structName string, headers []string, kinds KindSl
return <-listChan
}
type column struct {
ch chan types.Value
list <-chan types.List
zeroValue types.Value
hdr string
}
// ReadToColumnar takes a CSV reader and reads data from each column into a
// separate list. Values from columns in each successive row are appended to the
// column-specific lists whose type is described by headers. Finally, a new
// "Columnar" struct is created that consists of one field for each column and
// each field contains a list of values.
// If the original data contained headers it is expected that the input reader
// has already read those and are pointing at the first data row.
// If kinds is non-empty, it will be used to type the fields in the generated
// structs; otherwise, they will be left as string-fields.
// In addition to the list, ReadToList returns the typeDef of the structs in the
// list.
func ReadToColumnar(r *csv.Reader, structName string, headers []string, kinds KindSlice, vrw types.ValueReadWriter, limit uint64) (s types.Struct) {
valueChan := make(chan types.Value, 128) // TODO: Make this a function param?
cols := []column{}
fieldOrder := []int{}
for i, hdr := range headers {
ch := make(chan types.Value, 1024)
cols = append(cols, column{
ch: ch,
list: types.NewStreamingList(vrw, ch),
hdr: hdr,
})
fieldOrder = append(fieldOrder, i)
}
cnt := uint64(0)
for {
row, err := r.Read()
if cnt >= limit || err == io.EOF {
close(valueChan)
break
} else if err != nil {
panic(err)
}
cnt++
fields := readFieldsFromRow(row, headers, fieldOrder, kinds)
for i, v := range fields {
cols[i].ch <- v
}
}
sd := types.StructData{}
for _, col := range cols {
close(col.ch)
r := vrw.WriteValue(<-col.list)
sd[col.hdr] = r
}
return types.NewStruct("Columnar", sd)
}
// getFieldIndexByHeaderName takes the collection of headers and the name to search for and returns the index of name within the headers or -1 if not found
func getFieldIndexByHeaderName(headers []string, name string) int {
for i, header := range headers {
@@ -184,21 +249,27 @@ func primaryKeyValuesFromFields(fields types.ValueSlice, fieldOrder, pkIndices [
return keys, value
}
// ReadToMap takes a CSV reader and reads data into a typed Map of structs. Each row gets read into a struct named structName, described by headers. If the original data contained headers it is expected that the input reader has already read those and are pointing at the first data row.
// If kinds is non-empty, it will be used to type the fields in the generated structs; otherwise, they will be left as string-fields.
func ReadToMap(r *csv.Reader, structName string, headersRaw []string, primaryKeys []string, kinds KindSlice, vrw types.ValueReadWriter) types.Map {
// ReadToMap takes a CSV reader and reads data into a typed Map of structs. Each
// row gets read into a struct named structName, described by headers. If the
// original data contained headers it is expected that the input reader has
// already read those and are pointing at the first data row.
// If kinds is non-empty, it will be used to type the fields in the generated
// structs; otherwise, they will be left as string-fields.
func ReadToMap(r *csv.Reader, structName string, headersRaw []string, primaryKeys []string, kinds KindSlice, vrw types.ValueReadWriter, limit uint64) types.Map {
temp, fieldOrder, kindMap := MakeStructTemplateFromHeaders(headersRaw, structName, kinds)
pkIndices := getPkIndices(primaryKeys, headersRaw)
d.Chk.True(len(pkIndices) >= 1, "No primary key defined when reading into map")
gb := types.NewGraphBuilder(vrw, types.MapKind)
cnt := uint64(0)
for {
row, err := r.Read()
if err == io.EOF {
if cnt >= limit || err == io.EOF {
break
} else if err != nil {
panic(err)
}
cnt++
fields := readFieldsFromRow(row, headersRaw, fieldOrder, kindMap)
graphKeys, mapKey := primaryKeyValuesFromFields(fields, fieldOrder, pkIndices)

View File

@@ -7,6 +7,7 @@ package csv
import (
"bytes"
"encoding/csv"
"math"
"testing"
"github.com/attic-labs/noms/go/chunks"
@@ -15,6 +16,8 @@ import (
"github.com/stretchr/testify/assert"
)
var LIMIT = uint64(math.MaxUint64)
func TestReadToList(t *testing.T) {
assert := assert.New(t)
storage := &chunks.MemoryStorage{}
@@ -27,7 +30,7 @@ b,2,false
headers := []string{"A", "B", "C"}
kinds := KindSlice{types.StringKind, types.NumberKind, types.BoolKind}
l := ReadToList(r, "test", headers, kinds, db)
l := ReadToList(r, "test", headers, kinds, db, LIMIT)
assert.Equal(uint64(2), l.Len())
@@ -53,7 +56,7 @@ b,2,false
headers := []string{"A", "B", "C"}
kinds := KindSlice{types.StringKind, types.NumberKind, types.BoolKind}
m := ReadToMap(r, "test", headers, []string{"0"}, kinds, db)
m := ReadToMap(r, "test", headers, []string{"0"}, kinds, db, LIMIT)
assert.Equal(uint64(2), m.Len())
assert.True(types.TypeOf(m).Equals(
@@ -85,14 +88,14 @@ func testTrailingHelper(t *testing.T, dataString string) {
headers := []string{"A", "B"}
kinds := KindSlice{types.StringKind, types.StringKind}
l := ReadToList(r, "test", headers, kinds, db1)
l := ReadToList(r, "test", headers, kinds, db1, LIMIT)
assert.Equal(uint64(3), l.Len())
storage = &chunks.MemoryStorage{}
db2 := datas.NewDatabase(storage.NewView())
defer db2.Close()
r = NewCSVReader(bytes.NewBufferString(dataString), ',')
m := ReadToMap(r, "test", headers, []string{"0"}, kinds, db2)
m := ReadToMap(r, "test", headers, []string{"0"}, kinds, db2, LIMIT)
assert.Equal(uint64(3), m.Len())
}
@@ -162,7 +165,7 @@ func TestReadParseError(t *testing.T) {
_, ok := r.(*csv.ParseError)
assert.True(ok, "Should be a ParseError")
}()
ReadToList(r, "test", headers, kinds, db)
ReadToList(r, "test", headers, kinds, db, LIMIT)
}()
}
@@ -174,7 +177,7 @@ func TestDuplicateHeaderName(t *testing.T) {
r := NewCSVReader(bytes.NewBufferString(dataString), ',')
headers := []string{"A", "A"}
kinds := KindSlice{types.StringKind, types.StringKind}
assert.Panics(func() { ReadToList(r, "test", headers, kinds, db) })
assert.Panics(func() { ReadToList(r, "test", headers, kinds, db, LIMIT) })
}
func TestEscapeFieldNames(t *testing.T) {
@@ -186,12 +189,12 @@ func TestEscapeFieldNames(t *testing.T) {
headers := []string{"A A", "B"}
kinds := KindSlice{types.NumberKind, types.NumberKind}
l := ReadToList(r, "test", headers, kinds, db)
l := ReadToList(r, "test", headers, kinds, db, LIMIT)
assert.Equal(uint64(1), l.Len())
assert.Equal(types.Number(1), l.Get(0).(types.Struct).Get(EscapeStructFieldFromCSV("A A")))
r = NewCSVReader(bytes.NewBufferString(dataString), ',')
m := ReadToMap(r, "test", headers, []string{"1"}, kinds, db)
m := ReadToMap(r, "test", headers, []string{"1"}, kinds, db, LIMIT)
assert.Equal(uint64(1), l.Len())
assert.Equal(types.Number(1), m.Get(types.Number(2)).(types.Struct).Get(EscapeStructFieldFromCSV("A A")))
}
@@ -205,7 +208,7 @@ func TestDefaults(t *testing.T) {
headers := []string{"A", "B", "C", "D"}
kinds := KindSlice{types.NumberKind, types.NumberKind, types.BoolKind, types.StringKind}
l := ReadToList(r, "test", headers, kinds, db)
l := ReadToList(r, "test", headers, kinds, db, LIMIT)
assert.Equal(uint64(1), l.Len())
row := l.Get(0).(types.Struct)
assert.Equal(types.Number(42), row.Get("A"))
@@ -223,7 +226,7 @@ func TestBooleanStrings(t *testing.T) {
headers := []string{"T", "F"}
kinds := KindSlice{types.BoolKind, types.BoolKind}
l := ReadToList(r, "test", headers, kinds, db)
l := ReadToList(r, "test", headers, kinds, db, LIMIT)
assert.Equal(uint64(5), l.Len())
for i := uint64(0); i < l.Len(); i++ {
row := l.Get(i).(types.Struct)

View File

@@ -98,7 +98,7 @@ func createTestList(s *csvWriteTestSuite) types.List {
storage := &chunks.MemoryStorage{}
db := datas.NewDatabase(storage.NewView())
cr, headers := startReadingCsvTestExpectationFile(s)
l := ReadToList(cr, TEST_ROW_STRUCT_NAME, headers, typesToKinds(s.fieldTypes), db)
l := ReadToList(cr, TEST_ROW_STRUCT_NAME, headers, typesToKinds(s.fieldTypes), db, LIMIT)
return l
}
@@ -106,14 +106,14 @@ func createTestMap(s *csvWriteTestSuite) types.Map {
storage := &chunks.MemoryStorage{}
db := datas.NewDatabase(storage.NewView())
cr, headers := startReadingCsvTestExpectationFile(s)
return ReadToMap(cr, TEST_ROW_STRUCT_NAME, headers, []string{"anid"}, typesToKinds(s.fieldTypes), db)
return ReadToMap(cr, TEST_ROW_STRUCT_NAME, headers, []string{"anid"}, typesToKinds(s.fieldTypes), db, LIMIT)
}
func createTestNestedMap(s *csvWriteTestSuite) types.Map {
storage := &chunks.MemoryStorage{}
db := datas.NewDatabase(storage.NewView())
cr, headers := startReadingCsvTestExpectationFile(s)
return ReadToMap(cr, TEST_ROW_STRUCT_NAME, headers, []string{"anid", "year"}, typesToKinds(s.fieldTypes), db)
return ReadToMap(cr, TEST_ROW_STRUCT_NAME, headers, []string{"anid", "year"}, typesToKinds(s.fieldTypes), db, LIMIT)
}
func verifyOutput(s *csvWriteTestSuite, r io.Reader) {