Cassandra Database(三)Hector Scala Class

Cassandra Database(3)Hector Scala Class

Cassandra Database(3)Hector Scala Class

Actually, I am using the latest cassandra from branch 1.2 and I build it myself.

1. Creating the Schema and Insert/Get
The case class object will be as follow
package com.sillyat.easycassandraserver.models

import org.joda.time.DateTime

case class Product(id: Option[Long], productName: String, create: DateTime, productPrice: BigDecimal, productCode: String, country: String)

The sample of the Schema and Insert/Get

package com.sillycat.easycassandraserver.apps

import me.prettyprint.hector.api.{Keyspace, Cluster}
import me.prettyprint.hector.api.factory.HFactory
import me.prettyprint.cassandra.model.BasicColumnFamilyDefinition
import me.prettyprint.hector.api.ddl.{KeyspaceDefinition, ColumnFamilyDefinition, ComparatorType}
import me.prettyprint.cassandra.service.ThriftCfDef
import java.util.Arrays
import me.prettyprint.hector.api.mutation.Mutator
import me.prettyprint.cassandra.serializers.{LongSerializer, StringSerializer, BigDecimalSerializer}
import com.sillyat.easycassandraserver.models.Product
import org.joda.time.DateTime
import me.prettyprint.hector.api.query.{QueryResult, ColumnQuery}
import me.prettyprint.hector.api.beans.HColumn

object CassandraShemaDataDemoApp extends App{
  val clusterName = "TestCluster"
  val host = "localhost"
  val port = 9160

  val keyspaceName = "sillycat"
  val columnFamilyName = "products"

  val cluster: Cluster = HFactory.getOrCreateCluster("TestCluster", host + ":" + 9160)

  //drop the schema
  cluster.describeKeyspace(keyspaceName) match {
    case null => println("There is no keyspace yet.")
    case _    => cluster.dropKeyspace(keyspaceName)
  }

  //define the schema
  val columnFamilyProducts: BasicColumnFamilyDefinition = new BasicColumnFamilyDefinition()
  columnFamilyProducts.setKeyspaceName(keyspaceName)
  columnFamilyProducts.setName(columnFamilyName)

  columnFamilyProducts.setKeyValidationClass(ComparatorType.UTF8TYPE.getClassName())
  columnFamilyProducts.setComparatorType(ComparatorType.UTF8TYPE)

  //columnFamilyProducts.setDefaultValidationClass(ComparatorType.UTF8TYPE.getClassName())
  //columnFamilyProducts.setDefaultValidationClass(ComparatorType.DECIMALTYPE.getClassName())
  //columnFamilyProducts.setDefaultValidationClass(ComparatorType.ASCIITYPE.getClassName())
  //columnFamilyProducts.setDefaultValidationClass(ComparatorType.COMPOSITETYPE.getClassName())


  val productsFamily: ColumnFamilyDefinition = new ThriftCfDef(columnFamilyProducts);

  val keyspaceDefinition: KeyspaceDefinition =
    HFactory.createKeyspaceDefinition(keyspaceName, "org.apache.cassandra.locator.SimpleStrategy",
      1, Arrays.asList(productsFamily))

  cluster.addKeyspace(keyspaceDefinition);

  //insert data
  val keyspaceOperator: Keyspace = HFactory.createKeyspace(keyspaceName, cluster)

  val mutator: Mutator[String]  = HFactory.createMutator(keyspaceOperator, StringSerializer.get())

  val p1 = Product(Some(1), "IPHONE5", DateTime.now, 499.99, "iphone5", "China")
  val p2 = Product(Some(2), "IPHONE4", DateTime.now, 199.99, "iphone4", "US")
  val p3 = Product(Some(3), "IPHONE4S", DateTime.now, 299.99, "iphone4s", "TW")

  //single insert p1
  mutator.insert(
    p1.productCode,
    columnFamilyName,
    HFactory.createColumn("ProductName",
      p1.productName,
      StringSerializer.get(),
      StringSerializer.get()
    )
  )

  mutator.insert(
    p1.productCode,
    columnFamilyName,
    HFactory.createColumn("Country",
      p1.country,
      StringSerializer.get(),
      StringSerializer.get()
    )
  )

  mutator.insert(
    p1.productCode,
    columnFamilyName,
    HFactory.createColumn("ProductPrice",
      java.math.BigDecimal.valueOf(p1.productPrice.doubleValue()),
      StringSerializer.get(),
      BigDecimalSerializer.get()
    )
  )

  //get p1
  val bigdecimalColumnQuery: ColumnQuery[String, String, java.math.BigDecimal] = HFactory.createColumnQuery(keyspaceOperator,
    StringSerializer.get(),
    StringSerializer.get(),
    BigDecimalSerializer.get())
  bigdecimalColumnQuery.setColumnFamily(columnFamilyName).setKey(p1.productCode).setName("ProductPrice")
  val result1: QueryResult[HColumn[String, java.math.BigDecimal]]  = bigdecimalColumnQuery.execute()

