1
0
mirror of https://github.com/sjwhitworth/golearn.git synced 2025-05-09 19:29:30 +08:00

Merge pull request #82 from Sentimentron/edf-remove

Edf remove
This commit is contained in:
Stephen Whitworth 2014-09-25 07:32:27 +01:00
commit fc5eafd82b
15 changed files with 5603 additions and 1244 deletions

View File

@ -1,25 +1,17 @@
package base
import (
"fmt"
)
// BinaryAttributeGroups contain only BinaryAttributes
// Compact each Attribute to a bit for better storage
type BinaryAttributeGroup struct {
FixedAttributeGroup
}
func (b *BinaryAttributeGroup) RowSize() int {
return (len(b.attributes) + 7) / 8
func (b *BinaryAttributeGroup) String() string {
return "BinaryAttributeGroup"
}
// String gets a human-readable view of this group
func (b *BinaryAttributeGroup) String() string {
if len(b.alloc) > 1 {
return fmt.Sprintf("BinaryAttributeGroup(%d attributes\n thread: %d\n size: %d\n)", len(b.attributes), b.threadNo, b.size)
}
return fmt.Sprintf("BinaryAttributeGroup(%d attributes\n thread: %d\n size: %d\n)", len(b.attributes), b.threadNo, b.size)
func (b *BinaryAttributeGroup) RowSize() int {
return (len(b.attributes) + 7) / 8
}
func (b *BinaryAttributeGroup) getByteOffset(col, row int) int {

View File

@ -3,33 +3,31 @@ package base
import (
"bytes"
"fmt"
"github.com/sjwhitworth/golearn/base/edf"
"math"
"os"
"sync"
)
// DenseInstances stores each Attribute value explicitly
// in a large grid.
type DenseInstances struct {
storage *edf.EdfFile
agMap map[string]int
ags []AttributeGroup
lock sync.Mutex
fixed bool
classAttrs map[AttributeSpec]bool
maxRow int
attributes []Attribute
agMap map[string]int
ags []AttributeGroup
lock sync.Mutex
fixed bool
classAttrs map[AttributeSpec]bool
maxRow int
attributes []Attribute
tmpAttrAgMap map[Attribute]string
// Counters for each AttributeGroup type
floatRowSizeBytes int
catRowSizeBytes int
binRowSizeBits int
}
// NewDenseInstances generates a new DenseInstances set
// with an anonymous EDF mapping and default settings.
func NewDenseInstances() *DenseInstances {
storage, err := edf.EdfAnonMap()
if err != nil {
panic(err)
}
return &DenseInstances{
storage,
make(map[string]int),
make([]AttributeGroup, 0),
sync.Mutex{},
@ -37,6 +35,10 @@ func NewDenseInstances() *DenseInstances {
make(map[AttributeSpec]bool),
0,
make([]Attribute, 0),
make(map[Attribute]string),
0,
0,
0,
}
}
@ -54,34 +56,9 @@ func (inst *DenseInstances) createAttributeGroup(name string, size int) {
panic("Can't add additional Attributes")
}
// Resolve or create thread
threads, err := inst.storage.GetThreads()
if err != nil {
panic(err)
}
ok := false
for i := range threads {
if threads[i] == name {
ok = true
break
}
}
if ok {
panic("Can't create AttributeGroup: thread already exists")
}
// Write the pool's thread into the file
thread := edf.NewThread(inst.storage, name)
err = inst.storage.WriteThread(thread)
if err != nil {
panic(fmt.Sprintf("Can't write thread: %s", err))
}
// Create the AttributeGroup information
if size != 0 {
ag := new(FixedAttributeGroup)
ag.threadNo = thread.GetId()
ag.parent = inst
ag.attributes = make([]Attribute, 0)
ag.size = size
@ -89,7 +66,6 @@ func (inst *DenseInstances) createAttributeGroup(name string, size int) {
agAdd = ag
} else {
ag := new(BinaryAttributeGroup)
ag.threadNo = thread.GetId()
ag.parent = inst
ag.attributes = make([]Attribute, 0)
ag.size = size
@ -145,6 +121,7 @@ func (inst *DenseInstances) GetAttributeGroup(name string) (AttributeGroup, erro
//
// IMPORTANT: will panic if storage has been allocated via Extend.
func (inst *DenseInstances) AddAttribute(a Attribute) AttributeSpec {
var ok bool
inst.lock.Lock()
defer inst.lock.Unlock()
@ -152,21 +129,32 @@ func (inst *DenseInstances) AddAttribute(a Attribute) AttributeSpec {
panic("Can't add additional Attributes")
}
cur := 0
// Generate a default AttributeGroup name
ag := "FLOAT"
if _, ok := a.(*CategoricalAttribute); ok {
ag = "CAT"
generatingBinClass := false
if ag, ok = inst.tmpAttrAgMap[a]; ok {
// Retrieved the group id
} else if _, ok := a.(*CategoricalAttribute); ok {
inst.catRowSizeBytes += 8
cur = inst.catRowSizeBytes / os.Getpagesize()
ag = fmt.Sprintf("CAT%d", cur)
} else if _, ok := a.(*FloatAttribute); ok {
ag = "FLOAT"
inst.floatRowSizeBytes += 8
cur = inst.floatRowSizeBytes / os.Getpagesize()
ag = fmt.Sprintf("FLOAT%d", cur)
} else if _, ok := a.(*BinaryAttribute); ok {
ag = "BIN"
inst.binRowSizeBits++
cur = (inst.binRowSizeBits / 8) / os.Getpagesize()
ag = fmt.Sprintf("BIN%d", cur)
generatingBinClass = true
} else {
panic("Unrecognised Attribute type")
}
// Create the ag if it doesn't exist
if _, ok := inst.agMap[ag]; !ok {
if ag != "BIN" {
if !generatingBinClass {
inst.createAttributeGroup(ag, 8)
} else {
inst.createAttributeGroup(ag, 0)
@ -179,8 +167,8 @@ func (inst *DenseInstances) AddAttribute(a Attribute) AttributeSpec {
return AttributeSpec{id, len(p.Attributes()) - 1, a}
}
// AddAttributeToAttributeGroup adds an Attribute to a given ag
func (inst *DenseInstances) AddAttributeToAttributeGroup(newAttribute Attribute, ag string) (AttributeSpec, error) {
// addAttributeToAttributeGroup adds an Attribute to a given ag
func (inst *DenseInstances) addAttributeToAttributeGroup(newAttribute Attribute, ag string) (AttributeSpec, error) {
inst.lock.Lock()
defer inst.lock.Unlock()
@ -272,9 +260,15 @@ func (inst *DenseInstances) RemoveClassAttribute(a Attribute) error {
// AllClassAttributes returns a slice of Attributes which have
// been designated class Attributes.
func (inst *DenseInstances) AllClassAttributes() []Attribute {
var ret []Attribute
inst.lock.Lock()
defer inst.lock.Unlock()
return inst.allClassAttributes()
}
// allClassAttributes returns a slice of Attributes which have
// been designated class Attributes (doesn't lock)
func (inst *DenseInstances) allClassAttributes() []Attribute {
var ret []Attribute
for a := range inst.classAttrs {
if inst.classAttrs[a] {
ret = append(ret, a.attr)
@ -287,6 +281,47 @@ func (inst *DenseInstances) AllClassAttributes() []Attribute {
// Allocation functions
//
// realiseAttributeGroups decides which Attributes are going
// to be stored in which groups
func (inst *DenseInstances) realiseAttributeGroups() error {
for a := range inst.tmpAttrAgMap {
// Generate a default AttributeGroup name
ag := inst.tmpAttrAgMap[a]
// Augment with some additional information
// Find out whether this attribute is also a class
classAttribute := false
for _, c := range inst.allClassAttributes() {
if c.Equals(a) {
classAttribute = true
}
}
// This might result in multiple ClassAttribute groups
// but hopefully nothing too crazy
if classAttribute {
// ag = fmt.Sprintf("CLASS_%s", ag)
}
// Create the ag if it doesn't exist
if agId, ok := inst.agMap[ag]; !ok {
_, generatingBinClass := inst.ags[agId].(*BinaryAttributeGroup)
if !generatingBinClass {
inst.createAttributeGroup(ag, 8)
} else {
inst.createAttributeGroup(ag, 0)
}
}
id := inst.agMap[ag]
p := inst.ags[id]
err := p.AddAttribute(a)
if err != nil {
return err
}
}
return nil
}
// Extend extends this set of Instances to store rows additional rows.
// It's recommended to set rows to something quite large.
//
@ -296,30 +331,25 @@ func (inst *DenseInstances) Extend(rows int) error {
inst.lock.Lock()
defer inst.lock.Unlock()
// Get the size of each page
pageSize := inst.storage.GetPageSize()
if !inst.fixed {
err := inst.realiseAttributeGroups()
if err != nil {
return err
}
}
for _, p := range inst.ags {
// Compute ag row storage requirements
rowSize := p.RowSize()
// How many rows can we store per page?
rowsPerPage := float64(pageSize) / float64(rowSize)
// How many bytes?
allocSize := rows * rowSize
// How many pages?
pagesNeeded := uint32(math.Ceil(float64(rows) / rowsPerPage))
bytes := make([]byte, allocSize)
p.addStorage(bytes)
// Allocate those pages
r, err := inst.storage.AllocPages(pagesNeeded, p.getThreadNo())
if err != nil {
panic(fmt.Sprintf("Allocation error: %s (rowSize %d, pageSize %d, rowsPerPage %.2f, tried to allocate %d page(s) and extend by %d row(s))", err, rowSize, pageSize, rowsPerPage, pagesNeeded, rows))
}
// Resolve and assign those pages
byteBlock := inst.storage.ResolveRange(r)
for _, block := range byteBlock {
p.addStorage(block)
}
}
inst.fixed = true
inst.maxRow += rows

View File

@ -1,254 +0,0 @@
package edf
import (
"fmt"
)
// contentEntry structs are stored in contentEntry blocks
// which always at block 2.
type contentEntry struct {
// Which thread this entry is assigned to
thread uint32
// Which page this block starts at
Start uint32
// The page up to and including which the block ends
End uint32
}
func (e *EdfFile) extend(additionalPages uint32) error {
fileInfo, err := e.f.Stat()
if err != nil {
panic(err)
}
newSize := uint64(fileInfo.Size())/e.pageSize + uint64(additionalPages)
return e.truncate(int64(newSize))
}
func (e *EdfFile) getFreeMapSize() uint64 {
if e.f != nil {
fileInfo, err := e.f.Stat()
if err != nil {
panic(err)
}
return uint64(fileInfo.Size()) / e.pageSize
}
return uint64(EDF_SIZE) / e.pageSize
}
func (e *EdfFile) getContiguousOffset(pagesRequested uint32) (uint32, error) {
// Create the free bitmap
bitmap := make([]bool, e.getFreeMapSize())
for i := 0; i < 4; i++ {
bitmap[i] = true
}
// Traverse the contents table and build a free bitmap
block := uint64(2)
for {
// Get the range for this block
r := e.getPageRange(block, block)
if r.Start.Segment != r.End.Segment {
return 0, fmt.Errorf("Contents block split across segments")
}
bytes := e.m[r.Start.Segment]
bytes = bytes[r.Start.Byte : r.End.Byte+1]
// Get the address of the next contents block
block = uint64FromBytes(bytes)
if block != 0 {
// No point in checking this block for free space
continue
}
bytes = bytes[8:]
// Look for a blank entry in the table
for i := 0; i < len(bytes); i += 12 {
threadID := uint32FromBytes(bytes[i:])
if threadID == 0 {
continue
}
start := uint32FromBytes(bytes[i+4:])
end := uint32FromBytes(bytes[i+8:])
for j := start; j <= end; j++ {
if int(j) >= len(bitmap) {
break
}
bitmap[j] = true
}
}
break
}
// Look through the freemap and find a good spot
for i := 0; i < len(bitmap); i++ {
if bitmap[i] {
continue
}
for j := i; j < len(bitmap); j++ {
if !bitmap[j] {
diff := j - 1 - i
if diff > int(pagesRequested) {
return uint32(i), nil
}
}
}
}
return 0, nil
}
// addNewContentsBlock adds a new contents block in the next available space
func (e *EdfFile) addNewContentsBlock() error {
var toc contentEntry
// Find the next available offset
startBlock, err := e.getContiguousOffset(1)
if startBlock == 0 && err == nil {
// Increase the size of the file if necessary
e.extend(uint32(e.pageSize))
} else if err != nil {
return err
}
// Traverse the contents blocks looking for one with a blank NEXT pointer
block := uint64(2)
for {
// Get the range for this block
r := e.getPageRange(block, block)
if r.Start.Segment != r.End.Segment {
return fmt.Errorf("Contents block split across segments")
}
bytes := e.m[r.Start.Segment]
bytes = bytes[r.Start.Byte : r.End.Byte+1]
// Get the address of the next contents block
block = uint64FromBytes(bytes)
if block == 0 {
uint64ToBytes(uint64(startBlock), bytes)
break
}
}
// Add to the next available TOC space
toc.Start = startBlock
toc.End = startBlock + 1
toc.thread = 1 // SYSTEM thread
return e.addToTOC(&toc, false)
}
// addToTOC adds a ContentsEntry structure in the next available place
func (e *EdfFile) addToTOC(c *contentEntry, extend bool) error {
// Traverse the contents table looking for a free spot
block := uint64(2)
for {
// Get the range for this block
r := e.getPageRange(block, block)
if r.Start.Segment != r.End.Segment {
return fmt.Errorf("Contents block split across segments")
}
bytes := e.m[r.Start.Segment]
bytes = bytes[r.Start.Byte : r.End.Byte+1]
// Get the address of the next contents block
block = uint64FromBytes(bytes)
if block != 0 {
// No point in checking this block for free space
continue
}
bytes = bytes[8:]
// Look for a blank entry in the table
cur := 0
for {
threadID := uint32FromBytes(bytes)
if threadID == 0 {
break
}
cur += 12
bytes = bytes[12:]
if len(bytes) < 12 {
if extend {
// Append a new contents block and try again
e.addNewContentsBlock()
return e.addToTOC(c, false)
}
return fmt.Errorf("Can't add to contents: no space available")
}
}
// Write the contents information into this block
uint32ToBytes(c.thread, bytes)
bytes = bytes[4:]
uint32ToBytes(c.Start, bytes)
bytes = bytes[4:]
uint32ToBytes(c.End, bytes)
break
}
return nil
}
// AllocPages allocates a |pagesRequested| chunk of pages on the thread
// with the given identifier. Returns an edfRange describing the result.
func (e *EdfFile) AllocPages(pagesRequested uint32, thread uint32) (edfRange, error) {
var ret edfRange
var toc contentEntry
// Parameter check
if pagesRequested == 0 {
return ret, fmt.Errorf("Must request some pages")
}
if thread == 0 {
return ret, fmt.Errorf("Need a valid page identifier")
}
// Find the next available offset
startBlock, err := e.getContiguousOffset(pagesRequested)
if startBlock == 0 && err == nil {
// Increase the size of the file if necessary
e.extend(pagesRequested)
return e.AllocPages(pagesRequested, thread)
} else if err != nil {
return ret, err
}
// Add to the table of contents
toc.thread = thread
toc.Start = startBlock
toc.End = startBlock + pagesRequested
err = e.addToTOC(&toc, true)
// Compute the range
ret = e.getPageRange(uint64(startBlock), uint64(startBlock+pagesRequested))
return ret, err
}
// getThreadBlocks returns EdfRanges containing blocks assigned to a given thread.
func (e *EdfFile) getThreadBlocks(thread uint32) ([]edfRange, error) {
var ret []edfRange
// Traverse the contents table
block := uint64(2)
for {
// Get the range for this block
r := e.getPageRange(block, block)
if r.Start.Segment != r.End.Segment {
return nil, fmt.Errorf("Contents block split across segments")
}
bytes := e.m[r.Start.Segment]
bytes = bytes[r.Start.Byte : r.End.Byte+1]
// Get the address of the next contents block
block = uint64FromBytes(bytes)
bytes = bytes[8:]
// Look for matching contents entries
for {
threadID := uint32FromBytes(bytes)
if threadID == thread {
blockStart := uint32FromBytes(bytes[4:])
blockEnd := uint32FromBytes(bytes[8:])
r = e.getPageRange(uint64(blockStart), uint64(blockEnd))
ret = append(ret, r)
}
bytes = bytes[12:]
if len(bytes) < 12 {
break
}
}
// Time to stop
if block == 0 {
break
}
}
return ret, nil
}

View File

@ -1,70 +0,0 @@
package edf
import (
. "github.com/smartystreets/goconvey/convey"
"io/ioutil"
"os"
"testing"
)
func TestAllocFixed(t *testing.T) {
Convey("Creating a non-existent file should succeed", t, func() {
tempFile, err := ioutil.TempFile(os.TempDir(), "TestFileCreate")
So(err, ShouldEqual, nil)
Convey("Mapping the file should succeed", func() {
mapping, err := edfMap(tempFile, EDF_CREATE)
So(err, ShouldEqual, nil)
Convey("Allocation should succeed", func() {
r, err := mapping.AllocPages(1, 2)
So(err, ShouldEqual, nil)
So(r.Start.Byte, ShouldEqual, 4*os.Getpagesize())
So(r.Start.Segment, ShouldEqual, 0)
Convey("Unmapping the file should succeed", func() {
err = mapping.unmap(EDF_UNMAP_SYNC)
So(err, ShouldEqual, nil)
Convey("Remapping the file should succeed", func() {
mapping, err = edfMap(tempFile, EDF_READ_ONLY)
Convey("Should get the same allocations back", func() {
rr, err := mapping.getThreadBlocks(2)
So(err, ShouldEqual, nil)
So(len(rr), ShouldEqual, 1)
So(rr[0], ShouldResemble, r)
})
})
})
})
})
})
}
func TestAllocWithExtraContentsBlock(t *testing.T) {
Convey("Creating a non-existent file should succeed", t, func() {
tempFile, err := ioutil.TempFile(os.TempDir(), "TestFileCreate")
So(err, ShouldEqual, nil)
Convey("Mapping the file should succeed", func() {
mapping, err := edfMap(tempFile, EDF_CREATE)
So(err, ShouldEqual, nil)
Convey("Allocation of 10 pages should succeed", func() {
allocated := make([]edfRange, 10)
for i := 0; i < 10; i++ {
r, err := mapping.AllocPages(1, 2)
So(err, ShouldEqual, nil)
allocated[i] = r
}
Convey("Unmapping the file should succeed", func() {
err = mapping.unmap(EDF_UNMAP_SYNC)
So(err, ShouldEqual, nil)
Convey("Remapping the file should succeed", func() {
mapping, err = edfMap(tempFile, EDF_READ_ONLY)
Convey("Should get the same allocations back", func() {
rr, err := mapping.getThreadBlocks(2)
So(err, ShouldEqual, nil)
So(len(rr), ShouldEqual, 10)
So(rr, ShouldResemble, allocated)
})
})
})
})
})
})
}

View File

@ -1,40 +0,0 @@
package edf
// map.go: handles mmaping, truncation, header creation, verification,
// creation of initial thread contents block (todo)
// creation of initial thread metadata block (todo)
// thread.go: handles extending thread contents block (todo)
// extending thread metadata block (todo), adding threads (todo),
// retrieving the segments and offsets relevant to a thread (todo)
// resolution of threads by name (todo)
// appending data to a thread (todo)
// deleting threads (todo)
const (
// EDF_VERSION is the file format version
EDF_VERSION = 1
// EDF_LENGTH is th number of OS pages in each slice
EDF_LENGTH = 32
// EDF_SIZE sets the maximum size of the mapping, represented with
// EDF_LENGTH segments
// Currently set arbitrarily to 128 MiB
EDF_SIZE = 128 * (1024 * 1024)
)
const (
// EDF_READ_ONLY means the file will only be read, modifications fail
EDF_READ_ONLY = iota
// EDF_READ_WRITE specifies that the file will be read and written
EDF_READ_WRITE
// EDF_CREATE means the file will be created and opened with EDF_READ_WRITE
EDF_CREATE
)
const (
// EDF_UNMAP_NOSYNC means the file won't be
// Sync'd to disk before unmapping
EDF_UNMAP_NOSYNC = iota
// EDF_UNMAP_SYNC synchronises the EDF file to disk
// during unmapping
EDF_UNMAP_SYNC
)

View File

@ -1,424 +0,0 @@
package edf
import (
"fmt"
mmap "github.com/Sentimentron/go-mmap"
"os"
"runtime"
"runtime/debug"
)
type edfMode int
const (
edfFileMode edfMode = iota
edfAnonMode
edfFreedMode
)
// EdfFile represents a mapped file on disk or
// and anonymous mapping for instance storage
type EdfFile struct {
f *os.File
m []mmap.Mmap
segmentSize uint64
pageSize uint64
mode edfMode
}
// GetPageSize returns the pageSize of an EdfFile
func (e *EdfFile) GetPageSize() uint64 {
return e.pageSize
}
// edfPosition represents a start and finish point
// within the mapping
type edfPosition struct {
Segment uint64
Byte uint64
}
// edfRange represents a start and an end segment
// mapped in an EdfFile and also the byte offsets
// within that segment
type edfRange struct {
Start edfPosition
End edfPosition
segmentSize uint64
}
// Size returns the size (in bytes) of a given edfRange
func (r *edfRange) Size() uint64 {
ret := uint64(r.End.Segment-r.Start.Segment) * r.segmentSize
ret += uint64(r.End.Byte - r.Start.Byte)
return ret
}
// edfCallFree is a half-baked finalizer called on garbage
// collection to ensure that the mapping gets freed
func edfCallFree(e *EdfFile) {
e.unmap(EDF_UNMAP_NOSYNC)
}
// EdfAnonMap maps the EdfFile structure into RAM
// IMPORTANT: everything's lost if unmapped
func EdfAnonMap() (*EdfFile, error) {
var err error
// Allocate return structure
ret := new(EdfFile)
// Create mapping references
ret.m = make([]mmap.Mmap, 0)
// Get the page size
pageSize := int64(os.Getpagesize())
// Segment size is the size of each mapped region
ret.pageSize = uint64(pageSize)
ret.segmentSize = uint64(EDF_LENGTH) * uint64(os.Getpagesize())
// Set the mode
ret.mode = edfAnonMode
// Allocate 4 pages initially
ret.truncate(4)
// Generate the header
ret.createHeader()
err = ret.writeInitialData()
// Make sure this gets unmapped on garbage collection
runtime.SetFinalizer(ret, edfCallFree)
return ret, err
}
// edfMap takes an os.File and returns an EdfMappedFile
// structure, which represents the mmap'd underlying file
//
// The `mode` parameter takes the following values
// EDF_CREATE: edfMap will truncate the file to the right length and write the correct header information
// EDF_READ_WRITE: edfMap will verify header information
// EDF_READ_ONLY: edfMap will verify header information
// IMPORTANT: EDF_LENGTH (edf.go) controls the size of the address
// space mapping. This means that the file can be truncated to the
// correct size without remapping. On 32-bit systems, this
// is set to 2GiB.
func edfMap(f *os.File, mode int) (*EdfFile, error) {
var err error
// Set up various things
ret := new(EdfFile)
ret.f = f
ret.m = make([]mmap.Mmap, 0)
// Figure out the flags
protFlags := mmap.PROT_READ
if mode == EDF_READ_WRITE || mode == EDF_CREATE {
protFlags |= mmap.PROT_WRITE
}
mapFlags := mmap.MAP_FILE | mmap.MAP_SHARED
// Get the page size
pageSize := int64(os.Getpagesize())
// Segment size is the size of each mapped region
ret.pageSize = uint64(pageSize)
ret.segmentSize = uint64(EDF_LENGTH) * uint64(os.Getpagesize())
// Map the file
for i := int64(0); i < EDF_SIZE; i += int64(EDF_LENGTH) * pageSize {
thisMapping, err := mmap.Map(f, i*pageSize, int(int64(EDF_LENGTH)*pageSize), protFlags, mapFlags)
if err != nil {
// TODO: cleanup
return nil, err
}
ret.m = append(ret.m, thisMapping)
}
// Verify or generate the header
if mode == EDF_READ_WRITE || mode == EDF_READ_ONLY {
err = ret.verifyHeader()
if err != nil {
return nil, err
}
} else if mode == EDF_CREATE {
err = ret.truncate(4)
if err != nil {
return nil, err
}
ret.createHeader()
err = ret.writeInitialData()
} else {
err = fmt.Errorf("Unrecognised flags")
}
// Make sure this gets unmapped on garbage collection
runtime.SetFinalizer(ret, edfCallFree)
// Set the mode
ret.mode = edfFileMode
return ret, err
}
// getByteRange returns the segment offset and range of
// two positions in the file.
func (e *EdfFile) getByteRange(byteStart uint64, byteEnd uint64) edfRange {
var ret edfRange
ret.Start.Segment = byteStart / e.segmentSize
ret.End.Segment = byteEnd / e.segmentSize
ret.Start.Byte = byteStart % e.segmentSize
ret.End.Byte = byteEnd % e.segmentSize
ret.segmentSize = e.segmentSize
return ret
}
// getPageRange returns the segment offset and range of
// two pages in the file.
func (e *EdfFile) getPageRange(pageStart uint64, pageEnd uint64) edfRange {
return e.getByteRange(pageStart*e.pageSize, pageEnd*e.pageSize+e.pageSize-1)
}
// verifyHeader checks that this version of Golearn can
// read the file presented.
func (e *EdfFile) verifyHeader() error {
// Check the magic bytes
diff := (e.m[0][0] ^ byte('G')) | (e.m[0][1] ^ byte('O'))
diff |= (e.m[0][2] ^ byte('L')) | (e.m[0][3] ^ byte('N'))
if diff != 0 {
return fmt.Errorf("Invalid magic bytes")
}
// Check the file version
version := uint32FromBytes(e.m[0][4:8])
if version != EDF_VERSION {
return fmt.Errorf("Unsupported version: %d", version)
}
// Check the page size
pageSize := uint32FromBytes(e.m[0][8:12])
if pageSize != uint32(os.Getpagesize()) {
return fmt.Errorf("Unsupported page size: (file: %d, system: %d", pageSize, os.Getpagesize())
}
return nil
}
// createHeader writes a valid header file into the file.
// Unexported since it can cause data loss.
func (e *EdfFile) createHeader() {
e.m[0][0] = byte('G')
e.m[0][1] = byte('O')
e.m[0][2] = byte('L')
e.m[0][3] = byte('N')
uint32ToBytes(EDF_VERSION, e.m[0][4:8])
uint32ToBytes(uint32(os.Getpagesize()), e.m[0][8:12])
e.sync()
}
// writeInitialData writes system thread information
func (e *EdfFile) writeInitialData() error {
var t thread
t.name = "SYSTEM"
t.id = 1
err := e.WriteThread(&t)
if err != nil {
return err
}
t.name = "FIXED"
t.id = 2
err = e.WriteThread(&t)
return err
}
// getThreadCount returns the number of threads in this file.
func (e *EdfFile) getThreadCount() uint32 {
// The number of threads is stored in bytes 12-16 in the header
return uint32FromBytes(e.m[0][12:])
}
// incrementThreadCount increments the record of the number
// of threads in this file
func (e *EdfFile) incrementThreadCount() uint32 {
cur := e.getThreadCount()
cur++
uint32ToBytes(cur, e.m[0][12:])
return cur
}
// GetThreads returns the thread identifier -> name map.
func (e *EdfFile) GetThreads() (map[uint32]string, error) {
ret := make(map[uint32]string)
count := e.getThreadCount()
// The starting block
block := uint64(1)
for {
// Decode the block offset
r := e.getPageRange(block, block)
if r.Start.Segment != r.End.Segment {
return nil, fmt.Errorf("thread range split across segments")
}
bytes := e.m[r.Start.Segment]
bytes = bytes[r.Start.Byte : r.End.Byte+1]
// The first 8 bytes say where to go next
block = uint64FromBytes(bytes)
bytes = bytes[8:]
for {
length := uint32FromBytes(bytes)
if length == 0 {
break
}
t := &thread{}
size := t.Deserialize(bytes)
bytes = bytes[size:]
ret[t.id] = t.name[0:len(t.name)]
}
// If next block offset is zero, no more threads to read
if block == 0 {
break
}
}
// Hey? What's wrong with you!
if len(ret) != int(count) {
return ret, fmt.Errorf("thread mismatch: %d/%d, indicates possible corruption", len(ret), count)
}
return ret, nil
}
// sync writes information to physical storage.
func (e *EdfFile) sync() error {
// Do nothing if we're mapped anonymously
if e.mode == edfAnonMode {
return nil
}
for _, m := range e.m {
err := m.Sync(mmap.MS_SYNC)
if err != nil {
return err
}
}
return nil
}
// truncate changes the size of the underlying file
// The size of the address space doesn't change.
func (e *EdfFile) truncateFile(size int64) error {
pageSize := int64(os.Getpagesize())
newSize := pageSize * size
// Synchronise
// e.sync()
// Double-check that we're not reducing file size
fileInfo, err := e.f.Stat()
if err != nil {
return err
}
if fileInfo.Size() > newSize {
return fmt.Errorf("Can't reduce file size!")
}
// Truncate the file
err = e.f.Truncate(newSize)
if err != nil {
return err
}
// Verify that the file is larger now than it was
fileInfo, err = e.f.Stat()
if err != nil {
return err
}
if fileInfo.Size() != newSize {
return fmt.Errorf("Truncation failed: %d, %d", fileInfo.Size(), newSize)
}
return err
}
func (e *EdfFile) truncateMem(size int64) error {
pageSize := int64(os.Getpagesize())
newSize := pageSize * size
currentSize := 0
for _, m := range e.m {
currentSize += len(m)
}
if int64(currentSize) > newSize {
return fmt.Errorf("Can't reduce size")
}
// Allocate some more memory
for i := uint64(currentSize); i < uint64(newSize); i += e.segmentSize {
newMap := make([]byte, e.segmentSize)
e.m = append(e.m, newMap)
}
return nil
}
func (e *EdfFile) truncate(size int64) error {
if e.mode == edfAnonMode {
return e.truncateMem(size)
} else if e.mode == edfFileMode {
return e.truncateFile(size)
}
panic("Unsupported")
}
// unmap unlinks the EdfFile from the address space.
// EDF_UNMAP_NOSYNC skips calling Sync() on the underlying
// file before this happens.
// IMPORTANT: attempts to use this mapping after unmap() is
// called will result in crashes.
func (e *EdfFile) unmap(flags int) error {
var ret error
// Check if the file has already been freed
if e.mode == edfFreedMode {
fmt.Fprintln(os.Stderr, "Potential double-free")
debug.PrintStack()
return nil
} else if e.mode == edfAnonMode {
// If it's anonymous, don't do anything
e.m = nil
e.mode = edfFreedMode
return nil
}
// Sync the file
if flags != EDF_UNMAP_NOSYNC {
e.sync()
}
e.mode = edfFreedMode
// Unmap the file
for _, m := range e.m {
err := m.Unmap()
if err != nil {
ret = err
}
}
return ret
}
// ResolveRange returns a slice of byte slices representing
// the underlying memory referenced by edfRange.
//
// WARNING: slow.
func (e *EdfFile) ResolveRange(r edfRange) [][]byte {
var ret [][]byte
segCounter := 0
for segment := r.Start.Segment; segment <= r.End.Segment; segment++ {
if segment == r.Start.Segment {
ret = append(ret, e.m[segment][r.Start.Byte:])
} else if segment == r.End.Segment {
ret = append(ret, e.m[segment][:r.End.Byte+1])
} else {
ret = append(ret, e.m[segment])
}
segCounter++
}
return ret
}

View File

@ -1,118 +0,0 @@
package edf
import (
. "github.com/smartystreets/goconvey/convey"
"io/ioutil"
"os"
"testing"
)
func TestAnonMap(t *testing.T) {
Convey("Anonymous mapping should succeed", t, func() {
mapping, err := EdfAnonMap()
So(err, ShouldEqual, nil)
bytes := mapping.m[0]
// Read the magic bytes
magic := bytes[0:4]
Convey("Magic bytes should be correct", func() {
So(magic[0], ShouldEqual, byte('G'))
So(magic[1], ShouldEqual, byte('O'))
So(magic[2], ShouldEqual, byte('L'))
So(magic[3], ShouldEqual, byte('N'))
})
// Read the file version
versionBytes := bytes[4:8]
Convey("Version should be correct", func() {
version := uint32FromBytes(versionBytes)
So(version, ShouldEqual, EDF_VERSION)
})
// Read the block size
blockBytes := bytes[8:12]
Convey("Page size should be correct", func() {
pageSize := uint32FromBytes(blockBytes)
So(pageSize, ShouldEqual, os.Getpagesize())
})
})
}
func TestFileCreate(t *testing.T) {
Convey("Creating a non-existent file should succeed", t, func() {
tempFile, err := ioutil.TempFile(os.TempDir(), "TestFileCreate")
So(err, ShouldEqual, nil)
Convey("Mapping the file should succeed", func() {
mapping, err := edfMap(tempFile, EDF_CREATE)
So(err, ShouldEqual, nil)
Convey("Unmapping the file should succeed", func() {
err = mapping.unmap(EDF_UNMAP_SYNC)
So(err, ShouldEqual, nil)
})
// Read the magic bytes
magic := make([]byte, 4)
read, err := tempFile.ReadAt(magic, 0)
Convey("Magic bytes should be correct", func() {
So(err, ShouldEqual, nil)
So(read, ShouldEqual, 4)
So(magic[0], ShouldEqual, byte('G'))
So(magic[1], ShouldEqual, byte('O'))
So(magic[2], ShouldEqual, byte('L'))
So(magic[3], ShouldEqual, byte('N'))
})
// Read the file version
versionBytes := make([]byte, 4)
read, err = tempFile.ReadAt(versionBytes, 4)
Convey("Version should be correct", func() {
So(err, ShouldEqual, nil)
So(read, ShouldEqual, 4)
version := uint32FromBytes(versionBytes)
So(version, ShouldEqual, EDF_VERSION)
})
// Read the block size
blockBytes := make([]byte, 4)
read, err = tempFile.ReadAt(blockBytes, 8)
Convey("Page size should be correct", func() {
So(err, ShouldEqual, nil)
So(read, ShouldEqual, 4)
pageSize := uint32FromBytes(blockBytes)
So(pageSize, ShouldEqual, os.Getpagesize())
})
// Check the file size is at least four * page size
info, err := tempFile.Stat()
Convey("File should be the right size", func() {
So(err, ShouldEqual, nil)
So(info.Size(), ShouldBeGreaterThanOrEqualTo, 4*os.Getpagesize())
})
})
})
}
func TestFileThreadCounter(t *testing.T) {
Convey("Creating a non-existent file should succeed", t, func() {
tempFile, err := ioutil.TempFile(os.TempDir(), "TestFileCreate")
So(err, ShouldEqual, nil)
Convey("Mapping the file should succeed", func() {
mapping, err := edfMap(tempFile, EDF_CREATE)
So(err, ShouldEqual, nil)
Convey("The file should have two threads to start with", func() {
count := mapping.getThreadCount()
So(count, ShouldEqual, 2)
Convey("They should be SYSTEM and FIXED", func() {
threads, err := mapping.GetThreads()
So(err, ShouldEqual, nil)
So(len(threads), ShouldEqual, 2)
So(threads[1], ShouldEqual, "SYSTEM")
So(threads[2], ShouldEqual, "FIXED")
})
})
Convey("Incrementing the threadcount should result in three threads", func() {
mapping.incrementThreadCount()
count := mapping.getThreadCount()
So(count, ShouldEqual, 3)
Convey("thread information should indicate corruption", func() {
_, err := mapping.GetThreads()
So(err, ShouldNotEqual, nil)
})
})
})
})
}

View File

@ -1,137 +0,0 @@
package edf
import (
"fmt"
)
// Threads are streams of data encapsulated within the file.
type thread struct {
name string
id uint32
}
// NewThread returns a new thread.
func NewThread(e *EdfFile, name string) *thread {
return &thread{name, e.getThreadCount() + 1}
}
// getSpaceNeeded the number of bytes needed to serialize this
// thread.
func (t *thread) getSpaceNeeded() int {
return 8 + len(t.name)
}
// Serialize copies this thread to the output byte slice
// Returns the number of bytes used.
func (t *thread) Serialize(out []byte) int {
// ret keeps track of written bytes
ret := 0
// Write the length of the name first
nameLength := len(t.name)
uint32ToBytes(uint32(nameLength), out)
out = out[4:]
ret += 4
// Then write the string
copy(out, t.name)
out = out[nameLength:]
ret += nameLength
// Then the thread number
uint32ToBytes(t.id, out)
ret += 4
return ret
}
// Deserialize copies the input byte slice into a thread.
func (t *thread) Deserialize(out []byte) int {
ret := 0
// Read the length of the thread's name
nameLength := uint32FromBytes(out)
ret += 4
out = out[4:]
// Copy out the string
t.name = string(out[:nameLength])
ret += int(nameLength)
out = out[nameLength:]
// Read the identifier
t.id = uint32FromBytes(out)
ret += 4
return ret
}
// FindThread obtains the index of a thread in the EdfFile.
func (e *EdfFile) FindThread(targetName string) (uint32, error) {
var offset uint32
var counter uint32
// Resolve the initial thread block
blockRange := e.getPageRange(1, 1)
if blockRange.Start.Segment != blockRange.End.Segment {
return 0, fmt.Errorf("thread block split across segments!")
}
bytes := e.m[blockRange.Start.Segment][blockRange.Start.Byte:blockRange.End.Byte]
// Skip the first 8 bytes, since we don't support multiple thread blocks yet
// TODO: fix that
bytes = bytes[8:]
counter = 1
for {
length := uint32FromBytes(bytes)
if length == 0 {
return 0, fmt.Errorf("No matching threads")
}
name := string(bytes[4 : 4+length])
if name == targetName {
offset = counter
break
}
bytes = bytes[8+length:]
counter++
}
return offset, nil
}
// WriteThread inserts a new thread into the EdfFile.
func (e *EdfFile) WriteThread(t *thread) error {
offset, _ := e.FindThread(t.name)
if offset != 0 {
return fmt.Errorf("Writing a duplicate thread")
}
// Resolve the initial thread block
blockRange := e.getPageRange(1, 1)
if blockRange.Start.Segment != blockRange.End.Segment {
return fmt.Errorf("thread block split across segments!")
}
bytes := e.m[blockRange.Start.Segment][blockRange.Start.Byte:blockRange.End.Byte]
// Skip the first 8 bytes, since we don't support multiple thread blocks yet
// TODO: fix that
bytes = bytes[8:]
cur := 0
for {
length := uint32FromBytes(bytes)
if length == 0 {
break
}
cur += 8 + int(length)
bytes = bytes[8+length:]
}
// cur should have now found an empty offset
// Check that we have enough room left to insert
roomLeft := len(bytes)
roomNeeded := t.getSpaceNeeded()
if roomLeft < roomNeeded {
return fmt.Errorf("Not enough space available")
}
// If everything's fine, serialise
t.Serialize(bytes)
// Increment thread count
e.incrementThreadCount()
return nil
}
// GetId returns this thread's identifier.
func (t *thread) GetId() uint32 {
return t.id
}

View File

@ -1,60 +0,0 @@
package edf
import (
. "github.com/smartystreets/goconvey/convey"
"os"
"testing"
)
func TestThreadDeserialize(t *testing.T) {
bytes := []byte{0, 0, 0, 6, 83, 89, 83, 84, 69, 77, 0, 0, 0, 1}
Convey("Given a byte slice", t, func() {
var thread thread
size := thread.Deserialize(bytes)
Convey("Decoded name should be SYSTEM", func() {
So(thread.name, ShouldEqual, "SYSTEM")
})
Convey("Size should be the same as the array", func() {
So(size, ShouldEqual, len(bytes))
})
})
}
func TestThreadSerialize(t *testing.T) {
var thread thread
refBytes := []byte{0, 0, 0, 6, 83, 89, 83, 84, 69, 77, 0, 0, 0, 1}
thread.name = "SYSTEM"
thread.id = 1
toBytes := make([]byte, len(refBytes))
Convey("Should serialize correctly", t, func() {
thread.Serialize(toBytes)
So(toBytes, ShouldResemble, refBytes)
})
}
func TestThreadFindAndWrite(t *testing.T) {
Convey("Creating a non-existent file should succeed", t, func() {
tempFile, err := os.OpenFile("hello.db", os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0700) //ioutil.TempFile(os.TempDir(), "TestFileCreate")
So(err, ShouldEqual, nil)
Convey("Mapping the file should succeed", func() {
mapping, err := edfMap(tempFile, EDF_CREATE)
So(err, ShouldEqual, nil)
Convey("Writing the thread should succeed", func() {
t := NewThread(mapping, "MyNameISWhat")
Convey("thread number should be 3", func() {
So(t.id, ShouldEqual, 3)
})
Convey("Writing the thread should succeed", func() {
err := mapping.WriteThread(t)
So(err, ShouldEqual, nil)
Convey("Should be able to find the thread again later", func() {
id, err := mapping.FindThread("MyNameISWhat")
So(err, ShouldEqual, nil)
So(id, ShouldEqual, 3)
})
})
})
})
os.Remove("hello.db")
})
}

View File

@ -1,32 +0,0 @@
package edf
func uint64ToBytes(in uint64, out []byte) {
var i uint64
for i = 0; i < 8; i++ {
out[7-i] = byte((in & (0xFF << i * 8) >> i * 8))
}
}
func uint64FromBytes(in []byte) uint64 {
var i uint64
out := uint64(0)
for i = 0; i < 8; i++ {
out |= uint64(in[7-i] << uint64(i*0x8))
}
return out
}
func uint32FromBytes(in []byte) uint32 {
ret := uint32(0)
ret |= uint32(in[0]) << 24
ret |= uint32(in[1]) << 16
ret |= uint32(in[2]) << 8
ret |= uint32(in[3])
return ret
}
func uint32ToBytes(in uint32, out []byte) {
out[0] = byte(in & (0xFF << 24) >> 24)
out[1] = byte(in & (0xFF << 16) >> 16)
out[2] = byte(in & (0xFF << 8) >> 8)
out[3] = byte(in & 0xFF)
}

View File

@ -1,20 +0,0 @@
package edf
// Utility function tests
import (
. "github.com/smartystreets/goconvey/convey"
"testing"
)
func TestInt32Conversion(t *testing.T) {
Convey("Given deadbeef", t, func() {
buf := make([]byte, 4)
original := uint32(0xDEAD)
uint32ToBytes(original, buf)
converted := uint32FromBytes(buf)
Convey("Decoded value should be the original...", func() {
So(converted, ShouldEqual, original)
})
})
}

View File

@ -8,7 +8,6 @@ import (
// FixedAttributeGroups contain a particular number of rows of
// a particular number of Attributes, all of a given type.
type FixedAttributeGroup struct {
threadNo uint32
parent DataGrid
attributes []Attribute
size int
@ -16,11 +15,9 @@ type FixedAttributeGroup struct {
maxRow int
}
// String gets a human-readable summary
func (f *FixedAttributeGroup) String() string {
if len(f.alloc) > 1 {
return fmt.Sprintf("FixedAttributeGroup(%d attributes\n thread: %d\n size: %d\n)", len(f.attributes), f.threadNo, f.size)
}
return fmt.Sprintf("FixedAttributeGroup(%d attributes\n thread: %d\n size: %d\n %d \n)", len(f.attributes), f.threadNo, f.size, f.alloc[0][0:60])
return "FixedAttributeGroup"
}
// RowSize returns the size of each row in bytes
@ -39,11 +36,6 @@ func (f *FixedAttributeGroup) AddAttribute(a Attribute) error {
return nil
}
// getThreadNo returns the ThreadNo assigned to this FixedAttributeGroup
func (f *FixedAttributeGroup) getThreadNo() uint32 {
return f.threadNo
}
// addStorage appends the given storage reference to this FixedAttributeGroup
func (f *FixedAttributeGroup) addStorage(a []byte) {
f.alloc = append(f.alloc, a)

View File

@ -7,8 +7,6 @@ import (
// AttributeGroups store related sequences of system values
// in memory for the DenseInstances structure.
type AttributeGroup interface {
// Returns an EDF thread number
getThreadNo() uint32
addStorage(a []byte)
// Used for printing
appendToRowBuf(row int, buffer *bytes.Buffer)

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long