admin 管理员组

文章数量: 888526

Spark 生成 janusgraph adjacent list

图数据库中的数据通常会来源于多张自然表在hive或关系型数据库.如存储人员城市graph。通常会有3张表,如:人员表(person)、地址表(address)、人员地址关联关系表(person_address)。

一、原始数据

原始数据person/address/person_address在HDFS /user/janusgraph/source目录下:

  • person.csv
    ID、firstname、lastname
hadoop fs -cat /user/janusgraph/source/person.csv
1,daniel,kuppitz
2,marko,rodriguez
3,matthias,bröcheler
  • address.csv
    addressId、Country、Region
 hadoop fs -cat /user/janusgraph/source/address.csv
2,Germany,NRW
4,USA,NM
6,USA,WA
  • person_address.csv
    personId、addressId
 hadoop fs -cat /user/janusgraph/source/person_address.csv
1,2
2,4
3,6

二、使用Spark将数据转换成adjacent

安装spark客户端并配置环境变量。

#address信息加载到spark rdd
[hadoop@bigdat-test-graph00.gz01 ~]$ spark-shell
scala> scala> val address = sc.textFile("/user/janusgraph/source/address.csv").map(_.split(",")).keyBy(a => a(0)).cache()
address: org.apache.spark.rdd.RDD[(String, Array[String])] = MapPartitionsRDD[7] at keyBy at <console>:24scala> address.collect
res1: Array[(String, Array[String])] = Array((2,Array(2, Germany, NRW)), (4,Array(4, USA, NM)), (6,Array(6, USA, WA)))scala> address.values.collect
res2: Array[Array[String]] = Array(Array(2, Germany, NRW), Array(4, USA, NM), Array(6, USA, WA))
#person信息加载到spark rdd
[hadoop@bigdat-test-graph00.gz01 ~]$ spark-shell
scala> val person = sc.textFile("/user/janusgraph/source/person.csv").map(_.split(",")).keyBy(a => a(0)).cache()
person: org.apache.spark.rdd.RDD[(String, Array[String])] = MapPartitionsRDD[12] at keyBy at <console>:24scala> person.collect
res3: Array[(String, Array[String])] = Array((1,Array(1, daniel, kuppitz)), (2,Array(2, marko, rodriguez)), (3,Array(3, matthias, bröcheler)))scala> person.keys.collect.foreach(println)
1
2
3scala> person.values.collect()
res5: Array[Array[String]] = Array(Array(1, daniel, kuppitz), Array(2, marko, rodriguez), Array(3, matthias, bröcheler))

加载out Edages边信息

#person_adress信息加载到spark rdd
[hadoop@bigdat-test-graph00.gz01 ~]$ spark-shell
#key=persionId,value=[personId,AdressId],person——>address
val outE = sc.textFile("/user/janusgraph/source/person_address.csv").map(_.split(",")).keyBy(e => e(0)).cache()scala> outE.collect
res6: Array[(String, Array[String])] = Array((1,Array(1, 2)), (2,Array(2, 4)), (3,Array(3, 6)))scala> outE.keys.collect.foreach(println)
1
2
3

加载in Edages边信息

#将addressId作为ID
#key=adressId,value=[AdressId,personId],person<——address
scala> val inE = outE.map(x => Array(x._2(1), x._2(0))).keyBy(e => e(0)).cache()
inE: org.apache.spark.rdd.RDD[(String, Array[String])] = MapPartitionsRDD[21] at keyBy at <console>:25scala> inE.collect
res8: Array[(String, Array[String])] = Array((2,Array(2, 1)), (4,Array(4, 2)), (6,Array(6, 3)))scala> inE.keys.collect.foreach(println)
2
4
6

将adress与person定点关联上即生成adjacent list

#将Address顶点点与address边关联(inE)join(inner join)
#key=addressId,values=((country,region),personId)
scala> val addressOutE = address.join(inE).mapValues(x => (x._1.slice(1,3), x._2(1)))
addressOutE: org.apache.spark.rdd.RDD[(String, (Array[String], String))] = MapPartitionsRDD[26] at mapValues at <console>:27scala> addressOutE.collect
res10: Array[(String, (Array[String], String))] = Array((4,(Array(USA, NM),2)), (6,(Array(USA, WA),3)), (2,(Array(Germany, NRW),1)))#将person顶点点与person边关联(outE)join(inner join)
#key=personId,values=((firstname,lastname),addressId)
scala> val personInE = person.join(outE).mapValues(x => (x._1.slice(1,3), x._2(1)))
personInE: org.apache.spark.rdd.RDD[(String, (Array[String], String))] = MapPartitionsRDD[30] at mapValues at <console>:27scala> personInE.collect
res11: Array[(String, (Array[String], String))] = Array((2,(Array(marko, rodriguez),4)), (3,(Array(matthias, bröcheler),6)), (1,(Array(daniel, kuppitz),2)))#设置上address顶点的label等进行每条记录格式化如扁平化,根据自己需要
#address:addressId:country,region:personId
#如:address:4:USA,NM:2
scala> val addressLines = addressOutE.map(a => Array("address", a._1, a._2._1.mkString(","), a._2._2).mkString(":"))
addressLines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[31] at map at <console>:25scala> addressLines.collect
res3: Array[String] = Array(address:4:USA,NM:2, address:6:USA,WA:3, address:2:Germany,NRW:1)#设置上person顶点的label等进行每条记录格式化如扁平化,根据自己需要
#person:personId:firstname:lastname:addressId
#如:person:2:marko,rodriguez:4
scala> val personLines = personInE.map(a => Array("person", a._1, a._2._1.mkString(","), a._2._2).mkString(":"))
personLines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[32] at map at <console>:25scala> personLines.collect
res4: Array[String] = Array(person:2:marko,rodriguez:4, person:3:matthias,bröcheler:6, person:1:daniel,kuppitz:2)#保存adjacent list到hdfs路径
scala> addressLines.union(personLines).saveAsTextFile("/user/janusgraph/input_adjacent")

保存adjacent list到hdfs路径

[hadoop@bigdat-test-graph00.gz01 ~]$ spark-shell
scala> addressLines.union(personLines).saveAsTextFile("/user/janusgraph/input_adjacent")
[hadoop@bigdat-test-graph00.gz01 ~]$ hadoop fs -ls /user/janusgraph/input_adjacent/
Found 5 items
-rw-r--r--   3 jn rd          0 2020-03-21 12:33 /user/janusgraph/input_adjacent/_SUCCESS
-rw-r--r--   3 jn rd         62 2020-03-21 12:33 /user/janusgraph/input_adjacent/part-00000
-rw-r--r--   3 jn rd          0 2020-03-21 12:33 /user/janusgraph/input_adjacent/part-00001
-rw-r--r--   3 jn rd         27 2020-03-21 12:33 /user/janusgraph/input_adjacent/part-00002
-rw-r--r--   3 jn rd         57 2020-03-21 12:33 /user/janusgraph/input_adjacent/part-00003
[hadoop@bigdat-test-graph00.gz01 ~]$ hadoop fs -cat /user/janusgraph/input_adjacent/*
address:4:USA,NM:2
address:6:USA,WA:3
address:2:Germany,NRW:1
person:2:marko,rodriguez:4
person:3:matthias,bröcheler:6
person:1:daniel,kuppitz:2

本文标签: Spark 生成 janusgraph adjacent list