fix parquet to succeed with warning when processing non-matching schema

This commit is contained in:
elianddb
2026-03-02 13:42:12 -08:00
parent 74916eb467
commit 86a1bd49cd
2 changed files with 162 additions and 10 deletions

View File

@@ -69,8 +69,11 @@ func OpenParquetReader(vrw types.ValueReadWriter, path string, sch schema.Schema
return NewParquetReader(vrw, fr, sch)
}
// NewParquetReader creates a ParquetReader from a given fileReader.
// The ParquetFileInfo should describe the parquet file being read.
// NewParquetReader creates a [ParquetReader] from |fr| and |sche|.
//
// Columns in |sche| that are not present in the parquet file are skipped so
// import callers can report schema mismatch warnings and continue processing.
// This mirrors the update-import behavior used for other flat-file readers.
func NewParquetReader(vrw types.ValueReadWriter, fr source.ParquetFile, sche schema.Schema) (*ParquetReader, error) {
pr, err := reader.NewParquetColumnReader(fr, 4)
if err != nil {
@@ -87,6 +90,7 @@ func NewParquetReader(vrw types.ValueReadWriter, fr source.ParquetFile, sche sch
rLevels := make(map[string][]int32)
dLevels := make(map[string][]int32)
rowReadCounters := make(map[string]int)
chosenColumns := make([]schema.Column, 0, len(columns))
var colName []string
for _, col := range columns {
pathName := common.ReformPathStr(fmt.Sprintf("%s.%s", rootName, col.Name))
@@ -95,10 +99,12 @@ func NewParquetReader(vrw types.ValueReadWriter, fr source.ParquetFile, sche sch
return nil, fmt.Errorf("cannot read column: %s", err.Error())
}
if !found {
// Missing columns are ignored so downstream import code can handle
// schema differences uniformly across file formats.
if resolvedColumnName != "" {
return nil, fmt.Errorf("cannot read column: %s is ambiguous", resolvedColumnName)
}
return nil, fmt.Errorf("cannot read column: %s Column not found", col.Name)
continue
}
colData, rLevel, dLevel, cErr := pr.ReadColumnByPath(resolvedColumnName, num)
if cErr != nil {
@@ -111,12 +117,23 @@ func NewParquetReader(vrw types.ValueReadWriter, fr source.ParquetFile, sche sch
dLevels[col.Name] = dLevel
rowReadCounters[col.Name] = 0
colName = append(colName, col.Name)
chosenColumns = append(chosenColumns, col)
}
if len(chosenColumns) == 0 {
return nil, fmt.Errorf("cannot read column: no matching columns found in parquet file")
}
chosenColumnCollection := schema.NewColCollection(chosenColumns...)
chosenSchema, schErr := schema.SchemaFromCols(chosenColumnCollection)
if schErr != nil {
chosenSchema = schema.UnkeyedSchemaFromCols(chosenColumnCollection)
}
return &ParquetReader{
fileReader: fr,
pReader: pr,
sch: sche,
sch: chosenSchema,
vrw: vrw,
numRow: int(num),
rowsRead: 0,
@@ -128,18 +145,25 @@ func NewParquetReader(vrw types.ValueReadWriter, fr source.ParquetFile, sche sch
}, nil
}
// resolveColumnPrefix takes a path into a parquet schema and determines:
// - whether there is exactly one leaf column corresponding to that path
// - whether any of the types after the prefix are repeated.
// resolveColumnPrefix reports whether |columnPrefix| resolves to exactly one
// leaf parquet column and whether the resolved path includes repeated types.
//
// If |columnPrefix| is not present in the parquet schema, it returns
// |found| == false and a nil error. Parse and traversal failures return a
// non-nil error.
func resolveColumnPrefix(pr *reader.ParquetReader, columnPrefix string) (columnName string, found bool, isRepeated bool, err error) {
inPath, err := pr.SchemaHandler.ConvertToInPathStr(columnPrefix)
if err != nil {
return "", false, false, err
return "", false, false, nil
}
segments := strings.Split(inPath, "\x01")
pathSegments := strings.Split(inPath, "\x01")
pathMapType := pr.SchemaHandler.PathMap
for _, segment := range segments[1:] {
if len(pathSegments) < 2 || pathSegments[0] != pathMapType.Path {
return "", false, false, nil
}
for _, segment := range pathSegments[1:] {
pathMapType, found = pathMapType.Children[segment]
if !found {
return "", false, isRepeated, nil

View File

@@ -1486,3 +1486,131 @@ DELIM
[ "$status" -eq 1 ]
[[ "$output" =~ "fatal: --all-text is only supported for create operations" ]] || false
}
#https://github.com/dolthub/dolt/issues/10589
@test "import-update-tables: table has more columns than flat file warns and continues for CSV and Parquet" {
dolt sql <<SQL
CREATE TABLE with_created_at_csv (
id INT PRIMARY KEY,
name VARCHAR(100),
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE with_created_at_parquet (
id INT PRIMARY KEY,
name VARCHAR(100),
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO with_created_at_csv (id, name) VALUES (1, 'alice');
INSERT INTO with_created_at_parquet (id, name) VALUES (1, 'alice');
SQL
cat <<DELIM > subset.csv
id,name
2,bob
DELIM
run dolt table import -u with_created_at_csv subset.csv
[ "$status" -eq 0 ]
[[ "$output" =~ "Warning: The import file's schema does not match the table's schema" ]] || false
[[ "$output" =~ "Missing columns in with_created_at_csv:" ]] || false
[[ "$output" =~ "Rows Processed: 1, Additions: 1, Modifications: 0, Had No Effect: 0" ]] || false
[[ "$output" =~ "Import completed successfully." ]] || false
run dolt sql -r csv -q "SELECT id, name FROM with_created_at_csv ORDER BY id"
[ "${lines[1]}" = "1,alice" ]
[ "${lines[2]}" = "2,bob" ]
# Create a parquet flat file with the same two columns / row values as subset.csv.
dolt sql -q "CREATE TABLE parquet_subset (id INT PRIMARY KEY, name VARCHAR(100));"
dolt sql -q "INSERT INTO parquet_subset VALUES (2, 'bob');"
dolt table export parquet_subset subset.parquet
dolt sql -q "DROP TABLE parquet_subset;"
run dolt table import -u with_created_at_parquet subset.parquet
[ "$status" -eq 0 ]
[[ "$output" =~ "Warning: The import file's schema does not match the table's schema" ]] || false
[[ "$output" =~ "Missing columns in with_created_at_parquet:" ]] || false
[[ "$output" =~ "Rows Processed: 1, Additions: 1, Modifications: 0, Had No Effect: 0" ]] || false
[[ "$output" =~ "Import completed successfully." ]] || false
run dolt sql -r csv -q "SELECT id, name FROM with_created_at_parquet ORDER BY id"
[ "${lines[1]}" = "1,alice" ]
[ "${lines[2]}" = "2,bob" ]
}
@test "import-update-tables: parquet subsetting throws error with not null column" {
cat <<SQL > 1pk5col-ints-def-sch.sql
CREATE TABLE test (
pk int NOT NULL COMMENT 'tag:0',
c1 int,
c2 int,
c3 int,
c4 int NOT NULL,
c5 int,
PRIMARY KEY (pk)
);
SQL
dolt sql < 1pk5col-ints-def-sch.sql
dolt sql <<SQL
CREATE TABLE parquet_subset (
pk int PRIMARY KEY,
c1 int,
c2 int,
c5 int,
c3 int
);
INSERT INTO parquet_subset VALUES (0,1,2,6,3);
SQL
dolt table export parquet_subset 1pk5col-ints-updt.parquet
dolt sql -q "DROP TABLE parquet_subset;"
run dolt table import -u test 1pk5col-ints-updt.parquet
[ "$status" -eq 1 ]
[[ "$output" =~ "Field 'c4' doesn't have a default value" ]] || false
}
@test "import-update-tables: parquet subsetting but with defaults" {
cat <<SQL > 1pk5col-ints-def-sch.sql
CREATE TABLE test (
pk int NOT NULL COMMENT 'tag:0',
c1 int,
c2 int,
c3 int,
c4 int DEFAULT 42,
c5 int,
PRIMARY KEY (pk)
);
SQL
dolt sql < 1pk5col-ints-def-sch.sql
dolt sql <<SQL
CREATE TABLE parquet_subset (
pk int PRIMARY KEY,
c1 int,
c2 int,
c5 int,
c3 int
);
INSERT INTO parquet_subset VALUES (0,1,2,6,3);
SQL
dolt table export parquet_subset 1pk5col-ints-updt.parquet
dolt sql -q "DROP TABLE parquet_subset;"
run dolt table import -u test 1pk5col-ints-updt.parquet
[ "$status" -eq 0 ]
[[ "$output" =~ "Warning: The import file's schema does not match the table's schema" ]] || false
[[ "$output" =~ "Missing columns in test:" ]] || false
[[ "$output" =~ "Rows Processed: 1, Additions: 1, Modifications: 0, Had No Effect: 0" ]] || false
[[ "$output" =~ "Import completed successfully." ]] || false
run dolt sql -r csv -q "select * from test"
[ "${lines[1]}" = "0,1,2,3,42,6" ]
run dolt sql -q "select count(*) from test"
[[ "$output" =~ "1" ]] || false
}