  val stringColumnQuery: ColumnQuery[String, String, String] = HFactory.createColumnQuery(keyspaceOperator,
    StringSerializer.get(),
    StringSerializer.get(),
    StringSerializer.get())
  stringColumnQuery.setColumnFamily(columnFamilyName).setKey(p1.productCode).setName("ProductName")
  val result2: QueryResult[HColumn[String, String]]  = stringColumnQuery.execute()

  stringColumnQuery.setColumnFamily(columnFamilyName).setKey(p1.productCode).setName("Country")
  val result3: QueryResult[HColumn[String, String]]  = stringColumnQuery.execute()

  println("Read HColumn from cassandra: " + result1.get())
  println("Read HColumn from cassandra: " + result2.get())
  println("Read HColumn from cassandra: " + result3.get())
  println("Verify on CLI with:  get products['iphone5']; ")

  //multiple insert p2,p3
  //p2
  mutator.addInsertion(
    p2.productCode,
    columnFamilyName,
    HFactory.createColumn("ProductName",
      p2.productName,
      StringSerializer.get(),
      StringSerializer.get()
    )
  )

  mutator.addInsertion(
    p2.productCode,
    columnFamilyName,
    HFactory.createColumn("Country",
      p2.country,
      StringSerializer.get(),
      StringSerializer.get()
    )
  )

  mutator.addInsertion(
    p2.productCode,
    columnFamilyName,
    HFactory.createColumn("ProductPrice",
      java.math.BigDecimal.valueOf(p2.productPrice.doubleValue()),
      StringSerializer.get(),
      BigDecimalSerializer.get()
    )
  )

  //p3
  mutator.addInsertion(
    p3.productCode,
    columnFamilyName,
    HFactory.createColumn("ProductName",
      p3.productName,
      StringSerializer.get(),
      StringSerializer.get()
    )
  )

  mutator.addInsertion(
    p3.productCode,
    columnFamilyName,
    HFactory.createColumn("Country",
      p3.country,
      StringSerializer.get(),
      StringSerializer.get()
    )
  )

  mutator.addInsertion(
    p3.productCode,
    columnFamilyName,
    HFactory.createColumn("ProductPrice",
      java.math.BigDecimal.valueOf(p3.productPrice.doubleValue()),
      StringSerializer.get(),
      BigDecimalSerializer.get()
    )
  )

  mutator.execute()

  bigdecimalColumnQuery.setColumnFamily(columnFamilyName).setKey(p2.productCode).setName("ProductPrice")
  val result4: QueryResult[HColumn[String, java.math.BigDecimal]]  = bigdecimalColumnQuery.execute()

  println("Read HColumn from cassandra: " + result4.get())
  println("Verify on CLI with:  get products['iphone4s']; ")

  cluster.getConnectionManager.shutdown()
}

The most important part is to remember hector is coming from Java, so, sometimes we need to use
java.math.BigDecimal instead of BigDecimal in scala world.

2. The Querys 
package com.sillycat.easycassandraserver.apps

import me.prettyprint.hector.api.{Keyspace, Cluster}
import me.prettyprint.hector.api.factory.HFactory
import me.prettyprint.hector.api.query.{MultigetSliceQuery, QueryResult, RangeSlicesQuery}
import com.sillyat.easycassandraserver.models.Product
import scala.Product
import org.joda.time.DateTime
import me.prettyprint.hector.api.beans.{Row, Rows, OrderedRows}
import me.prettyprint.cassandra.serializers.{BigDecimalSerializer, StringSerializer}
import scala.collection.JavaConversions._

object CassandraQueryDemoApp extends App{
  val clusterName = "TestCluster"
  val host = "localhost"
  val port = 9160

  val keyspaceName = "sillycat"
  val columnFamilyName = "products"

  val cluster: Cluster = HFactory.getOrCreateCluster("TestCluster", host + ":" + 9160)

  val p1 = Product(Some(1), "IPHONE5", DateTime.now, 499.99, "iphone5", "China")
  val p2 = Product(Some(2), "IPHONE4", DateTime.now, 199.99, "iphone4", "US")
  val p3 = Product(Some(3), "IPHONE4S", DateTime.now, 299.99, "iphone4s", "TW")

  val keyspaceOperator: Keyspace = HFactory.createKeyspace(keyspaceName, cluster)

  //range query
  val bigDecimalRangeSlicesQuery: RangeSlicesQuery[String, String, java.math.BigDecimal] =
    HFactory.createRangeSlicesQuery(keyspaceOperator, StringSerializer.get(), StringSerializer.get(), BigDecimalSerializer.get())
  bigDecimalRangeSlicesQuery.setColumnFamily(columnFamilyName)
  bigDecimalRangeSlicesQuery.setKeys(p2.productCode, p2.productCode)
  bigDecimalRangeSlicesQuery.setColumnNames("ProductPrice")

