WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content

Commit c317df1

Browse files
committed
feat(writer): support InsertIfAbsent functionality
Ensures documents are only inserted if their docIDs are not already present in the current index Signed-off-by: Gao Hongtao <[email protected]>
1 parent 04f5c70 commit c317df1

File tree

3 files changed

+192
-8
lines changed

3 files changed

+192
-8
lines changed

index/batch.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import segment "github.com/blugelabs/bluge_segment_api"
1919
type Batch struct {
2020
documents []segment.Document
2121
ids []segment.Term
22+
unparsedDocuments []segment.Document
23+
unparsedIDs []segment.Term
2224
persistedCallback func(error)
2325
}
2426

@@ -30,6 +32,11 @@ func (b *Batch) Insert(doc segment.Document) {
3032
b.documents = append(b.documents, doc)
3133
}
3234

35+
func (b *Batch) InsertIfAbsent(id segment.Term, doc segment.Document) {
36+
b.unparsedDocuments = append(b.unparsedDocuments, doc)
37+
b.unparsedIDs = append(b.unparsedIDs, id)
38+
}
39+
3340
func (b *Batch) Update(id segment.Term, doc segment.Document) {
3441
b.documents = append(b.documents, doc)
3542
b.ids = append(b.ids, id)
@@ -43,6 +50,8 @@ func (b *Batch) Reset() {
4350
b.documents = b.documents[:0]
4451
b.ids = b.ids[:0]
4552
b.persistedCallback = nil
53+
b.unparsedDocuments = b.unparsedDocuments[:0]
54+
b.unparsedIDs = b.unparsedIDs[:0]
4655
}
4756

4857
func (b *Batch) SetPersistedCallback(f func(error)) {

index/writer.go

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,15 @@ func (s *Writer) close() (err error) {
224224

225225
// Batch applies a batch of changes to the index atomically
226226
func (s *Writer) Batch(batch *Batch) (err error) {
227+
if len(batch.unparsedIDs) > 0 {
228+
if err := s.removeExistingDocuments(batch); err != nil {
229+
return err
230+
}
231+
if len(batch.documents) == 0 {
232+
return nil
233+
}
234+
}
235+
227236
start := time.Now()
228237

229238
defer func() {
@@ -287,6 +296,34 @@ func (s *Writer) Batch(batch *Batch) (err error) {
287296
return err
288297
}
289298

299+
func (s *Writer) removeExistingDocuments(batch *Batch) error {
300+
root := s.currentSnapshot()
301+
defer func() { _ = root.Close() }()
302+
303+
for _, seg := range root.segment {
304+
dict, err := seg.segment.Dictionary(batch.unparsedIDs[0].Field())
305+
if err != nil {
306+
return err
307+
}
308+
309+
for i := 0; i < len(batch.unparsedIDs); i++ {
310+
if ok, _ := dict.Contains(batch.unparsedIDs[i].Term()); ok {
311+
batch.unparsedDocuments = append(batch.unparsedDocuments[:i], batch.unparsedDocuments[i+1:]...)
312+
batch.unparsedIDs = append(batch.unparsedIDs[:i], batch.unparsedIDs[i+1:]...)
313+
i--
314+
if len(batch.unparsedDocuments) == 0 {
315+
return nil
316+
}
317+
}
318+
}
319+
}
320+
if len(batch.unparsedDocuments) > 0 {
321+
batch.documents = append(batch.documents, batch.unparsedDocuments...)
322+
batch.ids = append(batch.ids, batch.unparsedIDs...)
323+
}
324+
return nil
325+
}
326+
290327
func (s *Writer) prepareSegment(newSegment *segmentWrapper, idTerms []segment.Term,
291328
internalOps map[string][]byte, persistedCallback func(error)) error {
292329
// new introduction
@@ -304,16 +341,18 @@ func (s *Writer) prepareSegment(newSegment *segmentWrapper, idTerms []segment.Te
304341
introduction.persisted = make(chan error, 1)
305342
}
306343

307-
// optimistically prepare obsoletes outside of rootLock
308-
root := s.currentSnapshot()
309-
defer func() { _ = root.Close() }()
344+
if len(idTerms) > 0 {
345+
// optimistically prepare obsoletes outside of rootLock
346+
root := s.currentSnapshot()
347+
defer func() { _ = root.Close() }()
310348

311-
for _, seg := range root.segment {
312-
delta, err := seg.segment.DocsMatchingTerms(idTerms)
313-
if err != nil {
314-
return err
349+
for _, seg := range root.segment {
350+
delta, err := seg.segment.DocsMatchingTerms(idTerms)
351+
if err != nil {
352+
return err
353+
}
354+
introduction.obsoletes[seg.id] = delta
315355
}
316-
introduction.obsoletes[seg.id] = delta
317356
}
318357

319358
introStartTime := time.Now()

index/writer_test.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1631,3 +1631,139 @@ func TestIndexSeekBackwardsStats(t *testing.T) {
16311631
idx.stats.TotTermSearchersFinished)
16321632
}
16331633
}
1634+
1635+
func TestBatch_InsertIfAbsent(t *testing.T) {
1636+
cfg, cleanup := CreateConfig("TestBatch_InsertIfAbsent")
1637+
defer func() {
1638+
err := cleanup()
1639+
if err != nil {
1640+
t.Log(err)
1641+
}
1642+
}()
1643+
1644+
idx, err := OpenWriter(cfg)
1645+
if err != nil {
1646+
t.Fatal(err)
1647+
}
1648+
defer func() {
1649+
err := idx.Close()
1650+
if err != nil {
1651+
t.Fatal(err)
1652+
}
1653+
}()
1654+
1655+
var expectedCount uint64
1656+
1657+
// Verify initial document count is zero
1658+
reader, err := idx.Reader()
1659+
if err != nil {
1660+
t.Fatal(err)
1661+
}
1662+
docCount, err := reader.Count()
1663+
if err != nil {
1664+
t.Error(err)
1665+
}
1666+
if docCount != expectedCount {
1667+
t.Errorf("Expected document count to be %d got %d", expectedCount, docCount)
1668+
}
1669+
err = reader.Close()
1670+
if err != nil {
1671+
t.Fatal(err)
1672+
}
1673+
1674+
// Insert a document using InsertIfAbsent
1675+
docID := "doc-1"
1676+
doc := &FakeDocument{
1677+
NewFakeField("_id", docID, true, false, false),
1678+
NewFakeField("title", "mister", false, false, true),
1679+
}
1680+
batch := NewBatch()
1681+
batch.InsertIfAbsent(testIdentifier(docID), doc)
1682+
1683+
// Apply the batch
1684+
if err := idx.Batch(batch); err != nil {
1685+
t.Fatalf("failed to apply batch: %v", err)
1686+
}
1687+
expectedCount++
1688+
1689+
// Verify document count after insertion
1690+
reader, err = idx.Reader()
1691+
if err != nil {
1692+
t.Fatal(err)
1693+
}
1694+
docCount, err = reader.Count()
1695+
if err != nil {
1696+
t.Error(err)
1697+
}
1698+
if docCount != expectedCount {
1699+
t.Errorf("Expected document count to be %d got %d", expectedCount, docCount)
1700+
}
1701+
err = reader.Close()
1702+
if err != nil {
1703+
t.Fatal(err)
1704+
}
1705+
1706+
// Attempt to InsertIfAbsent with the same ID
1707+
docDuplicate := &FakeDocument{
1708+
NewFakeField("_id", docID, true, false, false),
1709+
NewFakeField("title", "mister2", true, false, true),
1710+
}
1711+
batchDuplicate := NewBatch()
1712+
batchDuplicate.InsertIfAbsent(testIdentifier(docID), docDuplicate)
1713+
1714+
// Apply the duplicate batch
1715+
if err := idx.Batch(batchDuplicate); err != nil {
1716+
t.Fatalf("failed to apply duplicate batch: %v", err)
1717+
}
1718+
1719+
// Since it's InsertIfAbsent, the document should not be duplicated
1720+
// Verify document count remains the same
1721+
reader, err = idx.Reader()
1722+
if err != nil {
1723+
t.Fatal(err)
1724+
}
1725+
docCount, err = reader.Count()
1726+
if err != nil {
1727+
t.Error(err)
1728+
}
1729+
if docCount != expectedCount {
1730+
t.Errorf("Expected document count to be %d after duplicate insert, got %d", expectedCount, docCount)
1731+
}
1732+
1733+
docNum1, err := findNumberByID(reader, docID)
1734+
if err != nil {
1735+
t.Fatal(err)
1736+
}
1737+
1738+
dvr, err := reader.DocumentValueReader([]string{"title"})
1739+
if err != nil {
1740+
t.Fatal(err)
1741+
}
1742+
err = dvr.VisitDocumentValues(docNum1, func(field string, term []byte) {
1743+
if field == "title" {
1744+
if string(term) != "mister" {
1745+
t.Errorf("expected title to be 'First Document', got '%s'", string(term))
1746+
}
1747+
}
1748+
})
1749+
if err != nil {
1750+
t.Fatal(err)
1751+
}
1752+
1753+
err = reader.VisitStoredFields(docNum1, func(field string, value []byte) bool {
1754+
if field == "title" {
1755+
if string(value) != "mister" {
1756+
t.Errorf("expected title to be 'mister', got '%s'", string(value))
1757+
}
1758+
}
1759+
return true
1760+
})
1761+
if err != nil {
1762+
t.Fatal(err)
1763+
}
1764+
1765+
err = reader.Close()
1766+
if err != nil {
1767+
t.Fatal(err)
1768+
}
1769+
}

0 commit comments

Comments
 (0)