mirror of
https://github.com/dolthub/dolt.git
synced 2026-01-25 10:26:22 -06:00
fix for datetime and time types for exporting parquet file (#3743)
This commit is contained in:
@@ -28,6 +28,7 @@ import (
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/row"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/schema/typeinfo"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
@@ -102,11 +103,11 @@ func (pr *ParquetReader) ReadSqlRow(ctx context.Context) (sql.Row, error) {
|
||||
allCols.Iter(func(tag uint64, col schema.Column) (stop bool, err error) {
|
||||
val := pr.fileData[col.Name][pr.rowReadCounter]
|
||||
if val != nil {
|
||||
sqlType := col.TypeInfo.ToSqlType()
|
||||
if _, ok := sqlType.(sql.DatetimeType); ok {
|
||||
val = time.Unix(val.(int64), 0)
|
||||
} else if _, ok := sqlType.(sql.TimeType); ok {
|
||||
val = sql.Timespan(val.(int64))
|
||||
switch col.TypeInfo.GetTypeIdentifier() {
|
||||
case typeinfo.DatetimeTypeIdentifier:
|
||||
val = time.UnixMicro(val.(int64))
|
||||
case typeinfo.TimeTypeIdentifier:
|
||||
val = sql.Timespan(time.Duration(val.(int64)).Microseconds())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -37,16 +37,16 @@ type ParquetWriter struct {
|
||||
}
|
||||
|
||||
var typeMap = map[typeinfo.Identifier]string{
|
||||
typeinfo.DatetimeTypeIdentifier: "type=INT64, convertedtype=TIME_MICROS",
|
||||
typeinfo.DatetimeTypeIdentifier: "type=INT64, convertedtype=TIMESTAMP_MICROS",
|
||||
typeinfo.DecimalTypeIdentifier: "type=BYTE_ARRAY, convertedtype=UTF8",
|
||||
typeinfo.EnumTypeIdentifier: "type=BYTE_ARRAY, convertedtype=UTF8",
|
||||
typeinfo.InlineBlobTypeIdentifier: "type=BYTE_ARRAY, convertedtype=UTF8",
|
||||
typeinfo.SetTypeIdentifier: "type=BYTE_ARRAY, convertedtype=UTF8",
|
||||
typeinfo.TimeTypeIdentifier: "type=INT64, convertedtype=TIME_MICROS",
|
||||
typeinfo.TimeTypeIdentifier: "type=INT64, convertedtype=TIMESPAN",
|
||||
typeinfo.TupleTypeIdentifier: "type=BYTE_ARRAY, convertedtype=UTF8",
|
||||
typeinfo.UuidTypeIdentifier: "type=BYTE_ARRAY, convertedtype=UTF8",
|
||||
typeinfo.VarBinaryTypeIdentifier: "type=BYTE_ARRAY, convertedtype=UTF8",
|
||||
typeinfo.YearTypeIdentifier: "type=INT32, convertedtype=DATE",
|
||||
typeinfo.YearTypeIdentifier: "type=INT32, convertedtype=INT_32",
|
||||
typeinfo.UnknownTypeIdentifier: "type=BYTE_ARRAY, convertedtype=UTF8",
|
||||
typeinfo.JSONTypeIdentifier: "type=BYTE_ARRAY, convertedtype=UTF8",
|
||||
typeinfo.BlobStringTypeIdentifier: "type=BYTE_ARRAY, convertedtype=UTF8",
|
||||
@@ -114,10 +114,10 @@ func (pwr *ParquetWriter) WriteSqlRow(ctx context.Context, r sql.Row) error {
|
||||
// convert datetime and time types to int64
|
||||
switch colT.TypeInfo.GetTypeIdentifier() {
|
||||
case typeinfo.DatetimeTypeIdentifier:
|
||||
val = val.(time.Time).Unix()
|
||||
val = val.(time.Time).UnixMicro()
|
||||
sqlType = sql.Int64
|
||||
case typeinfo.TimeTypeIdentifier:
|
||||
val = int64(val.(sql.Timespan))
|
||||
val = int64(val.(sql.Timespan).AsTimeDuration())
|
||||
sqlType = sql.Int64
|
||||
case typeinfo.BitTypeIdentifier:
|
||||
sqlType = sql.Uint64
|
||||
|
||||
@@ -373,7 +373,7 @@ CREATE TABLE diffTypes (
|
||||
v6 ENUM('one', 'two', 'three')
|
||||
);
|
||||
INSERT INTO diffTypes VALUES
|
||||
(1,'2020-04-08','11:11:11','2020','2020-04-08 11:11:11',true,'one'),
|
||||
(1,'2020-04-08','-11:11:11','2020','2020-04-08 11:11:11',true,'one'),
|
||||
(2,'2020-04-08','12:12:12','2020','2020-04-08 12:12:12',false,'three'),
|
||||
(3,'2021-10-09','04:12:34','2019','2019-10-09 04:12:34',true,NULL);
|
||||
SQL
|
||||
@@ -382,14 +382,39 @@ SQL
|
||||
[[ "$output" =~ "Successfully exported data." ]] || false
|
||||
[ -f dt.parquet ]
|
||||
|
||||
run parquet-tools cat --json dt.parquet > output.json
|
||||
run parquet-tools cat --json dt.parquet
|
||||
[ "$status" -eq 0 ]
|
||||
row1='{"pk":1,"v1":1586304000,"v2":40271000000,"v3":2020,"v4":1586344271,"v5":1,"v6":"one"}'
|
||||
row2='{"pk":2,"v1":1586304000,"v2":43932000000,"v3":2020,"v4":1586347932,"v5":0,"v6":"three"}'
|
||||
row3='{"pk":3,"v1":1633737600,"v2":15154000000,"v3":2019,"v4":1570594354,"v5":1}'
|
||||
[[ "$output" =~ "$row1" ]] || false
|
||||
[[ "$output" =~ "$row2" ]] || false
|
||||
[[ "$output" =~ "$row3" ]] || false
|
||||
[[ "$output" =~ '{"pk":1,"v1":1586304000000000,"v2":-40271000000000,"v3":2020,"v4":1586344271000000,"v5":1,"v6":"one"}' ]] || false
|
||||
[[ "$output" =~ '{"pk":2,"v1":1586304000000000,"v2":43932000000000,"v3":2020,"v4":1586347932000000,"v5":0,"v6":"three"}' ]] || false
|
||||
[[ "$output" =~ '{"pk":3,"v1":1633737600000000,"v2":15154000000000,"v3":2019,"v4":1570594354000000,"v5":1}' ]] || false
|
||||
|
||||
run dolt sql -q "SELECT * FROM diffTypes"
|
||||
result=$output
|
||||
|
||||
dolt table import -r diffTypes dt.parquet
|
||||
run dolt sql -q "SELECT * FROM diffTypes"
|
||||
[ "$output" = "$result" ]
|
||||
|
||||
echo "import pandas as pd
|
||||
df = pd.read_parquet('dt.parquet')
|
||||
print(df)
|
||||
" > pandas_test.py
|
||||
run python3 pandas_test.py
|
||||
panda_result=$output
|
||||
|
||||
echo "import pyarrow.parquet as pq
|
||||
table = pq.read_table('dt.parquet')
|
||||
print(table.to_pandas())
|
||||
" > arrow_test.py
|
||||
run python3 arrow_test.py
|
||||
[ "$output" = "$panda_result" ]
|
||||
|
||||
echo "import pandas as pd
|
||||
df = pd.read_parquet('dt.parquet')
|
||||
print(pd.to_timedelta(df.at[0, 'v2']))
|
||||
" > timespan_test.py
|
||||
run python3 timespan_test.py
|
||||
[[ "$output" =~ "-1 days +12:48:49" ]] || false
|
||||
}
|
||||
|
||||
@test "export-tables: table export more types to parquet" {
|
||||
|
||||
Reference in New Issue
Block a user