bleve搜索引擎源码分析之索引——mapping真复杂啊

接下来看看下面index部分的源码实现:

    data := struct {
        Name string
        Des  string
    }{
        Name: "hello world this is bone",
        Des:  "this is a good time",
    }

    // index some data
    index.Index("id", data)

其中,

index.Index("id", data)

实现代码:

// Index adds the specified index operation to the
// batch.  NOTE: the bleve Index is not updated
// until the batch is executed.
func (b *Batch) Index(id string, data interface{}) error {
    if id == "" {
        return ErrorEmptyID
    }
    doc := document.NewDocument(id)
    err := b.index.Mapping().MapDocument(doc, data)
    if err != nil {
        return err
    }
    b.internal.Update(doc)
    return nil
}

根据mapping来映射文档,

 b.index.Mapping().MapDocument(doc, data)

该代码的实现:

func (im *IndexMappingImpl) MapDocument(doc *document.Document, data interface{}) error {
    docType := im.determineType(data)
    docMapping := im.mappingForType(docType)
    walkContext := im.newWalkContext(doc, docMapping)
    if docMapping.Enabled {
        docMapping.walkDocument(data, []string{}, []uint64{}, walkContext)

        // see if the _all field was disabled
        allMapping := docMapping.documentMappingForPath("_all")
        if allMapping == nil || (allMapping.Enabled != false) {
            field := document.NewCompositeFieldWithIndexingOptions("_all", true, []string{}, walkContext.excludedFromAll, document.IndexField|document.IncludeTermVectors)
            doc.AddField(field)
        }
    }
    
    return nil
} 
func (dm *DocumentMapping) walkDocument(data interface{}, path []string, indexes []uint64, context *walkContext) {
    // allow default "json" tag to be overriden
    structTagKey := dm.StructTagKey
    if structTagKey == "" {
        structTagKey = "json"
    }

    val := reflect.ValueOf(data)
    typ := val.Type()
    switch typ.Kind() {
    case reflect.Map:
        // FIXME can add support for other map keys in the future
        if typ.Key().Kind() == reflect.String {
            for _, key := range val.MapKeys() {
                fieldName := key.String()
                fieldVal := val.MapIndex(key).Interface()
                dm.processProperty(fieldVal, append(path, fieldName), indexes, context)
            }
        }
    case reflect.Struct:
        for i := 0; i < val.NumField(); i++ {
            field := typ.Field(i)
            fieldName := field.Name
            // anonymous fields of type struct can elide the type name
            if field.Anonymous && field.Type.Kind() == reflect.Struct {
                fieldName = ""
            }

            // if the field has a name under the specified tag, prefer that
            tag := field.Tag.Get(structTagKey)
            tagFieldName := parseTagName(tag)
            if tagFieldName == "-" {
                continue
            }
            // allow tag to set field name to empty, only if anonymous
            if field.Tag != "" && (tagFieldName != "" || field.Anonymous) {
                fieldName = tagFieldName
            }

            if val.Field(i).CanInterface() {
                fieldVal := val.Field(i).Interface()
                newpath := path
                if fieldName != "" {
                    newpath = append(path, fieldName)
                }
                dm.processProperty(fieldVal, newpath, indexes, context)
            }
        }
    case reflect.Slice, reflect.Array:
        for i := 0; i < val.Len(); i++ {
            if val.Index(i).CanInterface() {
                fieldVal := val.Index(i).Interface()
                dm.processProperty(fieldVal, path, append(indexes, uint64(i)), context)
            }
        }
    case reflect.Ptr:
        ptrElem := val.Elem()
        if ptrElem.IsValid() && ptrElem.CanInterface() {
            dm.processProperty(ptrElem.Interface(), path, indexes, context)
        }
    case reflect.String:
        dm.processProperty(val.String(), path, indexes, context)
    case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
        dm.processProperty(float64(val.Int()), path, indexes, context)
    case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
        dm.processProperty(float64(val.Uint()), path, indexes, context)
    case reflect.Float32, reflect.Float64:
        dm.processProperty(float64(val.Float()), path, indexes, context)
    case reflect.Bool:
        dm.processProperty(val.Bool(), path, indexes, context)
    }

}
func (dm *DocumentMapping) processProperty(property interface{}, path []string, indexes []uint64, context *walkContext) {
    pathString := encodePath(path)
    // look to see if there is a mapping for this field
    subDocMapping := dm.documentMappingForPath(pathString)
    closestDocMapping := dm.closestDocMapping(pathString)

    // check to see if we even need to do further processing
    if subDocMapping != nil && !subDocMapping.Enabled {
        return
    }

    propertyValue := reflect.ValueOf(property)
    if !propertyValue.IsValid() {
        // cannot do anything with the zero value
        return
    }
    propertyType := propertyValue.Type()
    switch propertyType.Kind() {
    case reflect.String:
        propertyValueString := propertyValue.String()
        if subDocMapping != nil {
            // index by explicit mapping
            for _, fieldMapping := range subDocMapping.Fields {
                fieldMapping.processString(propertyValueString, pathString, path, indexes, context)
            }
        } else if closestDocMapping.Dynamic {
            // automatic indexing behavior

            // first see if it can be parsed by the default date parser
            dateTimeParser := context.im.DateTimeParserNamed(context.im.DefaultDateTimeParser)
            if dateTimeParser != nil {
                parsedDateTime, err := dateTimeParser.ParseDateTime(propertyValueString)
                if err != nil {
                    // index as text
                    fieldMapping := newTextFieldMappingDynamic(context.im)
                    fieldMapping.processString(propertyValueString, pathString, path, indexes, context)
                } else {
                    // index as datetime
                    fieldMapping := newDateTimeFieldMappingDynamic(context.im)
                    fieldMapping.processTime(parsedDateTime, pathString, path, indexes, context)
                }
            }
    case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
        dm.processProperty(float64(propertyValue.Int()), path, indexes, context)
        return
    case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
        dm.processProperty(float64(propertyValue.Uint()), path, indexes, context)
        return
    case reflect.Float64, reflect.Float32:
        propertyValFloat := propertyValue.Float()
        if subDocMapping != nil {
            // index by explicit mapping
            for _, fieldMapping := range subDocMapping.Fields {
                fieldMapping.processFloat64(propertyValFloat, pathString, path, indexes, context)
            }
        } else if closestDocMapping.Dynamic {
            // automatic indexing behavior
            fieldMapping := newNumericFieldMappingDynamic(context.im)
            fieldMapping.processFloat64(propertyValFloat, pathString, path, indexes, context)
        }
    case reflect.Bool:
        propertyValBool := propertyValue.Bool()
        if subDocMapping != nil {
            // index by explicit mapping
            for _, fieldMapping := range subDocMapping.Fields {
                fieldMapping.processBoolean(propertyValBool, pathString, path, indexes, context)
            }
        } else if closestDocMapping.Dynamic {
            // automatic indexing behavior
            fieldMapping := newBooleanFieldMappingDynamic(context.im)
            fieldMapping.processBoolean(propertyValBool, pathString, path, indexes, context)
        }
    case reflect.Struct:
        switch property := property.(type) {
        case time.Time:
            // don't descend into the time struct
            if subDocMapping != nil {
                // index by explicit mapping
                for _, fieldMapping := range subDocMapping.Fields {
                    fieldMapping.processTime(property, pathString, path, indexes, context)
                }
            } else if closestDocMapping.Dynamic {
                fieldMapping := newDateTimeFieldMappingDynamic(context.im)
                fieldMapping.processTime(property, pathString, path, indexes, context)
            }
        default:
            dm.walkDocument(property, path, indexes, context)
        }
    default:
        dm.walkDocument(property, path, indexes, context)
    }
}

 分词的部分终于来了!

func (fm *FieldMapping) processString(propertyValueString string, pathString string, path []string, indexes []uint64, context *walkContext) {
    fieldName := getFieldName(pathString, path, fm)
    options := fm.Options()
    if fm.Type == "text" {     
        analyzer := fm.analyzerForField(path, context)
        field := document.NewTextFieldCustom(fieldName, indexes, []byte(propertyValueString), options, analyzer)
        context.doc.AddField(field)     
  
        if !fm.IncludeInAll {  
            context.excludedFromAll = append(context.excludedFromAll, fieldName)
        }
    } else if fm.Type == "datetime" { 
        dateTimeFormat := context.im.DefaultDateTimeParser
        if fm.DateFormat != "" {        
            dateTimeFormat = fm.DateFormat  
        }
        dateTimeParser := context.im.DateTimeParserNamed(dateTimeFormat)
        if dateTimeParser != nil {      
            parsedDateTime, err := dateTimeParser.ParseDateTime(propertyValueString)
            if err == nil {
                fm.processTime(parsedDateTime, pathString, path, indexes, context)
            }                  
        }
    }
}

func (fm *FieldMapping) processFloat64(propertyValFloat float64, pathString string, path []string, indexes []uint64, context *walkContext) {
    fieldName := getFieldName(pathString, path, fm)
    if fm.Type == "number" {
        options := fm.Options()
        field := document.NewNumericFieldWithIndexingOptions(fieldName, indexes, propertyValFloat, options)
        context.doc.AddField(field)

        if !fm.IncludeInAll {
            context.excludedFromAll = append(context.excludedFromAll, fieldName)
        }
    }
}