  val result1: QueryResult[OrderedRows[String, String, java.math.BigDecimal]] = bigDecimalRangeSlicesQuery.execute()
  val orderedRows1: OrderedRows[String, String, java.math.BigDecimal]= result1.get()

  orderedRows1.toList.foreach( row => println("result1: " + row.getColumnSlice))


  val stringRangeSlicesQuery: RangeSlicesQuery[String, String, String] =
    HFactory.createRangeSlicesQuery(keyspaceOperator, StringSerializer.get(), StringSerializer.get(), StringSerializer.get())
  stringRangeSlicesQuery.setColumnFamily(columnFamilyName)
  stringRangeSlicesQuery.setKeys(p2.productCode, p2.productCode)
  stringRangeSlicesQuery.setColumnNames("ProductName", "Country")

  val result2: QueryResult[OrderedRows[String, String, String]] = stringRangeSlicesQuery.execute()
  val orderedRows2: OrderedRows[String, String, String]= result2.get()

  orderedRows2.toList.foreach( row => println("result2: " + row.getColumnSlice))

//  createIndexedSlicesQuery   time between some date
//  indexedSlicesQuery.addEqualsExpression("birthdate", 1975L);
//  indexedSlicesQuery.addGtExpression("birthmonth", 6L);
//  indexedSlicesQuery.addLtExpression("birthmonth", 8L);
//  indexedSlicesQuery.setColumnNames("birthdate","birthmonth");
//  indexedSlicesQuery.setColumnFamily("Indexed1");
//  indexedSlicesQuery.setStartKey("");

  //multiget
  val multigetSliceQuery: MultigetSliceQuery[String, String, String]  =
    HFactory.createMultigetSliceQuery(keyspaceOperator, StringSerializer.get(), StringSerializer.get(), StringSerializer.get())
  multigetSliceQuery.setColumnFamily(columnFamilyName)
  multigetSliceQuery.setKeys(p1.productCode, p2.productCode, p3.productCode)

  multigetSliceQuery.setRange(null, null, false, 3)  //number 3 is the number of the columns

  println(multigetSliceQuery)
  val result3: QueryResult[Rows[String, String, String]]  = multigetSliceQuery.execute()
  val orderedRows3: Rows[String, String, String] = result3.get()

  orderedRows3.toList.foreach(row => println("result3: " + row.getColumnSlice))

  //pagenation
  val pagenationSliceQuery: RangeSlicesQuery[String, String, String]  =
    HFactory.createRangeSlicesQuery(keyspaceOperator, StringSerializer.get(), StringSerializer.get(), StringSerializer.get())
  pagenationSliceQuery.setColumnFamily(columnFamilyName)
  pagenationSliceQuery.setRange("", "", false, 3)
  pagenationSliceQuery.setRowCount(3)
  val result4: QueryResult[OrderedRows[String, String, String]]  = pagenationSliceQuery.execute()
  val orderedRows4: OrderedRows[String, String, String]  = result4.get()

  orderedRows4.toList.foreach(row => println("result4: " + row.getColumnSlice))

  val lastRow: Row[String,String,String] = orderedRows4.peekLast()
  pagenationSliceQuery.setKeys(lastRow.getKey(), "")
  val orderedRows5 = pagenationSliceQuery.execute().get()

  orderedRows5.toList.foreach(row => println("result5: " + row.getColumnSlice))
}

How to translate java collections to scala
import scala.collection.JavaConversions._

Tips:
1. Error Message:
type mismatch;
 found   : me.prettyprint.cassandra.serializers.BigDecimalSerializer
 required: me.prettyprint.hector.api.Serializer[Number]
Note: java.math.BigDecimal <: Number (and me.prettyprint.cassandra.serializers.BigDecimalSerializer <: me.prettyprint.cassandra.serializers.AbstractSerializer[java.math.BigDecimal]), but Java-defined trait Serializer is invariant in type T.
You may wish to investigate a wildcard type such as `_ <: Number`. (SLS 3.2.10)
    p1.productPrice, StringSerializer.get(),BigDecimalSerializer.get()))
                                                                    ^
Solution:
mutator.insert(
    p1.productCode,
    columnFamilyName,
    HFactory.createColumn("ProductPrice",
      java.math.BigDecimal.valueOf(p1.productPrice.doubleValue()),
      StringSerializer.get(),
      BigDecimalSerializer.get()
    )
  )

  val bigdecimalColumnQuery: ColumnQuery[String, String, java.math.BigDecimal] = HFactory.createColumnQuery(keyspaceOperator,
    StringSerializer.get(),
    StringSerializer.get(),
    BigDecimalSerializer.get())
  bigdecimalColumnQuery.setColumnFamily(columnFamilyName).setKey(p1.productCode).setName("ProductPrice")
  val result1: QueryResult[HColumn[String, java.math.BigDecimal]]  = bigdecimalColumnQuery.execute()


References:
http://fbc.pionier.net.pl/pro/clepsydra/storage/clepsydra-nosql-driver/xref/pl/psnc/synat/a9/nosqldriver/cassandra/CassandraDataMutator.html