文章翻譯:王運里
發(fā)表時間:2015 年 7 月 17 日
原文作者:MANU MUKERJI
文章分類:大數(shù)據(jù)及商務(wù)智能
文章開頭介紹了 Cassandra 中表的創(chuàng)建等基本知識。隨后介紹了java 中對 Cassandra 的操作的庫 CassandraHelper.java,CassandraTester.java,MapReduceExample.java,還有其中的 getSession(),createConnection(String),closeConnection(),prepareQueries(),addKey(String) 等方法。文章最后,作者給出了一個他修改過的標準單詞計數(shù)的示例,他將計數(shù)的數(shù)據(jù)寫入到 Cassandra 中,并在 cqlsh 中查看了運行結(jié)果。作者在文章中詳細地給出了運行此示例的步驟。此外,作者還在文章中提供了他對該示例測試時所用的數(shù)據(jù)集,您可以從 git 上獲取它。
在這篇文章中我會講解如何設(shè)置允許您寫入 Cassandra 的 Map Reduce Job。這里介紹的用例將包括串流分析到 Cassandra 中。
我想在我們開始之前,你有可用的 Cassandra 集群和 Hadoop 集群,甚至單個實例或本地主機就足夠了。用于此示例的代碼可在 https://github.com/manum/mr-cassandra 中獲得。
讓我們創(chuàng)建我們將要使用的 Cassandra Keyspace 和 Table。您可以在 cqlsh 中運行以下代碼(命令行實用程序,可以讓你跟 Cassandra 交互)。
表 keytable 只有一列叫作 key;它將是我們存儲數(shù)據(jù)的地方。
CREATE KEYSPACE keytest WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 3 };
CREATE TABLE keytable (
key varchar,
PRIMARY KEY (key)
);
這是它在運行之后的樣子:
cqlsh> USE keytest;
cqlsh:keytest> select * from keytable;
key
----------
test1234
(1 rows)
我們可以從看 CassandraHelper.java 和 CassandraTester.java 開始。
CassandraHelper 方法:
getSession():檢索當前會話對象以確保沒有其他會話對象被創(chuàng)建。
public Session getSession() {
LOG.info("Starting getSession()");
if (this.session == null && (this.cluster == null || this.cluster.isClosed())) {
LOG.info("Cluster not started or closed");
} else if (this.session.isClosed()) {
LOG.info("session is closed. Creating a session");
this.session = this.cluster.connect();
}
return this.session;
}
createConnection(String): 為 Cassandra 服務(wù)器傳遞 host。
public void createConnection(String node) {
this.cluster = Cluster.builder().addContactPoint(node).build();
Metadata metadata = cluster.getMetadata();
System.out.printf("Connected to cluster: %s\n",metadata.getClusterName());
for ( Host host : metadata.getAllHosts() ) {
System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack());
}
this.session = cluster.connect();
this.prepareQueries();
}
closeConnection(): 在一切都完成之后關(guān)閉連接。
public void closeConnection() {
cluster.close();
}
prepareQueries():此方法準備的查詢在服務(wù)器端進行了優(yōu)化。如果您經(jīng)常執(zhí)行相同的查詢或查詢不會更改但數(shù)據(jù)可能改變,例如在插入操作時,它推薦您使用預(yù)查詢。
private void prepareQueries() {
LOG.info("Starting prepareQueries()");
this.preparedStatement = this.session.prepare(this.query);
}
addKey(String):該方法將數(shù)據(jù)添加到群集,它還有try catch塊捕獲異常并告訴你正在發(fā)生什么。
public void addKey(String key) {
Session session = this.getSession();
if(key.length()>0) {
try {
session.execute(this.preparedStatement.bind(key) );
//session.executeAsync(this.preparedStatement.bind(key));
} catch (NoHostAvailableException e) {
System.out.printf("No host in the %s cluster can be contacted to execute the query.\n",
session.getCluster());
Session.State st = session.getState();
for ( Host host : st.getConnectedHosts() ) {
System.out.println("In flight queries::"+st.getInFlightQueries(host));
System.out.println("open connections::"+st.getOpenConnections(host));
}
} catch (QueryExecutionException e) {
System.out.println("An exception was thrown by Cassandra because it cannot " +
"successfully execute the query with the specified consistency level.");
} catch (IllegalStateException e) {
System.out.println("The BoundStatement is not ready.");
}
}
}
CassandraTester: 該類有一個 void main 方法,您需要提供想要連接的主機并且它會將值 "test1234" 寫入到 Cassandra 中。
MapReduceExample.java 是這里的感興趣的文件。它有一個 Mapper 類和 Reducer 類和 main 方法來初始化工作。在 Mapper 下,你會發(fā)現(xiàn) setup() 和 cleanup() 方法——它們會被 Map Reduce 框架自動調(diào)用來處理設(shè)置和清理操作——您將使用連接到 Cassandra 和之后的清理連接。
我修改了標準的單詞計數(shù)的例子,現(xiàn)在的方案對行計數(shù),并且會將它們都寫入 Cassandra。reducer 的輸出基本上是各行和行數(shù)。
若要運行本示例,您需要做以下幾點:
從 https://github.com/manum/mr-cassandra 復(fù)制 repo。
運行 mvn install 來在目標的 /folder 中創(chuàng)建一個 jar。
用 scp 命令將 jar 復(fù)制到您的分布式集群中。
復(fù)制測試輸入(對于這個測試,我使用了 git 上莎士比亞的所有作品 all-shakespeare.txt)
如果您運行上述步驟,它應(yīng)該啟動這份工作。在工作完成之后,轉(zhuǎn)到 cqlsh 并運行 select * from keytable limit 10;
cqlsh:keytest> select * from keytable limit 10;
key
----------------------------------------------------------------
REGAN\tGood sir, no more; these are unsightly tricks:
KING\tWe lost a jewel of her; and our esteem
ROSALIND\tAy, but when?
\tNow leaves him.
\tThy brother by decree is banished:
DUCHESS OF YORK\tI had a Richard too, and thou didst kill him;
JULIET\tWho is't that calls? is it my lady mother?
ARTHUR\tO, save me, Hubert, save me! my eyes are out
\tFull of high feeding, madly hath broke loose
\tSwift-winged with desire to get a grave,
(10 rows)
cqlsh:keytest>
Manu Mukerji 有云計算和大數(shù)據(jù)方面的背景,實時處理數(shù)以億計的每天的交易。他喜歡建筑和設(shè)計可擴展、高可用性數(shù)據(jù)的解決方案,并在網(wǎng)絡(luò)廣告和社交媒體方面有豐富的工作經(jīng)驗。
LinkedIn: http://www.linkedin.com/in/manumukerji/
更多IT技術(shù)干貨: wiki.jikexueyuan.com
加入極客星球翻譯團隊: http://wiki.jikexueyuan.com/project/wiki-editors-guidelines/translators.html版權(quán)聲明:
本譯文僅用于學習和交流目的。非商業(yè)轉(zhuǎn)載請注明譯者、出處,并保留文章在極客學院的完整鏈接
商業(yè)合作請聯(lián)系 wiki@jikexueyuan.com
原文地址:https://www.packtpub.com/books/content/writing-cassandra-hdfs-using-hadoop-map-reduce-job