主要内容

  1. Spark SQL、DataFrame与Spark Streaming

1. Spark SQL、DataFrame与Spark Streaming

源码直接参照:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.util.IntParam
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel

object SqlNetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 2 second batch size
    val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount").setMaster("local[4]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create a socket stream on target ip:port and count the
    // words in input stream of /n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    //Socke作为数据源
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    //words DStream
    val words = lines.flatMap(_.split(" "))

    // Convert RDDs of the words DStream to DataFrame and run SQL query
    //调用foreachRDD方法,遍历DStream中的RDD
    words.foreachRDD((rdd: RDD[String], time: Time) => {
      // Get the singleton instance of SQLContext
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._

      // Convert RDD[String] to RDD[case class] to DataFrame
      val wordsDataFrame = rdd.map(w => Record(w)).toDF()

      // Register as table
      wordsDataFrame.registerTempTable("words")

      // Do word count on table using SQL and print it
      val wordCountsDataFrame =
        sqlContext.sql("select word, count(*) as total from words group by word")
      println(s"========= $time =========")
      wordCountsDataFrame.show()
    })

    ssc.start()
    ssc.awaitTermination()
  }
}


/** Case class for converting RDD to DataFrame */
case class Record(word: String)


/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {

  @transient  private var instance: SQLContext = _

  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}

运行程序后,再运行下列命令

[email protected]:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data

处理结果:


========= 1448783840000 ms =========
+---------+-----+
|     word|total|
+---------+-----+
|    Spark|   12|
|   system|   12|
|  general|   12|
|     fast|   12|
|      and|   12|
|computing|   12|
|        a|   12|
|       is|   12|
|      for|   12|
|      Big|   12|
|  cluster|   12|
|     Data|   12|
+---------+-----+

========= 1448783842000 ms =========
+----+-----+
|word|total|
+----+-----+
+----+-----+

========= 1448783844000 ms =========
+----+-----+
|word|total|
+----+-----+
+----+-----+

WordPress网站开放投稿功能,接受读者的投稿。但WordPress本身并不提供投稿功能,只拥有强大的扩展能力,我们可以自己添加这个投稿功能。

实现用户投稿,有两种方法:

一种是开放后台注册功能,普通用户注册进去默认设置为投稿者,登陆进去即可添加文章(默认为草稿);

另一种是在前台提供投稿表单,用户填写相应的表格,例如米扑博客:http://blog.mimvp.com

add-posting-functionality-to-wordpress-02

前一种方法实现起来比较简单,基本不需要配置太多,只是有些博主可能会觉得别扭,不愿让他人看到自己的博客后台;而后一种方法对投稿者来说方便了很多,博主也不用担心自己博客的后台隐私,只是该方法实现起来比较麻烦,需要配置的东西很多。本文只将介绍后一种方法,希望对你有所帮助,当然也只是复制粘贴代码就可以了。

一、添加投稿表单

1、首先在当前主题目录(/wp-content/themes/your_theme/pages/)下新建一个php文件,命名为tougao.php,并将page.php中的所有代码复制到tougao.php中(tougao.php名称可能在your_theme/pages/tougao.php下,也可能在your_theme/template-tougao.php);

tougao.php 代码

1

HP 台式机,支持UEFI安装;原有的机械硬盘安装了过Windows 2008,后删除掉系统盘,重新格式为数据盘,但是仍然是GPT分区。

加一块SSD硬盘。

用U盘启动安装Windows 2012。(这种安装方式,不支持UEFI,不支持安装到GPT硬盘,只支持安装到MBR硬盘)

启动后报错0xc0000225,回车后可以启动Windows 2012。问题是这样多麻烦啊

排错:

拔掉老的机械式大硬盘,启动一点问题木有啊

问题的解决:

后来在机器BIOS里面发现,原来BIOS里面优先启动UEFI设备!  不是传统的MBR硬盘。也就是优先从我的老的机械式硬盘启动呢~~~

好吧,BIOS启动顺序里面disable UEFI启动选项,故障排错,乌拉~~!!!

参考:

https://neosmart.net/wiki/0xc0000225/