diff --git a/cmd/timeliner/main.go b/cmd/timeliner/main.go index c5a05ec..de13da4 100644 --- a/cmd/timeliner/main.go +++ b/cmd/timeliner/main.go @@ -34,6 +34,8 @@ func init() { flag.BoolVar(&prune, "prune", prune, "When finishing, delete items not found on remote (download-all or import only)") flag.BoolVar(&integrity, "integrity", integrity, "Perform integrity check on existing items and reprocess if needed (download-all or import only)") flag.BoolVar(&reprocess, "reprocess", reprocess, "Reprocess every item that has not been modified locally (download-all or import only)") + flag.BoolVar(&softMerge, "softmerge", softMerge, "Merge incoming data with existing row using 'soft' keys (account ID + item timestamp + one of text, filename, and hash)") + flag.StringVar(&keep, "keep", keep, "Comma-separated list of existing values to keep if merge is performed (preferring existing value): id,ts,text,file") flag.StringVar(&tfStartInput, "start", "", "Timeframe start (relative=duration, absolute=YYYY/MM/DD)") flag.StringVar(&tfEndInput, "end", "", "Timeframe end (relative=duration, absolute=YYYY/MM/DD)") @@ -111,6 +113,37 @@ func main() { return } + // get the timeframe within which to constrain item processing (multiple commands use this) + tf, err := parseTimeframe() + if err != nil { + log.Fatalf("[FATAL] %v", err) + } + + // make the processing options + mergeOptions := timeliner.MergeOptions{SoftMerge: softMerge} + keepFields := strings.Split(keep, ",") + for _, val := range keepFields { + switch val { + case "id": + mergeOptions.PreferExistingID = true + case "ts": + mergeOptions.PreferExistingTimestamp = true + case "text": + mergeOptions.PreferExistingDataText = true + case "file": + mergeOptions.PreferExistingDataFile = true + default: + log.Fatalf("[FATAL] Unrecognized value for 'keep' argument: '%s'", val) + } + } + procOpt := timeliner.ProcessingOptions{ + Reprocess: reprocess, + Prune: prune, + Integrity: integrity, + Timeframe: tf, + Merge: mergeOptions, + } + // make a client for each account var clients []timeliner.WrappedClient for _, a := range accounts { @@ -133,15 +166,10 @@ func main() { switch subcmd { case "get-latest": - if reprocess || prune || integrity || tfStartInput != "" { + if procOpt.Reprocess || procOpt.Prune || procOpt.Integrity || procOpt.Timeframe.Since != nil { log.Fatalf("[FATAL] The get-latest subcommand does not support -reprocess, -prune, -integrity, or -start") } - _, tfEnd, err := parseTimeframe() - if err != nil { - log.Fatalf("[FATAL] %v", err) - } - var wg sync.WaitGroup for _, wc := range clients { wg.Add(1) @@ -152,7 +180,7 @@ func main() { if retryNum > 0 { log.Println("[INFO] Retrying command") } - err := wc.GetLatest(ctx, tfEnd) + err := wc.GetLatest(ctx, tf.Until) if err != nil { log.Printf("[ERROR][%s/%s] Getting latest: %v", wc.DataSourceID(), wc.UserID(), err) @@ -169,11 +197,6 @@ func main() { wg.Wait() case "get-all": - tfStart, tfEnd, err := parseTimeframe() - if err != nil { - log.Fatalf("[FATAL] %v", err) - } - var wg sync.WaitGroup for _, wc := range clients { wg.Add(1) @@ -184,7 +207,7 @@ func main() { if retryNum > 0 { log.Println("[INFO] Retrying command") } - err := wc.GetAll(ctx, reprocess, prune, integrity, timeliner.Timeframe{Since: tfStart, Until: tfEnd}) + err := wc.GetAll(ctx, procOpt) if err != nil { log.Printf("[ERROR][%s/%s] Downloading all: %v", wc.DataSourceID(), wc.UserID(), err) @@ -205,7 +228,7 @@ func main() { wc := clients[0] ctx, cancel := context.WithCancel(context.Background()) - err = wc.Import(ctx, file, reprocess, prune, integrity) + err = wc.Import(ctx, file, procOpt) if err != nil { log.Printf("[ERROR][%s/%s] Importing: %v", wc.DataSourceID(), wc.UserID(), err) @@ -218,44 +241,42 @@ func main() { } // parseTimeframe parses tfStartInput and/or tfEndInput and returns -// the resulting start and end times (may be nil), or an error. -func parseTimeframe() (start, end *time.Time, err error) { - var tfStart, tfEnd time.Time +// the resulting timeframe or an error. +func parseTimeframe() (timeliner.Timeframe, error) { + var tf timeliner.Timeframe + var timeStart, timeEnd time.Time + if tfStartInput != "" { - var tfStartRel time.Duration - tfStartRel, err = time.ParseDuration(tfStartInput) + tfStartRel, err := time.ParseDuration(tfStartInput) if err == nil { - tfStart = time.Now().Add(tfStartRel) + timeStart = time.Now().Add(tfStartRel) } else { - tfStart, err = time.Parse(dateFormat, tfStartInput) + timeStart, err = time.Parse(dateFormat, tfStartInput) if err != nil { - err = fmt.Errorf("bad timeframe start value '%s': %v", tfStartInput, err) - return + return tf, fmt.Errorf("bad timeframe start value '%s': %v", tfStartInput, err) } } - start = &tfStart + tf.Since = &timeStart } if tfEndInput != "" { - var tfEndRel time.Duration - tfEndRel, err = time.ParseDuration(tfEndInput) + tfEndRel, err := time.ParseDuration(tfEndInput) if err == nil { - tfEnd = time.Now().Add(tfEndRel) + timeEnd = time.Now().Add(tfEndRel) } else { - tfEnd, err = time.Parse(dateFormat, tfEndInput) + timeEnd, err = time.Parse(dateFormat, tfEndInput) if err != nil { - err = fmt.Errorf("bad timeframe end value '%s': %v", tfEndInput, err) - return + return tf, fmt.Errorf("bad timeframe end value '%s': %v", tfEndInput, err) } } - end = &tfEnd + tf.Until = &timeEnd } - if start != nil && end != nil && end.Before(*start) { - err = fmt.Errorf("end time must be after start time (start=%s end=%s)", start, end) + if tf.Since != nil && tf.Until != nil && tf.Until.Before(*tf.Since) { + return tf, fmt.Errorf("end time must be after start time (start=%s end=%s)", tf.Since, tf.Until) } - return + return tf, nil } func loadConfig() error { @@ -353,6 +374,8 @@ var ( integrity bool prune bool reprocess bool + softMerge bool + keep string tfStartInput, tfEndInput string diff --git a/datasource.go b/datasource.go index 39048ec..86196bd 100644 --- a/datasource.go +++ b/datasource.go @@ -107,7 +107,6 @@ type DataSource struct { // authFunc gets the authentication function for this // service. If s.Authenticate is set, it returns that; // if s.OAuth2 is set, it uses a standard OAuth2 func. -// TODO: update godoc func (ds DataSource) authFunc() AuthenticateFn { if ds.Authenticate != nil { return ds.Authenticate @@ -191,7 +190,7 @@ type Client interface { // timeliner.Checkpoint to set a checkpoint. Checkpoints are not // required, but if the implementation sets checkpoints, it // should be able to resume from one, too. - ListItems(ctx context.Context, itemChan chan<- *ItemGraph, opt Options) error + ListItems(ctx context.Context, itemChan chan<- *ItemGraph, opt ListingOptions) error } // Timeframe represents a start and end time and/or diff --git a/datasources/facebook/facebook.go b/datasources/facebook/facebook.go index 4877316..8164151 100644 --- a/datasources/facebook/facebook.go +++ b/datasources/facebook/facebook.go @@ -67,7 +67,7 @@ type Client struct { } // ListItems lists the items on the Facebook account. -func (c *Client) ListItems(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.Options) error { +func (c *Client) ListItems(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.ListingOptions) error { defer close(itemChan) if opt.Filename != "" { diff --git a/datasources/googlelocation/googlelocation.go b/datasources/googlelocation/googlelocation.go index 8233807..f167260 100644 --- a/datasources/googlelocation/googlelocation.go +++ b/datasources/googlelocation/googlelocation.go @@ -43,7 +43,7 @@ func init() { type Client struct{} // ListItems lists items from the data source. opt.Filename must be non-empty. -func (c *Client) ListItems(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.Options) error { +func (c *Client) ListItems(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.ListingOptions) error { defer close(itemChan) if opt.Filename == "" { diff --git a/datasources/googlephotos/googlephotos.go b/datasources/googlephotos/googlephotos.go index a2d57be..708a171 100644 --- a/datasources/googlephotos/googlephotos.go +++ b/datasources/googlephotos/googlephotos.go @@ -71,7 +71,7 @@ type Client struct { // ListItems lists items from the data source. // opt.Timeframe precision is day-level at best. -func (c *Client) ListItems(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.Options) error { +func (c *Client) ListItems(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.ListingOptions) error { defer close(itemChan) if opt.Filename != "" { diff --git a/datasources/googlephotos/takeoutarchive.go b/datasources/googlephotos/takeoutarchive.go index a0f7dc5..4fbacf0 100644 --- a/datasources/googlephotos/takeoutarchive.go +++ b/datasources/googlephotos/takeoutarchive.go @@ -17,7 +17,7 @@ import ( "github.com/mholt/timeliner" ) -func (c *Client) listFromTakeoutArchive(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.Options) error { +func (c *Client) listFromTakeoutArchive(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.ListingOptions) error { err := archiver.Walk(opt.Filename, func(f archiver.File) error { pathInArchive := getPathInArchive(f) // TODO: maybe this should be a function in the archiver lib @@ -69,10 +69,15 @@ func (c *Client) listFromTakeoutArchive(ctx context.Context, itemChan chan<- *ti itemMeta.pathInArchive = strings.TrimSuffix(pathInArchive, ".json") itemMeta.archiveFilename = opt.Filename - collection.Items = append(collection.Items, timeliner.CollectionItem{ - Item: itemMeta, - Position: len(collection.Items), - }) + withinTimeframe := (opt.Timeframe.Since == nil || itemMeta.parsedPhotoTakenTime.After(*opt.Timeframe.Since)) && + (opt.Timeframe.Until == nil || itemMeta.parsedPhotoTakenTime.Before(*opt.Timeframe.Until)) + + if withinTimeframe { + collection.Items = append(collection.Items, timeliner.CollectionItem{ + Item: itemMeta, + Position: len(collection.Items), + }) + } return nil }) @@ -80,9 +85,11 @@ func (c *Client) listFromTakeoutArchive(ctx context.Context, itemChan chan<- *ti return err } - ig := timeliner.NewItemGraph(nil) - ig.Collections = append(ig.Collections, collection) - itemChan <- ig + if len(collection.Items) > 0 { + ig := timeliner.NewItemGraph(nil) + ig.Collections = append(ig.Collections, collection) + itemChan <- ig + } return nil }) @@ -99,7 +106,7 @@ func getPathInArchive(f archiver.File) string { switch hdr := f.Header.(type) { case zip.FileHeader: return hdr.Name - case tar.Header: + case *tar.Header: return hdr.Name } return "" @@ -187,8 +194,10 @@ func (m mediaArchiveMetadata) timestamp() (time.Time, error) { return time.Unix(parsed, 0), nil } +// ID does NOT return the same ID as from the API. Takeout archives do NOT +// have an ID associated with each item, so we do our best by making up +// an ID using the timestamp and the filename. func (m mediaArchiveMetadata) ID() string { - // TODO: THIS IS NOT THE SAME AS THE ID FROM THE API return m.PhotoTakenTime.Timestamp + "_" + m.Title } diff --git a/datasources/instagram/instagram.go b/datasources/instagram/instagram.go index 0e3f08c..7ffbab9 100644 --- a/datasources/instagram/instagram.go +++ b/datasources/instagram/instagram.go @@ -38,7 +38,7 @@ func init() { type Client struct{} // ListItems lists items from the data source. opt.Filename must be non-empty. -func (c *Client) ListItems(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.Options) error { +func (c *Client) ListItems(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.ListingOptions) error { defer close(itemChan) if opt.Filename == "" { diff --git a/datasources/smsbackuprestore/smsbackuprestore.go b/datasources/smsbackuprestore/smsbackuprestore.go index 4be465d..07436ac 100644 --- a/datasources/smsbackuprestore/smsbackuprestore.go +++ b/datasources/smsbackuprestore/smsbackuprestore.go @@ -48,7 +48,7 @@ type Client struct { } // ListItems lists items from the data source. -func (c *Client) ListItems(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.Options) error { +func (c *Client) ListItems(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.ListingOptions) error { defer close(itemChan) if opt.Filename == "" { diff --git a/datasources/twitter/api.go b/datasources/twitter/api.go index 621fa57..3425eb1 100644 --- a/datasources/twitter/api.go +++ b/datasources/twitter/api.go @@ -12,7 +12,7 @@ import ( "github.com/mholt/timeliner" ) -func (c *Client) getFromAPI(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.Options) error { +func (c *Client) getFromAPI(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.ListingOptions) error { // load any previous checkpoint c.checkpoint.load(opt.Checkpoint) diff --git a/datasources/twitter/archives.go b/datasources/twitter/archives.go index 4f78dd4..292b8bf 100644 --- a/datasources/twitter/archives.go +++ b/datasources/twitter/archives.go @@ -9,7 +9,7 @@ import ( "github.com/mholt/timeliner" ) -func (c *Client) getFromArchiveFile(itemChan chan<- *timeliner.ItemGraph, opt timeliner.Options) error { +func (c *Client) getFromArchiveFile(itemChan chan<- *timeliner.ItemGraph, opt timeliner.ListingOptions) error { // load the user's account ID var err error c.ownerAccount, err = c.getOwnerAccountFromArchive(opt.Filename) diff --git a/datasources/twitter/twitter.go b/datasources/twitter/twitter.go index 11d7714..d3dfb3f 100644 --- a/datasources/twitter/twitter.go +++ b/datasources/twitter/twitter.go @@ -72,7 +72,7 @@ type Client struct { } // ListItems lists items from opt.Filename if specified, or from the API otherwise. -func (c *Client) ListItems(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.Options) error { +func (c *Client) ListItems(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.ListingOptions) error { defer close(itemChan) if opt.Filename != "" { diff --git a/db.go b/db.go index bf50e40..63e01c1 100644 --- a/db.go +++ b/db.go @@ -88,8 +88,8 @@ CREATE TABLE IF NOT EXISTS "items" ( "account_id" INTEGER NOT NULL, "original_id" TEXT NOT NULL, -- ID provided by the data source "person_id" INTEGER NOT NULL, - "timestamp" INTEGER, -- timestamp when item content was originally created - "stored" INTEGER NOT NULL DEFAULT (strftime('%s', CURRENT_TIME)), -- timestamp row was created - TODO not sure if needed + "timestamp" INTEGER, -- timestamp when item content was originally created (NOT when the database row was created) + "stored" INTEGER NOT NULL DEFAULT (strftime('%s', CURRENT_TIME)), -- timestamp row was created or last updated from source "modified" INTEGER, -- timestamp when item was locally modified; if not null, then item is "not clean" "class" INTEGER, "mime_type" TEXT, @@ -106,6 +106,7 @@ CREATE TABLE IF NOT EXISTS "items" ( CREATE INDEX IF NOT EXISTS "idx_items_timestamp" ON "items"("timestamp"); CREATE INDEX IF NOT EXISTS "idx_items_data_text" ON "items"("data_text"); +CREATE INDEX IF NOT EXISTS "idx_items_data_file" ON "items"("data_file"); CREATE INDEX IF NOT EXISTS "idx_items_data_hash" ON "items"("data_hash"); -- Relationships draws relationships between and across items and persons. diff --git a/itemfiles.go b/itemfiles.go index 4bee2e3..b619961 100644 --- a/itemfiles.go +++ b/itemfiles.go @@ -75,7 +75,6 @@ func (t *Timeline) openUniqueCanonicalItemDataFile(it Item, dataSourceID string) tryPath = strings.TrimSuffix(tryPath, lastAppend) lastAppend = fmt.Sprintf("_%d%s", i+1, ext) // start at 1, but actually 2 because existing file is "1" tryPath += lastAppend - continue } if err != nil { diff --git a/itemgraph.go b/itemgraph.go index 8768c2f..40df649 100644 --- a/itemgraph.go +++ b/itemgraph.go @@ -173,6 +173,7 @@ type ItemRow struct { Location metaGob []byte // use Metadata.(encode/decode) + item Item } // Location contains location information. @@ -302,7 +303,7 @@ type Relation struct { Bidirectional bool } -// Collection represents a group of items. +// Collection represents a group of items, like an album. type Collection struct { // The ID of the collection as given // by the service; for example, the diff --git a/processing.go b/processing.go index e6b058a..4b20390 100644 --- a/processing.go +++ b/processing.go @@ -17,7 +17,7 @@ import ( // obtained from ac. It returns a WaitGroup which blocks until // all workers have finished, and a channel into which the // service should pipe its items. -func (wc *WrappedClient) beginProcessing(cc concurrentCuckoo, reprocess, integrity bool) (*sync.WaitGroup, chan<- *ItemGraph) { +func (wc *WrappedClient) beginProcessing(cc concurrentCuckoo, po ProcessingOptions) (*sync.WaitGroup, chan<- *ItemGraph) { wg := new(sync.WaitGroup) ch := make(chan *ItemGraph) @@ -31,12 +31,11 @@ func (wc *WrappedClient) beginProcessing(cc concurrentCuckoo, reprocess, integri continue } _, err := wc.processItemGraph(ig, &recursiveState{ - timestamp: time.Now(), - reprocess: reprocess, - integrityCheck: integrity, - seen: make(map[*ItemGraph]int64), - idmap: make(map[string]int64), - cuckoo: cc, + timestamp: time.Now(), + procOpt: po, + seen: make(map[*ItemGraph]int64), + idmap: make(map[string]int64), + cuckoo: cc, }) if err != nil { log.Printf("[ERROR][%s/%s] Processing item graph: %v", @@ -50,11 +49,10 @@ func (wc *WrappedClient) beginProcessing(cc concurrentCuckoo, reprocess, integri } type recursiveState struct { - timestamp time.Time - reprocess bool - integrityCheck bool - seen map[*ItemGraph]int64 // value is the item's row ID - idmap map[string]int64 // map an item's service ID to the row ID -- TODO: I don't love this... any better way? + timestamp time.Time + procOpt ProcessingOptions + seen map[*ItemGraph]int64 // value is the item's row ID + idmap map[string]int64 // map an item's service ID to the row ID -- TODO: I don't love this... any better way? // the cuckoo filter pointer lives for // the duration of the entire operation; @@ -75,7 +73,7 @@ func (wc *WrappedClient) processItemGraph(ig *ItemGraph, state *recursiveState) var igRowID int64 if ig.Node == nil { - // mark this node as visited + // mark this node as visited (no ID) state.seen[ig] = 0 } else { // process root node @@ -131,7 +129,7 @@ func (wc *WrappedClient) processItemGraph(ig *ItemGraph, state *recursiveState) coll.Items[i].itemRowID = state.idmap[it.Item.ID()] } - err := wc.processCollection(coll, state.timestamp) + err := wc.processCollection(coll, state.timestamp, state.procOpt) if err != nil { return 0, fmt.Errorf("processing collection: %v (original_id=%s)", err, coll.OriginalID) } @@ -200,7 +198,7 @@ func (wc *WrappedClient) processSingleItemGraphNode(it Item, state *recursiveSta state.cuckoo.Unlock() } - itemRowID, err := wc.storeItemFromService(it, state.timestamp, state.reprocess, state.integrityCheck) + itemRowID, err := wc.storeItemFromService(it, state.timestamp, state.procOpt) if err != nil { return itemRowID, err } @@ -220,13 +218,30 @@ func (wc *WrappedClient) processSingleItemGraphNode(it Item, state *recursiveSta return itemRowID, nil } -func (wc *WrappedClient) storeItemFromService(it Item, timestamp time.Time, reprocess, integrity bool) (int64, error) { +func (wc *WrappedClient) storeItemFromService(it Item, timestamp time.Time, procOpt ProcessingOptions) (int64, error) { if it == nil { return 0, nil } - // process this item only one at a time itemOriginalID := it.ID() + + // if enabled, prepare a "soft merge" - this operation finds an existing row that + // matches properties of an item that are LIKELY unique if they are, in fact, the + // same item, without relying on the item's original_id alone (which might not be + // consistent depending on the method used, for example importing from Google + // Takeout for Google Photos has different IDs than using their API) -- and sets + // the original_id field of the candidate row to that of the incoming row; this + // will trigger a conflict in the query below that will cause a graceful update + var doingSoftMerge bool + if procOpt.Merge.SoftMerge { + var err error + itemOriginalID, doingSoftMerge, err = wc.softMerge(it, procOpt) + if err != nil { + return 0, fmt.Errorf("soft merge: %v", err) + } + } + + // process this item only one at a time itemLockID := fmt.Sprintf("%s_%d_%s", wc.ds.ID, wc.acc.ID, itemOriginalID) itemLocks.Lock(itemLockID) defer itemLocks.Unlock(itemLockID) @@ -247,6 +262,12 @@ func (wc *WrappedClient) storeItemFromService(it Item, timestamp time.Time, repr defer rc.Close() } + // as we go, we'll decide whether we are to process and store this item's data + // file or not, as it depends on a few factors; our default behavior is that + // data in a new item is preferred and thus assimilated into the timeline, + // so we begin by deciding simply based on whether this item has a file reader + processDataFile := rc != nil + // if the item is already in our DB, load it var ir ItemRow if itemOriginalID != "" { @@ -257,16 +278,22 @@ func (wc *WrappedClient) storeItemFromService(it Item, timestamp time.Time, repr if ir.ID > 0 { // already have it - if !wc.shouldProcessExistingItem(it, ir, reprocess, integrity) { + if !wc.shouldProcessExistingItem(it, ir, doingSoftMerge, procOpt) { return ir.ID, nil } - // at this point, we will be replacing the existing - // file, so move it temporarily as a safe measure, - // and also because our filename-generator will not - // allow a file to be overwritten, but we want to - // replace the existing file in this case - if ir.DataFile != nil && rc != nil { + // update our decision to process the data file; we know we already have this item + // so a merge is taking place; so now this depends on the merge configuration and + // whether the existing item has a data file + processDataFile = processDataFile && (ir.DataFile == nil || !procOpt.Merge.PreferExistingDataFile) + + // if there is an existing data file, and this new item has a data file, + // and the merge configuration does NOT prefer existing file over new one, + // then we know we will be replacing the existing file, so move it out of + // the way temporarily as a safe measure, and also because our + // filename-generator will not allow a file to be overwritten, but we want + // to replace the existing file in this case... + if processDataFile { origFile := wc.tl.fullpath(*ir.DataFile) bakFile := wc.tl.fullpath(*ir.DataFile + ".bak") err = os.Rename(origFile, bakFile) @@ -297,7 +324,7 @@ func (wc *WrappedClient) storeItemFromService(it Item, timestamp time.Time, repr var dataFileName *string var datafile *os.File - if rc != nil { + if processDataFile { datafile, dataFileName, err = wc.tl.openUniqueCanonicalItemDataFile(it, wc.ds.ID) if err != nil { return 0, fmt.Errorf("opening output data file: %v", err) @@ -306,21 +333,44 @@ func (wc *WrappedClient) storeItemFromService(it Item, timestamp time.Time, repr } // prepare the item's DB row values - err = wc.fillItemRow(&ir, it, timestamp, dataFileName) + err = wc.fillItemRow(&ir, it, itemOriginalID, timestamp, dataFileName) if err != nil { return 0, fmt.Errorf("assembling item for storage: %v", err) } - // TODO: Insert modified time too, if edited locally? - // TODO: On conflict, maybe we just want to ignore -- make this configurable... + // honor relevant merge options + timestampCoal, dataTextCoal := "COALESCE(?, timestamp)", "COALESCE(?, data_text)" + if procOpt.Merge.PreferExistingTimestamp { + timestampCoal = "COALESCE(timestamp, ?)" + } + if procOpt.Merge.PreferExistingDataText { + dataTextCoal = "COALESCE(data_text, ?)" + } + + // insert into the DB if it does not exist, and if it does, we update the existing + // row such that we usually prefer the new value, but if the new value is nil, keep + // the existing value (this is mostly an additive "merge" of the stored row with + // the incoming row, except that if both values are not null, we overwrite existing + // value with the new one); 'coalesce(?, field)' means "store new value if not null, + // otherwise keep existing value"; i.e. the incoming data is authoritative unless it + // is missing, in which case we keep what we have _, err = wc.tl.db.Exec(`INSERT INTO items (account_id, original_id, person_id, timestamp, stored, class, mime_type, data_text, data_file, data_hash, metadata, latitude, longitude) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (account_id, original_id) DO UPDATE - SET person_id=?, timestamp=?, stored=?, class=?, mime_type=?, data_text=?, - data_file=?, data_hash=?, metadata=?, latitude=?, longitude=?`, + SET person_id=COALESCE(?, person_id), + timestamp=`+timestampCoal+`, + stored=COALESCE(?, stored), + class=COALESCE(?, timestamp), + mime_type=COALESCE(?, mime_type), + data_text=`+dataTextCoal+`, + data_file=COALESCE(?, data_file), + data_hash=COALESCE(?, data_hash), + metadata=COALESCE(?, metadata), + latitude=COALESCE(?, latitude), + longitude=COALESCE(?, longitude)`, ir.AccountID, ir.OriginalID, ir.PersonID, ir.Timestamp.Unix(), ir.Stored.Unix(), ir.Class, ir.MIMEType, ir.DataText, ir.DataFile, ir.DataHash, ir.metaGob, ir.Latitude, ir.Longitude, @@ -342,7 +392,7 @@ func (wc *WrappedClient) storeItemFromService(it Item, timestamp time.Time, repr // if there is a data file, download it and compute its checksum; // then update the item's row in the DB with its name and checksum - if rc != nil && dataFileName != nil { + if processDataFile { h := sha256.New() err := wc.tl.downloadItemFile(rc, datafile, h) if err != nil { @@ -361,7 +411,7 @@ func (wc *WrappedClient) storeItemFromService(it Item, timestamp time.Time, repr } // save the file's name and hash to confirm it was downloaded successfully - _, err = wc.tl.db.Exec(`UPDATE items SET data_hash=? WHERE id=?`, // TODO: LIMIT 1... + _, err = wc.tl.db.Exec(`UPDATE items SET data_hash=? WHERE id=?`, // TODO: LIMIT 1... (see https://github.com/mattn/go-sqlite3/pull/802) b64hash, itemRowID) if err != nil { log.Printf("[ERROR][%s/%s] Updating item's data file hash in DB: %v; cleaning up data file: %s (item_id=%d)", @@ -373,9 +423,9 @@ func (wc *WrappedClient) storeItemFromService(it Item, timestamp time.Time, repr return itemRowID, nil } -func (wc *WrappedClient) shouldProcessExistingItem(it Item, dbItem ItemRow, reprocess, integrity bool) bool { +func (wc *WrappedClient) shouldProcessExistingItem(it Item, dbItem ItemRow, doingSoftMerge bool, procOpt ProcessingOptions) bool { // if integrity check is enabled and checksum mismatches, always reprocess - if integrity && dbItem.DataFile != nil && dbItem.DataHash != nil { + if procOpt.Integrity && dbItem.DataFile != nil && dbItem.DataHash != nil { datafile, err := os.Open(wc.tl.fullpath(*dbItem.DataFile)) if err != nil { log.Printf("[ERROR][%s/%s] Integrity check: opening existing data file: %v; reprocessing (item_id=%d)", @@ -419,11 +469,14 @@ func (wc *WrappedClient) shouldProcessExistingItem(it Item, dbItem ItemRow, repr return true } - // finally, if the user wants to reprocess anyway, then do so - return reprocess + // finally, if the user wants to reprocess anyway, then do so; + // or if we are doing a soft merge (merging one identical item + // into an existing one), always reprocess so that the merge + // will actually take place + return procOpt.Reprocess || doingSoftMerge } -func (wc *WrappedClient) fillItemRow(ir *ItemRow, it Item, timestamp time.Time, canonicalDataFileName *string) error { +func (wc *WrappedClient) fillItemRow(ir *ItemRow, it Item, itemOriginalID string, timestamp time.Time, canonicalDataFileName *string) error { // unpack the item's information into values to use in the row ownerID, ownerName := it.Owner() @@ -469,7 +522,7 @@ func (wc *WrappedClient) fillItemRow(ir *ItemRow, it Item, timestamp time.Time, } ir.AccountID = wc.acc.ID - ir.OriginalID = it.ID() + ir.OriginalID = itemOriginalID ir.PersonID = person.ID ir.Timestamp = it.Timestamp() ir.Stored = timestamp @@ -481,10 +534,91 @@ func (wc *WrappedClient) fillItemRow(ir *ItemRow, it Item, timestamp time.Time, ir.metaGob = metaGob ir.Location = *loc + // not used in the DB, but if we need to get the item's + // original file name, for example, rather than the + // unique filename to be used on disk... + ir.item = it + return nil } -func (wc *WrappedClient) processCollection(coll Collection, timestamp time.Time) error { +// softMerge finds a candidate row that already exists in the DB that is likely to be identical +// to it, even if the original_id does not match, and updates the original_id field to that of it +// if there is exactly 1 matching row and if procOpt permits. This will allow the existing row +// to be merged with the incoming row even though their original_ids do not match. This is a +// best-guess effort based on timestamp and data_text/data_file/data_hash (the hash must be one +// that is offered by the data source, as the post-download hashing is not known until after +// downloading the file, obviously; since most data sources don't offer one, in practice soft +// merges happen over timestamp plus filename or text data only). It returns the ID that must +// be used when processing the item, and whether a soft merge is being performed or not. +func (wc *WrappedClient) softMerge(it Item, procOpt ProcessingOptions) (string, bool, error) { + var filenameLikePattern *string + if dataFileName := it.DataFileName(); dataFileName != nil { + temp := "%/" + *dataFileName + filenameLikePattern = &temp + } + + newOriginalID := it.ID() + dataText, err := it.DataText() + if err != nil { + return newOriginalID, false, fmt.Errorf("getting item text: %v", err) + } + dataHash := it.DataFileHash() + if err != nil { + return newOriginalID, false, fmt.Errorf("getting item data hash: %v", err) + } + + // make sure there is exactly 1 matching row; any more is ambiguous and too risky to merge + // (also make sure the existing original_id does not match the new one; that would be a regular merge) + var numMatches int + var rowID *int + var oldOriginalID *string + err = wc.tl.db.QueryRow(`SELECT COUNT(1), id, original_id + FROM items + WHERE account_id=? AND timestamp=? AND (data_text=? OR data_file LIKE ? OR data_hash=?) AND original_id != ? + LIMIT 1`, + wc.acc.ID, it.Timestamp().Unix(), dataText, filenameLikePattern, dataHash, newOriginalID).Scan(&numMatches, &rowID, &oldOriginalID) + if err == sql.ErrNoRows || numMatches == 0 { + return newOriginalID, false, nil + } + if err != nil { + return newOriginalID, false, fmt.Errorf("querying for candidate row: %v", err) + } + if numMatches > 1 { + return newOriginalID, false, fmt.Errorf("ambiguous match with %d existing items (account_id=%d timestamp=%d data_text=%p data_file=%p) - unable to merge, skipping item with ID: %s", + numMatches, wc.acc.ID, it.Timestamp().Unix(), dataText, filenameLikePattern, newOriginalID) + } + + // now we know there is exactly one match, so we are to perform a soft merge; + // we must honor the configured merge preferences especially regarding ID + + // if configured to keep existing ID, make sure the caller knows to use the + // existing/old ID rather than the ID associated with the current/new item + if procOpt.Merge.PreferExistingID { + log.Printf("[INFO] Soft merging new item with id=%s into row %d with item id=%s (preserved item ID)", newOriginalID, rowID, *oldOriginalID) + return *oldOriginalID, true, nil + } + + // now we know there is exactly 1 match and we are to use the new item's ID; set up merge by + // updating the item's original_id to the incoming item's ID value; this will cause the + // imminent INSERT query to find a conflict and perform a graceful merge with the incoming data + _, err = wc.tl.db.Exec(`UPDATE items SET original_id=? WHERE id=?`, newOriginalID, rowID) // TODO: limit 1 (see https://github.com/mattn/go-sqlite3/pull/802) + if err != nil && err != sql.ErrNoRows { + return newOriginalID, false, fmt.Errorf("updating candidate row's original_id in DB: %v (id=%d old_original_id=%s new_original_id=%s)", + err, rowID, *oldOriginalID, newOriginalID) + } + + log.Printf("[INFO] Soft merging new item with id=%s into row %d with item id=%s (changed item ID)", newOriginalID, rowID, *oldOriginalID) + + return newOriginalID, true, nil +} + +func (wc *WrappedClient) processCollection(coll Collection, timestamp time.Time, procOpt ProcessingOptions) error { + // never reprocess or check integrity when storing items in collections since the main processing handles that + procOpt.Reprocess = false + procOpt.Integrity = false + + // TODO: support soft merge (based on name, I guess) _, err := wc.tl.db.Exec(`INSERT INTO collections (account_id, original_id, name) VALUES (?, ?, ?) ON CONFLICT (account_id, original_id) @@ -508,7 +642,7 @@ func (wc *WrappedClient) processCollection(coll Collection, timestamp time.Time) // (TODO: could batch this for faster inserts) for _, cit := range coll.Items { if cit.itemRowID == 0 { - itID, err := wc.storeItemFromService(cit.Item, timestamp, false, false) // never reprocess or check integrity here + itID, err := wc.storeItemFromService(cit.Item, timestamp, procOpt) if err != nil { return fmt.Errorf("adding item from collection to storage: %v", err) } diff --git a/timeliner.go b/timeliner.go index 54a0c5f..03d4cff 100644 --- a/timeliner.go +++ b/timeliner.go @@ -79,23 +79,6 @@ type concurrentCuckoo struct { *sync.Mutex } -// Options specifies parameters for listing items -// from a data source. Some data sources might not -// be able to honor all fields. -type Options struct { - // A file from which to read the data. - Filename string - - // Time bounds on which data to retrieve. - // The respective time and item ID fields - // which are set must never conflict. - Timeframe Timeframe - - // A checkpoint from which to resume - // item retrieval. - Checkpoint []byte -} - // FakeCloser turns an io.Reader into an io.ReadCloser // where the Close() method does nothing. func FakeCloser(r io.Reader) io.ReadCloser { @@ -157,3 +140,72 @@ type checkpointWrapper struct { Params string Data []byte } + +// ProcessingOptions configures how item processing is carried out. +type ProcessingOptions struct { + Reprocess bool + Prune bool + Integrity bool + Timeframe Timeframe + Merge MergeOptions +} + +// MergeOptions configures how items are merged. By +// default, newly listed items will be combined with +// existing ones that have the same ID by filling in +// values that are missing in the existing item. +// +// These options allow customizing that behavior. For +// example, merges can be performed even if IDs aren't +// the same but other properties are. Or properties in +// the new item may override values in the existing +// item, even if the existing item has a non-nil value. +// This is useful if you want to prefer the form of +// the item in the current listing over that of the +// earlier listing. +type MergeOptions struct { + // If true, an item may be merged if it is likely + // to be the same as an existing item, even if the + // item's ID is different. For example, if a + // service has multiple ways of listing items, but + // does not provide a consistent ID for the same + // item across listing methods, a soft merge will + // allow the processing to treat them as the same + // as long as other fields match: timestamp, and + // either data text or data filename. + SoftMerge bool + + // Keep existing item's ID. This option only has + // any effect if SoftMerge is true, since merges + // are only performed on items with the same ID + // by default. (If SoftMerge is true, the item IDs + // may differ, thus this option takes effect.) + PreferExistingID bool + + // Keep existing item's timestamp. + PreferExistingTimestamp bool + + // Keep existing item's text data. + PreferExistingDataText bool + + // Keep existing item's data file. + PreferExistingDataFile bool +} + +// ListingOptions specifies parameters for listing items +// from a data source. Some data sources might not be +// able to honor all fields. +// TODO: maybe ListOptions instead? +type ListingOptions struct { + // A file from which to read the data. + Filename string + + // Time bounds on which data to retrieve. + // The respective time and item ID fields + // which are set must never conflict. + Timeframe Timeframe + + // A checkpoint from which to resume + // item retrieval. + Checkpoint []byte +} diff --git a/wrappedclient.go b/wrappedclient.go index f85cd6d..765dedc 100644 --- a/wrappedclient.go +++ b/wrappedclient.go @@ -72,9 +72,9 @@ func (wc *WrappedClient) GetLatest(ctx context.Context, until *time.Time) error checkpoint := wc.prepareCheckpoint(timeframe) - wg, ch := wc.beginProcessing(concurrentCuckoo{}, false, false) + wg, ch := wc.beginProcessing(concurrentCuckoo{}, ProcessingOptions{}) - err := wc.Client.ListItems(ctx, ch, Options{ + err := wc.Client.ListItems(ctx, ch, ListingOptions{ Timeframe: timeframe, Checkpoint: checkpoint, }) @@ -93,14 +93,14 @@ func (wc *WrappedClient) GetLatest(ctx context.Context, until *time.Time) error return nil } -// GetAll gets all the items using wc. If reprocess is true, items that -// are already in the timeline will be re-processed. If prune is true, +// GetAll gets all the items using wc. If procOpt.Reprocess is true, items that +// are already in the timeline will be re-processed. If procOpt.Prune is true, // items that are not listed on the data source by wc will be removed -// from the timeline at the end of the listing. If integrity is true, +// from the timeline at the end of the listing. If procOpt.Integrity is true, // all items that are listed by wc that exist in the timeline and which // consist of a data file will be opened and checked for integrity; if // the file has changed, it will be reprocessed. -func (wc *WrappedClient) GetAll(ctx context.Context, reprocess, prune, integrity bool, tf Timeframe) error { +func (wc *WrappedClient) GetAll(ctx context.Context, procOpt ProcessingOptions) error { if wc.Client == nil { return fmt.Errorf("no client") } @@ -110,16 +110,16 @@ func (wc *WrappedClient) GetAll(ctx context.Context, reprocess, prune, integrity ctx = context.WithValue(ctx, wrappedClientCtxKey, wc) var cc concurrentCuckoo - if prune { + if procOpt.Prune { cc.Filter = cuckoo.NewFilter(10000000) // 10mil = ~16 MB on 64-bit cc.Mutex = new(sync.Mutex) } - checkpoint := wc.prepareCheckpoint(tf) + checkpoint := wc.prepareCheckpoint(procOpt.Timeframe) - wg, ch := wc.beginProcessing(cc, reprocess, integrity) + wg, ch := wc.beginProcessing(cc, procOpt) - err := wc.Client.ListItems(ctx, ch, Options{Checkpoint: checkpoint, Timeframe: tf}) + err := wc.Client.ListItems(ctx, ch, ListingOptions{Checkpoint: checkpoint, Timeframe: procOpt.Timeframe}) if err != nil { return fmt.Errorf("getting items from service: %v", err) } @@ -133,7 +133,7 @@ func (wc *WrappedClient) GetAll(ctx context.Context, reprocess, prune, integrity } // commence prune, if requested - if prune { + if procOpt.Prune { err := wc.doPrune(cc) if err != nil { return fmt.Errorf("processing completed, but error pruning: %v", err) @@ -158,7 +158,7 @@ func (wc *WrappedClient) prepareCheckpoint(tf Timeframe) []byte { func (wc *WrappedClient) successCleanup() error { // clear checkpoint - _, err := wc.tl.db.Exec(`UPDATE accounts SET checkpoint=NULL WHERE id=?`, wc.acc.ID) // TODO: limit 1 + _, err := wc.tl.db.Exec(`UPDATE accounts SET checkpoint=NULL WHERE id=?`, wc.acc.ID) // TODO: limit 1 (see https://github.com/mattn/go-sqlite3/pull/802) if err != nil { return fmt.Errorf("clearing checkpoint: %v", err) } @@ -181,22 +181,23 @@ func (wc *WrappedClient) successCleanup() error { // Import is like GetAll but for a locally-stored archive or export file that can // simply be opened and processed, rather than needing to run over a network. See // the godoc for GetAll. This is only for data sources that support Import. -func (wc *WrappedClient) Import(ctx context.Context, filename string, reprocess, prune, integrity bool) error { +func (wc *WrappedClient) Import(ctx context.Context, filename string, procOpt ProcessingOptions) error { if wc.Client == nil { return fmt.Errorf("no client") } var cc concurrentCuckoo - if prune { + if procOpt.Prune { cc.Filter = cuckoo.NewFilter(10000000) // 10mil = ~16 MB on 64-bit cc.Mutex = new(sync.Mutex) } - wg, ch := wc.beginProcessing(cc, reprocess, integrity) + wg, ch := wc.beginProcessing(cc, procOpt) - err := wc.Client.ListItems(ctx, ch, Options{ + err := wc.Client.ListItems(ctx, ch, ListingOptions{ Filename: filename, Checkpoint: wc.acc.checkpoint, + Timeframe: procOpt.Timeframe, }) if err != nil { return fmt.Errorf("importing: %v", err) @@ -211,7 +212,7 @@ func (wc *WrappedClient) Import(ctx context.Context, filename string, reprocess, } // commence prune, if requested - if prune { + if procOpt.Prune { err := wc.doPrune(cc) if err != nil { return fmt.Errorf("processing completed, but error pruning: %v", err) @@ -306,7 +307,7 @@ func (wc *WrappedClient) deleteItem(rowID int64) error { return fmt.Errorf("querying count of rows sharing data file: %v", err) } - _, err = wc.tl.db.Exec(`DELETE FROM items WHERE id=?`, rowID) // TODO: limit 1 + _, err = wc.tl.db.Exec(`DELETE FROM items WHERE id=?`, rowID) // TODO: limit 1 (see https://github.com/mattn/go-sqlite3/pull/802) if err != nil { return fmt.Errorf("deleting item from DB: %v", err) }