通过thrift,我们可以使用python访问hbase

关于thrift

thrift是一个跨语言服务的软件开发框架(Thrift is a software framework for scalable cross-language services development.)。

它的官方网站是:http://incubator.apache.org/thrift/

 

下载thrift

svn co http://svn.apache.org/repos/asf/incubator/thrift/trunk thrift

 

安装thritf(Linux)

cd thrift

./bootstrap.sh

./configure

make

make install

 

生成hbase的client代码

cd $HBASE_HOME/src/java/org/apache/hadoop/hbase/thrift

thrift –gen py Hbase.thrift

然后将生成的gen-py文件夹下的hbase文件夹拷贝到

/usr/lib/python2.5/site-packages/

 

准备hbase

首先确认hbase正常工作,然后启动hbase的thrift服务:

$HBASE_HOME//bin/hbase-deamon.sh start thrift

 

OK,准备工作到此为止,我们开始编写python客户程序。

 

假设我们需要一个表保存从网上抓取下来的网页。

表命名为”webpages”

它使用网页的url反转后作为行标识符,使用列组”contents:”(注意结尾的冒号)保存网页的内容。

 

导入需要的模块:

from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

from hbase import Hbase
from hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation, NotFound

 

建立与hbase的连接:

transport = TTransport.TBufferedTransport(
TSocket.TSocket(netloc, port))
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = Hbase.Client(protocol)
transport.open()

创建表:

#只保留一个版本,使用BLOCK方式压缩
#其他参数请参考hbase的API
contents=ColumnDescriptor(name=”contents:”, maxVersions=1, compression=”BLOCK”)
client.createTable(“webpages”,[contents,])

写入数据:

def write(url, content):
row = self.reverseUrl(url)
mutations = [Mutation(column=”contents:”, value=content)]
client.mutateRow(“webpages”, row, mutations)

完整的代码和单元测试如下:

from unittest import TestCase, main
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

from hbase import Hbase
from hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation, NotFound
class HbaseWriter:

def __init__(self, netloc, port, table=”webpages”):
self.tableName = table

self.transport = TTransport.TBufferedTransport(
TSocket.TSocket(netloc, port))
self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
self.client = Hbase.Client(self.protocol)
self.transport.open()

tables = self.client.getTableNames()
if self.tableName not in tables:
self.__createTable()

def __del__(self):
self.transport.close()

def __createTable(self):
self.client.createTable(self.tableName,
[ColumnDescriptor(name=”contents:”, maxVersions=1, compression=”BLOCK”),])

def reverseUrl(self, url):
link = filter(None, url.split(“//”))[-1]
hops = filter(None, link.split(“/”))
domain = hops[0].split(“.”)
domain.reverse()
domain = ‘.’.join(domain)
hops[0] = domain
return ‘/’.join(hops)

def write(self, url, content):
row = self.reverseUrl(url)
mutations = [Mutation(column=”contents:”, value=content)]
self.client.mutateRow(self.tableName, row, mutations)

class TestHbaseWriter(TestCase):
def setUp(self):
self.writer = HbaseWriter(“192.168.1.103”, 9090, “test”)

def tearDown(self):
name = self.writer.tableName
client = self.writer.client
client.disableTable(name)
client.deleteTable(name)

def testReverseUrl(self):
self.assertEquals(self.writer.reverseUrl(“http://www.a.com”), “com.a.www”)
self.assertEquals(self.writer.reverseUrl(“http://www.a.com/”), “com.a.www”)
self.assertEquals(self.writer.reverseUrl(“http://a.com”), “com.a”)
self.assertEquals(self.writer.reverseUrl(“http://www.b.com/foo”), “com.b.www/foo”)
self.assertEquals(self.writer.reverseUrl(“aaa.bbb.ccc.com.cn/foo1/foo2”), “cn.com.ccc.bbb.aaa/foo1/foo2”)

def testCreate(self):
tableName = self.writer.tableName
client = self.writer.client
self.assertTrue(self.writer.tableName in client.getTableNames())
columns = dict()
columns[“contents”] = ColumnDescriptor(name=”contents”, maxVersions=1, compression=”BLOCK”)
cds = client.getColumnDescriptors(tableName)
for name,column in cds.items():
self.assertTrue(column.name in columns)

def testWrite(self):
tableName = self.writer.tableName
client = self.writer.client
data = {“http://www.a.com”:”com.a.www”,
“http://www.a.com/bbb”:”com.a.www/bbb”,
“http://www.foo.com/foo”:”foo”}
for url, content in data.items():
self.writer.write(url, content)

scannerId = client.scannerOpen(tableName, “”, [“contents:”,])
while True :
try:
result = client.scannerGet(scannerId)
except NotFound:
break
row = result.row
contents = result.columns[“contents:”].value
url = “http://” + self.writer.reverseUrl(row)
self.assertTrue(url in data)
self.assertEqual(data[url], contents)
client.scannerClose(scannerId)

if __name__ == “__main__”:
main()

关于Zeno Chen

本人涉及的领域较多,杂而不精 程序设计语言: Perl, Java, PHP, Python; 数据库系统: MySQL,Oracle; 偶尔做做电路板的开发,主攻STM32单片机
此条目发表在Python分类目录。将固定链接加入收藏夹。

发表回复