Merge modifications made by Aaron. (#3638)

* Add kingpin library for argument handling.
* Update hard-coded Version number in chunkstore.
* Store noms repo information in ipfs home directory.
This commit is contained in:
Dan Willhite
2017-08-24 17:50:15 -07:00
committed by GitHub
parent 23e0bb388b
commit 95aac7ca1e
12 changed files with 123 additions and 202 deletions
+17 -16
View File
@@ -5,6 +5,7 @@ import (
"fmt"
"io/ioutil"
"os"
"path"
"reflect"
"sync"
@@ -15,6 +16,7 @@ import (
"github.com/attic-labs/noms/go/chunks"
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/hash"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/mitchellh/go-homedir"
"github.com/ipfs/go-ipfs/blocks/blockstore"
"github.com/ipfs/go-ipfs/blockservice"
"github.com/ipfs/go-ipfs/core"
@@ -43,14 +45,7 @@ var (
// write. If local is false, then reads will fall through to the network and
// blocks stored will be exposed to the entire IPFS network.
func NewChunkStore(name string, local bool) *chunkStore {
env := "IPFS_PATH"
if local {
env = "IPFS_LOCAL_PATH"
}
p := os.Getenv(env)
if p == "" {
p = "~/.ipfs"
}
p := getIPFSDir(local)
r, err := fsrepo.Open(p)
d.CheckError(err)
@@ -195,7 +190,7 @@ func (cs *chunkStore) Put(c chunks.Chunk) {
func (cs *chunkStore) Version() string {
// TODO: Store this someplace in the DB root
return "7.13"
return "7.14"
}
func (cs *chunkStore) Rebase() {
@@ -240,7 +235,7 @@ func (cs *chunkStore) Commit(current, last hash.Hash) bool {
// TODO: Optimistic concurrency?
cid := nomsHashToCID(current)
dir := cs.getLocalNamesDir()
dir := getIPFSDir(cs.local)
err := os.MkdirAll(dir, 0755)
d.PanicIfError(err)
err = ioutil.WriteFile(cs.getLocalNameFile(cs.name), []byte(cid.String()), 0644)
@@ -250,16 +245,22 @@ func (cs *chunkStore) Commit(current, last hash.Hash) bool {
return true
}
func (cs *chunkStore) getLocalNamesDir() string {
if cs.local {
return os.ExpandEnv("$HOME/.noms/ipfs-local")
} else {
return os.ExpandEnv("$HOME/.noms/ipfs")
func getIPFSDir(local bool) string {
env := "IPFS_PATH"
if local {
env = "IPFS_LOCAL_PATH"
}
p := os.Getenv(env)
if p == "" {
p = "~/.ipfs"
}
p, err := homedir.Expand(p)
d.Chk.NoError(err)
return p
}
func (cs *chunkStore) getLocalNameFile(name string) string {
return cs.getLocalNamesDir() + "/" + name
return path.Join(getIPFSDir(cs.local), name)
}
func (cs *chunkStore) Stats() interface{} {
+49
View File
@@ -0,0 +1,49 @@
// Copyright 2017 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/datas"
"github.com/attic-labs/noms/go/ipfs"
"github.com/attic-labs/noms/go/spec"
"github.com/attic-labs/noms/samples/go/ipfs-chat/dbg"
)
func runDaemon(topic string, interval time.Duration, networkDS, localDS string) {
dbg.SetLogger(log.New(os.Stdout, "", 0))
sourceSp, err := spec.ForDataset(networkDS)
d.CheckErrorNoUsage(err)
source := sourceSp.GetDataset()
source, err = InitDatabase(source)
d.PanicIfError(err)
destSp, err := spec.ForDataset(localDS)
d.CheckErrorNoUsage(err)
dest := destSp.GetDataset()
dest, err = InitDatabase(dest)
d.PanicIfError(err)
fmt.Printf("Replicating %s to %s...\n", sourceSp.String(), destSp.String())
sub, err := ipfs.CurrentNode.Floodsub.Subscribe(topic)
d.PanicIfError(err)
go Replicate(sub, source, dest, func(ds datas.Dataset) {
dest = ds
})
for {
s := dest.HeadRef().TargetHash().String()
fmt.Println("publishing: " + s)
Publish(sub, topic, s)
time.Sleep(interval)
}
}
+10 -12
View File
@@ -8,27 +8,25 @@ import (
"errors"
"fmt"
"os"
"regexp"
"strings"
"path/filepath"
"regexp"
"sort"
"strings"
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/marshal"
"github.com/attic-labs/noms/go/spec"
"github.com/attic-labs/noms/go/types"
"github.com/attic-labs/noms/go/util/datetime"
"github.com/attic-labs/noms/samples/go/ipfs-chat/lib"
"golang.org/x/net/html"
"sort"
)
var (
character = ""
msgs = []lib.Message{}
msgs = []Message{}
)
func importScript(dir string, dsSpec string) error {
func runImport(dir, dsSpec string) error {
filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if path == dir {
return nil
@@ -54,7 +52,7 @@ func importScript(dir string, dsSpec string) error {
sp, err := spec.ForDataset(dsSpec)
d.CheckErrorNoUsage(err)
ds := sp.GetDataset()
ds, err = lib.InitDatabase(ds)
ds, err = InitDatabase(ds)
d.PanicIfError(err)
fmt.Println("Creating msg map")
@@ -65,9 +63,9 @@ func importScript(dir string, dsSpec string) error {
m := types.NewMap(kvPairs...)
fmt.Println("Creating index")
ti := lib.NewTermIndex(types.NewMap()).Edit()
ti := NewTermIndex(types.NewMap()).Edit()
for _, msg := range msgs {
terms := lib.GetTerms(msg)
terms := GetTerms(msg)
ti.InsertAll(terms, types.String(msg.ID()))
}
termDocs := ti.Value(nil).TermDocs
@@ -90,7 +88,7 @@ outer:
}
sort.Strings(users)
fmt.Println("Committing data")
root := lib.Root{Messages: m, Index: termDocs, Users: users}
root := Root{Messages: m, Index: termDocs, Users: users}
_, err = ds.Database().CommitValue(ds, marshal.MustMarshal(root))
return err
}
@@ -103,7 +101,7 @@ func extractDialog(n *html.Node) {
}
if character != "" && n.Type == html.TextNode {
//fmt.Println("Dialog:", strings.TrimSpace(n.Data))
msg := lib.Message{
msg := Message{
Ordinal: uint64(len(msgs)),
Author: character,
Body: strings.TrimSpace(n.Data),
@@ -1,8 +0,0 @@
This program is a hack to simulate filecoin. It guarantees that chats get
replicated somewhere on the network.
Once filecoin exists, this won't be necessary.
Another approach in a blockstack-type world would be to make each user
have their own noms backend that they own/pay for. Then when a user chats,
the data goes into their own db, and they sync between them.
-79
View File
@@ -1,79 +0,0 @@
// Copyright 2017 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package main
import (
"flag"
"fmt"
"log"
"os"
"time"
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/datas"
"github.com/attic-labs/noms/go/ipfs"
"github.com/attic-labs/noms/go/spec"
"github.com/attic-labs/noms/samples/go/ipfs-chat/dbg"
"github.com/attic-labs/noms/samples/go/ipfs-chat/lib"
)
var (
topic = flag.String("topic", "ipfs-chat", "topic to subscribe to for notifications of changes")
interval = flag.Duration("interval", 5*time.Second, "rate to publish current head to network")
)
func main() {
flag.Parse()
flag.Usage = func() {
fmt.Println("ipfs-chatd [flags] <ipfs-dataset> <ipfs-local-dataset>")
flag.PrintDefaults()
return
}
if *topic == "" {
fmt.Fprintln(os.Stderr, "--topic cannot be empty")
return
}
if *interval == time.Duration(0) {
fmt.Fprintln(os.Stderr, "--interval cannot be empty")
return
}
if flag.NArg() < 2 {
fmt.Fprintln(os.Stderr, "Insufficient arguments")
return
}
dbg.SetLogger(log.New(os.Stdout, "", 0))
sourceSp, err := spec.ForDataset(flag.Arg(0))
d.CheckErrorNoUsage(err)
source := sourceSp.GetDataset()
source, err = lib.InitDatabase(source)
d.PanicIfError(err)
destSp, err := spec.ForDataset(flag.Arg(1))
d.CheckErrorNoUsage(err)
dest := destSp.GetDataset()
dest, err = lib.InitDatabase(dest)
d.PanicIfError(err)
fmt.Printf("Replicating %s to %s...\n", sourceSp.String(), destSp.String())
sub, err := ipfs.CurrentNode.Floodsub.Subscribe(*topic)
d.PanicIfError(err)
go lib.Replicate(sub, source, dest, func(ds datas.Dataset) {
dest = ds
})
for {
s := dest.HeadRef().TargetHash().String()
fmt.Println("publishing: " + s)
lib.Publish(sub, *topic, s)
time.Sleep(*interval)
}
}
@@ -1,55 +0,0 @@
// Copyright 2017 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package main
import (
"flag"
cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
mh "gx/ipfs/QmU9a9NV9RdPNwZQDYd5uKsm6N6LJLSvLbywDDYFbaaC6P/go-multihash"
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/hash"
"fmt"
"os"
)
func main() {
nomsHash := flag.String("nh", "", "noms hash to translate to ipfs hash")
ipfsHash := flag.String("ih", "", "ipfs hash to translate to noms hash")
flag.Parse()
if *nomsHash != "" {
nh, ok := hash.MaybeParse(*nomsHash)
if !ok {
fmt.Printf("unable to parse noms hash: %s\n", *nomsHash)
os.Exit(1)
}
ih := nomsHashToCID(nh)
fmt.Println("ipfs hash:", ih.String())
return
}
ih, err := cid.Decode(*ipfsHash)
if err != nil {
fmt.Printf("unable to parse ipfs hash: %s, error: %s\n", *ipfsHash, err)
os.Exit(1)
}
nh := cidToNomsHash(ih)
fmt.Println("noms hash:", nh.String())
}
func nomsHashToCID(nh hash.Hash) *cid.Cid {
mhb, err := mh.Encode(nh[:], mh.SHA2_512)
d.PanicIfError(err)
return cid.NewCidV1(cid.Raw, mhb)
}
func cidToNomsHash(id *cid.Cid) (h hash.Hash) {
dmh, err := mh.Decode([]byte(id.Hash()))
d.PanicIfError(err)
copy(h[:], dmh.Digest)
return
}
+42 -27
View File
@@ -5,10 +5,8 @@
package main
import (
"flag"
"fmt"
"log"
"os"
"regexp"
"strings"
"time"
@@ -21,8 +19,8 @@ import (
"github.com/attic-labs/noms/go/types"
"github.com/attic-labs/noms/go/util/math"
"github.com/attic-labs/noms/samples/go/ipfs-chat/dbg"
"github.com/attic-labs/noms/samples/go/ipfs-chat/lib"
"github.com/jroimartin/gocui"
"gopkg.in/alecthomas/kingpin.v2"
)
const (
@@ -37,40 +35,57 @@ const (
var (
viewNames = []string{users, messages, input}
firstLayout = true
username = flag.String("username", "", "username to tag messages with")
topic = flag.String("pubsub-topic", "ipfs-chat", "topic to subscribe to for notifications of changes")
importDir = flag.String("import", "", "path to directory containing script files for import")
)
func main() {
flag.Parse()
// allow short (-h) help
kingpin.CommandLine.HelpFlag.Short('h')
if *importDir != "" {
importScript(*importDir, flag.Arg(0))
return
clientCmd := kingpin.Command("client", "runs the ipfs-chat client UI")
clientTopic := clientCmd.Flag("topic", "IPFS pubsub topic to publish and subscribe to").Default("ipfs-chat").String()
username := clientCmd.Flag("username", "username to sign in as").String()
clientDS := clientCmd.Arg("dataset", "the dataset spec to store chat data in").Required().String()
importCmd := kingpin.Command("import", "imports data into a chat")
importDir := importCmd.Flag("dir", "directory that contains data to import").Default("./data").ExistingDir()
importDS := importCmd.Arg("dataset", "the dataset spec to import chat data to").Required().String()
daemonCmd := kingpin.Command("daemon", "runs a daemon that simulates filecoin, eagerly storing all chunks for a chat")
daemonTopic := daemonCmd.Flag("topic", "IPFS pubsub topic to publish and subscribe to").Default("ipfs-chat").String()
daemonInterval := daemonCmd.Flag("interval", "amount of time to wait before publishing state to network").Default("5s").Duration()
daemonNetworkDS := daemonCmd.Arg("network-dataset", "the dataset spec to use to read and write data to the IPFS network").Required().String()
daemonLocalDS := daemonCmd.Arg("local-dataset", "the dataset spec to use to read and write data locally").Required().String()
kingpin.CommandLine.Help = "A demonstration of using Noms to build a scalable multiuser collaborative application."
switch kingpin.Parse() {
case "client":
runClient(*username, *clientTopic, *clientDS)
case "import":
runImport(*importDir, *importDS)
case "daemon":
runDaemon(*daemonTopic, *daemonInterval, *daemonNetworkDS, *daemonLocalDS)
}
}
func runClient(username, topic, clientDS string) {
var displayingSearchResults = false
if *username == "" {
fmt.Fprintln(os.Stderr, "--username required")
return
}
dsSpec := flag.Arg(0)
dsSpec := clientDS
sp, err := spec.ForDataset(dsSpec)
d.CheckErrorNoUsage(err)
ds := sp.GetDataset()
ds, err = lib.InitDatabase(ds)
ds, err = InitDatabase(ds)
d.PanicIfError(err)
g, err := gocui.NewGui(gocui.Output256)
d.PanicIfError(err)
defer g.Close()
sub, err := ipfs.CurrentNode.Floodsub.Subscribe(*topic)
sub, err := ipfs.CurrentNode.Floodsub.Subscribe(topic)
d.PanicIfError(err)
go lib.Replicate(sub, ds, ds, func(nds datas.Dataset) {
go Replicate(sub, ds, ds, func(nds datas.Dataset) {
ds = nds
if displayingSearchResults || !textScrolledToEnd(g) {
g.Execute(func(g *gocui.Gui) (err error) {
@@ -110,7 +125,7 @@ func main() {
updateMessages(g, msgView, ds, nil, nil)
return nil
}
sIds, sTerms, nds, err := handleEnter(buf, *username, time.Now(), ds)
sIds, sTerms, nds, err := handleEnter(buf, username, time.Now(), ds)
if err != nil {
return
}
@@ -121,7 +136,7 @@ func main() {
if displayingSearchResults {
return
}
lib.Publish(sub, *topic, ds.HeadRef().TargetHash().String())
Publish(sub, topic, ds.HeadRef().TargetHash().String())
return
}))
d.PanicIfError(g.SetKeybinding("", gocui.KeyF1, gocui.ModNone, debugInfo))
@@ -193,7 +208,7 @@ func (dp *dataPager) Next() (string, bool) {
}
nm := dp.msgMap.Get(msgKey)
var m lib.Message
var m Message
err := marshal.Unmarshal(nm, &m)
if err != nil {
return fmt.Sprintf("ERROR: %s", err.Error()), true
@@ -215,7 +230,7 @@ func updateMessages(g *gocui.Gui, v *gocui.View, ds datas.Dataset, filterIds *ty
}
doneChan := make(chan struct{})
msgMap, msgKeyChan, err := lib.ListMessages(ds, filterIds, doneChan)
msgMap, msgKeyChan, err := ListMessages(ds, filterIds, doneChan)
d.PanicIfError(err)
dp = dataPager{
dataset: ds,
@@ -244,7 +259,7 @@ func resetAuthors(g *gocui.Gui, ds datas.Dataset) {
v, err := g.View(users)
d.PanicIfError(err)
v.Clear()
for _, u := range lib.GetAuthors(ds) {
for _, u := range GetAuthors(ds) {
fmt.Fprintln(v, u)
}
}
@@ -309,8 +324,8 @@ func scrollView(v *gocui.View, dy, lineCnt int) {
func handleEnter(body string, author string, clientTime time.Time, ds datas.Dataset) (*types.Map, []string, datas.Dataset, error) {
if strings.HasPrefix(body, searchPrefix) {
st := lib.TermsFromString(body[len(searchPrefix):])
ids := lib.SearchIndex(ds, st)
st := TermsFromString(body[len(searchPrefix):])
ids := SearchIndex(ds, st)
return &ids, st, ds, nil
}
@@ -318,7 +333,7 @@ func handleEnter(body string, author string, clientTime time.Time, ds datas.Data
return nil, nil, ds, gocui.ErrQuit
}
ds, err := lib.AddMessage(body, author, clientTime, ds)
ds, err := AddMessage(body, author, clientTime, ds)
return nil, nil, ds, err
}
@@ -2,7 +2,7 @@
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package lib
package main
import (
"fmt"
@@ -2,7 +2,7 @@
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package lib
package main
import (
"testing"
@@ -2,7 +2,7 @@
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package lib
package main
import (
"context"
@@ -2,7 +2,7 @@
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package lib
package main
import (
"sync"
@@ -2,7 +2,7 @@
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package lib
package main
import (
"strings"