关于thrift
thrift是一个跨语言服务的软件开发框架(Thrift is a software framework for scalable cross-language services development.)。
它的官方网站是:http://incubator.apache.org/thrift/
下载thrift
svn co http://svn.apache.org/repos/asf/incubator/thrift/trunk thrift
安装thritf(Linux)
cd thrift
./bootstrap.sh
./configure
make
make install
生成hbase的client代码
cd $HBASE_HOME/src/java/org/apache/hadoop/hbase/thrift
thrift –gen py Hbase.thrift
然后将生成的gen-py文件夹下的hbase文件夹拷贝到
/usr/lib/python2.5/site-packages/
准备hbase
首先确认hbase正常工作,然后启动hbase的thrift服务:
$HBASE_HOME//bin/hbase-deamon.sh start thrift
OK,准备工作到此为止,我们开始编写python客户程序。
假设我们需要一个表保存从网上抓取下来的网页。
表命名为”webpages”
它使用网页的url反转后作为行标识符,使用列组”contents:”(注意结尾的冒号)保存网页的内容。
导入需要的模块:
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from hbase import Hbase
from hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation, NotFound
建立与hbase的连接:
TSocket.TSocket(netloc, port))
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = Hbase.Client(protocol)
transport.open()
创建表:
#只保留一个版本,使用BLOCK方式压缩
#其他参数请参考hbase的API
contents=ColumnDescriptor(name=”contents:”, maxVersions=1, compression=”BLOCK”)
client.createTable(“webpages”,[contents,])
写入数据:
row = self.reverseUrl(url)
mutations = [Mutation(column=”contents:”, value=content)]
client.mutateRow(“webpages”, row, mutations)
完整的代码和单元测试如下:
from unittest import TestCase, main
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from hbase import Hbase
from hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation, NotFound
class HbaseWriter:
def __init__(self, netloc, port, table=”webpages”):
self.tableName = table
self.transport = TTransport.TBufferedTransport(
TSocket.TSocket(netloc, port))
self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
self.client = Hbase.Client(self.protocol)
self.transport.open()
tables = self.client.getTableNames()
if self.tableName not in tables:
self.__createTable()
def __del__(self):
self.transport.close()
def __createTable(self):
self.client.createTable(self.tableName,
[ColumnDescriptor(name=”contents:”, maxVersions=1, compression=”BLOCK”),])
def reverseUrl(self, url):
link = filter(None, url.split(“//”))[-1]
hops = filter(None, link.split(“/”))
domain = hops[0].split(“.”)
domain.reverse()
domain = ‘.’.join(domain)
hops[0] = domain
return ‘/’.join(hops)
def write(self, url, content):
row = self.reverseUrl(url)
mutations = [Mutation(column=”contents:”, value=content)]
self.client.mutateRow(self.tableName, row, mutations)
class TestHbaseWriter(TestCase):
def setUp(self):
self.writer = HbaseWriter(“192.168.1.103”, 9090, “test”)
def tearDown(self):
name = self.writer.tableName
client = self.writer.client
client.disableTable(name)
client.deleteTable(name)
def testReverseUrl(self):
self.assertEquals(self.writer.reverseUrl(“http://www.a.com”), “com.a.www”)
self.assertEquals(self.writer.reverseUrl(“http://www.a.com/”), “com.a.www”)
self.assertEquals(self.writer.reverseUrl(“http://a.com”), “com.a”)
self.assertEquals(self.writer.reverseUrl(“http://www.b.com/foo”), “com.b.www/foo”)
self.assertEquals(self.writer.reverseUrl(“aaa.bbb.ccc.com.cn/foo1/foo2”), “cn.com.ccc.bbb.aaa/foo1/foo2”)
def testCreate(self):
tableName = self.writer.tableName
client = self.writer.client
self.assertTrue(self.writer.tableName in client.getTableNames())
columns = dict()
columns[“contents”] = ColumnDescriptor(name=”contents”, maxVersions=1, compression=”BLOCK”)
cds = client.getColumnDescriptors(tableName)
for name,column in cds.items():
self.assertTrue(column.name in columns)
def testWrite(self):
tableName = self.writer.tableName
client = self.writer.client
data = {“http://www.a.com”:”com.a.www”,
“http://www.a.com/bbb”:”com.a.www/bbb”,
“http://www.foo.com/foo”:”foo”}
for url, content in data.items():
self.writer.write(url, content)
scannerId = client.scannerOpen(tableName, “”, [“contents:”,])
while True :
try:
result = client.scannerGet(scannerId)
except NotFound:
break
row = result.row
contents = result.columns[“contents:”].value
url = “http://” + self.writer.reverseUrl(row)
self.assertTrue(url in data)
self.assertEqual(data[url], contents)
client.scannerClose(scannerId)
if __name__ == “__main__”:
main()